vault_audit_tools/commands/
airflow_polling.rs1use crate::audit::types::AuditEntry;
44use crate::utils::progress::ProgressBar;
45use crate::utils::reader::open_file;
46use crate::utils::time::parse_timestamp;
47use anyhow::Result;
48use chrono::{DateTime, Utc};
49use std::collections::HashMap;
50use std::io::{BufRead, BufReader};
51
52pub fn format_number(n: usize) -> String {
53 let s = n.to_string();
54 let mut result = String::new();
55 for (i, c) in s.chars().rev().enumerate() {
56 if i > 0 && i % 3 == 0 {
57 result.push(',');
58 }
59 result.push(c);
60 }
61 result.chars().rev().collect()
62}
63
64struct PathData {
65 operations: usize,
66 entities: std::collections::HashSet<String>,
67 operations_by_entity: HashMap<String, usize>,
68 timestamps: Vec<DateTime<Utc>>,
69}
70
71impl PathData {
72 fn new() -> Self {
73 Self {
74 operations: 0,
75 entities: std::collections::HashSet::new(),
76 operations_by_entity: HashMap::new(),
77 timestamps: Vec::new(),
78 }
79 }
80}
81
82pub fn run(log_files: &[String], output: Option<&str>) -> Result<()> {
83 let mut airflow_operations = 0;
84 let mut airflow_paths: HashMap<String, PathData> = HashMap::new();
85 let mut total_lines = 0;
86
87 for (file_idx, log_file) in log_files.iter().enumerate() {
89 eprintln!(
90 "[{}/{}] Processing: {}",
91 file_idx + 1,
92 log_files.len(),
93 log_file
94 );
95
96 let file_size = std::fs::metadata(log_file).ok().map(|m| m.len() as usize);
98 let mut progress = if let Some(size) = file_size {
99 ProgressBar::new(size, "Processing")
100 } else {
101 ProgressBar::new_spinner("Processing")
102 };
103
104 let file = open_file(log_file)?;
105 let reader = BufReader::new(file);
106
107 let mut file_lines = 0;
108 let mut bytes_read = 0;
109
110 for line in reader.lines() {
111 file_lines += 1;
112 total_lines += 1;
113 let line = line?;
114 bytes_read += line.len() + 1; if file_lines % 10_000 == 0 {
118 if let Some(size) = file_size {
119 progress.update(bytes_read.min(size)); } else {
121 progress.update(file_lines);
122 }
123 }
124
125 let entry: AuditEntry = match serde_json::from_str(&line) {
126 Ok(e) => e,
127 Err(_) => continue,
128 };
129
130 let request = match &entry.request {
132 Some(r) => r,
133 None => continue,
134 };
135
136 let path = match &request.path {
137 Some(p) => p.as_str(),
138 None => continue,
139 };
140
141 if path.to_lowercase().contains("airflow") {
142 airflow_operations += 1;
143
144 let entity_id = entry
145 .auth
146 .as_ref()
147 .and_then(|a| a.entity_id.as_deref())
148 .unwrap_or("no-entity");
149
150 let path_data = airflow_paths
152 .entry(path.to_string())
153 .or_insert_with(PathData::new);
154 path_data.operations += 1;
155 path_data.entities.insert(entity_id.to_string());
156 *path_data
157 .operations_by_entity
158 .entry(entity_id.to_string())
159 .or_insert(0) += 1;
160
161 if let Ok(ts) = parse_timestamp(&entry.time) {
163 path_data.timestamps.push(ts);
164 }
165 }
166 }
167
168 if let Some(size) = file_size {
170 progress.update(size);
171 }
172
173 progress.finish_with_message(&format!(
174 "Processed {} lines from this file",
175 format_number(file_lines)
176 ));
177 }
178
179 eprintln!(
180 "\nTotal: Processed {} lines, found {} Airflow operations",
181 format_number(total_lines),
182 format_number(airflow_operations)
183 );
184
185 println!("\nSummary:");
186 println!(" Total lines processed: {}", format_number(total_lines));
187 println!(
188 " Airflow operations: {}",
189 format_number(airflow_operations)
190 );
191 println!(" Unique paths: {}", format_number(airflow_paths.len()));
192
193 let total_entities: std::collections::HashSet<_> = airflow_paths
194 .values()
195 .flat_map(|data| data.entities.iter().cloned())
196 .collect();
197 println!(
198 " Entities involved: {}",
199 format_number(total_entities.len())
200 );
201
202 println!("\n1. TOP AIRFLOW PATHS BY OPERATIONS");
204 println!("{}", "-".repeat(100));
205 println!("{:<80} {:<12} {:<10}", "Path", "Operations", "Entities");
206 println!("{}", "-".repeat(100));
207
208 let mut sorted_paths: Vec<_> = airflow_paths.iter().collect();
209 sorted_paths.sort_by(|a, b| b.1.operations.cmp(&a.1.operations));
210
211 for (path, data) in sorted_paths.iter().take(30) {
212 let display_path = if path.len() <= 78 {
213 path.as_str()
214 } else {
215 &path[..75]
216 };
217 println!(
218 "{:<80} {:<12} {:<10}",
219 display_path,
220 format_number(data.operations),
221 format_number(data.entities.len())
222 );
223 }
224
225 println!("\n2. ENTITIES ACCESSING AIRFLOW SECRETS");
227 println!("{}", "-".repeat(100));
228 println!(
229 "{:<50} {:<12} {:<15}",
230 "Entity ID", "Operations", "Unique Paths"
231 );
232 println!("{}", "-".repeat(100));
233
234 let mut entity_patterns: HashMap<String, (usize, std::collections::HashSet<String>)> =
235 HashMap::new();
236 for (path, data) in &airflow_paths {
237 for entity in &data.entities {
238 let entry = entity_patterns
239 .entry(entity.clone())
240 .or_insert((0, std::collections::HashSet::new()));
241 entry.0 += data.operations_by_entity.get(entity).unwrap_or(&0);
242 entry.1.insert(path.clone());
243 }
244 }
245
246 let mut sorted_entities: Vec<_> = entity_patterns.iter().collect();
247 sorted_entities.sort_by(|a, b| b.1 .0.cmp(&a.1 .0));
248
249 for (entity, (ops, paths)) in sorted_entities.iter().take(20) {
250 let display_entity = if entity.len() <= 48 {
251 entity.as_str()
252 } else {
253 &entity[..45]
254 };
255 println!(
256 "{:<50} {:<12} {:<15}",
257 display_entity,
258 format_number(*ops),
259 format_number(paths.len())
260 );
261 }
262
263 println!("\n3. BURST RATE ANALYSIS (Paths with Time Data)");
265 println!(" NOTE: Rates calculated over actual time span - high rates indicate bursty access");
266 println!("{}", "-".repeat(100));
267 println!(
268 "{:<60} {:<12} {:<12} {:<15}",
269 "Path", "Operations", "Time Span", "Avg Interval"
270 );
271 println!("{}", "-".repeat(100));
272
273 struct PollingPattern {
274 path: String,
275 operations: usize,
276 time_span_hours: f64,
277 ops_per_hour: f64,
278 avg_interval_seconds: f64,
279 }
280
281 let mut polling_patterns = Vec::new();
282
283 for (path, data) in &airflow_paths {
284 if data.timestamps.len() < 2 {
285 continue;
286 }
287
288 let mut timestamps = data.timestamps.clone();
289 timestamps.sort();
290 let time_span_seconds = timestamps[timestamps.len() - 1]
291 .signed_duration_since(timestamps[0])
292 .num_seconds() as f64;
293 let time_span_hours = time_span_seconds / 3600.0;
294
295 if time_span_hours > 0.0 {
296 let ops_per_hour = data.operations as f64 / time_span_hours;
297 let avg_interval_seconds = time_span_seconds / data.operations as f64;
298
299 polling_patterns.push(PollingPattern {
300 path: path.clone(),
301 operations: data.operations,
302 time_span_hours,
303 ops_per_hour,
304 avg_interval_seconds,
305 });
306 }
307 }
308
309 polling_patterns.sort_by(|a, b| b.ops_per_hour.partial_cmp(&a.ops_per_hour).unwrap());
311
312 for pattern in polling_patterns.iter().take(25) {
313 let path_display = if pattern.path.len() <= 58 {
314 &pattern.path
315 } else {
316 &pattern.path[..55]
317 };
318 let time_span = format!("{:.1}h", pattern.time_span_hours);
319 let interval = format!("{:.1}s", pattern.avg_interval_seconds);
320
321 println!(
322 "{:<60} {:<12} {:<12} {:<15}",
323 path_display,
324 format_number(pattern.operations),
325 time_span,
326 interval
327 );
328 }
329
330 println!("\n4. ENTITY-PATH POLLING BEHAVIOR (Top 30)");
332 println!("{}", "-".repeat(100));
333 println!("{:<40} {:<45} {:<15}", "Entity", "Path", "Operations");
334 println!("{}", "-".repeat(100));
335
336 struct EntityPathCombo {
337 entity: String,
338 path: String,
339 operations: usize,
340 }
341
342 let mut entity_path_combos = Vec::new();
343 for (path, data) in &airflow_paths {
344 for (entity_id, ops) in &data.operations_by_entity {
345 entity_path_combos.push(EntityPathCombo {
346 entity: entity_id.clone(),
347 path: path.clone(),
348 operations: *ops,
349 });
350 }
351 }
352
353 entity_path_combos.sort_by(|a, b| b.operations.cmp(&a.operations));
354
355 for combo in entity_path_combos.iter().take(30) {
356 let entity_display = if combo.entity.len() <= 38 {
357 &combo.entity
358 } else {
359 &combo.entity[..35]
360 };
361 let path_display = if combo.path.len() <= 43 {
362 &combo.path
363 } else {
364 &combo.path[..40]
365 };
366
367 println!(
368 "{:<40} {:<45} {:<15}",
369 entity_display,
370 path_display,
371 format_number(combo.operations)
372 );
373 }
374
375 println!("\n5. OPTIMIZATION RECOMMENDATIONS");
377 println!("{}", "-".repeat(100));
378
379 let high_frequency_paths: Vec<_> = polling_patterns
380 .iter()
381 .filter(|p| p.ops_per_hour > 100.0)
382 .collect();
383 let total_high_freq_ops: usize = high_frequency_paths.iter().map(|p| p.operations).sum();
384
385 println!(
386 "Total Airflow operations: {}",
387 format_number(airflow_operations)
388 );
389 println!(
390 "Paths with >100 ops/hour burst rate: {}",
391 format_number(high_frequency_paths.len())
392 );
393 println!(
394 "Operations from high-frequency paths: {} ({:.1}%)",
395 format_number(total_high_freq_ops),
396 (total_high_freq_ops as f64 / airflow_operations as f64) * 100.0
397 );
398 println!();
399 println!("Recommended Actions:");
400 println!();
401 println!("1. IMPLEMENT AIRFLOW CONNECTION CACHING");
402 println!(" - Configure Airflow to cache connection objects");
403 println!(" - Expected reduction: 80-90% of reads");
404 println!(
405 " - Potential savings: {} operations/day",
406 format_number((airflow_operations as f64 * 0.85) as usize)
407 );
408 println!();
409 println!("2. DEPLOY VAULT AGENT WITH AIRFLOW");
410 println!(" - Run Vault agent as sidecar/daemon");
411 println!(" - Configure template rendering for connections");
412 println!(" - Expected reduction: 95% of reads");
413 println!(
414 " - Potential savings: {} operations/day",
415 format_number((airflow_operations as f64 * 0.95) as usize)
416 );
417 println!();
418 println!("3. USE AIRFLOW SECRETS BACKEND EFFICIENTLY");
419 println!(" - Review connection lookup patterns in DAGs");
420 println!(" - Implement connection object reuse within tasks");
421 println!(" - Cache connections at DAG level where appropriate");
422 println!();
423
424 if !polling_patterns.is_empty() {
425 println!("4. PRIORITY PATHS FOR IMMEDIATE OPTIMIZATION (by burst rate):");
426 for (i, pattern) in polling_patterns.iter().take(10).enumerate() {
427 let path_name = pattern.path.split('/').next_back().unwrap_or(&pattern.path);
428 println!(
429 " {}. {}: {} operations ({:.0}/hour burst rate)",
430 i + 1,
431 path_name,
432 format_number(pattern.operations),
433 pattern.ops_per_hour
434 );
435 }
436 }
437
438 println!("\n{}", "=".repeat(100));
439
440 if let Some(output_file) = output {
441 use std::fs::File;
442 use std::io::Write;
443 let mut file = File::create(output_file)?;
444 writeln!(file, "entity_id,path,operation_count")?;
445 for (path, data) in &airflow_paths {
446 for (entity, count) in &data.operations_by_entity {
447 writeln!(file, "{},{},{}", entity, path, count)?;
448 }
449 }
450 println!("\nOutput written to: {}", output_file);
451 }
452
453 Ok(())
454}