vault_audit_tools/commands/
airflow_polling.rs

1//! Airflow polling pattern detection.
2//!
3//! Identifies Apache Airflow instances that are polling Vault connections
4//! excessively, which can cause performance issues.
5//! Supports multi-file analysis for pattern detection over time.
6//!
7//! # Usage
8//!
9//! ```bash
10//! # Single file - detect default Airflow patterns
11//! vault-audit airflow-polling audit.log
12//!
13//! # Multi-day analysis with custom thresholds
14//! vault-audit airflow-polling logs/*.log --threshold 100
15//! ```
16//!
17//! # Detection Logic
18//!
19//! Identifies entities accessing paths like:
20//! - `database/config/*`
21//! - `database/creds/*`
22//! - Connection-related paths
23//!
24//! With characteristics of polling behavior:
25//! - High frequency access (default: >50 ops)
26//! - Regular time intervals
27//! - Repeated access to same paths
28//!
29//! # Output
30//!
31//! Displays entities with polling patterns:
32//! - Entity ID and display name
33//! - Total operations count
34//! - Polling rate (ops per hour)
35//! - Paths being polled
36//! - Time span of activity
37//!
38//! Helps optimize:
39//! - Airflow connection pooling
40//! - Vault performance
41//! - Database credential caching
42
43use crate::audit::types::AuditEntry;
44use crate::utils::format::format_number;
45use crate::utils::processor::{ProcessingMode, ProcessorBuilder};
46use crate::utils::time::parse_timestamp;
47use anyhow::Result;
48use chrono::{DateTime, Utc};
49use std::collections::{HashMap, HashSet};
50
51#[derive(Debug, Clone)]
52struct PathData {
53    operations: usize,
54    entities: HashSet<String>,
55    operations_by_entity: HashMap<String, usize>,
56    timestamps: Vec<DateTime<Utc>>,
57}
58
59impl PathData {
60    fn new() -> Self {
61        Self {
62            operations: 0,
63            entities: HashSet::new(),
64            operations_by_entity: HashMap::new(),
65            timestamps: Vec::new(),
66        }
67    }
68
69    fn merge(&mut self, other: Self) {
70        self.operations += other.operations;
71        self.entities.extend(other.entities);
72        for (entity, count) in other.operations_by_entity {
73            *self.operations_by_entity.entry(entity).or_insert(0) += count;
74        }
75        self.timestamps.extend(other.timestamps);
76    }
77}
78
79#[derive(Debug, Clone)]
80struct AirflowState {
81    airflow_operations: usize,
82    airflow_paths: HashMap<String, PathData>,
83}
84
85impl AirflowState {
86    fn new() -> Self {
87        Self {
88            airflow_operations: 0,
89            airflow_paths: HashMap::new(),
90        }
91    }
92
93    fn merge(mut self, other: Self) -> Self {
94        self.airflow_operations += other.airflow_operations;
95        for (path, other_data) in other.airflow_paths {
96            self.airflow_paths
97                .entry(path)
98                .and_modify(|data| data.merge(other_data.clone()))
99                .or_insert(other_data);
100        }
101        self
102    }
103}
104
105pub fn run(log_files: &[String], output: Option<&str>) -> Result<()> {
106    let processor = ProcessorBuilder::new()
107        .mode(ProcessingMode::Auto)
108        .progress_label("Processing".to_string())
109        .build();
110
111    let (result, stats) = processor.process_files_streaming(
112        log_files,
113        |entry: &AuditEntry, state: &mut AirflowState| {
114            // Filter for Airflow-related paths (case-insensitive)
115            let Some(request) = &entry.request else {
116                return;
117            };
118
119            let path = match &request.path {
120                Some(p) => p.as_str(),
121                None => return,
122            };
123
124            if path.to_lowercase().contains("airflow") {
125                state.airflow_operations += 1;
126
127                let entity_id = entry
128                    .auth
129                    .as_ref()
130                    .and_then(|a| a.entity_id.as_deref())
131                    .unwrap_or("no-entity");
132
133                // Track path statistics
134                let path_data = state
135                    .airflow_paths
136                    .entry(path.to_string())
137                    .or_insert_with(PathData::new);
138                path_data.operations += 1;
139                path_data.entities.insert(entity_id.to_string());
140                *path_data
141                    .operations_by_entity
142                    .entry(entity_id.to_string())
143                    .or_insert(0) += 1;
144
145                // Track timestamp if available
146                if let Ok(ts) = parse_timestamp(&entry.time) {
147                    path_data.timestamps.push(ts);
148                }
149            }
150        },
151        AirflowState::merge,
152        AirflowState::new(),
153    )?;
154
155    let total_lines = stats.total_lines;
156    let airflow_operations = result.airflow_operations;
157    let airflow_paths = result.airflow_paths;
158
159    eprintln!(
160        "\nTotal: Processed {} lines, found {} Airflow operations",
161        format_number(total_lines),
162        format_number(airflow_operations)
163    );
164
165    println!("\nSummary:");
166    println!("  Total lines processed: {}", format_number(total_lines));
167    println!(
168        "  Airflow operations: {}",
169        format_number(airflow_operations)
170    );
171    println!("  Unique paths: {}", format_number(airflow_paths.len()));
172
173    let total_entities: std::collections::HashSet<_> = airflow_paths
174        .values()
175        .flat_map(|data| data.entities.iter().cloned())
176        .collect();
177    println!(
178        "  Entities involved: {}",
179        format_number(total_entities.len())
180    );
181
182    // 1. Top Airflow paths by operations
183    println!("\n1. TOP AIRFLOW PATHS BY OPERATIONS");
184    println!("{}", "-".repeat(100));
185    println!("{:<80} {:<12} {:<10}", "Path", "Operations", "Entities");
186    println!("{}", "-".repeat(100));
187
188    let mut sorted_paths: Vec<_> = airflow_paths.iter().collect();
189    sorted_paths.sort_by(|a, b| b.1.operations.cmp(&a.1.operations));
190
191    for (path, data) in sorted_paths.iter().take(30) {
192        let display_path = if path.len() <= 78 {
193            path.as_str()
194        } else {
195            &path[..75]
196        };
197        println!(
198            "{:<80} {:<12} {:<10}",
199            display_path,
200            format_number(data.operations),
201            format_number(data.entities.len())
202        );
203    }
204
205    // 2. Entity access patterns
206    println!("\n2. ENTITIES ACCESSING AIRFLOW SECRETS");
207    println!("{}", "-".repeat(100));
208    println!(
209        "{:<50} {:<12} {:<15}",
210        "Entity ID", "Operations", "Unique Paths"
211    );
212    println!("{}", "-".repeat(100));
213
214    let mut entity_patterns: HashMap<String, (usize, std::collections::HashSet<String>)> =
215        HashMap::new();
216    for (path, data) in &airflow_paths {
217        for entity in &data.entities {
218            let entry = entity_patterns
219                .entry(entity.clone())
220                .or_insert_with(|| (0, std::collections::HashSet::new()));
221            entry.0 += data.operations_by_entity.get(entity).unwrap_or(&0);
222            entry.1.insert(path.clone());
223        }
224    }
225
226    let mut sorted_entities: Vec<_> = entity_patterns.iter().collect();
227    sorted_entities.sort_by(|a, b| b.1 .0.cmp(&a.1 .0));
228
229    for (entity, (ops, paths)) in sorted_entities.iter().take(20) {
230        let display_entity = if entity.len() <= 48 {
231            entity.as_str()
232        } else {
233            &entity[..45]
234        };
235        println!(
236            "{:<50} {:<12} {:<15}",
237            display_entity,
238            format_number(*ops),
239            format_number(paths.len())
240        );
241    }
242
243    // 3. Polling pattern analysis with BURST RATES
244    println!("\n3. BURST RATE ANALYSIS (Paths with Time Data)");
245    println!("   NOTE: Rates calculated over actual time span - high rates indicate bursty access");
246    println!("{}", "-".repeat(100));
247    println!(
248        "{:<60} {:<12} {:<12} {:<15}",
249        "Path", "Operations", "Time Span", "Avg Interval"
250    );
251    println!("{}", "-".repeat(100));
252
253    struct PollingPattern {
254        path: String,
255        operations: usize,
256        time_span_hours: f64,
257        ops_per_hour: f64,
258        avg_interval_seconds: f64,
259    }
260
261    let mut polling_patterns = Vec::new();
262
263    for (path, data) in &airflow_paths {
264        if data.timestamps.len() < 2 {
265            continue;
266        }
267
268        let mut timestamps = data.timestamps.clone();
269        timestamps.sort();
270        let time_span_seconds = timestamps[timestamps.len() - 1]
271            .signed_duration_since(timestamps[0])
272            .num_seconds() as f64;
273        let time_span_hours = time_span_seconds / 3600.0;
274
275        if time_span_hours > 0.0 {
276            let ops_per_hour = data.operations as f64 / time_span_hours;
277            let avg_interval_seconds = time_span_seconds / data.operations as f64;
278
279            polling_patterns.push(PollingPattern {
280                path: path.clone(),
281                operations: data.operations,
282                time_span_hours,
283                ops_per_hour,
284                avg_interval_seconds,
285            });
286        }
287    }
288
289    // Sort by operations per hour (highest burst rate)
290    polling_patterns.sort_by(|a, b| b.ops_per_hour.partial_cmp(&a.ops_per_hour).unwrap());
291
292    for pattern in polling_patterns.iter().take(25) {
293        let path_display = if pattern.path.len() <= 58 {
294            &pattern.path
295        } else {
296            &pattern.path[..55]
297        };
298        let time_span = format!("{:.1}h", pattern.time_span_hours);
299        let interval = format!("{:.1}s", pattern.avg_interval_seconds);
300
301        println!(
302            "{:<60} {:<12} {:<12} {:<15}",
303            path_display,
304            format_number(pattern.operations),
305            time_span,
306            interval
307        );
308    }
309
310    // 4. Entity-path combinations
311    println!("\n4. ENTITY-PATH POLLING BEHAVIOR (Top 30)");
312    println!("{}", "-".repeat(100));
313    println!("{:<40} {:<45} {:<15}", "Entity", "Path", "Operations");
314    println!("{}", "-".repeat(100));
315
316    struct EntityPathCombo {
317        entity: String,
318        path: String,
319        operations: usize,
320    }
321
322    let mut entity_path_combos = Vec::new();
323    for (path, data) in &airflow_paths {
324        for (entity_id, ops) in &data.operations_by_entity {
325            entity_path_combos.push(EntityPathCombo {
326                entity: entity_id.clone(),
327                path: path.clone(),
328                operations: *ops,
329            });
330        }
331    }
332
333    entity_path_combos.sort_by(|a, b| b.operations.cmp(&a.operations));
334
335    for combo in entity_path_combos.iter().take(30) {
336        let entity_display = if combo.entity.len() <= 38 {
337            &combo.entity
338        } else {
339            &combo.entity[..35]
340        };
341        let path_display = if combo.path.len() <= 43 {
342            &combo.path
343        } else {
344            &combo.path[..40]
345        };
346
347        println!(
348            "{:<40} {:<45} {:<15}",
349            entity_display,
350            path_display,
351            format_number(combo.operations)
352        );
353    }
354
355    // 5. Recommendations
356    println!("\n5. OPTIMIZATION RECOMMENDATIONS");
357    println!("{}", "-".repeat(100));
358
359    let high_frequency_paths: Vec<_> = polling_patterns
360        .iter()
361        .filter(|p| p.ops_per_hour > 100.0)
362        .collect();
363    let total_high_freq_ops: usize = high_frequency_paths.iter().map(|p| p.operations).sum();
364
365    println!(
366        "Total Airflow operations: {}",
367        format_number(airflow_operations)
368    );
369    println!(
370        "Paths with >100 ops/hour burst rate: {}",
371        format_number(high_frequency_paths.len())
372    );
373    println!(
374        "Operations from high-frequency paths: {} ({:.1}%)",
375        format_number(total_high_freq_ops),
376        (total_high_freq_ops as f64 / airflow_operations as f64) * 100.0
377    );
378    println!();
379    println!("Recommended Actions:");
380    println!();
381    println!("1. IMPLEMENT AIRFLOW CONNECTION CACHING");
382    println!("   - Configure Airflow to cache connection objects");
383    println!("   - Expected reduction: 80-90% of reads");
384    println!(
385        "   - Potential savings: {} operations/day",
386        format_number((airflow_operations as f64 * 0.85) as usize)
387    );
388    println!();
389    println!("2. DEPLOY VAULT AGENT WITH AIRFLOW");
390    println!("   - Run Vault agent as sidecar/daemon");
391    println!("   - Configure template rendering for connections");
392    println!("   - Expected reduction: 95% of reads");
393    println!(
394        "   - Potential savings: {} operations/day",
395        format_number((airflow_operations as f64 * 0.95) as usize)
396    );
397    println!();
398    println!("3. USE AIRFLOW SECRETS BACKEND EFFICIENTLY");
399    println!("   - Review connection lookup patterns in DAGs");
400    println!("   - Implement connection object reuse within tasks");
401    println!("   - Cache connections at DAG level where appropriate");
402    println!();
403
404    if !polling_patterns.is_empty() {
405        println!("4. PRIORITY PATHS FOR IMMEDIATE OPTIMIZATION (by burst rate):");
406        for (i, pattern) in polling_patterns.iter().take(10).enumerate() {
407            let path_name = pattern.path.split('/').next_back().unwrap_or(&pattern.path);
408            println!(
409                "   {}. {}: {} operations ({:.0}/hour burst rate)",
410                i + 1,
411                path_name,
412                format_number(pattern.operations),
413                pattern.ops_per_hour
414            );
415        }
416    }
417
418    println!("\n{}", "=".repeat(100));
419
420    if let Some(output_file) = output {
421        use std::fs::File;
422        use std::io::Write;
423        let mut file = File::create(output_file)?;
424        writeln!(file, "entity_id,path,operation_count")?;
425        for (path, data) in &airflow_paths {
426            for (entity, count) in &data.operations_by_entity {
427                writeln!(file, "{},{},{}", entity, path, count)?;
428            }
429        }
430        println!("\nOutput written to: {}", output_file);
431    }
432
433    Ok(())
434}