vault_audit_tools/commands/
path_hotspots.rs1use crate::audit::types::AuditEntry;
40use crate::utils::progress::ProgressBar;
41use crate::utils::reader::open_file;
42use crate::utils::time::parse_timestamp;
43use anyhow::Result;
44use chrono::DateTime;
45use chrono::Utc;
46use std::collections::{HashMap, HashSet};
47use std::io::{BufRead, BufReader};
48
49#[derive(Debug)]
51struct PathStats {
52 operations: usize,
53 entities: HashSet<String>,
54 operations_by_type: HashMap<String, usize>,
55 timestamps: Vec<DateTime<Utc>>,
56 entity_operations: HashMap<String, usize>,
57}
58
59impl PathStats {
60 fn new() -> Self {
61 Self {
62 operations: 0,
63 entities: HashSet::new(),
64 operations_by_type: HashMap::new(),
65 timestamps: Vec::new(),
66 entity_operations: HashMap::new(),
67 }
68 }
69}
70
71fn format_number(n: usize) -> String {
72 let s = n.to_string();
73 let mut result = String::new();
74 for (i, c) in s.chars().rev().enumerate() {
75 if i > 0 && i % 3 == 0 {
76 result.push(',');
77 }
78 result.push(c);
79 }
80 result.chars().rev().collect()
81}
82
83pub fn run(log_files: &[String], top: usize) -> Result<()> {
84 let mut path_stats: HashMap<String, PathStats> = HashMap::new();
85 let mut total_lines = 0;
86 let mut total_operations = 0;
87
88 for (file_idx, log_file) in log_files.iter().enumerate() {
90 eprintln!(
91 "[{}/{}] Processing: {}",
92 file_idx + 1,
93 log_files.len(),
94 log_file
95 );
96
97 let file_size = std::fs::metadata(log_file).ok().map(|m| m.len() as usize);
99 let mut progress = if let Some(size) = file_size {
100 ProgressBar::new(size, "Processing")
101 } else {
102 ProgressBar::new_spinner("Processing")
103 };
104
105 let mut file_lines = 0;
106 let mut bytes_read = 0;
107
108 let file = open_file(log_file)?;
109 let reader = BufReader::new(file);
110
111 for line in reader.lines() {
112 file_lines += 1;
113 total_lines += 1;
114 let line = line?;
115 bytes_read += line.len() + 1; if file_lines % 10_000 == 0 {
118 if let Some(size) = file_size {
119 progress.update(bytes_read.min(size));
120 } else {
121 progress.update(file_lines);
122 }
123 }
124
125 let entry: AuditEntry = match serde_json::from_str(&line) {
126 Ok(e) => e,
127 Err(_) => continue,
128 };
129
130 let path = match &entry.request {
131 Some(r) => match &r.path {
132 Some(p) => p.as_str(),
133 None => continue,
134 },
135 None => continue,
136 };
137
138 let operation = match &entry.request {
139 Some(r) => match &r.operation {
140 Some(o) => o.as_str(),
141 None => continue,
142 },
143 None => continue,
144 };
145
146 total_operations += 1;
147
148 let entity_id = entry
149 .auth
150 .as_ref()
151 .and_then(|a| a.entity_id.as_deref())
152 .unwrap_or("no-entity");
153
154 let ts = parse_timestamp(&entry.time).ok();
156
157 let stats = path_stats
159 .entry(path.to_string())
160 .or_insert_with(PathStats::new);
161 stats.operations += 1;
162 stats.entities.insert(entity_id.to_string());
163 *stats
164 .operations_by_type
165 .entry(operation.to_string())
166 .or_insert(0) += 1;
167 *stats
168 .entity_operations
169 .entry(entity_id.to_string())
170 .or_insert(0) += 1;
171 if let Some(t) = ts {
172 stats.timestamps.push(t);
173 }
174 }
175
176 if let Some(size) = file_size {
178 progress.update(size);
179 }
180
181 progress.finish_with_message(&format!(
182 "Processed {} lines from this file",
183 format_number(file_lines)
184 ));
185 }
186
187 eprintln!(
188 "\nTotal: Processed {} lines, {} operations",
189 format_number(total_lines),
190 format_number(total_operations)
191 );
192
193 let mut sorted_paths: Vec<_> = path_stats.iter().collect();
195 sorted_paths.sort_by(|a, b| b.1.operations.cmp(&a.1.operations));
196
197 println!("\n{}", "=".repeat(120));
199 println!("TOP {} PATH HOT SPOTS ANALYSIS", top);
200 println!("{}", "=".repeat(120));
201
202 println!(
203 "\n{:<5} {:<60} {:<12} {:<10} {:<10} {:<10}",
204 "#", "Path", "Ops", "Entities", "Top Op", "%"
205 );
206 println!("{}", "-".repeat(120));
207
208 for (i, (path, data)) in sorted_paths.iter().take(top).enumerate() {
209 let ops = data.operations;
210 let entity_count = data.entities.len();
211 let percentage = (ops as f64 / total_operations as f64) * 100.0;
212
213 let top_op = data
214 .operations_by_type
215 .iter()
216 .max_by_key(|x| x.1)
217 .map(|x| x.0.as_str())
218 .unwrap_or("N/A");
219
220 let display_path = if path.len() <= 58 {
221 path.to_string()
222 } else {
223 format!("{}...", &path[..55])
224 };
225
226 println!(
227 "{:<5} {:<60} {:<12} {:<10} {:<10} {:<10.2}%",
228 i + 1,
229 display_path,
230 format_number(ops),
231 format_number(entity_count),
232 top_op,
233 percentage
234 );
235 }
236
237 println!("\n\nDETAILED ANALYSIS OF TOP {} PATHS", top.min(20));
239 println!("{}", "=".repeat(120));
240
241 for (i, (path, data)) in sorted_paths.iter().take(top.min(20)).enumerate() {
242 println!("\n{}. PATH: {}", i + 1, path);
243 println!("{}", "-".repeat(120));
244
245 let ops = data.operations;
246 let entity_count = data.entities.len();
247 let percentage = (ops as f64 / total_operations as f64) * 100.0;
248
249 println!(
250 " Total Operations: {} ({:.2}% of all traffic)",
251 format_number(ops),
252 percentage
253 );
254 println!(" Unique Entities: {}", format_number(entity_count));
255
256 if data.timestamps.len() >= 2 {
258 let mut sorted_ts = data.timestamps.clone();
259 sorted_ts.sort();
260 let time_span = (sorted_ts
261 .last()
262 .unwrap()
263 .signed_duration_since(*sorted_ts.first().unwrap()))
264 .num_seconds() as f64
265 / 3600.0;
266 if time_span > 0.0 {
267 let ops_per_hour = ops as f64 / time_span;
268 println!(
269 " Access Rate: {:.1} operations/hour ({:.2}/minute)",
270 ops_per_hour,
271 ops_per_hour / 60.0
272 );
273 }
274 }
275
276 println!(" Operations by type:");
278 let mut ops_by_type: Vec<_> = data.operations_by_type.iter().collect();
279 ops_by_type.sort_by(|a, b| b.1.cmp(a.1));
280 for (op, count) in ops_by_type.iter().take(5) {
281 let op_pct = (**count as f64 / ops as f64) * 100.0;
282 println!(
283 " - {}: {} ({:.1}%)",
284 op,
285 format_number(**count),
286 op_pct
287 );
288 }
289
290 let mut top_entities: Vec<_> = data.entity_operations.iter().collect();
292 top_entities.sort_by(|a, b| b.1.cmp(a.1));
293 if !top_entities.is_empty() {
294 println!(" Top {} entities:", top_entities.len().min(5));
295 for (entity_id, entity_ops) in top_entities.iter().take(5) {
296 let entity_pct = (**entity_ops as f64 / ops as f64) * 100.0;
297 let entity_display = if entity_id.len() <= 40 {
298 entity_id.to_string()
299 } else {
300 format!("{}...", &entity_id[..37])
301 };
302 println!(
303 " - {}: {} ops ({:.1}%)",
304 entity_display,
305 format_number(**entity_ops),
306 entity_pct
307 );
308 }
309 }
310
311 print!(" Category: ");
313 let mut recommendations = Vec::new();
314
315 if path.contains("token/lookup") {
316 println!("TOKEN LOOKUP");
317 recommendations
318 .push("Implement client-side token TTL tracking to eliminate polling".to_string());
319 recommendations.push(format!(
320 "Potential reduction: 80-90% ({} operations)",
321 format_number((ops as f64 * 0.85) as usize)
322 ));
323 } else if path.to_lowercase().contains("airflow") {
324 println!("AIRFLOW SECRET");
325 recommendations
326 .push("Deploy Vault agent with template rendering for Airflow".to_string());
327 recommendations.push("Configure connection caching in Airflow".to_string());
328 recommendations.push(format!(
329 "Potential reduction: 95% ({} operations)",
330 format_number((ops as f64 * 0.95) as usize)
331 ));
332 } else if path.contains("approle/login") {
333 println!("APPROLE AUTHENTICATION");
334 if entity_count == 1 {
335 recommendations.push(format!(
336 "⚠️ CRITICAL: Single entity making all {} login requests",
337 format_number(ops)
338 ));
339 recommendations
340 .push("Review token TTL configuration - may be too short".to_string());
341 recommendations.push("Consider SecretID caching if appropriate".to_string());
342 }
343 } else if path.to_lowercase().contains("openshift")
344 || path.to_lowercase().contains("kubernetes")
345 {
346 println!("KUBERNETES/OPENSHIFT AUTH");
347 recommendations.push("Review pod authentication token TTLs".to_string());
348 recommendations.push("Consider increasing default token lifetime".to_string());
349 recommendations.push("Implement token renewal strategy in applications".to_string());
350 } else if path.to_lowercase().contains("github") && path.contains("login") {
351 println!("GITHUB AUTHENTICATION");
352 recommendations.push("Review GitHub auth token TTLs".to_string());
353 if entity_count == 1 {
354 recommendations.push(format!(
355 "⚠️ Single entity ({}) - investigate why",
356 entity_count
357 ));
358 }
359 } else if path.contains("data/") || path.contains("metadata/") {
360 println!("KV SECRET ENGINE");
361 if entity_count <= 3 && ops > 10000 {
362 recommendations.push(format!(
363 "⚠️ HIGH-FREQUENCY ACCESS: {} operations from only {} entities",
364 format_number(ops),
365 entity_count
366 ));
367 recommendations.push("Implement caching layer or Vault agent".to_string());
368 recommendations.push("Review if secret needs this frequency of access".to_string());
369 } else {
370 recommendations
371 .push("Consider Vault agent for high-frequency consumers".to_string());
372 }
373 } else {
374 println!("OTHER");
375 if ops > 5000 {
376 recommendations.push(format!(
377 "High-volume path ({} operations) - review necessity",
378 format_number(ops)
379 ));
380 }
381 }
382
383 if let Some((_, top_entity_ops)) = top_entities.first() {
385 let top_entity_pct = (**top_entity_ops as f64 / ops as f64) * 100.0;
386 if top_entity_pct > 50.0 && !recommendations.iter().any(|r| r.contains("CRITICAL")) {
387 recommendations.push(format!(
388 "⚠️ Entity concentration: Single entity responsible for {:.1}% of access",
389 top_entity_pct
390 ));
391 }
392 }
393
394 if !recommendations.is_empty() {
395 println!(" Recommendations:");
396 for rec in recommendations {
397 println!(" • {}", rec);
398 }
399 }
400 }
401
402 println!("\n\nSUMMARY BY PATH CATEGORY");
404 println!("{}", "=".repeat(120));
405
406 let mut categories: HashMap<&str, usize> = HashMap::new();
407 categories.insert("Token Operations", 0);
408 categories.insert("KV Secret Access", 0);
409 categories.insert("Authentication", 0);
410 categories.insert("Airflow Secrets", 0);
411 categories.insert("System/Admin", 0);
412 categories.insert("Other", 0);
413
414 for (path, stats) in path_stats.iter() {
415 let ops = stats.operations;
416 if path.contains("token/") {
417 *categories.get_mut("Token Operations").unwrap() += ops;
418 } else if path.contains("/data/") || path.contains("/metadata/") {
419 if path.to_lowercase().contains("airflow") {
420 *categories.get_mut("Airflow Secrets").unwrap() += ops;
421 } else {
422 *categories.get_mut("KV Secret Access").unwrap() += ops;
423 }
424 } else if path.contains("/login") || path.contains("/auth/") {
425 *categories.get_mut("Authentication").unwrap() += ops;
426 } else if path.contains("sys/") {
427 *categories.get_mut("System/Admin").unwrap() += ops;
428 } else {
429 *categories.get_mut("Other").unwrap() += ops;
430 }
431 }
432
433 println!(
434 "{:<30} {:<15} {:<15}",
435 "Category", "Operations", "% of Total"
436 );
437 println!("{}", "-".repeat(120));
438
439 let mut sorted_categories: Vec<_> = categories.iter().collect();
440 sorted_categories.sort_by(|a, b| b.1.cmp(a.1));
441
442 for (category, ops) in sorted_categories {
443 let percentage = (*ops as f64 / total_operations as f64) * 100.0;
444 println!(
445 "{:<30} {:<15} {:<15.2}%",
446 category,
447 format_number(*ops),
448 percentage
449 );
450 }
451
452 println!("\n{}", "=".repeat(120));
453
454 println!("\nTOP OPTIMIZATION OPPORTUNITIES (by impact)");
456 println!("{}", "=".repeat(120));
457
458 struct Opportunity {
459 name: String,
460 current_ops: usize,
461 potential_reduction: usize,
462 effort: String,
463 priority: u8,
464 }
465
466 let mut opportunities = Vec::new();
467
468 let token_lookup_ops: usize = path_stats
470 .iter()
471 .filter(|(path, _)| path.contains("token/lookup"))
472 .map(|(_, stats)| stats.operations)
473 .sum();
474
475 if token_lookup_ops > 10000 {
476 opportunities.push(Opportunity {
477 name: "Eliminate Token Lookup Polling".to_string(),
478 current_ops: token_lookup_ops,
479 potential_reduction: (token_lookup_ops as f64 * 0.85) as usize,
480 effort: "Medium".to_string(),
481 priority: 1,
482 });
483 }
484
485 let airflow_ops: usize = path_stats
487 .iter()
488 .filter(|(path, _)| path.to_lowercase().contains("airflow"))
489 .map(|(_, stats)| stats.operations)
490 .sum();
491
492 if airflow_ops > 10000 {
493 opportunities.push(Opportunity {
494 name: "Deploy Vault Agent for Airflow".to_string(),
495 current_ops: airflow_ops,
496 potential_reduction: (airflow_ops as f64 * 0.95) as usize,
497 effort: "Medium".to_string(),
498 priority: 2,
499 });
500 }
501
502 let high_freq_ops: usize = path_stats
504 .iter()
505 .filter(|(_, stats)| stats.operations > 5000 && stats.operations < 100000)
506 .map(|(_, stats)| stats.operations)
507 .sum();
508
509 let high_freq_count = path_stats
510 .iter()
511 .filter(|(_, stats)| stats.operations > 5000 && stats.operations < 100000)
512 .count();
513
514 if high_freq_ops > 10000 {
515 opportunities.push(Opportunity {
516 name: format!("Cache High-Frequency Paths ({} paths)", high_freq_count),
517 current_ops: high_freq_ops,
518 potential_reduction: (high_freq_ops as f64 * 0.70) as usize,
519 effort: "Low-Medium".to_string(),
520 priority: 3,
521 });
522 }
523
524 opportunities.sort_by_key(|o| o.priority);
525
526 println!(
527 "\n{:<10} {:<50} {:<15} {:<15} {:<15}",
528 "Priority", "Opportunity", "Current Ops", "Savings", "Effort"
529 );
530 println!("{}", "-".repeat(120));
531
532 let mut total_current_ops = 0;
533 let mut total_savings = 0;
534
535 for opp in &opportunities {
536 println!(
537 "{:<10} {:<50} {:<15} {:<15} {:<15}",
538 opp.priority,
539 opp.name,
540 format_number(opp.current_ops),
541 format_number(opp.potential_reduction),
542 opp.effort
543 );
544 total_current_ops += opp.current_ops;
545 total_savings += opp.potential_reduction;
546 }
547
548 println!("{}", "-".repeat(120));
549 println!(
550 "{:<10} {:<50} {:<15} {:<15}",
551 "TOTAL POTENTIAL SAVINGS",
552 "",
553 format_number(total_current_ops),
554 format_number(total_savings)
555 );
556
557 let projected_reduction = (total_savings as f64 / total_operations as f64) * 100.0;
558 println!(
559 "\nProjected reduction: {:.1}% of all Vault operations",
560 projected_reduction
561 );
562 println!("{}", "=".repeat(120));
563
564 Ok(())
565}