vault_audit_tools/commands/
system_overview.rs

1//! System-wide audit log overview.
2//!
3//! Provides high-level statistics and insights about Vault usage
4//! across the entire audit log. Supports analyzing multiple log files
5//! (compressed or uncompressed) for long-term trend analysis.
6//!
7//! # Usage
8//!
9//! ```bash
10//! # Single file (plain or compressed)
11//! vault-audit system-overview audit.log
12//! vault-audit system-overview audit.log.gz
13//!
14//! # Multiple files for week-long analysis
15//! vault-audit system-overview day1.log day2.log day3.log
16//!
17//! # Using shell globbing with compressed files
18//! vault-audit system-overview logs/vault_audit.2025-10-*.log.gz
19//! ```
20//!
21//! **Compressed File Support**: Automatically detects and decompresses `.gz` (gzip)
22//! and `.zst` (zstandard) files with streaming processing - no temp files needed.
23//!
24//! # Output
25//!
26//! Displays comprehensive statistics:
27//! - Total entries processed
28//! - Unique entities
29//! - Unique paths accessed
30//! - Operation breakdown (read, write, list, delete)
31//! - Top paths by access count
32//! - Mount point usage
33//! - Authentication method breakdown
34//! - Time range covered
35//! - Error rate
36//!
37//! Useful for:
38//! - Understanding overall Vault usage
39//! - Capacity planning
40//! - Identifying hotspots
41//! - Security audits
42
43use crate::audit::types::AuditEntry;
44use crate::utils::format::format_number;
45use crate::utils::parallel::process_files_parallel;
46use anyhow::Result;
47use std::collections::{HashMap, HashSet};
48
49/// Path access statistics
50#[derive(Debug)]
51struct PathData {
52    count: usize,
53    operations: HashMap<String, usize>,
54    entities: HashSet<String>,
55}
56
57impl PathData {
58    fn new() -> Self {
59        Self {
60            count: 0,
61            operations: HashMap::with_capacity(10), // Typical: few operation types per path
62            entities: HashSet::with_capacity(50),   // Typical: dozens of entities per popular path
63        }
64    }
65}
66
67/// Results from processing a single file
68#[derive(Debug)]
69struct FileAnalysisResult {
70    path_operations: HashMap<String, PathData>,
71    operation_types: HashMap<String, usize>,
72    path_prefixes: HashMap<String, usize>,
73    entity_paths: HashMap<String, HashMap<String, usize>>,
74    entity_names: HashMap<String, String>,
75}
76
77/// Global progress tracking for parallel processing
78static PARALLEL_PROGRESS: std::sync::OnceLock<(
79    std::sync::Arc<std::sync::atomic::AtomicUsize>,
80    std::sync::Arc<std::sync::Mutex<crate::utils::progress::ProgressBar>>,
81)> = std::sync::OnceLock::new();
82
83/// Set up global progress tracking
84pub fn init_parallel_progress(
85    processed: std::sync::Arc<std::sync::atomic::AtomicUsize>,
86    progress: std::sync::Arc<std::sync::Mutex<crate::utils::progress::ProgressBar>>,
87) {
88    let _ = PARALLEL_PROGRESS.set((processed, progress));
89}
90
91/// Process entries from a single file using streaming to reduce memory usage
92/// Optionally accepts progress tracking for sequential mode
93fn process_file_entries_streaming(
94    file_path: &str,
95    namespace_filter: Option<&str>,
96) -> Result<FileAnalysisResult> {
97    process_file_entries_streaming_with_progress(file_path, namespace_filter, None)
98}
99
100/// Process entries with optional local progress tracking
101fn process_file_entries_streaming_with_progress(
102    file_path: &str,
103    namespace_filter: Option<&str>,
104    local_progress: Option<&(
105        std::sync::Arc<std::sync::atomic::AtomicUsize>,
106        std::sync::Arc<std::sync::Mutex<crate::utils::progress::ProgressBar>>,
107    )>,
108) -> Result<FileAnalysisResult> {
109    use crate::utils::reader::open_file;
110    use std::io::{BufRead, BufReader};
111
112    let mut path_operations: HashMap<String, PathData> = HashMap::with_capacity(5000);
113    let mut operation_types: HashMap<String, usize> = HashMap::with_capacity(20);
114    let mut path_prefixes: HashMap<String, usize> = HashMap::with_capacity(100);
115    let mut entity_paths: HashMap<String, HashMap<String, usize>> = HashMap::with_capacity(2000);
116    let mut entity_names: HashMap<String, String> = HashMap::with_capacity(2000);
117
118    let file = open_file(file_path)?;
119    let reader = BufReader::new(file);
120
121    let mut lines_processed = 0;
122
123    for line_result in reader.lines() {
124        let line = line_result?;
125        lines_processed += 1;
126
127        // Update progress every 2000 lines for responsive feedback
128        if lines_processed % 2000 == 0 {
129            // Try local progress first (sequential mode), then global (parallel mode)
130            if let Some((processed, progress)) = &local_progress {
131                processed.store(lines_processed, std::sync::atomic::Ordering::Relaxed);
132                if let Ok(p) = progress.lock() {
133                    p.update(lines_processed);
134                }
135            } else if let Some((processed_lines, progress)) = PARALLEL_PROGRESS.get() {
136                processed_lines.fetch_add(2000, std::sync::atomic::Ordering::Relaxed);
137                if let Ok(p) = progress.lock() {
138                    p.update(processed_lines.load(std::sync::atomic::Ordering::Relaxed));
139                }
140            }
141        }
142
143        // Skip empty lines
144        if line.trim().is_empty() {
145            continue;
146        }
147
148        // Parse JSON entry
149        let entry: AuditEntry = match serde_json::from_str(&line) {
150            Ok(entry) => entry,
151            Err(_) => continue, // Skip invalid JSON lines
152        };
153
154        // Apply namespace filter if specified
155        if let Some(filter_ns) = namespace_filter {
156            if entry.namespace_id() != Some(filter_ns) {
157                continue;
158            }
159        }
160
161        let Some(request) = &entry.request else {
162            continue;
163        };
164
165        let path = match &request.path {
166            Some(p) => p.as_str(),
167            None => continue,
168        };
169
170        let operation = match &request.operation {
171            Some(o) => o.as_str(),
172            None => continue,
173        };
174
175        let entity_id = entry
176            .auth
177            .as_ref()
178            .and_then(|a| a.entity_id.as_deref())
179            .unwrap_or("no-entity");
180
181        let display_name = entry
182            .auth
183            .as_ref()
184            .and_then(|a| a.display_name.as_deref())
185            .unwrap_or("N/A");
186
187        if path.is_empty() || operation.is_empty() {
188            continue;
189        }
190
191        // Track by full path
192        let path_data = path_operations
193            .entry(path.to_string())
194            .or_insert_with(PathData::new);
195        path_data.count += 1;
196        *path_data
197            .operations
198            .entry(operation.to_string())
199            .or_insert(0) += 1;
200        path_data.entities.insert(entity_id.to_string());
201
202        // Track by operation type
203        *operation_types.entry(operation.to_string()).or_insert(0) += 1;
204
205        // Track by path prefix
206        let parts: Vec<&str> = path.trim_matches('/').split('/').collect();
207        let prefix = if parts.len() >= 2 {
208            format!("{}/{}", parts[0], parts[1])
209        } else if !parts.is_empty() {
210            parts[0].to_string()
211        } else {
212            "root".to_string()
213        };
214        *path_prefixes.entry(prefix).or_insert(0) += 1;
215
216        // Track entity usage
217        let entity_map = entity_paths.entry(entity_id.to_string()).or_default();
218        *entity_map.entry(path.to_string()).or_insert(0) += 1;
219        entity_names
220            .entry(entity_id.to_string())
221            .or_insert_with(|| display_name.to_string());
222    }
223
224    // Update progress for remaining lines
225    let remaining = lines_processed % 2000;
226    if remaining > 0 {
227        if let Some((processed, progress)) = &local_progress {
228            processed.store(lines_processed, std::sync::atomic::Ordering::Relaxed);
229            if let Ok(p) = progress.lock() {
230                p.update(lines_processed);
231            }
232        } else if let Some((processed_lines, progress)) = PARALLEL_PROGRESS.get() {
233            processed_lines.fetch_add(remaining, std::sync::atomic::Ordering::Relaxed);
234            if let Ok(p) = progress.lock() {
235                p.update(processed_lines.load(std::sync::atomic::Ordering::Relaxed));
236            }
237        }
238    }
239
240    Ok(FileAnalysisResult {
241        path_operations,
242        operation_types,
243        path_prefixes,
244        entity_paths,
245        entity_names,
246    })
247}
248
249/// Combine results from multiple files
250fn combine_results(
251    results: Vec<crate::utils::parallel::FileProcessResult<FileAnalysisResult>>,
252) -> FileAnalysisResult {
253    let mut combined = FileAnalysisResult {
254        path_operations: HashMap::with_capacity(5000),
255        operation_types: HashMap::with_capacity(20),
256        path_prefixes: HashMap::with_capacity(100),
257        entity_paths: HashMap::with_capacity(2000),
258        entity_names: HashMap::with_capacity(2000),
259    };
260
261    for file_result in results {
262        let result = file_result.data;
263
264        // Merge path operations
265        for (path, path_data) in result.path_operations {
266            if let Some(existing) = combined.path_operations.get_mut(&path) {
267                existing.count += path_data.count;
268                for (op, count) in path_data.operations {
269                    *existing.operations.entry(op).or_insert(0) += count;
270                }
271                existing.entities.extend(path_data.entities);
272            } else {
273                combined.path_operations.insert(path, path_data);
274            }
275        }
276
277        // Merge operation types
278        for (op, count) in result.operation_types {
279            *combined.operation_types.entry(op).or_insert(0) += count;
280        }
281
282        // Merge path prefixes
283        for (prefix, count) in result.path_prefixes {
284            *combined.path_prefixes.entry(prefix).or_insert(0) += count;
285        }
286
287        // Merge entity paths
288        for (entity_id, paths) in result.entity_paths {
289            let entity_map = combined.entity_paths.entry(entity_id).or_default();
290            for (path, count) in paths {
291                *entity_map.entry(path).or_insert(0) += count;
292            }
293        }
294
295        // Merge entity names (first occurrence wins)
296        for (entity_id, name) in result.entity_names {
297            combined.entity_names.entry(entity_id).or_insert(name);
298        }
299    }
300
301    combined
302}
303
304/// Sequential processing fallback for compatibility and single files
305fn run_sequential(
306    log_files: &[String],
307    namespace_filter: Option<&str>,
308) -> Result<(FileAnalysisResult, usize)> {
309    let mut combined_result = FileAnalysisResult {
310        path_operations: HashMap::with_capacity(5000),
311        operation_types: HashMap::with_capacity(20),
312        path_prefixes: HashMap::with_capacity(100),
313        entity_paths: HashMap::with_capacity(2000),
314        entity_names: HashMap::with_capacity(2000),
315    };
316    let mut total_lines = 0;
317
318    // Process each file sequentially using streaming to avoid memory issues
319    for (file_idx, log_file) in log_files.iter().enumerate() {
320        eprintln!(
321            "[{}/{}] Processing: {}",
322            file_idx + 1,
323            log_files.len(),
324            log_file
325        );
326
327        // Count lines in file first for accurate progress tracking
328        // Works for both compressed and uncompressed files
329        eprintln!("Scanning file to determine total lines...");
330        let file_lines = crate::utils::parallel::count_file_lines(log_file)?;
331
332        // Set up progress bar with line count
333        let progress = crate::utils::progress::ProgressBar::new(file_lines, "Processing");
334        let progress = std::sync::Arc::new(std::sync::Mutex::new(progress));
335        let processed = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
336
337        // Use streaming processor with local progress tracking
338        let local_progress_tuple = (processed.clone(), progress.clone());
339        let file_result = process_file_entries_streaming_with_progress(
340            log_file,
341            namespace_filter,
342            Some(&local_progress_tuple),
343        )?;
344
345        total_lines += file_lines;
346
347        // Finish progress bar
348        if let Ok(p) = progress.lock() {
349            p.update(file_lines);
350            p.finish_with_message(&format!(
351                "Processed {} lines from this file",
352                format_number(file_lines)
353            ));
354        }
355
356        // Merge path operations
357        for (path, path_data) in file_result.path_operations {
358            if let Some(existing) = combined_result.path_operations.get_mut(&path) {
359                existing.count += path_data.count;
360                for (op, count) in path_data.operations {
361                    *existing.operations.entry(op).or_insert(0) += count;
362                }
363                existing.entities.extend(path_data.entities);
364            } else {
365                combined_result.path_operations.insert(path, path_data);
366            }
367        }
368
369        // Merge operation types
370        for (op, count) in file_result.operation_types {
371            *combined_result.operation_types.entry(op).or_insert(0) += count;
372        }
373
374        // Merge path prefixes
375        for (prefix, count) in file_result.path_prefixes {
376            *combined_result.path_prefixes.entry(prefix).or_insert(0) += count;
377        }
378
379        // Merge entity paths
380        for (entity_id, paths) in file_result.entity_paths {
381            let entity_map = combined_result.entity_paths.entry(entity_id).or_default();
382            for (path, count) in paths {
383                *entity_map.entry(path).or_insert(0) += count;
384            }
385        }
386
387        // Merge entity names
388        for (entity_id, name) in file_result.entity_names {
389            combined_result
390                .entity_names
391                .entry(entity_id)
392                .or_insert(name);
393        }
394    }
395
396    Ok((combined_result, total_lines))
397}
398
399pub fn run(
400    log_files: &[String],
401    top: usize,
402    min_operations: usize,
403    namespace_filter: Option<&str>,
404    sequential: bool,
405) -> Result<()> {
406    let (combined_result, total_lines) = if sequential || log_files.len() == 1 {
407        // Use sequential processing for single files or when explicitly requested
408        eprintln!("Processing {} files sequentially...", log_files.len());
409        run_sequential(log_files, namespace_filter)?
410    } else {
411        // Use parallel processing for multiple files with streaming
412        // Create a closure that captures namespace_filter
413        let processor =
414            |file_path: &str| process_file_entries_streaming(file_path, namespace_filter);
415        process_files_parallel(log_files, processor, combine_results)?
416    };
417
418    eprintln!("\nTotal: Processed {} lines", format_number(total_lines));
419
420    let path_operations = combined_result.path_operations;
421    let operation_types = combined_result.operation_types;
422    let path_prefixes = combined_result.path_prefixes;
423    let entity_paths = combined_result.entity_paths;
424    let entity_names = combined_result.entity_names;
425
426    let total_operations: usize = operation_types.values().sum();
427
428    // Print results
429    println!("\n{}", "=".repeat(100));
430    println!("High-Volume Vault Operations Analysis");
431    println!("{}", "=".repeat(100));
432
433    // 1. Operation Types Summary
434    println!("\n1. Operation Types (Overall)");
435    println!("{}", "-".repeat(100));
436    println!("{:<20} {:>15} {:>12}", "Operation", "Count", "Percentage");
437    println!("{}", "-".repeat(100));
438
439    let mut sorted_ops: Vec<_> = operation_types.iter().collect();
440    sorted_ops.sort_by(|a, b| b.1.cmp(a.1));
441
442    for (op, count) in sorted_ops {
443        let pct = if total_operations > 0 {
444            (*count as f64 / total_operations as f64) * 100.0
445        } else {
446            0.0
447        };
448        println!("{:<20} {:>15} {:>11.2}%", op, format_number(*count), pct);
449    }
450
451    println!("{}", "-".repeat(100));
452    println!(
453        "{:<20} {:>15} {:>11.2}%",
454        "TOTAL",
455        format_number(total_operations),
456        100.0
457    );
458
459    // 2. Top Path Prefixes
460    println!("\n2. Top Path Prefixes (First 2 components)");
461    println!("{}", "-".repeat(100));
462    println!(
463        "{:<40} {:>15} {:>12}",
464        "Path Prefix", "Operations", "Percentage"
465    );
466    println!("{}", "-".repeat(100));
467
468    let mut sorted_prefixes: Vec<_> = path_prefixes.iter().collect();
469    sorted_prefixes.sort_by(|a, b| b.1.cmp(a.1));
470
471    for (prefix, count) in sorted_prefixes.iter().take(top) {
472        let pct = if total_operations > 0 {
473            (**count as f64 / total_operations as f64) * 100.0
474        } else {
475            0.0
476        };
477        println!(
478            "{:<40} {:>15} {:>11.2}%",
479            prefix,
480            format_number(**count),
481            pct
482        );
483    }
484
485    // 3. Top Individual Paths
486    println!("\n3. Top {} Individual Paths (Highest Volume)", top);
487    println!("{}", "-".repeat(100));
488    println!(
489        "{:<60} {:>10} {:>10} {:>15}",
490        "Path", "Ops", "Entities", "Top Op"
491    );
492    println!("{}", "-".repeat(100));
493
494    let mut sorted_paths: Vec<_> = path_operations.iter().collect();
495    sorted_paths.sort_by(|a, b| b.1.count.cmp(&a.1.count));
496
497    for (path, data) in sorted_paths.iter().take(top) {
498        if data.count < min_operations {
499            break;
500        }
501        let top_op = data
502            .operations
503            .iter()
504            .max_by_key(|x| x.1)
505            .map_or("N/A", |x| x.0.as_str());
506        let path_display = if path.len() > 60 {
507            format!("{}...", &path[..58])
508        } else {
509            (*path).to_string()
510        };
511        println!(
512            "{:<60} {:>10} {:>10} {:>15}",
513            path_display,
514            format_number(data.count),
515            format_number(data.entities.len()),
516            top_op
517        );
518    }
519
520    // 4. Top Entities by Total Operations
521    println!("\n4. Top {} Entities by Total Operations", top);
522    println!("{}", "-".repeat(100));
523    println!(
524        "{:<50} {:<38} {:>10}",
525        "Display Name", "Entity ID", "Total Ops"
526    );
527    println!("{}", "-".repeat(100));
528
529    let mut entity_totals: HashMap<String, usize> = HashMap::with_capacity(entity_paths.len());
530    for (entity_id, paths) in &entity_paths {
531        let total: usize = paths.values().sum();
532        entity_totals.insert(entity_id.clone(), total);
533    }
534
535    let mut sorted_entities: Vec<_> = entity_totals.iter().collect();
536    sorted_entities.sort_by(|a, b| b.1.cmp(a.1));
537
538    for (entity_id, total) in sorted_entities.iter().take(top) {
539        let name = entity_names
540            .get(*entity_id)
541            .map_or("N/A", std::string::String::as_str);
542        let name_display = if name.len() > 48 { &name[..48] } else { name };
543        let entity_short = if entity_id.len() > 36 {
544            &entity_id[..36]
545        } else {
546            entity_id
547        };
548        println!(
549            "{:<50} {:<38} {:>10}",
550            name_display,
551            entity_short,
552            format_number(**total)
553        );
554    }
555
556    // 5. Potential Stress Points
557    println!("\n5. Potential System Stress Points");
558    println!("{}", "-".repeat(100));
559
560    #[derive(Debug)]
561    struct StressPoint {
562        path: String,
563        entity_name: String,
564        operations: usize,
565    }
566
567    let mut stress_points = Vec::new();
568
569    for (path, data) in &path_operations {
570        if data.count >= min_operations {
571            for entity_id in &data.entities {
572                if let Some(entity_ops_map) = entity_paths.get(entity_id) {
573                    if let Some(&entity_ops) = entity_ops_map.get(path) {
574                        if entity_ops >= min_operations {
575                            stress_points.push(StressPoint {
576                                path: path.clone(),
577                                entity_name: entity_names
578                                    .get(entity_id)
579                                    .cloned()
580                                    .unwrap_or_else(|| "N/A".to_string()),
581                                operations: entity_ops,
582                            });
583                        }
584                    }
585                }
586            }
587        }
588    }
589
590    stress_points.sort_by(|a, b| b.operations.cmp(&a.operations));
591
592    println!("{:<40} {:<40} {:>10}", "Entity", "Path", "Ops");
593    println!("{}", "-".repeat(100));
594
595    for sp in stress_points.iter().take(top) {
596        let entity_display = if sp.entity_name.len() > 38 {
597            &sp.entity_name[..38]
598        } else {
599            &sp.entity_name
600        };
601        let path_display = if sp.path.len() > 38 {
602            &sp.path[..38]
603        } else {
604            &sp.path
605        };
606        println!(
607            "{:<40} {:<40} {:>10}",
608            entity_display,
609            path_display,
610            format_number(sp.operations)
611        );
612    }
613
614    println!("{}", "=".repeat(100));
615    println!("\nTotal Lines Processed: {}", format_number(total_lines));
616    println!("Total Operations: {}", format_number(total_operations));
617    println!("{}", "=".repeat(100));
618
619    Ok(())
620}