vault_audit_tools/commands/
path_hotspots.rs1use crate::audit::types::AuditEntry;
37use crate::utils::progress::ProgressBar;
38use crate::utils::reader::open_file;
39use crate::utils::time::parse_timestamp;
40use anyhow::Result;
41use chrono::DateTime;
42use chrono::Utc;
43use std::collections::{HashMap, HashSet};
44use std::io::{BufRead, BufReader};
45
46#[derive(Debug)]
48struct PathStats {
49 operations: usize,
50 entities: HashSet<String>,
51 operations_by_type: HashMap<String, usize>,
52 timestamps: Vec<DateTime<Utc>>,
53 entity_operations: HashMap<String, usize>,
54}
55
56impl PathStats {
57 fn new() -> Self {
58 Self {
59 operations: 0,
60 entities: HashSet::new(),
61 operations_by_type: HashMap::new(),
62 timestamps: Vec::new(),
63 entity_operations: HashMap::new(),
64 }
65 }
66}
67
68fn format_number(n: usize) -> String {
69 let s = n.to_string();
70 let mut result = String::new();
71 for (i, c) in s.chars().rev().enumerate() {
72 if i > 0 && i % 3 == 0 {
73 result.push(',');
74 }
75 result.push(c);
76 }
77 result.chars().rev().collect()
78}
79
80pub fn run(log_files: &[String], top: usize) -> Result<()> {
81 let mut path_stats: HashMap<String, PathStats> = HashMap::new();
82 let mut total_lines = 0;
83 let mut total_operations = 0;
84
85 for (file_idx, log_file) in log_files.iter().enumerate() {
87 eprintln!(
88 "[{}/{}] Processing: {}",
89 file_idx + 1,
90 log_files.len(),
91 log_file
92 );
93
94 let file_size = std::fs::metadata(log_file).ok().map(|m| m.len() as usize);
96 let mut progress = if let Some(size) = file_size {
97 ProgressBar::new(size, "Processing")
98 } else {
99 ProgressBar::new_spinner("Processing")
100 };
101
102 let mut file_lines = 0;
103 let mut bytes_read = 0;
104
105 let file = open_file(log_file)?;
106 let reader = BufReader::new(file);
107
108 for line in reader.lines() {
109 file_lines += 1;
110 total_lines += 1;
111 let line = line?;
112 bytes_read += line.len() + 1; if file_lines % 10_000 == 0 {
115 if let Some(size) = file_size {
116 progress.update(bytes_read.min(size));
117 } else {
118 progress.update(file_lines);
119 }
120 }
121
122 let entry: AuditEntry = match serde_json::from_str(&line) {
123 Ok(e) => e,
124 Err(_) => continue,
125 };
126
127 let path = match &entry.request {
128 Some(r) => match &r.path {
129 Some(p) => p.as_str(),
130 None => continue,
131 },
132 None => continue,
133 };
134
135 let operation = match &entry.request {
136 Some(r) => match &r.operation {
137 Some(o) => o.as_str(),
138 None => continue,
139 },
140 None => continue,
141 };
142
143 total_operations += 1;
144
145 let entity_id = entry
146 .auth
147 .as_ref()
148 .and_then(|a| a.entity_id.as_deref())
149 .unwrap_or("no-entity");
150
151 let ts = parse_timestamp(&entry.time).ok();
153
154 let stats = path_stats
156 .entry(path.to_string())
157 .or_insert_with(PathStats::new);
158 stats.operations += 1;
159 stats.entities.insert(entity_id.to_string());
160 *stats
161 .operations_by_type
162 .entry(operation.to_string())
163 .or_insert(0) += 1;
164 *stats
165 .entity_operations
166 .entry(entity_id.to_string())
167 .or_insert(0) += 1;
168 if let Some(t) = ts {
169 stats.timestamps.push(t);
170 }
171 }
172
173 if let Some(size) = file_size {
175 progress.update(size);
176 }
177
178 progress.finish_with_message(&format!(
179 "Processed {} lines from this file",
180 format_number(file_lines)
181 ));
182 }
183
184 eprintln!(
185 "\nTotal: Processed {} lines, {} operations",
186 format_number(total_lines),
187 format_number(total_operations)
188 );
189
190 let mut sorted_paths: Vec<_> = path_stats.iter().collect();
192 sorted_paths.sort_by(|a, b| b.1.operations.cmp(&a.1.operations));
193
194 println!("\n{}", "=".repeat(120));
196 println!("TOP {} PATH HOT SPOTS ANALYSIS", top);
197 println!("{}", "=".repeat(120));
198
199 println!(
200 "\n{:<5} {:<60} {:<12} {:<10} {:<10} {:<10}",
201 "#", "Path", "Ops", "Entities", "Top Op", "%"
202 );
203 println!("{}", "-".repeat(120));
204
205 for (i, (path, data)) in sorted_paths.iter().take(top).enumerate() {
206 let ops = data.operations;
207 let entity_count = data.entities.len();
208 let percentage = (ops as f64 / total_operations as f64) * 100.0;
209
210 let top_op = data
211 .operations_by_type
212 .iter()
213 .max_by_key(|x| x.1)
214 .map(|x| x.0.as_str())
215 .unwrap_or("N/A");
216
217 let display_path = if path.len() <= 58 {
218 path.to_string()
219 } else {
220 format!("{}...", &path[..55])
221 };
222
223 println!(
224 "{:<5} {:<60} {:<12} {:<10} {:<10} {:<10.2}%",
225 i + 1,
226 display_path,
227 format_number(ops),
228 format_number(entity_count),
229 top_op,
230 percentage
231 );
232 }
233
234 println!("\n\nDETAILED ANALYSIS OF TOP {} PATHS", top.min(20));
236 println!("{}", "=".repeat(120));
237
238 for (i, (path, data)) in sorted_paths.iter().take(top.min(20)).enumerate() {
239 println!("\n{}. PATH: {}", i + 1, path);
240 println!("{}", "-".repeat(120));
241
242 let ops = data.operations;
243 let entity_count = data.entities.len();
244 let percentage = (ops as f64 / total_operations as f64) * 100.0;
245
246 println!(
247 " Total Operations: {} ({:.2}% of all traffic)",
248 format_number(ops),
249 percentage
250 );
251 println!(" Unique Entities: {}", format_number(entity_count));
252
253 if data.timestamps.len() >= 2 {
255 let mut sorted_ts = data.timestamps.clone();
256 sorted_ts.sort();
257 let time_span = (sorted_ts
258 .last()
259 .unwrap()
260 .signed_duration_since(*sorted_ts.first().unwrap()))
261 .num_seconds() as f64
262 / 3600.0;
263 if time_span > 0.0 {
264 let ops_per_hour = ops as f64 / time_span;
265 println!(
266 " Access Rate: {:.1} operations/hour ({:.2}/minute)",
267 ops_per_hour,
268 ops_per_hour / 60.0
269 );
270 }
271 }
272
273 println!(" Operations by type:");
275 let mut ops_by_type: Vec<_> = data.operations_by_type.iter().collect();
276 ops_by_type.sort_by(|a, b| b.1.cmp(a.1));
277 for (op, count) in ops_by_type.iter().take(5) {
278 let op_pct = (**count as f64 / ops as f64) * 100.0;
279 println!(
280 " - {}: {} ({:.1}%)",
281 op,
282 format_number(**count),
283 op_pct
284 );
285 }
286
287 let mut top_entities: Vec<_> = data.entity_operations.iter().collect();
289 top_entities.sort_by(|a, b| b.1.cmp(a.1));
290 if !top_entities.is_empty() {
291 println!(" Top {} entities:", top_entities.len().min(5));
292 for (entity_id, entity_ops) in top_entities.iter().take(5) {
293 let entity_pct = (**entity_ops as f64 / ops as f64) * 100.0;
294 let entity_display = if entity_id.len() <= 40 {
295 entity_id.to_string()
296 } else {
297 format!("{}...", &entity_id[..37])
298 };
299 println!(
300 " - {}: {} ops ({:.1}%)",
301 entity_display,
302 format_number(**entity_ops),
303 entity_pct
304 );
305 }
306 }
307
308 print!(" Category: ");
310 let mut recommendations = Vec::new();
311
312 if path.contains("token/lookup") {
313 println!("TOKEN LOOKUP");
314 recommendations
315 .push("Implement client-side token TTL tracking to eliminate polling".to_string());
316 recommendations.push(format!(
317 "Potential reduction: 80-90% ({} operations)",
318 format_number((ops as f64 * 0.85) as usize)
319 ));
320 } else if path.to_lowercase().contains("airflow") {
321 println!("AIRFLOW SECRET");
322 recommendations
323 .push("Deploy Vault agent with template rendering for Airflow".to_string());
324 recommendations.push("Configure connection caching in Airflow".to_string());
325 recommendations.push(format!(
326 "Potential reduction: 95% ({} operations)",
327 format_number((ops as f64 * 0.95) as usize)
328 ));
329 } else if path.contains("approle/login") {
330 println!("APPROLE AUTHENTICATION");
331 if entity_count == 1 {
332 recommendations.push(format!(
333 "⚠️ CRITICAL: Single entity making all {} login requests",
334 format_number(ops)
335 ));
336 recommendations
337 .push("Review token TTL configuration - may be too short".to_string());
338 recommendations.push("Consider SecretID caching if appropriate".to_string());
339 }
340 } else if path.to_lowercase().contains("openshift")
341 || path.to_lowercase().contains("kubernetes")
342 {
343 println!("KUBERNETES/OPENSHIFT AUTH");
344 recommendations.push("Review pod authentication token TTLs".to_string());
345 recommendations.push("Consider increasing default token lifetime".to_string());
346 recommendations.push("Implement token renewal strategy in applications".to_string());
347 } else if path.to_lowercase().contains("github") && path.contains("login") {
348 println!("GITHUB AUTHENTICATION");
349 recommendations.push("Review GitHub auth token TTLs".to_string());
350 if entity_count == 1 {
351 recommendations.push(format!(
352 "⚠️ Single entity ({}) - investigate why",
353 entity_count
354 ));
355 }
356 } else if path.contains("data/") || path.contains("metadata/") {
357 println!("KV SECRET ENGINE");
358 if entity_count <= 3 && ops > 10000 {
359 recommendations.push(format!(
360 "⚠️ HIGH-FREQUENCY ACCESS: {} operations from only {} entities",
361 format_number(ops),
362 entity_count
363 ));
364 recommendations.push("Implement caching layer or Vault agent".to_string());
365 recommendations.push("Review if secret needs this frequency of access".to_string());
366 } else {
367 recommendations
368 .push("Consider Vault agent for high-frequency consumers".to_string());
369 }
370 } else {
371 println!("OTHER");
372 if ops > 5000 {
373 recommendations.push(format!(
374 "High-volume path ({} operations) - review necessity",
375 format_number(ops)
376 ));
377 }
378 }
379
380 if let Some((_, top_entity_ops)) = top_entities.first() {
382 let top_entity_pct = (**top_entity_ops as f64 / ops as f64) * 100.0;
383 if top_entity_pct > 50.0 && !recommendations.iter().any(|r| r.contains("CRITICAL")) {
384 recommendations.push(format!(
385 "⚠️ Entity concentration: Single entity responsible for {:.1}% of access",
386 top_entity_pct
387 ));
388 }
389 }
390
391 if !recommendations.is_empty() {
392 println!(" Recommendations:");
393 for rec in recommendations {
394 println!(" • {}", rec);
395 }
396 }
397 }
398
399 println!("\n\nSUMMARY BY PATH CATEGORY");
401 println!("{}", "=".repeat(120));
402
403 let mut categories: HashMap<&str, usize> = HashMap::new();
404 categories.insert("Token Operations", 0);
405 categories.insert("KV Secret Access", 0);
406 categories.insert("Authentication", 0);
407 categories.insert("Airflow Secrets", 0);
408 categories.insert("System/Admin", 0);
409 categories.insert("Other", 0);
410
411 for (path, stats) in path_stats.iter() {
412 let ops = stats.operations;
413 if path.contains("token/") {
414 *categories.get_mut("Token Operations").unwrap() += ops;
415 } else if path.contains("/data/") || path.contains("/metadata/") {
416 if path.to_lowercase().contains("airflow") {
417 *categories.get_mut("Airflow Secrets").unwrap() += ops;
418 } else {
419 *categories.get_mut("KV Secret Access").unwrap() += ops;
420 }
421 } else if path.contains("/login") || path.contains("/auth/") {
422 *categories.get_mut("Authentication").unwrap() += ops;
423 } else if path.contains("sys/") {
424 *categories.get_mut("System/Admin").unwrap() += ops;
425 } else {
426 *categories.get_mut("Other").unwrap() += ops;
427 }
428 }
429
430 println!(
431 "{:<30} {:<15} {:<15}",
432 "Category", "Operations", "% of Total"
433 );
434 println!("{}", "-".repeat(120));
435
436 let mut sorted_categories: Vec<_> = categories.iter().collect();
437 sorted_categories.sort_by(|a, b| b.1.cmp(a.1));
438
439 for (category, ops) in sorted_categories {
440 let percentage = (*ops as f64 / total_operations as f64) * 100.0;
441 println!(
442 "{:<30} {:<15} {:<15.2}%",
443 category,
444 format_number(*ops),
445 percentage
446 );
447 }
448
449 println!("\n{}", "=".repeat(120));
450
451 println!("\nTOP OPTIMIZATION OPPORTUNITIES (by impact)");
453 println!("{}", "=".repeat(120));
454
455 struct Opportunity {
456 name: String,
457 current_ops: usize,
458 potential_reduction: usize,
459 effort: String,
460 priority: u8,
461 }
462
463 let mut opportunities = Vec::new();
464
465 let token_lookup_ops: usize = path_stats
467 .iter()
468 .filter(|(path, _)| path.contains("token/lookup"))
469 .map(|(_, stats)| stats.operations)
470 .sum();
471
472 if token_lookup_ops > 10000 {
473 opportunities.push(Opportunity {
474 name: "Eliminate Token Lookup Polling".to_string(),
475 current_ops: token_lookup_ops,
476 potential_reduction: (token_lookup_ops as f64 * 0.85) as usize,
477 effort: "Medium".to_string(),
478 priority: 1,
479 });
480 }
481
482 let airflow_ops: usize = path_stats
484 .iter()
485 .filter(|(path, _)| path.to_lowercase().contains("airflow"))
486 .map(|(_, stats)| stats.operations)
487 .sum();
488
489 if airflow_ops > 10000 {
490 opportunities.push(Opportunity {
491 name: "Deploy Vault Agent for Airflow".to_string(),
492 current_ops: airflow_ops,
493 potential_reduction: (airflow_ops as f64 * 0.95) as usize,
494 effort: "Medium".to_string(),
495 priority: 2,
496 });
497 }
498
499 let high_freq_ops: usize = path_stats
501 .iter()
502 .filter(|(_, stats)| stats.operations > 5000 && stats.operations < 100000)
503 .map(|(_, stats)| stats.operations)
504 .sum();
505
506 let high_freq_count = path_stats
507 .iter()
508 .filter(|(_, stats)| stats.operations > 5000 && stats.operations < 100000)
509 .count();
510
511 if high_freq_ops > 10000 {
512 opportunities.push(Opportunity {
513 name: format!("Cache High-Frequency Paths ({} paths)", high_freq_count),
514 current_ops: high_freq_ops,
515 potential_reduction: (high_freq_ops as f64 * 0.70) as usize,
516 effort: "Low-Medium".to_string(),
517 priority: 3,
518 });
519 }
520
521 opportunities.sort_by_key(|o| o.priority);
522
523 println!(
524 "\n{:<10} {:<50} {:<15} {:<15} {:<15}",
525 "Priority", "Opportunity", "Current Ops", "Savings", "Effort"
526 );
527 println!("{}", "-".repeat(120));
528
529 let mut total_current_ops = 0;
530 let mut total_savings = 0;
531
532 for opp in &opportunities {
533 println!(
534 "{:<10} {:<50} {:<15} {:<15} {:<15}",
535 opp.priority,
536 opp.name,
537 format_number(opp.current_ops),
538 format_number(opp.potential_reduction),
539 opp.effort
540 );
541 total_current_ops += opp.current_ops;
542 total_savings += opp.potential_reduction;
543 }
544
545 println!("{}", "-".repeat(120));
546 println!(
547 "{:<10} {:<50} {:<15} {:<15}",
548 "TOTAL POTENTIAL SAVINGS",
549 "",
550 format_number(total_current_ops),
551 format_number(total_savings)
552 );
553
554 let projected_reduction = (total_savings as f64 / total_operations as f64) * 100.0;
555 println!(
556 "\nProjected reduction: {:.1}% of all Vault operations",
557 projected_reduction
558 );
559 println!("{}", "=".repeat(120));
560
561 Ok(())
562}