vault_audit_tools/utils/
parallel.rs

1//! Parallel file processing utilities.
2//!
3//! This module provides high-performance parallel processing of multiple audit log files
4//! using Rayon for CPU-bound workloads. Files are processed concurrently with proper
5//! progress tracking and error handling.
6
7use crate::audit::types::AuditEntry;
8use crate::utils::progress::ProgressBar;
9use crate::utils::reader::open_file;
10use anyhow::{Context, Result};
11use rayon::prelude::*;
12use std::io::{BufRead, BufReader};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16/// Result of processing a single file
17#[derive(Debug)]
18#[allow(dead_code)]
19pub struct FileProcessResult<T> {
20    pub file_path: String,
21    pub lines_processed: usize,
22    pub data: T,
23}
24
25/// Process multiple files in parallel with memory-efficient streaming
26///
27/// This function processes files concurrently using a streaming approach that
28/// processes entries line-by-line without loading entire files into memory.
29/// This prevents memory allocation failures on large files.
30///
31/// # Arguments
32/// * `files` - List of file paths to process
33/// * `processor` - Function that processes a single file with streaming callback
34/// * `combiner` - Function that combines results from all files
35///
36/// # Returns
37/// Combined result from all files plus total lines processed
38pub fn process_files_parallel<T, F, C, R>(
39    files: &[String],
40    processor: F,
41    combiner: C,
42) -> Result<(R, usize)>
43where
44    T: Send + 'static,
45    R: Send + 'static,
46    F: Fn(&str) -> Result<T> + Send + Sync,
47    C: Fn(Vec<FileProcessResult<T>>) -> R + Send + Sync,
48{
49    if files.is_empty() {
50        return Err(anyhow::anyhow!("No files provided for processing"));
51    }
52
53    eprintln!("Processing {} files in parallel...", files.len());
54
55    // First pass: count total lines across all files for accurate progress
56    eprintln!("Scanning files to determine total work...");
57    let total_lines_to_process: usize = files
58        .par_iter()
59        .map(|file_path| count_file_lines(file_path).unwrap_or(0))
60        .sum();
61
62    eprintln!(
63        "Total lines to process: {}",
64        crate::utils::format::format_number(total_lines_to_process)
65    );
66
67    let processed_lines = Arc::new(AtomicUsize::new(0));
68    let progress = Arc::new(Mutex::new(ProgressBar::new(
69        total_lines_to_process,
70        "Processing",
71    )));
72
73    // Initialize global progress for system_overview streaming
74    crate::commands::system_overview::init_parallel_progress(
75        processed_lines.clone(),
76        progress.clone(),
77    );
78
79    // Process files in parallel
80    let results: Result<Vec<_>> = files
81        .par_iter()
82        .enumerate()
83        .map(|(idx, file_path)| -> Result<FileProcessResult<T>> {
84            // Don't print starting messages to avoid interfering with progress bar
85
86            // Process file using streaming approach (progress updated internally)
87            let data = processor(file_path)
88                .with_context(|| format!("Failed to process file: {}", file_path))?;
89
90            // Count lines for completion message
91            let lines_count = count_file_lines(file_path)?;
92
93            // Print completion message without interfering with progress
94            if let Ok(mut progress) = progress.lock() {
95                eprint!("\r"); // Clear current line
96                eprint!("{}", " ".repeat(100)); // Clear with spaces
97                eprint!("\r"); // Return to start
98                eprintln!(
99                    "[{}/{}] ✓ Completed: {} ({} lines)",
100                    idx + 1,
101                    files.len(),
102                    file_path.split('/').next_back().unwrap_or(file_path),
103                    crate::utils::format::format_number(lines_count)
104                );
105                // Re-render progress bar on new line
106                progress.render();
107            }
108
109            Ok(FileProcessResult {
110                file_path: file_path.clone(),
111                lines_processed: lines_count,
112                data,
113            })
114        })
115        .collect();
116
117    let results = results?;
118    let total_lines_processed = processed_lines.load(Ordering::Relaxed);
119
120    if let Ok(mut progress) = progress.lock() {
121        // Clear the progress line before final message
122        eprint!("\r");
123        eprint!("{}", " ".repeat(80));
124        eprint!("\r");
125        progress.finish_with_message(&format!("Processed {} total lines", total_lines_processed));
126    }
127
128    // Combine results
129    let result = combiner(results);
130
131    Ok((result, total_lines_processed))
132}
133
134/// Count lines in a file for progress tracking (lightweight)
135fn count_file_lines(file_path: &str) -> Result<usize> {
136    let file =
137        open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
138    let reader = BufReader::new(file);
139
140    let mut count = 0;
141    for line_result in reader.lines() {
142        line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
143        count += 1;
144    }
145
146    Ok(count)
147}
148
149/// Process a file with streaming to minimize memory usage
150///
151/// This function processes audit entries one-by-one instead of loading
152/// the entire file into memory, making it suitable for very large files.
153#[allow(dead_code)]
154pub fn process_file_streaming<T, F>(file_path: &str, mut processor: F) -> Result<T>
155where
156    F: FnMut(&AuditEntry),
157    T: Default,
158{
159    let file =
160        open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
161    let reader = BufReader::new(file);
162
163    for (line_num, line_result) in reader.lines().enumerate() {
164        let line = line_result
165            .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
166
167        // Skip empty lines
168        if line.trim().is_empty() {
169            continue;
170        }
171
172        // Parse and process entry immediately
173        if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
174            processor(&entry);
175        }
176    }
177
178    Ok(T::default())
179}
180
181/// Read all entries from a single file (kept for compatibility but not recommended for large files)
182#[allow(dead_code)]
183fn read_file_entries(file_path: &str) -> Result<Vec<AuditEntry>> {
184    let file =
185        open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
186    let reader = BufReader::new(file);
187
188    let mut entries = Vec::new();
189
190    for (line_num, line) in reader.lines().enumerate() {
191        let line = line
192            .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
193
194        // Skip empty lines
195        if line.trim().is_empty() {
196            continue;
197        }
198
199        // Parse JSON entry
200        if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
201            entries.push(entry);
202        }
203    }
204
205    Ok(entries)
206}
207
208/// Process multiple files with simple aggregation (sum, count, etc.)
209///
210/// This is a simpler version for cases where you just need to aggregate
211/// simple metrics across files.
212#[allow(dead_code)]
213pub fn process_files_aggregate<T, F, A>(
214    files: &[String],
215    processor: F,
216    aggregator: A,
217    initial: T,
218) -> Result<(T, usize)>
219where
220    T: Send + Clone + Sync + 'static,
221    F: Fn(&str) -> Result<T> + Send + Sync,
222    A: Fn(T, T) -> T + Send + Sync,
223{
224    process_files_parallel(files, processor, |results| {
225        results
226            .into_iter()
227            .fold(initial.clone(), |acc, result| aggregator(acc, result.data))
228    })
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use std::io::Write;
235    use tempfile::NamedTempFile;
236
237    #[test]
238    fn test_parallel_processing() {
239        // Create test files
240        let mut files = Vec::new();
241        let _temp_files: Vec<NamedTempFile> = (0..3).map(|i| {
242            let mut temp_file = NamedTempFile::new().unwrap();
243            writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i, i).unwrap();
244            writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i+1, i).unwrap();
245
246            files.push(temp_file.path().to_str().unwrap().to_string());
247            temp_file
248        }).collect();
249
250        // Process files to count entries per file
251        let (results, _total_lines) = process_files_parallel(
252            &files,
253            |file_path| -> Result<usize> {
254                let file = open_file(file_path)?;
255                let reader = BufReader::new(file);
256                let mut count = 0;
257                for line_result in reader.lines() {
258                    let line = line_result?;
259                    if line.trim().is_empty() {
260                        continue;
261                    }
262                    if serde_json::from_str::<AuditEntry>(&line).is_ok() {
263                        count += 1;
264                    }
265                }
266                Ok(count)
267            },
268            |results| results.into_iter().map(|r| r.data).sum::<usize>(),
269        )
270        .unwrap();
271
272        assert_eq!(results, 6); // 2 entries per file * 3 files
273                                // Note: total_lines from atomic counter is only updated by streaming processors
274                                // that explicitly call the global progress tracker
275    }
276}