vault_audit_tools/commands/
system_overview.rs

1//! System-wide audit log overview.
2//!
3//! Provides high-level statistics and insights about Vault usage
4//! across the entire audit log. Supports analyzing multiple log files
5//! (compressed or uncompressed) for long-term trend analysis.
6//!
7//! # Usage
8//!
9//! ```bash
10//! # Single file (plain or compressed)
11//! vault-audit system-overview audit.log
12//! vault-audit system-overview audit.log.gz
13//!
14//! # Multiple files for week-long analysis
15//! vault-audit system-overview day1.log day2.log day3.log
16//!
17//! # Using shell globbing with compressed files
18//! vault-audit system-overview logs/vault_audit.2025-10-*.log.gz
19//! ```
20//!
21//! **Compressed File Support**: Automatically detects and decompresses `.gz` (gzip)
22//! and `.zst` (zstandard) files with streaming processing - no temp files needed.
23//!
24//! # Output
25//!
26//! Displays comprehensive statistics:
27//! - Total entries processed
28//! - Unique entities
29//! - Unique paths accessed
30//! - Operation breakdown (read, write, list, delete)
31//! - Top paths by access count
32//! - Mount point usage
33//! - Authentication method breakdown
34//! - Time range covered
35//! - Error rate
36//!
37//! Useful for:
38//! - Understanding overall Vault usage
39//! - Capacity planning
40//! - Identifying hotspots
41//! - Security audits
42
43use crate::audit::types::AuditEntry;
44use crate::utils::format::format_number;
45use crate::utils::parallel::process_files_parallel;
46use crate::utils::progress::ProgressBar;
47use crate::utils::reader::open_file;
48use anyhow::Result;
49use std::collections::{HashMap, HashSet};
50use std::io::{BufRead, BufReader};
51
52/// Path access statistics
53#[derive(Debug)]
54struct PathData {
55    count: usize,
56    operations: HashMap<String, usize>,
57    entities: HashSet<String>,
58}
59
60impl PathData {
61    fn new() -> Self {
62        Self {
63            count: 0,
64            operations: HashMap::with_capacity(10), // Typical: few operation types per path
65            entities: HashSet::with_capacity(50),   // Typical: dozens of entities per popular path
66        }
67    }
68}
69
70/// Results from processing a single file
71#[derive(Debug)]
72struct FileAnalysisResult {
73    path_operations: HashMap<String, PathData>,
74    operation_types: HashMap<String, usize>,
75    path_prefixes: HashMap<String, usize>,
76    entity_paths: HashMap<String, HashMap<String, usize>>,
77    entity_names: HashMap<String, String>,
78}
79
80/// Global progress tracking for parallel processing
81static PARALLEL_PROGRESS: std::sync::OnceLock<(
82    std::sync::Arc<std::sync::atomic::AtomicUsize>,
83    std::sync::Arc<std::sync::Mutex<crate::utils::progress::ProgressBar>>,
84)> = std::sync::OnceLock::new();
85
86/// Set up global progress tracking
87pub fn init_parallel_progress(
88    processed: std::sync::Arc<std::sync::atomic::AtomicUsize>,
89    progress: std::sync::Arc<std::sync::Mutex<crate::utils::progress::ProgressBar>>,
90) {
91    let _ = PARALLEL_PROGRESS.set((processed, progress));
92}
93
94/// Process entries from a single file using streaming to reduce memory usage
95fn process_file_entries_streaming(file_path: &str) -> Result<FileAnalysisResult> {
96    use crate::utils::reader::open_file;
97    use std::io::{BufRead, BufReader};
98
99    let mut path_operations: HashMap<String, PathData> = HashMap::with_capacity(5000);
100    let mut operation_types: HashMap<String, usize> = HashMap::with_capacity(20);
101    let mut path_prefixes: HashMap<String, usize> = HashMap::with_capacity(100);
102    let mut entity_paths: HashMap<String, HashMap<String, usize>> = HashMap::with_capacity(2000);
103    let mut entity_names: HashMap<String, String> = HashMap::with_capacity(2000);
104
105    let file = open_file(file_path)?;
106    let reader = BufReader::new(file);
107
108    let mut lines_processed = 0;
109
110    for line_result in reader.lines() {
111        let line = line_result?;
112        lines_processed += 1;
113
114        // Update progress every 2000 lines for responsive feedback
115        if lines_processed % 2000 == 0 {
116            if let Some((processed_lines, progress)) = PARALLEL_PROGRESS.get() {
117                processed_lines.fetch_add(2000, std::sync::atomic::Ordering::Relaxed);
118                if let Ok(mut p) = progress.lock() {
119                    p.update(processed_lines.load(std::sync::atomic::Ordering::Relaxed));
120                }
121            }
122        }
123
124        // Skip empty lines
125        if line.trim().is_empty() {
126            continue;
127        }
128
129        // Parse JSON entry
130        let entry: AuditEntry = match serde_json::from_str(&line) {
131            Ok(entry) => entry,
132            Err(_) => continue, // Skip invalid JSON lines
133        };
134
135        let Some(request) = &entry.request else {
136            continue;
137        };
138
139        let path = match &request.path {
140            Some(p) => p.as_str(),
141            None => continue,
142        };
143
144        let operation = match &request.operation {
145            Some(o) => o.as_str(),
146            None => continue,
147        };
148
149        let entity_id = entry
150            .auth
151            .as_ref()
152            .and_then(|a| a.entity_id.as_deref())
153            .unwrap_or("no-entity");
154
155        let display_name = entry
156            .auth
157            .as_ref()
158            .and_then(|a| a.display_name.as_deref())
159            .unwrap_or("N/A");
160
161        if path.is_empty() || operation.is_empty() {
162            continue;
163        }
164
165        // Track by full path
166        let path_data = path_operations
167            .entry(path.to_string())
168            .or_insert_with(PathData::new);
169        path_data.count += 1;
170        *path_data
171            .operations
172            .entry(operation.to_string())
173            .or_insert(0) += 1;
174        path_data.entities.insert(entity_id.to_string());
175
176        // Track by operation type
177        *operation_types.entry(operation.to_string()).or_insert(0) += 1;
178
179        // Track by path prefix
180        let parts: Vec<&str> = path.trim_matches('/').split('/').collect();
181        let prefix = if parts.len() >= 2 {
182            format!("{}/{}", parts[0], parts[1])
183        } else if !parts.is_empty() {
184            parts[0].to_string()
185        } else {
186            "root".to_string()
187        };
188        *path_prefixes.entry(prefix).or_insert(0) += 1;
189
190        // Track entity usage
191        let entity_map = entity_paths.entry(entity_id.to_string()).or_default();
192        *entity_map.entry(path.to_string()).or_insert(0) += 1;
193        entity_names
194            .entry(entity_id.to_string())
195            .or_insert_with(|| display_name.to_string());
196    }
197
198    // Update progress for remaining lines
199    let remaining = lines_processed % 2000;
200    if remaining > 0 {
201        if let Some((processed_lines, progress)) = PARALLEL_PROGRESS.get() {
202            processed_lines.fetch_add(remaining, std::sync::atomic::Ordering::Relaxed);
203            if let Ok(mut p) = progress.lock() {
204                p.update(processed_lines.load(std::sync::atomic::Ordering::Relaxed));
205            }
206        }
207    }
208
209    Ok(FileAnalysisResult {
210        path_operations,
211        operation_types,
212        path_prefixes,
213        entity_paths,
214        entity_names,
215    })
216}
217
218/// Process entries from a single file (original non-streaming version)
219fn process_file_entries(_file_path: &str, entries: Vec<AuditEntry>) -> FileAnalysisResult {
220    let mut path_operations: HashMap<String, PathData> = HashMap::with_capacity(5000);
221    let mut operation_types: HashMap<String, usize> = HashMap::with_capacity(20);
222    let mut path_prefixes: HashMap<String, usize> = HashMap::with_capacity(100);
223    let mut entity_paths: HashMap<String, HashMap<String, usize>> = HashMap::with_capacity(2000);
224    let mut entity_names: HashMap<String, String> = HashMap::with_capacity(2000);
225
226    for entry in entries {
227        let Some(request) = &entry.request else {
228            continue;
229        };
230
231        let path = match &request.path {
232            Some(p) => p.as_str(),
233            None => continue,
234        };
235
236        let operation = match &request.operation {
237            Some(o) => o.as_str(),
238            None => continue,
239        };
240
241        let entity_id = entry
242            .auth
243            .as_ref()
244            .and_then(|a| a.entity_id.as_deref())
245            .unwrap_or("no-entity");
246
247        let display_name = entry
248            .auth
249            .as_ref()
250            .and_then(|a| a.display_name.as_deref())
251            .unwrap_or("N/A");
252
253        if path.is_empty() || operation.is_empty() {
254            continue;
255        }
256
257        // Track by full path
258        let path_data = path_operations
259            .entry(path.to_string())
260            .or_insert_with(PathData::new);
261        path_data.count += 1;
262        *path_data
263            .operations
264            .entry(operation.to_string())
265            .or_insert(0) += 1;
266        path_data.entities.insert(entity_id.to_string());
267
268        // Track by operation type
269        *operation_types.entry(operation.to_string()).or_insert(0) += 1;
270
271        // Track by path prefix
272        let parts: Vec<&str> = path.trim_matches('/').split('/').collect();
273        let prefix = if parts.len() >= 2 {
274            format!("{}/{}", parts[0], parts[1])
275        } else if !parts.is_empty() {
276            parts[0].to_string()
277        } else {
278            "root".to_string()
279        };
280        *path_prefixes.entry(prefix).or_insert(0) += 1;
281
282        // Track entity usage
283        let entity_map = entity_paths.entry(entity_id.to_string()).or_default();
284        *entity_map.entry(path.to_string()).or_insert(0) += 1;
285        entity_names
286            .entry(entity_id.to_string())
287            .or_insert_with(|| display_name.to_string());
288    }
289
290    FileAnalysisResult {
291        path_operations,
292        operation_types,
293        path_prefixes,
294        entity_paths,
295        entity_names,
296    }
297}
298
299/// Combine results from multiple files
300fn combine_results(
301    results: Vec<crate::utils::parallel::FileProcessResult<FileAnalysisResult>>,
302) -> FileAnalysisResult {
303    let mut combined = FileAnalysisResult {
304        path_operations: HashMap::with_capacity(5000),
305        operation_types: HashMap::with_capacity(20),
306        path_prefixes: HashMap::with_capacity(100),
307        entity_paths: HashMap::with_capacity(2000),
308        entity_names: HashMap::with_capacity(2000),
309    };
310
311    for file_result in results {
312        let result = file_result.data;
313
314        // Merge path operations
315        for (path, path_data) in result.path_operations {
316            if let Some(existing) = combined.path_operations.get_mut(&path) {
317                existing.count += path_data.count;
318                for (op, count) in path_data.operations {
319                    *existing.operations.entry(op).or_insert(0) += count;
320                }
321                existing.entities.extend(path_data.entities);
322            } else {
323                combined.path_operations.insert(path, path_data);
324            }
325        }
326
327        // Merge operation types
328        for (op, count) in result.operation_types {
329            *combined.operation_types.entry(op).or_insert(0) += count;
330        }
331
332        // Merge path prefixes
333        for (prefix, count) in result.path_prefixes {
334            *combined.path_prefixes.entry(prefix).or_insert(0) += count;
335        }
336
337        // Merge entity paths
338        for (entity_id, paths) in result.entity_paths {
339            let entity_map = combined.entity_paths.entry(entity_id).or_default();
340            for (path, count) in paths {
341                *entity_map.entry(path).or_insert(0) += count;
342            }
343        }
344
345        // Merge entity names (first occurrence wins)
346        for (entity_id, name) in result.entity_names {
347            combined.entity_names.entry(entity_id).or_insert(name);
348        }
349    }
350
351    combined
352}
353
354/// Sequential processing fallback for compatibility and single files
355fn run_sequential(log_files: &[String]) -> Result<(FileAnalysisResult, usize)> {
356    let mut combined_result = FileAnalysisResult {
357        path_operations: HashMap::with_capacity(5000),
358        operation_types: HashMap::with_capacity(20),
359        path_prefixes: HashMap::with_capacity(100),
360        entity_paths: HashMap::with_capacity(2000),
361        entity_names: HashMap::with_capacity(2000),
362    };
363    let mut total_lines = 0;
364
365    // Process each file sequentially
366    for (file_idx, log_file) in log_files.iter().enumerate() {
367        eprintln!(
368            "[{}/{}] Processing: {}",
369            file_idx + 1,
370            log_files.len(),
371            log_file
372        );
373
374        let file_size = std::fs::metadata(log_file).ok().map(|m| m.len() as usize);
375        let mut progress = if let Some(size) = file_size {
376            ProgressBar::new(size, "Processing")
377        } else {
378            ProgressBar::new_spinner("Processing")
379        };
380
381        let file = open_file(log_file)?;
382        let reader = BufReader::new(file);
383
384        let mut file_lines = 0;
385        let mut bytes_read = 0;
386        let mut entries = Vec::new();
387
388        for line in reader.lines() {
389            file_lines += 1;
390            total_lines += 1;
391            let line = line?;
392            bytes_read += line.len() + 1;
393
394            if file_lines % 10_000 == 0 {
395                if let Some(size) = file_size {
396                    progress.update(bytes_read.min(size));
397                } else {
398                    progress.update(file_lines);
399                }
400            }
401
402            if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
403                entries.push(entry);
404            }
405        }
406
407        if let Some(size) = file_size {
408            progress.update(size);
409        }
410
411        progress.finish_with_message(&format!(
412            "Processed {} lines from this file",
413            format_number(file_lines)
414        ));
415
416        // Process the entries from this file and merge directly
417        let file_result = process_file_entries(log_file, entries);
418
419        // Merge path operations
420        for (path, path_data) in file_result.path_operations {
421            if let Some(existing) = combined_result.path_operations.get_mut(&path) {
422                existing.count += path_data.count;
423                for (op, count) in path_data.operations {
424                    *existing.operations.entry(op).or_insert(0) += count;
425                }
426                existing.entities.extend(path_data.entities);
427            } else {
428                combined_result.path_operations.insert(path, path_data);
429            }
430        }
431
432        // Merge operation types
433        for (op, count) in file_result.operation_types {
434            *combined_result.operation_types.entry(op).or_insert(0) += count;
435        }
436
437        // Merge path prefixes
438        for (prefix, count) in file_result.path_prefixes {
439            *combined_result.path_prefixes.entry(prefix).or_insert(0) += count;
440        }
441
442        // Merge entity paths
443        for (entity_id, paths) in file_result.entity_paths {
444            let entity_map = combined_result.entity_paths.entry(entity_id).or_default();
445            for (path, count) in paths {
446                *entity_map.entry(path).or_insert(0) += count;
447            }
448        }
449
450        // Merge entity names
451        for (entity_id, name) in file_result.entity_names {
452            combined_result
453                .entity_names
454                .entry(entity_id)
455                .or_insert(name);
456        }
457    }
458
459    Ok((combined_result, total_lines))
460}
461
462pub fn run(
463    log_files: &[String],
464    top: usize,
465    min_operations: usize,
466    sequential: bool,
467) -> Result<()> {
468    let (combined_result, total_lines) = if sequential || log_files.len() == 1 {
469        // Use sequential processing for single files or when explicitly requested
470        eprintln!("Processing {} files sequentially...", log_files.len());
471        run_sequential(log_files)?
472    } else {
473        // Use parallel processing for multiple files with streaming
474        process_files_parallel(log_files, process_file_entries_streaming, combine_results)?
475    };
476
477    eprintln!("\nTotal: Processed {} lines", format_number(total_lines));
478
479    let path_operations = combined_result.path_operations;
480    let operation_types = combined_result.operation_types;
481    let path_prefixes = combined_result.path_prefixes;
482    let entity_paths = combined_result.entity_paths;
483    let entity_names = combined_result.entity_names;
484
485    let total_operations: usize = operation_types.values().sum();
486
487    // Print results
488    println!("\n{}", "=".repeat(100));
489    println!("High-Volume Vault Operations Analysis");
490    println!("{}", "=".repeat(100));
491
492    // 1. Operation Types Summary
493    println!("\n1. Operation Types (Overall)");
494    println!("{}", "-".repeat(100));
495    println!("{:<20} {:>15} {:>12}", "Operation", "Count", "Percentage");
496    println!("{}", "-".repeat(100));
497
498    let mut sorted_ops: Vec<_> = operation_types.iter().collect();
499    sorted_ops.sort_by(|a, b| b.1.cmp(a.1));
500
501    for (op, count) in sorted_ops {
502        let pct = if total_operations > 0 {
503            (*count as f64 / total_operations as f64) * 100.0
504        } else {
505            0.0
506        };
507        println!("{:<20} {:>15} {:>11.2}%", op, format_number(*count), pct);
508    }
509
510    println!("{}", "-".repeat(100));
511    println!(
512        "{:<20} {:>15} {:>11.2}%",
513        "TOTAL",
514        format_number(total_operations),
515        100.0
516    );
517
518    // 2. Top Path Prefixes
519    println!("\n2. Top Path Prefixes (First 2 components)");
520    println!("{}", "-".repeat(100));
521    println!(
522        "{:<40} {:>15} {:>12}",
523        "Path Prefix", "Operations", "Percentage"
524    );
525    println!("{}", "-".repeat(100));
526
527    let mut sorted_prefixes: Vec<_> = path_prefixes.iter().collect();
528    sorted_prefixes.sort_by(|a, b| b.1.cmp(a.1));
529
530    for (prefix, count) in sorted_prefixes.iter().take(top) {
531        let pct = if total_operations > 0 {
532            (**count as f64 / total_operations as f64) * 100.0
533        } else {
534            0.0
535        };
536        println!(
537            "{:<40} {:>15} {:>11.2}%",
538            prefix,
539            format_number(**count),
540            pct
541        );
542    }
543
544    // 3. Top Individual Paths
545    println!("\n3. Top {} Individual Paths (Highest Volume)", top);
546    println!("{}", "-".repeat(100));
547    println!(
548        "{:<60} {:>10} {:>10} {:>15}",
549        "Path", "Ops", "Entities", "Top Op"
550    );
551    println!("{}", "-".repeat(100));
552
553    let mut sorted_paths: Vec<_> = path_operations.iter().collect();
554    sorted_paths.sort_by(|a, b| b.1.count.cmp(&a.1.count));
555
556    for (path, data) in sorted_paths.iter().take(top) {
557        if data.count < min_operations {
558            break;
559        }
560        let top_op = data
561            .operations
562            .iter()
563            .max_by_key(|x| x.1)
564            .map_or("N/A", |x| x.0.as_str());
565        let path_display = if path.len() > 60 {
566            format!("{}...", &path[..58])
567        } else {
568            (*path).to_string()
569        };
570        println!(
571            "{:<60} {:>10} {:>10} {:>15}",
572            path_display,
573            format_number(data.count),
574            format_number(data.entities.len()),
575            top_op
576        );
577    }
578
579    // 4. Top Entities by Total Operations
580    println!("\n4. Top {} Entities by Total Operations", top);
581    println!("{}", "-".repeat(100));
582    println!(
583        "{:<50} {:<38} {:>10}",
584        "Display Name", "Entity ID", "Total Ops"
585    );
586    println!("{}", "-".repeat(100));
587
588    let mut entity_totals: HashMap<String, usize> = HashMap::with_capacity(entity_paths.len());
589    for (entity_id, paths) in &entity_paths {
590        let total: usize = paths.values().sum();
591        entity_totals.insert(entity_id.clone(), total);
592    }
593
594    let mut sorted_entities: Vec<_> = entity_totals.iter().collect();
595    sorted_entities.sort_by(|a, b| b.1.cmp(a.1));
596
597    for (entity_id, total) in sorted_entities.iter().take(top) {
598        let name = entity_names
599            .get(*entity_id)
600            .map_or("N/A", std::string::String::as_str);
601        let name_display = if name.len() > 48 { &name[..48] } else { name };
602        let entity_short = if entity_id.len() > 36 {
603            &entity_id[..36]
604        } else {
605            entity_id
606        };
607        println!(
608            "{:<50} {:<38} {:>10}",
609            name_display,
610            entity_short,
611            format_number(**total)
612        );
613    }
614
615    // 5. Potential Stress Points
616    println!("\n5. Potential System Stress Points");
617    println!("{}", "-".repeat(100));
618
619    #[derive(Debug)]
620    struct StressPoint {
621        path: String,
622        entity_name: String,
623        operations: usize,
624    }
625
626    let mut stress_points = Vec::new();
627
628    for (path, data) in &path_operations {
629        if data.count >= min_operations {
630            for entity_id in &data.entities {
631                if let Some(entity_ops_map) = entity_paths.get(entity_id) {
632                    if let Some(&entity_ops) = entity_ops_map.get(path) {
633                        if entity_ops >= min_operations {
634                            stress_points.push(StressPoint {
635                                path: path.clone(),
636                                entity_name: entity_names
637                                    .get(entity_id)
638                                    .cloned()
639                                    .unwrap_or_else(|| "N/A".to_string()),
640                                operations: entity_ops,
641                            });
642                        }
643                    }
644                }
645            }
646        }
647    }
648
649    stress_points.sort_by(|a, b| b.operations.cmp(&a.operations));
650
651    println!("{:<40} {:<40} {:>10}", "Entity", "Path", "Ops");
652    println!("{}", "-".repeat(100));
653
654    for sp in stress_points.iter().take(top) {
655        let entity_display = if sp.entity_name.len() > 38 {
656            &sp.entity_name[..38]
657        } else {
658            &sp.entity_name
659        };
660        let path_display = if sp.path.len() > 38 {
661            &sp.path[..38]
662        } else {
663            &sp.path
664        };
665        println!(
666            "{:<40} {:<40} {:>10}",
667            entity_display,
668            path_display,
669            format_number(sp.operations)
670        );
671    }
672
673    println!("{}", "=".repeat(100));
674    println!("\nTotal Lines Processed: {}", format_number(total_lines));
675    println!("Total Operations: {}", format_number(total_operations));
676    println!("{}", "=".repeat(100));
677
678    Ok(())
679}