vault_audit_tools/commands/
airflow_polling.rs

1//! Airflow polling pattern detection.
2//!
3//! Identifies Apache Airflow instances that are polling Vault connections
4//! excessively, which can cause performance issues.
5//! Supports multi-file analysis for pattern detection over time.
6//!
7//! # Usage
8//!
9//! ```bash
10//! # Single file - detect default Airflow patterns
11//! vault-audit airflow-polling audit.log
12//!
13//! # Multi-day analysis with custom thresholds
14//! vault-audit airflow-polling logs/*.log --threshold 100
15//! ```
16//!
17//! # Detection Logic
18//!
19//! Identifies entities accessing paths like:
20//! - `database/config/*`
21//! - `database/creds/*`
22//! - Connection-related paths
23//!
24//! With characteristics of polling behavior:
25//! - High frequency access (default: >50 ops)
26//! - Regular time intervals
27//! - Repeated access to same paths
28//!
29//! # Output
30//!
31//! Displays entities with polling patterns:
32//! - Entity ID and display name
33//! - Total operations count
34//! - Polling rate (ops per hour)
35//! - Paths being polled
36//! - Time span of activity
37//!
38//! Helps optimize:
39//! - Airflow connection pooling
40//! - Vault performance
41//! - Database credential caching
42
43use crate::audit::types::AuditEntry;
44use crate::utils::progress::ProgressBar;
45use crate::utils::time::parse_timestamp;
46use anyhow::Result;
47use chrono::{DateTime, Utc};
48use std::collections::HashMap;
49use std::fs::File;
50use std::io::{BufRead, BufReader};
51
52pub fn format_number(n: usize) -> String {
53    let s = n.to_string();
54    let mut result = String::new();
55    for (i, c) in s.chars().rev().enumerate() {
56        if i > 0 && i % 3 == 0 {
57            result.push(',');
58        }
59        result.push(c);
60    }
61    result.chars().rev().collect()
62}
63
64struct PathData {
65    operations: usize,
66    entities: std::collections::HashSet<String>,
67    operations_by_entity: HashMap<String, usize>,
68    timestamps: Vec<DateTime<Utc>>,
69}
70
71impl PathData {
72    fn new() -> Self {
73        Self {
74            operations: 0,
75            entities: std::collections::HashSet::new(),
76            operations_by_entity: HashMap::new(),
77            timestamps: Vec::new(),
78        }
79    }
80}
81
82pub fn run(log_files: &[String], output: Option<&str>) -> Result<()> {
83    let mut airflow_operations = 0;
84    let mut airflow_paths: HashMap<String, PathData> = HashMap::new();
85    let mut total_lines = 0;
86
87    // Process each log file sequentially
88    for (file_idx, log_file) in log_files.iter().enumerate() {
89        eprintln!(
90            "[{}/{}] Processing: {}",
91            file_idx + 1,
92            log_files.len(),
93            log_file
94        );
95
96        // Get file size for progress tracking
97        let file_size = std::fs::metadata(log_file).ok().map(|m| m.len() as usize);
98        let mut progress = if let Some(size) = file_size {
99            ProgressBar::new(size, "Processing")
100        } else {
101            ProgressBar::new_spinner("Processing")
102        };
103
104        let file = File::open(log_file)?;
105        let reader = BufReader::new(file);
106
107        let mut file_lines = 0;
108        let mut bytes_read = 0;
109
110        for line in reader.lines() {
111            file_lines += 1;
112            total_lines += 1;
113            let line = line?;
114            bytes_read += line.len() + 1; // +1 for newline
115
116            // Update progress every 10k lines for smooth animation
117            if file_lines % 10_000 == 0 {
118                if let Some(size) = file_size {
119                    progress.update(bytes_read.min(size)); // Cap at file size
120                } else {
121                    progress.update(file_lines);
122                }
123            }
124
125            let entry: AuditEntry = match serde_json::from_str(&line) {
126                Ok(e) => e,
127                Err(_) => continue,
128            };
129
130            // Filter for Airflow-related paths (case-insensitive)
131            let request = match &entry.request {
132                Some(r) => r,
133                None => continue,
134            };
135
136            let path = match &request.path {
137                Some(p) => p.as_str(),
138                None => continue,
139            };
140
141            if path.to_lowercase().contains("airflow") {
142                airflow_operations += 1;
143
144                let entity_id = entry
145                    .auth
146                    .as_ref()
147                    .and_then(|a| a.entity_id.as_deref())
148                    .unwrap_or("no-entity");
149
150                // Track path statistics
151                let path_data = airflow_paths
152                    .entry(path.to_string())
153                    .or_insert_with(PathData::new);
154                path_data.operations += 1;
155                path_data.entities.insert(entity_id.to_string());
156                *path_data
157                    .operations_by_entity
158                    .entry(entity_id.to_string())
159                    .or_insert(0) += 1;
160
161                // Track timestamp if available
162                if let Ok(ts) = parse_timestamp(&entry.time) {
163                    path_data.timestamps.push(ts);
164                }
165            }
166        }
167
168        // Ensure 100% progress for this file
169        if let Some(size) = file_size {
170            progress.update(size);
171        }
172
173        progress.finish_with_message(&format!(
174            "Processed {} lines from this file",
175            format_number(file_lines)
176        ));
177    }
178
179    eprintln!(
180        "\nTotal: Processed {} lines, found {} Airflow operations",
181        format_number(total_lines),
182        format_number(airflow_operations)
183    );
184
185    println!("\nSummary:");
186    println!("  Total lines processed: {}", format_number(total_lines));
187    println!(
188        "  Airflow operations: {}",
189        format_number(airflow_operations)
190    );
191    println!("  Unique paths: {}", format_number(airflow_paths.len()));
192
193    let total_entities: std::collections::HashSet<_> = airflow_paths
194        .values()
195        .flat_map(|data| data.entities.iter().cloned())
196        .collect();
197    println!(
198        "  Entities involved: {}",
199        format_number(total_entities.len())
200    );
201
202    // 1. Top Airflow paths by operations
203    println!("\n1. TOP AIRFLOW PATHS BY OPERATIONS");
204    println!("{}", "-".repeat(100));
205    println!("{:<80} {:<12} {:<10}", "Path", "Operations", "Entities");
206    println!("{}", "-".repeat(100));
207
208    let mut sorted_paths: Vec<_> = airflow_paths.iter().collect();
209    sorted_paths.sort_by(|a, b| b.1.operations.cmp(&a.1.operations));
210
211    for (path, data) in sorted_paths.iter().take(30) {
212        let display_path = if path.len() <= 78 {
213            path.as_str()
214        } else {
215            &path[..75]
216        };
217        println!(
218            "{:<80} {:<12} {:<10}",
219            display_path,
220            format_number(data.operations),
221            format_number(data.entities.len())
222        );
223    }
224
225    // 2. Entity access patterns
226    println!("\n2. ENTITIES ACCESSING AIRFLOW SECRETS");
227    println!("{}", "-".repeat(100));
228    println!(
229        "{:<50} {:<12} {:<15}",
230        "Entity ID", "Operations", "Unique Paths"
231    );
232    println!("{}", "-".repeat(100));
233
234    let mut entity_patterns: HashMap<String, (usize, std::collections::HashSet<String>)> =
235        HashMap::new();
236    for (path, data) in &airflow_paths {
237        for entity in &data.entities {
238            let entry = entity_patterns
239                .entry(entity.clone())
240                .or_insert((0, std::collections::HashSet::new()));
241            entry.0 += data.operations_by_entity.get(entity).unwrap_or(&0);
242            entry.1.insert(path.clone());
243        }
244    }
245
246    let mut sorted_entities: Vec<_> = entity_patterns.iter().collect();
247    sorted_entities.sort_by(|a, b| b.1 .0.cmp(&a.1 .0));
248
249    for (entity, (ops, paths)) in sorted_entities.iter().take(20) {
250        let display_entity = if entity.len() <= 48 {
251            entity.as_str()
252        } else {
253            &entity[..45]
254        };
255        println!(
256            "{:<50} {:<12} {:<15}",
257            display_entity,
258            format_number(*ops),
259            format_number(paths.len())
260        );
261    }
262
263    // 3. Polling pattern analysis with BURST RATES
264    println!("\n3. BURST RATE ANALYSIS (Paths with Time Data)");
265    println!("   NOTE: Rates calculated over actual time span - high rates indicate bursty access");
266    println!("{}", "-".repeat(100));
267    println!(
268        "{:<60} {:<12} {:<12} {:<15}",
269        "Path", "Operations", "Time Span", "Avg Interval"
270    );
271    println!("{}", "-".repeat(100));
272
273    struct PollingPattern {
274        path: String,
275        operations: usize,
276        time_span_hours: f64,
277        ops_per_hour: f64,
278        avg_interval_seconds: f64,
279    }
280
281    let mut polling_patterns = Vec::new();
282
283    for (path, data) in &airflow_paths {
284        if data.timestamps.len() < 2 {
285            continue;
286        }
287
288        let mut timestamps = data.timestamps.clone();
289        timestamps.sort();
290        let time_span_seconds = timestamps[timestamps.len() - 1]
291            .signed_duration_since(timestamps[0])
292            .num_seconds() as f64;
293        let time_span_hours = time_span_seconds / 3600.0;
294
295        if time_span_hours > 0.0 {
296            let ops_per_hour = data.operations as f64 / time_span_hours;
297            let avg_interval_seconds = time_span_seconds / data.operations as f64;
298
299            polling_patterns.push(PollingPattern {
300                path: path.clone(),
301                operations: data.operations,
302                time_span_hours,
303                ops_per_hour,
304                avg_interval_seconds,
305            });
306        }
307    }
308
309    // Sort by operations per hour (highest burst rate)
310    polling_patterns.sort_by(|a, b| b.ops_per_hour.partial_cmp(&a.ops_per_hour).unwrap());
311
312    for pattern in polling_patterns.iter().take(25) {
313        let path_display = if pattern.path.len() <= 58 {
314            &pattern.path
315        } else {
316            &pattern.path[..55]
317        };
318        let time_span = format!("{:.1}h", pattern.time_span_hours);
319        let interval = format!("{:.1}s", pattern.avg_interval_seconds);
320
321        println!(
322            "{:<60} {:<12} {:<12} {:<15}",
323            path_display,
324            format_number(pattern.operations),
325            time_span,
326            interval
327        );
328    }
329
330    // 4. Entity-path combinations
331    println!("\n4. ENTITY-PATH POLLING BEHAVIOR (Top 30)");
332    println!("{}", "-".repeat(100));
333    println!("{:<40} {:<45} {:<15}", "Entity", "Path", "Operations");
334    println!("{}", "-".repeat(100));
335
336    struct EntityPathCombo {
337        entity: String,
338        path: String,
339        operations: usize,
340    }
341
342    let mut entity_path_combos = Vec::new();
343    for (path, data) in &airflow_paths {
344        for (entity_id, ops) in &data.operations_by_entity {
345            entity_path_combos.push(EntityPathCombo {
346                entity: entity_id.clone(),
347                path: path.clone(),
348                operations: *ops,
349            });
350        }
351    }
352
353    entity_path_combos.sort_by(|a, b| b.operations.cmp(&a.operations));
354
355    for combo in entity_path_combos.iter().take(30) {
356        let entity_display = if combo.entity.len() <= 38 {
357            &combo.entity
358        } else {
359            &combo.entity[..35]
360        };
361        let path_display = if combo.path.len() <= 43 {
362            &combo.path
363        } else {
364            &combo.path[..40]
365        };
366
367        println!(
368            "{:<40} {:<45} {:<15}",
369            entity_display,
370            path_display,
371            format_number(combo.operations)
372        );
373    }
374
375    // 5. Recommendations
376    println!("\n5. OPTIMIZATION RECOMMENDATIONS");
377    println!("{}", "-".repeat(100));
378
379    let high_frequency_paths: Vec<_> = polling_patterns
380        .iter()
381        .filter(|p| p.ops_per_hour > 100.0)
382        .collect();
383    let total_high_freq_ops: usize = high_frequency_paths.iter().map(|p| p.operations).sum();
384
385    println!(
386        "Total Airflow operations: {}",
387        format_number(airflow_operations)
388    );
389    println!(
390        "Paths with >100 ops/hour burst rate: {}",
391        format_number(high_frequency_paths.len())
392    );
393    println!(
394        "Operations from high-frequency paths: {} ({:.1}%)",
395        format_number(total_high_freq_ops),
396        (total_high_freq_ops as f64 / airflow_operations as f64) * 100.0
397    );
398    println!();
399    println!("Recommended Actions:");
400    println!();
401    println!("1. IMPLEMENT AIRFLOW CONNECTION CACHING");
402    println!("   - Configure Airflow to cache connection objects");
403    println!("   - Expected reduction: 80-90% of reads");
404    println!(
405        "   - Potential savings: {} operations/day",
406        format_number((airflow_operations as f64 * 0.85) as usize)
407    );
408    println!();
409    println!("2. DEPLOY VAULT AGENT WITH AIRFLOW");
410    println!("   - Run Vault agent as sidecar/daemon");
411    println!("   - Configure template rendering for connections");
412    println!("   - Expected reduction: 95% of reads");
413    println!(
414        "   - Potential savings: {} operations/day",
415        format_number((airflow_operations as f64 * 0.95) as usize)
416    );
417    println!();
418    println!("3. USE AIRFLOW SECRETS BACKEND EFFICIENTLY");
419    println!("   - Review connection lookup patterns in DAGs");
420    println!("   - Implement connection object reuse within tasks");
421    println!("   - Cache connections at DAG level where appropriate");
422    println!();
423
424    if !polling_patterns.is_empty() {
425        println!("4. PRIORITY PATHS FOR IMMEDIATE OPTIMIZATION (by burst rate):");
426        for (i, pattern) in polling_patterns.iter().take(10).enumerate() {
427            let path_name = pattern.path.split('/').next_back().unwrap_or(&pattern.path);
428            println!(
429                "   {}. {}: {} operations ({:.0}/hour burst rate)",
430                i + 1,
431                path_name,
432                format_number(pattern.operations),
433                pattern.ops_per_hour
434            );
435        }
436    }
437
438    println!("\n{}", "=".repeat(100));
439
440    if let Some(output_file) = output {
441        use std::fs::File;
442        use std::io::Write;
443        let mut file = File::create(output_file)?;
444        writeln!(file, "entity_id,path,operation_count")?;
445        for (path, data) in &airflow_paths {
446            for (entity, count) in &data.operations_by_entity {
447                writeln!(file, "{},{},{}", entity, path, count)?;
448            }
449        }
450        println!("\nOutput written to: {}", output_file);
451    }
452
453    Ok(())
454}