vault_audit_tools/commands/
path_hotspots.rs

1//! Path hotspot analysis command.
2//!
3//! Identifies the most frequently accessed paths in Vault to help
4//! understand usage patterns and potential performance bottlenecks.
5//! Supports multi-file analysis (compressed or uncompressed) for long-term trending.
6//!
7//! # Usage
8//!
9//! ```bash
10//! # Single file - show top 20 hotspots (default)
11//! vault-audit path-hotspots audit.log
12//! vault-audit path-hotspots audit.log.gz
13//!
14//! # Multi-day analysis with top 50 (compressed files)
15//! vault-audit path-hotspots logs/*.log.gz --top 50
16//!
17//! # Filter by mount point across multiple files
18//! vault-audit path-hotspots day*.log --mount secret
19//! ```
20//!
21//! **Compressed File Support**: Works seamlessly with `.gz` and `.zst` files.
22//!
23//! # Output
24//!
25//! Displays top accessed paths with:
26//! - Path name
27//! - Total operations
28//! - Unique entities accessing
29//! - Operation breakdown (read/write/list/delete)
30//! - Access rate (ops per hour)
31//! - Top entity contributors
32//!
33//! Helps identify:
34//! - Performance bottlenecks
35//! - Heavily used secrets
36//! - Caching opportunities
37//! - Load distribution
38
39use crate::audit::types::AuditEntry;
40use crate::utils::progress::ProgressBar;
41use crate::utils::reader::open_file;
42use crate::utils::time::parse_timestamp;
43use anyhow::Result;
44use chrono::DateTime;
45use chrono::Utc;
46use std::collections::{HashMap, HashSet};
47use std::io::{BufRead, BufReader};
48
49/// Statistics for a single path
50#[derive(Debug)]
51struct PathStats {
52    operations: usize,
53    entities: HashSet<String>,
54    operations_by_type: HashMap<String, usize>,
55    timestamps: Vec<DateTime<Utc>>,
56    entity_operations: HashMap<String, usize>,
57}
58
59impl PathStats {
60    fn new() -> Self {
61        Self {
62            operations: 0,
63            entities: HashSet::new(),
64            operations_by_type: HashMap::new(),
65            timestamps: Vec::new(),
66            entity_operations: HashMap::new(),
67        }
68    }
69}
70
71fn format_number(n: usize) -> String {
72    let s = n.to_string();
73    let mut result = String::new();
74    for (i, c) in s.chars().rev().enumerate() {
75        if i > 0 && i % 3 == 0 {
76            result.push(',');
77        }
78        result.push(c);
79    }
80    result.chars().rev().collect()
81}
82
83pub fn run(log_files: &[String], top: usize) -> Result<()> {
84    let mut path_stats: HashMap<String, PathStats> = HashMap::new();
85    let mut total_lines = 0;
86    let mut total_operations = 0;
87
88    // Process each log file sequentially
89    for (file_idx, log_file) in log_files.iter().enumerate() {
90        eprintln!(
91            "[{}/{}] Processing: {}",
92            file_idx + 1,
93            log_files.len(),
94            log_file
95        );
96
97        // Get file size for progress tracking
98        let file_size = std::fs::metadata(log_file).ok().map(|m| m.len() as usize);
99        let mut progress = if let Some(size) = file_size {
100            ProgressBar::new(size, "Processing")
101        } else {
102            ProgressBar::new_spinner("Processing")
103        };
104
105        let mut file_lines = 0;
106        let mut bytes_read = 0;
107
108        let file = open_file(log_file)?;
109        let reader = BufReader::new(file);
110
111        for line in reader.lines() {
112            file_lines += 1;
113            total_lines += 1;
114            let line = line?;
115            bytes_read += line.len() + 1; // +1 for newline
116
117            if file_lines % 10_000 == 0 {
118                if let Some(size) = file_size {
119                    progress.update(bytes_read.min(size));
120                } else {
121                    progress.update(file_lines);
122                }
123            }
124
125            let entry: AuditEntry = match serde_json::from_str(&line) {
126                Ok(e) => e,
127                Err(_) => continue,
128            };
129
130            let path = match &entry.request {
131                Some(r) => match &r.path {
132                    Some(p) => p.as_str(),
133                    None => continue,
134                },
135                None => continue,
136            };
137
138            let operation = match &entry.request {
139                Some(r) => match &r.operation {
140                    Some(o) => o.as_str(),
141                    None => continue,
142                },
143                None => continue,
144            };
145
146            total_operations += 1;
147
148            let entity_id = entry
149                .auth
150                .as_ref()
151                .and_then(|a| a.entity_id.as_deref())
152                .unwrap_or("no-entity");
153
154            // Parse timestamp
155            let ts = parse_timestamp(&entry.time).ok();
156
157            // Track path statistics
158            let stats = path_stats
159                .entry(path.to_string())
160                .or_insert_with(PathStats::new);
161            stats.operations += 1;
162            stats.entities.insert(entity_id.to_string());
163            *stats
164                .operations_by_type
165                .entry(operation.to_string())
166                .or_insert(0) += 1;
167            *stats
168                .entity_operations
169                .entry(entity_id.to_string())
170                .or_insert(0) += 1;
171            if let Some(t) = ts {
172                stats.timestamps.push(t);
173            }
174        }
175
176        // Ensure 100% progress for this file
177        if let Some(size) = file_size {
178            progress.update(size);
179        }
180
181        progress.finish_with_message(&format!(
182            "Processed {} lines from this file",
183            format_number(file_lines)
184        ));
185    }
186
187    eprintln!(
188        "\nTotal: Processed {} lines, {} operations",
189        format_number(total_lines),
190        format_number(total_operations)
191    );
192
193    // Sort paths by operation count
194    let mut sorted_paths: Vec<_> = path_stats.iter().collect();
195    sorted_paths.sort_by(|a, b| b.1.operations.cmp(&a.1.operations));
196
197    // 1. Summary table
198    println!("\n{}", "=".repeat(120));
199    println!("TOP {} PATH HOT SPOTS ANALYSIS", top);
200    println!("{}", "=".repeat(120));
201
202    println!(
203        "\n{:<5} {:<60} {:<12} {:<10} {:<10} {:<10}",
204        "#", "Path", "Ops", "Entities", "Top Op", "%"
205    );
206    println!("{}", "-".repeat(120));
207
208    for (i, (path, data)) in sorted_paths.iter().take(top).enumerate() {
209        let ops = data.operations;
210        let entity_count = data.entities.len();
211        let percentage = (ops as f64 / total_operations as f64) * 100.0;
212
213        let top_op = data
214            .operations_by_type
215            .iter()
216            .max_by_key(|x| x.1)
217            .map(|x| x.0.as_str())
218            .unwrap_or("N/A");
219
220        let display_path = if path.len() <= 58 {
221            path.to_string()
222        } else {
223            format!("{}...", &path[..55])
224        };
225
226        println!(
227            "{:<5} {:<60} {:<12} {:<10} {:<10} {:<10.2}%",
228            i + 1,
229            display_path,
230            format_number(ops),
231            format_number(entity_count),
232            top_op,
233            percentage
234        );
235    }
236
237    // 2. Detailed analysis for top 20 paths
238    println!("\n\nDETAILED ANALYSIS OF TOP {} PATHS", top.min(20));
239    println!("{}", "=".repeat(120));
240
241    for (i, (path, data)) in sorted_paths.iter().take(top.min(20)).enumerate() {
242        println!("\n{}. PATH: {}", i + 1, path);
243        println!("{}", "-".repeat(120));
244
245        let ops = data.operations;
246        let entity_count = data.entities.len();
247        let percentage = (ops as f64 / total_operations as f64) * 100.0;
248
249        println!(
250            "   Total Operations: {} ({:.2}% of all traffic)",
251            format_number(ops),
252            percentage
253        );
254        println!("   Unique Entities: {}", format_number(entity_count));
255
256        // Calculate time span and rate
257        if data.timestamps.len() >= 2 {
258            let mut sorted_ts = data.timestamps.clone();
259            sorted_ts.sort();
260            let time_span = (sorted_ts
261                .last()
262                .unwrap()
263                .signed_duration_since(*sorted_ts.first().unwrap()))
264            .num_seconds() as f64
265                / 3600.0;
266            if time_span > 0.0 {
267                let ops_per_hour = ops as f64 / time_span;
268                println!(
269                    "   Access Rate: {:.1} operations/hour ({:.2}/minute)",
270                    ops_per_hour,
271                    ops_per_hour / 60.0
272                );
273            }
274        }
275
276        // Operation breakdown
277        println!("   Operations by type:");
278        let mut ops_by_type: Vec<_> = data.operations_by_type.iter().collect();
279        ops_by_type.sort_by(|a, b| b.1.cmp(a.1));
280        for (op, count) in ops_by_type.iter().take(5) {
281            let op_pct = (**count as f64 / ops as f64) * 100.0;
282            println!(
283                "      - {}: {} ({:.1}%)",
284                op,
285                format_number(**count),
286                op_pct
287            );
288        }
289
290        // Top entities
291        let mut top_entities: Vec<_> = data.entity_operations.iter().collect();
292        top_entities.sort_by(|a, b| b.1.cmp(a.1));
293        if !top_entities.is_empty() {
294            println!("   Top {} entities:", top_entities.len().min(5));
295            for (entity_id, entity_ops) in top_entities.iter().take(5) {
296                let entity_pct = (**entity_ops as f64 / ops as f64) * 100.0;
297                let entity_display = if entity_id.len() <= 40 {
298                    entity_id.to_string()
299                } else {
300                    format!("{}...", &entity_id[..37])
301                };
302                println!(
303                    "      - {}: {} ops ({:.1}%)",
304                    entity_display,
305                    format_number(**entity_ops),
306                    entity_pct
307                );
308            }
309        }
310
311        // Categorize and provide recommendations
312        print!("   Category: ");
313        let mut recommendations = Vec::new();
314
315        if path.contains("token/lookup") {
316            println!("TOKEN LOOKUP");
317            recommendations
318                .push("Implement client-side token TTL tracking to eliminate polling".to_string());
319            recommendations.push(format!(
320                "Potential reduction: 80-90% ({} operations)",
321                format_number((ops as f64 * 0.85) as usize)
322            ));
323        } else if path.to_lowercase().contains("airflow") {
324            println!("AIRFLOW SECRET");
325            recommendations
326                .push("Deploy Vault agent with template rendering for Airflow".to_string());
327            recommendations.push("Configure connection caching in Airflow".to_string());
328            recommendations.push(format!(
329                "Potential reduction: 95% ({} operations)",
330                format_number((ops as f64 * 0.95) as usize)
331            ));
332        } else if path.contains("approle/login") {
333            println!("APPROLE AUTHENTICATION");
334            if entity_count == 1 {
335                recommendations.push(format!(
336                    "⚠️  CRITICAL: Single entity making all {} login requests",
337                    format_number(ops)
338                ));
339                recommendations
340                    .push("Review token TTL configuration - may be too short".to_string());
341                recommendations.push("Consider SecretID caching if appropriate".to_string());
342            }
343        } else if path.to_lowercase().contains("openshift")
344            || path.to_lowercase().contains("kubernetes")
345        {
346            println!("KUBERNETES/OPENSHIFT AUTH");
347            recommendations.push("Review pod authentication token TTLs".to_string());
348            recommendations.push("Consider increasing default token lifetime".to_string());
349            recommendations.push("Implement token renewal strategy in applications".to_string());
350        } else if path.to_lowercase().contains("github") && path.contains("login") {
351            println!("GITHUB AUTHENTICATION");
352            recommendations.push("Review GitHub auth token TTLs".to_string());
353            if entity_count == 1 {
354                recommendations.push(format!(
355                    "⚠️  Single entity ({}) - investigate why",
356                    entity_count
357                ));
358            }
359        } else if path.contains("data/") || path.contains("metadata/") {
360            println!("KV SECRET ENGINE");
361            if entity_count <= 3 && ops > 10000 {
362                recommendations.push(format!(
363                    "⚠️  HIGH-FREQUENCY ACCESS: {} operations from only {} entities",
364                    format_number(ops),
365                    entity_count
366                ));
367                recommendations.push("Implement caching layer or Vault agent".to_string());
368                recommendations.push("Review if secret needs this frequency of access".to_string());
369            } else {
370                recommendations
371                    .push("Consider Vault agent for high-frequency consumers".to_string());
372            }
373        } else {
374            println!("OTHER");
375            if ops > 5000 {
376                recommendations.push(format!(
377                    "High-volume path ({} operations) - review necessity",
378                    format_number(ops)
379                ));
380            }
381        }
382
383        // Entity concentration check
384        if let Some((_, top_entity_ops)) = top_entities.first() {
385            let top_entity_pct = (**top_entity_ops as f64 / ops as f64) * 100.0;
386            if top_entity_pct > 50.0 && !recommendations.iter().any(|r| r.contains("CRITICAL")) {
387                recommendations.push(format!(
388                    "⚠️  Entity concentration: Single entity responsible for {:.1}% of access",
389                    top_entity_pct
390                ));
391            }
392        }
393
394        if !recommendations.is_empty() {
395            println!("   Recommendations:");
396            for rec in recommendations {
397                println!("      • {}", rec);
398            }
399        }
400    }
401
402    // 3. Summary by category
403    println!("\n\nSUMMARY BY PATH CATEGORY");
404    println!("{}", "=".repeat(120));
405
406    let mut categories: HashMap<&str, usize> = HashMap::new();
407    categories.insert("Token Operations", 0);
408    categories.insert("KV Secret Access", 0);
409    categories.insert("Authentication", 0);
410    categories.insert("Airflow Secrets", 0);
411    categories.insert("System/Admin", 0);
412    categories.insert("Other", 0);
413
414    for (path, stats) in path_stats.iter() {
415        let ops = stats.operations;
416        if path.contains("token/") {
417            *categories.get_mut("Token Operations").unwrap() += ops;
418        } else if path.contains("/data/") || path.contains("/metadata/") {
419            if path.to_lowercase().contains("airflow") {
420                *categories.get_mut("Airflow Secrets").unwrap() += ops;
421            } else {
422                *categories.get_mut("KV Secret Access").unwrap() += ops;
423            }
424        } else if path.contains("/login") || path.contains("/auth/") {
425            *categories.get_mut("Authentication").unwrap() += ops;
426        } else if path.contains("sys/") {
427            *categories.get_mut("System/Admin").unwrap() += ops;
428        } else {
429            *categories.get_mut("Other").unwrap() += ops;
430        }
431    }
432
433    println!(
434        "{:<30} {:<15} {:<15}",
435        "Category", "Operations", "% of Total"
436    );
437    println!("{}", "-".repeat(120));
438
439    let mut sorted_categories: Vec<_> = categories.iter().collect();
440    sorted_categories.sort_by(|a, b| b.1.cmp(a.1));
441
442    for (category, ops) in sorted_categories {
443        let percentage = (*ops as f64 / total_operations as f64) * 100.0;
444        println!(
445            "{:<30} {:<15} {:<15.2}%",
446            category,
447            format_number(*ops),
448            percentage
449        );
450    }
451
452    println!("\n{}", "=".repeat(120));
453
454    // 4. Overall recommendations
455    println!("\nTOP OPTIMIZATION OPPORTUNITIES (by impact)");
456    println!("{}", "=".repeat(120));
457
458    struct Opportunity {
459        name: String,
460        current_ops: usize,
461        potential_reduction: usize,
462        effort: String,
463        priority: u8,
464    }
465
466    let mut opportunities = Vec::new();
467
468    // Calculate token lookup impact
469    let token_lookup_ops: usize = path_stats
470        .iter()
471        .filter(|(path, _)| path.contains("token/lookup"))
472        .map(|(_, stats)| stats.operations)
473        .sum();
474
475    if token_lookup_ops > 10000 {
476        opportunities.push(Opportunity {
477            name: "Eliminate Token Lookup Polling".to_string(),
478            current_ops: token_lookup_ops,
479            potential_reduction: (token_lookup_ops as f64 * 0.85) as usize,
480            effort: "Medium".to_string(),
481            priority: 1,
482        });
483    }
484
485    // Calculate Airflow impact
486    let airflow_ops: usize = path_stats
487        .iter()
488        .filter(|(path, _)| path.to_lowercase().contains("airflow"))
489        .map(|(_, stats)| stats.operations)
490        .sum();
491
492    if airflow_ops > 10000 {
493        opportunities.push(Opportunity {
494            name: "Deploy Vault Agent for Airflow".to_string(),
495            current_ops: airflow_ops,
496            potential_reduction: (airflow_ops as f64 * 0.95) as usize,
497            effort: "Medium".to_string(),
498            priority: 2,
499        });
500    }
501
502    // Calculate high-frequency path caching opportunities
503    let high_freq_ops: usize = path_stats
504        .iter()
505        .filter(|(_, stats)| stats.operations > 5000 && stats.operations < 100000)
506        .map(|(_, stats)| stats.operations)
507        .sum();
508
509    let high_freq_count = path_stats
510        .iter()
511        .filter(|(_, stats)| stats.operations > 5000 && stats.operations < 100000)
512        .count();
513
514    if high_freq_ops > 10000 {
515        opportunities.push(Opportunity {
516            name: format!("Cache High-Frequency Paths ({} paths)", high_freq_count),
517            current_ops: high_freq_ops,
518            potential_reduction: (high_freq_ops as f64 * 0.70) as usize,
519            effort: "Low-Medium".to_string(),
520            priority: 3,
521        });
522    }
523
524    opportunities.sort_by_key(|o| o.priority);
525
526    println!(
527        "\n{:<10} {:<50} {:<15} {:<15} {:<15}",
528        "Priority", "Opportunity", "Current Ops", "Savings", "Effort"
529    );
530    println!("{}", "-".repeat(120));
531
532    let mut total_current_ops = 0;
533    let mut total_savings = 0;
534
535    for opp in &opportunities {
536        println!(
537            "{:<10} {:<50} {:<15} {:<15} {:<15}",
538            opp.priority,
539            opp.name,
540            format_number(opp.current_ops),
541            format_number(opp.potential_reduction),
542            opp.effort
543        );
544        total_current_ops += opp.current_ops;
545        total_savings += opp.potential_reduction;
546    }
547
548    println!("{}", "-".repeat(120));
549    println!(
550        "{:<10} {:<50} {:<15} {:<15}",
551        "TOTAL POTENTIAL SAVINGS",
552        "",
553        format_number(total_current_ops),
554        format_number(total_savings)
555    );
556
557    let projected_reduction = (total_savings as f64 / total_operations as f64) * 100.0;
558    println!(
559        "\nProjected reduction: {:.1}% of all Vault operations",
560        projected_reduction
561    );
562    println!("{}", "=".repeat(120));
563
564    Ok(())
565}