vault_audit_tools/commands/
path_hotspots.rs

1//! Path hotspot analysis command.
2//!
3//! Identifies the most frequently accessed paths in Vault to help
4//! understand usage patterns and potential performance bottlenecks.
5//! Supports multi-file analysis (compressed or uncompressed) for long-term trending.
6//!
7//! # Usage
8//!
9//! ```bash
10//! # Single file - show top 20 hotspots (default)
11//! vault-audit path-hotspots audit.log
12//! vault-audit path-hotspots audit.log.gz
13//!
14//! # Multi-day analysis with top 50 (compressed files)
15//! vault-audit path-hotspots logs/*.log.gz --top 50
16//!
17//! # Filter by mount point across multiple files
18//! vault-audit path-hotspots day*.log --mount secret
19//! ```
20//!
21//! **Compressed File Support**: Works seamlessly with `.gz` and `.zst` files.
22//!
23//! # Output
24//!
25//! Displays top accessed paths with:
26//! - Path name
27//! - Total operations
28//! - Unique entities accessing
29//! - Operation breakdown (read/write/list/delete)
30//! - Access rate (ops per hour)
31//! - Top entity contributors
32//!
33//! Helps identify:
34//! - Performance bottlenecks
35//! - Heavily used secrets
36//! - Caching opportunities
37//! - Load distribution
38
39use crate::audit::types::AuditEntry;
40use crate::utils::format::format_number;
41use crate::utils::processor::{ProcessingMode, ProcessorBuilder};
42use crate::utils::time::parse_timestamp;
43use anyhow::Result;
44use chrono::DateTime;
45use chrono::Utc;
46use std::collections::{HashMap, HashSet};
47
48/// Statistics for a single path
49#[derive(Debug, Clone)]
50struct PathStats {
51    operations: usize,
52    entities: HashSet<String>,
53    operations_by_type: HashMap<String, usize>,
54    timestamps: Vec<DateTime<Utc>>,
55    entity_operations: HashMap<String, usize>,
56}
57
58impl PathStats {
59    fn new() -> Self {
60        Self {
61            operations: 0,
62            entities: HashSet::with_capacity(50), // Typical: dozens of entities per popular path
63            operations_by_type: HashMap::with_capacity(10), // Few operation types: read, write, list, delete
64            timestamps: Vec::with_capacity(1000),           // Pre-allocate for timestamps
65            entity_operations: HashMap::with_capacity(50),  // Entities accessing this path
66        }
67    }
68
69    fn merge(&mut self, other: Self) {
70        self.operations += other.operations;
71        self.entities.extend(other.entities);
72        for (op, count) in other.operations_by_type {
73            *self.operations_by_type.entry(op).or_insert(0) += count;
74        }
75        self.timestamps.extend(other.timestamps);
76        for (entity, count) in other.entity_operations {
77            *self.entity_operations.entry(entity).or_insert(0) += count;
78        }
79    }
80}
81
82#[derive(Debug, Clone)]
83struct HotspotsState {
84    path_stats: HashMap<String, PathStats>,
85    total_operations: usize,
86}
87
88impl HotspotsState {
89    fn new() -> Self {
90        Self {
91            path_stats: HashMap::with_capacity(5000),
92            total_operations: 0,
93        }
94    }
95
96    fn merge(mut self, other: Self) -> Self {
97        self.total_operations += other.total_operations;
98        for (path, other_stats) in other.path_stats {
99            self.path_stats
100                .entry(path)
101                .and_modify(|stats| stats.merge(other_stats.clone()))
102                .or_insert(other_stats);
103        }
104        self
105    }
106}
107
108pub fn run(log_files: &[String], top: usize) -> Result<()> {
109    let processor = ProcessorBuilder::new()
110        .mode(ProcessingMode::Auto)
111        .progress_label("Processing".to_string())
112        .build();
113
114    let (result, stats) = processor.process_files_streaming(
115        log_files,
116        |entry: &AuditEntry, state: &mut HotspotsState| {
117            let path = match &entry.request {
118                Some(r) => match &r.path {
119                    Some(p) => p.as_str(),
120                    None => return,
121                },
122                None => return,
123            };
124
125            let operation = match &entry.request {
126                Some(r) => match &r.operation {
127                    Some(o) => o.as_str(),
128                    None => return,
129                },
130                None => return,
131            };
132
133            state.total_operations += 1;
134
135            let entity_id = entry
136                .auth
137                .as_ref()
138                .and_then(|a| a.entity_id.as_deref())
139                .unwrap_or("no-entity");
140
141            // Parse timestamp
142            let ts = parse_timestamp(&entry.time).ok();
143
144            // Track path statistics
145            let path_stats = state
146                .path_stats
147                .entry(path.to_string())
148                .or_insert_with(PathStats::new);
149            path_stats.operations += 1;
150            path_stats.entities.insert(entity_id.to_string());
151            *path_stats
152                .operations_by_type
153                .entry(operation.to_string())
154                .or_insert(0) += 1;
155            *path_stats
156                .entity_operations
157                .entry(entity_id.to_string())
158                .or_insert(0) += 1;
159            if let Some(t) = ts {
160                path_stats.timestamps.push(t);
161            }
162        },
163        HotspotsState::merge,
164        HotspotsState::new(),
165    )?;
166
167    let total_lines = stats.total_lines;
168    let total_operations = result.total_operations;
169    let path_stats = result.path_stats;
170
171    eprintln!(
172        "\nTotal: Processed {} lines, {} operations",
173        format_number(total_lines),
174        format_number(total_operations)
175    );
176
177    // Sort paths by operation count
178    let mut sorted_paths: Vec<_> = path_stats.iter().collect();
179    sorted_paths.sort_by(|a, b| b.1.operations.cmp(&a.1.operations));
180
181    // 1. Summary table
182    println!("\n{}", "=".repeat(120));
183    println!("TOP {} PATH HOT SPOTS ANALYSIS", top);
184    println!("{}", "=".repeat(120));
185
186    println!(
187        "\n{:<5} {:<60} {:<12} {:<10} {:<10} {:<10}",
188        "#", "Path", "Ops", "Entities", "Top Op", "%"
189    );
190    println!("{}", "-".repeat(120));
191
192    for (i, (path, data)) in sorted_paths.iter().take(top).enumerate() {
193        let ops = data.operations;
194        let entity_count = data.entities.len();
195        let percentage = (ops as f64 / total_operations as f64) * 100.0;
196
197        let top_op = data
198            .operations_by_type
199            .iter()
200            .max_by_key(|x| x.1)
201            .map_or("N/A", |x| x.0.as_str());
202
203        let display_path = if path.len() <= 58 {
204            (*path).clone()
205        } else {
206            format!("{}...", &path[..55])
207        };
208
209        println!(
210            "{:<5} {:<60} {:<12} {:<10} {:<10} {:<10.2}%",
211            i + 1,
212            display_path,
213            format_number(ops),
214            format_number(entity_count),
215            top_op,
216            percentage
217        );
218    }
219
220    // 2. Detailed analysis for top 20 paths
221    println!("\n\nDETAILED ANALYSIS OF TOP {} PATHS", top.min(20));
222    println!("{}", "=".repeat(120));
223
224    for (i, (path, data)) in sorted_paths.iter().take(top.min(20)).enumerate() {
225        println!("\n{}. PATH: {}", i + 1, path);
226        println!("{}", "-".repeat(120));
227
228        let ops = data.operations;
229        let entity_count = data.entities.len();
230        let percentage = (ops as f64 / total_operations as f64) * 100.0;
231
232        println!(
233            "   Total Operations: {} ({:.2}% of all traffic)",
234            format_number(ops),
235            percentage
236        );
237        println!("   Unique Entities: {}", format_number(entity_count));
238
239        // Calculate time span and rate
240        if data.timestamps.len() >= 2 {
241            let mut sorted_ts = data.timestamps.clone();
242            sorted_ts.sort();
243            let time_span = (sorted_ts
244                .last()
245                .unwrap()
246                .signed_duration_since(*sorted_ts.first().unwrap()))
247            .num_seconds() as f64
248                / 3600.0;
249            if time_span > 0.0 {
250                let ops_per_hour = ops as f64 / time_span;
251                println!(
252                    "   Access Rate: {:.1} operations/hour ({:.2}/minute)",
253                    ops_per_hour,
254                    ops_per_hour / 60.0
255                );
256            }
257        }
258
259        // Operation breakdown
260        println!("   Operations by type:");
261        let mut ops_by_type: Vec<_> = data.operations_by_type.iter().collect();
262        ops_by_type.sort_by(|a, b| b.1.cmp(a.1));
263        for (op, count) in ops_by_type.iter().take(5) {
264            let op_pct = (**count as f64 / ops as f64) * 100.0;
265            println!(
266                "      - {}: {} ({:.1}%)",
267                op,
268                format_number(**count),
269                op_pct
270            );
271        }
272
273        // Top entities
274        let mut top_entities: Vec<_> = data.entity_operations.iter().collect();
275        top_entities.sort_by(|a, b| b.1.cmp(a.1));
276        if !top_entities.is_empty() {
277            println!("   Top {} entities:", top_entities.len().min(5));
278            for (entity_id, entity_ops) in top_entities.iter().take(5) {
279                let entity_pct = (**entity_ops as f64 / ops as f64) * 100.0;
280                let entity_display = if entity_id.len() <= 40 {
281                    (*entity_id).clone()
282                } else {
283                    format!("{}...", &entity_id[..37])
284                };
285                println!(
286                    "      - {}: {} ops ({:.1}%)",
287                    entity_display,
288                    format_number(**entity_ops),
289                    entity_pct
290                );
291            }
292        }
293
294        // Categorize and provide recommendations
295        print!("   Category: ");
296        let mut recommendations = Vec::new();
297
298        if path.contains("token/lookup") {
299            println!("TOKEN LOOKUP");
300            recommendations
301                .push("Implement client-side token TTL tracking to eliminate polling".to_string());
302            recommendations.push(format!(
303                "Potential reduction: 80-90% ({} operations)",
304                format_number((ops as f64 * 0.85) as usize)
305            ));
306        } else if path.to_lowercase().contains("airflow") {
307            println!("AIRFLOW SECRET");
308            recommendations
309                .push("Deploy Vault agent with template rendering for Airflow".to_string());
310            recommendations.push("Configure connection caching in Airflow".to_string());
311            recommendations.push(format!(
312                "Potential reduction: 95% ({} operations)",
313                format_number((ops as f64 * 0.95) as usize)
314            ));
315        } else if path.contains("approle/login") {
316            println!("APPROLE AUTHENTICATION");
317            if entity_count == 1 {
318                recommendations.push(format!(
319                    "⚠️  CRITICAL: Single entity making all {} login requests",
320                    format_number(ops)
321                ));
322                recommendations
323                    .push("Review token TTL configuration - may be too short".to_string());
324                recommendations.push("Consider SecretID caching if appropriate".to_string());
325            }
326        } else if path.to_lowercase().contains("openshift")
327            || path.to_lowercase().contains("kubernetes")
328        {
329            println!("KUBERNETES/OPENSHIFT AUTH");
330            recommendations.push("Review pod authentication token TTLs".to_string());
331            recommendations.push("Consider increasing default token lifetime".to_string());
332            recommendations.push("Implement token renewal strategy in applications".to_string());
333        } else if path.to_lowercase().contains("github") && path.contains("login") {
334            println!("GITHUB AUTHENTICATION");
335            recommendations.push("Review GitHub auth token TTLs".to_string());
336            if entity_count == 1 {
337                recommendations.push(format!(
338                    "⚠️  Single entity ({}) - investigate why",
339                    entity_count
340                ));
341            }
342        } else if path.contains("data/") || path.contains("metadata/") {
343            println!("KV SECRET ENGINE");
344            if entity_count <= 3 && ops > 10000 {
345                recommendations.push(format!(
346                    "⚠️  HIGH-FREQUENCY ACCESS: {} operations from only {} entities",
347                    format_number(ops),
348                    entity_count
349                ));
350                recommendations.push("Implement caching layer or Vault agent".to_string());
351                recommendations.push("Review if secret needs this frequency of access".to_string());
352            } else {
353                recommendations
354                    .push("Consider Vault agent for high-frequency consumers".to_string());
355            }
356        } else {
357            println!("OTHER");
358            if ops > 5000 {
359                recommendations.push(format!(
360                    "High-volume path ({} operations) - review necessity",
361                    format_number(ops)
362                ));
363            }
364        }
365
366        // Entity concentration check
367        if let Some((_, top_entity_ops)) = top_entities.first() {
368            let top_entity_pct = (**top_entity_ops as f64 / ops as f64) * 100.0;
369            if top_entity_pct > 50.0 && !recommendations.iter().any(|r| r.contains("CRITICAL")) {
370                recommendations.push(format!(
371                    "⚠️  Entity concentration: Single entity responsible for {:.1}% of access",
372                    top_entity_pct
373                ));
374            }
375        }
376
377        if !recommendations.is_empty() {
378            println!("   Recommendations:");
379            for rec in recommendations {
380                println!("      • {}", rec);
381            }
382        }
383    }
384
385    // 3. Summary by category
386    println!("\n\nSUMMARY BY PATH CATEGORY");
387    println!("{}", "=".repeat(120));
388
389    let mut categories: HashMap<&str, usize> = HashMap::with_capacity(10); // Small fixed set of categories
390    categories.insert("Token Operations", 0);
391    categories.insert("KV Secret Access", 0);
392    categories.insert("Authentication", 0);
393    categories.insert("Airflow Secrets", 0);
394    categories.insert("System/Admin", 0);
395    categories.insert("Other", 0);
396
397    for (path, stats) in &path_stats {
398        let ops = stats.operations;
399        if path.contains("token/") {
400            *categories.get_mut("Token Operations").unwrap() += ops;
401        } else if path.contains("/data/") || path.contains("/metadata/") {
402            if path.to_lowercase().contains("airflow") {
403                *categories.get_mut("Airflow Secrets").unwrap() += ops;
404            } else {
405                *categories.get_mut("KV Secret Access").unwrap() += ops;
406            }
407        } else if path.contains("/login") || path.contains("/auth/") {
408            *categories.get_mut("Authentication").unwrap() += ops;
409        } else if path.contains("sys/") {
410            *categories.get_mut("System/Admin").unwrap() += ops;
411        } else {
412            *categories.get_mut("Other").unwrap() += ops;
413        }
414    }
415
416    println!(
417        "{:<30} {:<15} {:<15}",
418        "Category", "Operations", "% of Total"
419    );
420    println!("{}", "-".repeat(120));
421
422    let mut sorted_categories: Vec<_> = categories.iter().collect();
423    sorted_categories.sort_by(|a, b| b.1.cmp(a.1));
424
425    for (category, ops) in sorted_categories {
426        let percentage = (*ops as f64 / total_operations as f64) * 100.0;
427        println!(
428            "{:<30} {:<15} {:<15.2}%",
429            category,
430            format_number(*ops),
431            percentage
432        );
433    }
434
435    println!("\n{}", "=".repeat(120));
436
437    // 4. Overall recommendations
438    println!("\nTOP OPTIMIZATION OPPORTUNITIES (by impact)");
439    println!("{}", "=".repeat(120));
440
441    struct Opportunity {
442        name: String,
443        current_ops: usize,
444        potential_reduction: usize,
445        effort: String,
446        priority: u8,
447    }
448
449    let mut opportunities = Vec::new();
450
451    // Calculate token lookup impact
452    let token_lookup_ops: usize = path_stats
453        .iter()
454        .filter(|(path, _)| path.contains("token/lookup"))
455        .map(|(_, stats)| stats.operations)
456        .sum();
457
458    if token_lookup_ops > 10000 {
459        opportunities.push(Opportunity {
460            name: "Eliminate Token Lookup Polling".to_string(),
461            current_ops: token_lookup_ops,
462            potential_reduction: (token_lookup_ops as f64 * 0.85) as usize,
463            effort: "Medium".to_string(),
464            priority: 1,
465        });
466    }
467
468    // Calculate Airflow impact
469    let airflow_ops: usize = path_stats
470        .iter()
471        .filter(|(path, _)| path.to_lowercase().contains("airflow"))
472        .map(|(_, stats)| stats.operations)
473        .sum();
474
475    if airflow_ops > 10000 {
476        opportunities.push(Opportunity {
477            name: "Deploy Vault Agent for Airflow".to_string(),
478            current_ops: airflow_ops,
479            potential_reduction: (airflow_ops as f64 * 0.95) as usize,
480            effort: "Medium".to_string(),
481            priority: 2,
482        });
483    }
484
485    // Calculate high-frequency path caching opportunities
486    let high_freq_ops: usize = path_stats
487        .iter()
488        .filter(|(_, stats)| stats.operations > 5000 && stats.operations < 100_000)
489        .map(|(_, stats)| stats.operations)
490        .sum();
491
492    let high_freq_count = path_stats
493        .iter()
494        .filter(|(_, stats)| stats.operations > 5000 && stats.operations < 100_000)
495        .count();
496
497    if high_freq_ops > 10000 {
498        opportunities.push(Opportunity {
499            name: format!("Cache High-Frequency Paths ({} paths)", high_freq_count),
500            current_ops: high_freq_ops,
501            potential_reduction: (high_freq_ops as f64 * 0.70) as usize,
502            effort: "Low-Medium".to_string(),
503            priority: 3,
504        });
505    }
506
507    opportunities.sort_by_key(|o| o.priority);
508
509    println!(
510        "\n{:<10} {:<50} {:<15} {:<15} {:<15}",
511        "Priority", "Opportunity", "Current Ops", "Savings", "Effort"
512    );
513    println!("{}", "-".repeat(120));
514
515    let mut total_current_ops = 0;
516    let mut total_savings = 0;
517
518    for opp in &opportunities {
519        println!(
520            "{:<10} {:<50} {:<15} {:<15} {:<15}",
521            opp.priority,
522            opp.name,
523            format_number(opp.current_ops),
524            format_number(opp.potential_reduction),
525            opp.effort
526        );
527        total_current_ops += opp.current_ops;
528        total_savings += opp.potential_reduction;
529    }
530
531    println!("{}", "-".repeat(120));
532    println!(
533        "{:<10} {:<50} {:<15} {:<15}",
534        "TOTAL POTENTIAL SAVINGS",
535        "",
536        format_number(total_current_ops),
537        format_number(total_savings)
538    );
539
540    let projected_reduction = (total_savings as f64 / total_operations as f64) * 100.0;
541    println!(
542        "\nProjected reduction: {:.1}% of all Vault operations",
543        projected_reduction
544    );
545    println!("{}", "=".repeat(120));
546
547    Ok(())
548}