vault_audit_tools/commands/
airflow_polling.rs1use crate::audit::types::AuditEntry;
44use crate::utils::format::format_number;
45use crate::utils::processor::{ProcessingMode, ProcessorBuilder};
46use crate::utils::time::parse_timestamp;
47use anyhow::Result;
48use chrono::{DateTime, Utc};
49use std::collections::{HashMap, HashSet};
50
51#[derive(Debug, Clone)]
52struct PathData {
53 operations: usize,
54 entities: HashSet<String>,
55 operations_by_entity: HashMap<String, usize>,
56 timestamps: Vec<DateTime<Utc>>,
57}
58
59impl PathData {
60 fn new() -> Self {
61 Self {
62 operations: 0,
63 entities: HashSet::new(),
64 operations_by_entity: HashMap::new(),
65 timestamps: Vec::new(),
66 }
67 }
68
69 fn merge(&mut self, other: Self) {
70 self.operations += other.operations;
71 self.entities.extend(other.entities);
72 for (entity, count) in other.operations_by_entity {
73 *self.operations_by_entity.entry(entity).or_insert(0) += count;
74 }
75 self.timestamps.extend(other.timestamps);
76 }
77}
78
79#[derive(Debug, Clone)]
80struct AirflowState {
81 airflow_operations: usize,
82 airflow_paths: HashMap<String, PathData>,
83}
84
85impl AirflowState {
86 fn new() -> Self {
87 Self {
88 airflow_operations: 0,
89 airflow_paths: HashMap::new(),
90 }
91 }
92
93 fn merge(mut self, other: Self) -> Self {
94 self.airflow_operations += other.airflow_operations;
95 for (path, other_data) in other.airflow_paths {
96 self.airflow_paths
97 .entry(path)
98 .and_modify(|data| data.merge(other_data.clone()))
99 .or_insert(other_data);
100 }
101 self
102 }
103}
104
105pub fn run(log_files: &[String], output: Option<&str>) -> Result<()> {
106 let processor = ProcessorBuilder::new()
107 .mode(ProcessingMode::Auto)
108 .progress_label("Processing".to_string())
109 .build();
110
111 let (result, stats) = processor.process_files_streaming(
112 log_files,
113 |entry: &AuditEntry, state: &mut AirflowState| {
114 let Some(request) = &entry.request else {
116 return;
117 };
118
119 let path = match &request.path {
120 Some(p) => p.as_str(),
121 None => return,
122 };
123
124 if path.to_lowercase().contains("airflow") {
125 state.airflow_operations += 1;
126
127 let entity_id = entry
128 .auth
129 .as_ref()
130 .and_then(|a| a.entity_id.as_deref())
131 .unwrap_or("no-entity");
132
133 let path_data = state
135 .airflow_paths
136 .entry(path.to_string())
137 .or_insert_with(PathData::new);
138 path_data.operations += 1;
139 path_data.entities.insert(entity_id.to_string());
140 *path_data
141 .operations_by_entity
142 .entry(entity_id.to_string())
143 .or_insert(0) += 1;
144
145 if let Ok(ts) = parse_timestamp(&entry.time) {
147 path_data.timestamps.push(ts);
148 }
149 }
150 },
151 AirflowState::merge,
152 AirflowState::new(),
153 )?;
154
155 let total_lines = stats.total_lines;
156 let airflow_operations = result.airflow_operations;
157 let airflow_paths = result.airflow_paths;
158
159 eprintln!(
160 "\nTotal: Processed {} lines, found {} Airflow operations",
161 format_number(total_lines),
162 format_number(airflow_operations)
163 );
164
165 println!("\nSummary:");
166 println!(" Total lines processed: {}", format_number(total_lines));
167 println!(
168 " Airflow operations: {}",
169 format_number(airflow_operations)
170 );
171 println!(" Unique paths: {}", format_number(airflow_paths.len()));
172
173 let total_entities: std::collections::HashSet<_> = airflow_paths
174 .values()
175 .flat_map(|data| data.entities.iter().cloned())
176 .collect();
177 println!(
178 " Entities involved: {}",
179 format_number(total_entities.len())
180 );
181
182 println!("\n1. TOP AIRFLOW PATHS BY OPERATIONS");
184 println!("{}", "-".repeat(100));
185 println!("{:<80} {:<12} {:<10}", "Path", "Operations", "Entities");
186 println!("{}", "-".repeat(100));
187
188 let mut sorted_paths: Vec<_> = airflow_paths.iter().collect();
189 sorted_paths.sort_by(|a, b| b.1.operations.cmp(&a.1.operations));
190
191 for (path, data) in sorted_paths.iter().take(30) {
192 let display_path = if path.len() <= 78 {
193 path.as_str()
194 } else {
195 &path[..75]
196 };
197 println!(
198 "{:<80} {:<12} {:<10}",
199 display_path,
200 format_number(data.operations),
201 format_number(data.entities.len())
202 );
203 }
204
205 println!("\n2. ENTITIES ACCESSING AIRFLOW SECRETS");
207 println!("{}", "-".repeat(100));
208 println!(
209 "{:<50} {:<12} {:<15}",
210 "Entity ID", "Operations", "Unique Paths"
211 );
212 println!("{}", "-".repeat(100));
213
214 let mut entity_patterns: HashMap<String, (usize, std::collections::HashSet<String>)> =
215 HashMap::new();
216 for (path, data) in &airflow_paths {
217 for entity in &data.entities {
218 let entry = entity_patterns
219 .entry(entity.clone())
220 .or_insert_with(|| (0, std::collections::HashSet::new()));
221 entry.0 += data.operations_by_entity.get(entity).unwrap_or(&0);
222 entry.1.insert(path.clone());
223 }
224 }
225
226 let mut sorted_entities: Vec<_> = entity_patterns.iter().collect();
227 sorted_entities.sort_by(|a, b| b.1 .0.cmp(&a.1 .0));
228
229 for (entity, (ops, paths)) in sorted_entities.iter().take(20) {
230 let display_entity = if entity.len() <= 48 {
231 entity.as_str()
232 } else {
233 &entity[..45]
234 };
235 println!(
236 "{:<50} {:<12} {:<15}",
237 display_entity,
238 format_number(*ops),
239 format_number(paths.len())
240 );
241 }
242
243 println!("\n3. BURST RATE ANALYSIS (Paths with Time Data)");
245 println!(" NOTE: Rates calculated over actual time span - high rates indicate bursty access");
246 println!("{}", "-".repeat(100));
247 println!(
248 "{:<60} {:<12} {:<12} {:<15}",
249 "Path", "Operations", "Time Span", "Avg Interval"
250 );
251 println!("{}", "-".repeat(100));
252
253 struct PollingPattern {
254 path: String,
255 operations: usize,
256 time_span_hours: f64,
257 ops_per_hour: f64,
258 avg_interval_seconds: f64,
259 }
260
261 let mut polling_patterns = Vec::new();
262
263 for (path, data) in &airflow_paths {
264 if data.timestamps.len() < 2 {
265 continue;
266 }
267
268 let mut timestamps = data.timestamps.clone();
269 timestamps.sort();
270 let time_span_seconds = timestamps[timestamps.len() - 1]
271 .signed_duration_since(timestamps[0])
272 .num_seconds() as f64;
273 let time_span_hours = time_span_seconds / 3600.0;
274
275 if time_span_hours > 0.0 {
276 let ops_per_hour = data.operations as f64 / time_span_hours;
277 let avg_interval_seconds = time_span_seconds / data.operations as f64;
278
279 polling_patterns.push(PollingPattern {
280 path: path.clone(),
281 operations: data.operations,
282 time_span_hours,
283 ops_per_hour,
284 avg_interval_seconds,
285 });
286 }
287 }
288
289 polling_patterns.sort_by(|a, b| b.ops_per_hour.partial_cmp(&a.ops_per_hour).unwrap());
291
292 for pattern in polling_patterns.iter().take(25) {
293 let path_display = if pattern.path.len() <= 58 {
294 &pattern.path
295 } else {
296 &pattern.path[..55]
297 };
298 let time_span = format!("{:.1}h", pattern.time_span_hours);
299 let interval = format!("{:.1}s", pattern.avg_interval_seconds);
300
301 println!(
302 "{:<60} {:<12} {:<12} {:<15}",
303 path_display,
304 format_number(pattern.operations),
305 time_span,
306 interval
307 );
308 }
309
310 println!("\n4. ENTITY-PATH POLLING BEHAVIOR (Top 30)");
312 println!("{}", "-".repeat(100));
313 println!("{:<40} {:<45} {:<15}", "Entity", "Path", "Operations");
314 println!("{}", "-".repeat(100));
315
316 struct EntityPathCombo {
317 entity: String,
318 path: String,
319 operations: usize,
320 }
321
322 let mut entity_path_combos = Vec::new();
323 for (path, data) in &airflow_paths {
324 for (entity_id, ops) in &data.operations_by_entity {
325 entity_path_combos.push(EntityPathCombo {
326 entity: entity_id.clone(),
327 path: path.clone(),
328 operations: *ops,
329 });
330 }
331 }
332
333 entity_path_combos.sort_by(|a, b| b.operations.cmp(&a.operations));
334
335 for combo in entity_path_combos.iter().take(30) {
336 let entity_display = if combo.entity.len() <= 38 {
337 &combo.entity
338 } else {
339 &combo.entity[..35]
340 };
341 let path_display = if combo.path.len() <= 43 {
342 &combo.path
343 } else {
344 &combo.path[..40]
345 };
346
347 println!(
348 "{:<40} {:<45} {:<15}",
349 entity_display,
350 path_display,
351 format_number(combo.operations)
352 );
353 }
354
355 println!("\n5. OPTIMIZATION RECOMMENDATIONS");
357 println!("{}", "-".repeat(100));
358
359 let high_frequency_paths: Vec<_> = polling_patterns
360 .iter()
361 .filter(|p| p.ops_per_hour > 100.0)
362 .collect();
363 let total_high_freq_ops: usize = high_frequency_paths.iter().map(|p| p.operations).sum();
364
365 println!(
366 "Total Airflow operations: {}",
367 format_number(airflow_operations)
368 );
369 println!(
370 "Paths with >100 ops/hour burst rate: {}",
371 format_number(high_frequency_paths.len())
372 );
373 println!(
374 "Operations from high-frequency paths: {} ({:.1}%)",
375 format_number(total_high_freq_ops),
376 (total_high_freq_ops as f64 / airflow_operations as f64) * 100.0
377 );
378 println!();
379 println!("Recommended Actions:");
380 println!();
381 println!("1. IMPLEMENT AIRFLOW CONNECTION CACHING");
382 println!(" - Configure Airflow to cache connection objects");
383 println!(" - Expected reduction: 80-90% of reads");
384 println!(
385 " - Potential savings: {} operations/day",
386 format_number((airflow_operations as f64 * 0.85) as usize)
387 );
388 println!();
389 println!("2. DEPLOY VAULT AGENT WITH AIRFLOW");
390 println!(" - Run Vault agent as sidecar/daemon");
391 println!(" - Configure template rendering for connections");
392 println!(" - Expected reduction: 95% of reads");
393 println!(
394 " - Potential savings: {} operations/day",
395 format_number((airflow_operations as f64 * 0.95) as usize)
396 );
397 println!();
398 println!("3. USE AIRFLOW SECRETS BACKEND EFFICIENTLY");
399 println!(" - Review connection lookup patterns in DAGs");
400 println!(" - Implement connection object reuse within tasks");
401 println!(" - Cache connections at DAG level where appropriate");
402 println!();
403
404 if !polling_patterns.is_empty() {
405 println!("4. PRIORITY PATHS FOR IMMEDIATE OPTIMIZATION (by burst rate):");
406 for (i, pattern) in polling_patterns.iter().take(10).enumerate() {
407 let path_name = pattern.path.split('/').next_back().unwrap_or(&pattern.path);
408 println!(
409 " {}. {}: {} operations ({:.0}/hour burst rate)",
410 i + 1,
411 path_name,
412 format_number(pattern.operations),
413 pattern.ops_per_hour
414 );
415 }
416 }
417
418 println!("\n{}", "=".repeat(100));
419
420 if let Some(output_file) = output {
421 use std::fs::File;
422 use std::io::Write;
423 let mut file = File::create(output_file)?;
424 writeln!(file, "entity_id,path,operation_count")?;
425 for (path, data) in &airflow_paths {
426 for (entity, count) in &data.operations_by_entity {
427 writeln!(file, "{},{},{}", entity, path, count)?;
428 }
429 }
430 println!("\nOutput written to: {}", output_file);
431 }
432
433 Ok(())
434}