vault_audit_tools/utils/
parallel.rs

1//! Parallel file processing utilities.
2//!
3//! This module provides high-performance parallel processing of multiple audit log files
4//! using Rayon for CPU-bound workloads. Files are processed concurrently with proper
5//! progress tracking and error handling.
6
7use crate::audit::types::AuditEntry;
8use crate::utils::progress::ProgressBar;
9use crate::utils::reader::open_file;
10use anyhow::{Context, Result};
11use rayon::prelude::*;
12use std::io::{BufRead, BufReader};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16/// Result of processing a single file
17#[derive(Debug)]
18#[allow(dead_code)]
19pub struct FileProcessResult<T> {
20    pub file_path: String,
21    pub lines_processed: usize,
22    pub data: T,
23}
24
25/// Process multiple files in parallel with memory-efficient streaming
26///
27/// This function processes files concurrently using a streaming approach that
28/// processes entries line-by-line without loading entire files into memory.
29/// This prevents memory allocation failures on large files.
30///
31/// # Arguments
32/// * `files` - List of file paths to process
33/// * `processor` - Function that processes a single file with streaming callback
34/// * `combiner` - Function that combines results from all files
35///
36/// # Returns
37/// Combined result from all files plus total lines processed
38pub fn process_files_parallel<T, F, C, R>(
39    files: &[String],
40    processor: F,
41    combiner: C,
42) -> Result<(R, usize)>
43where
44    T: Send + 'static,
45    R: Send + 'static,
46    F: Fn(&str) -> Result<T> + Send + Sync,
47    C: Fn(Vec<FileProcessResult<T>>) -> R + Send + Sync,
48{
49    if files.is_empty() {
50        return Err(anyhow::anyhow!("No files provided for processing"));
51    }
52
53    eprintln!("Processing {} files in parallel...", files.len());
54
55    // First pass: count total lines across all files for accurate progress
56    eprintln!("Scanning files to determine total work...");
57    let total_lines_to_process: usize = files
58        .par_iter()
59        .map(|file_path| count_file_lines(file_path).unwrap_or(0))
60        .sum();
61
62    eprintln!(
63        "Total lines to process: {}",
64        crate::utils::format::format_number(total_lines_to_process)
65    );
66
67    let processed_lines = Arc::new(AtomicUsize::new(0));
68    let progress = Arc::new(Mutex::new(ProgressBar::new(
69        total_lines_to_process,
70        "Processing",
71    )));
72
73    // Initialize global progress for system_overview streaming
74    crate::commands::system_overview::init_parallel_progress(
75        processed_lines.clone(),
76        progress.clone(),
77    );
78
79    // Process files in parallel
80    let results: Result<Vec<_>> = files
81        .par_iter()
82        .enumerate()
83        .map(|(idx, file_path)| -> Result<FileProcessResult<T>> {
84            // Don't print starting messages to avoid interfering with progress bar
85
86            // Process file using streaming approach (progress updated internally)
87            let data = processor(file_path)
88                .with_context(|| format!("Failed to process file: {}", file_path))?;
89
90            // Count lines for completion message
91            let lines_count = count_file_lines(file_path)?;
92
93            // Print completion message above progress bar
94            if let Ok(progress) = progress.lock() {
95                progress.println(format!(
96                    "[{}/{}] ✓ Completed: {} ({} lines)",
97                    idx + 1,
98                    files.len(),
99                    file_path.split('/').next_back().unwrap_or(file_path),
100                    crate::utils::format::format_number(lines_count)
101                ));
102            }
103
104            Ok(FileProcessResult {
105                file_path: file_path.clone(),
106                lines_processed: lines_count,
107                data,
108            })
109        })
110        .collect();
111
112    let results = results?;
113    let total_lines_processed = processed_lines.load(Ordering::Relaxed);
114
115    if let Ok(progress) = progress.lock() {
116        progress.finish_with_message(&format!("Processed {} total lines", total_lines_processed));
117    }
118
119    // Combine results
120    let result = combiner(results);
121
122    Ok((result, total_lines_processed))
123}
124
125/// Count lines in a file for progress tracking (lightweight)
126pub fn count_file_lines(file_path: &str) -> Result<usize> {
127    let file =
128        open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
129    let reader = BufReader::new(file);
130
131    let mut count = 0;
132    for line_result in reader.lines() {
133        line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
134        count += 1;
135    }
136
137    Ok(count)
138}
139
140/// Process a file with streaming to minimize memory usage
141///
142/// This function processes audit entries one-by-one instead of loading
143/// the entire file into memory, making it suitable for very large files.
144#[allow(dead_code)]
145pub fn process_file_streaming<T, F>(file_path: &str, mut processor: F) -> Result<T>
146where
147    F: FnMut(&AuditEntry),
148    T: Default,
149{
150    let file =
151        open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
152    let reader = BufReader::new(file);
153
154    for (line_num, line_result) in reader.lines().enumerate() {
155        let line = line_result
156            .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
157
158        // Skip empty lines
159        if line.trim().is_empty() {
160            continue;
161        }
162
163        // Parse and process entry immediately
164        if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
165            processor(&entry);
166        }
167    }
168
169    Ok(T::default())
170}
171
172/// Read all entries from a single file (kept for compatibility but not recommended for large files)
173#[allow(dead_code)]
174fn read_file_entries(file_path: &str) -> Result<Vec<AuditEntry>> {
175    let file =
176        open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
177    let reader = BufReader::new(file);
178
179    let mut entries = Vec::new();
180
181    for (line_num, line) in reader.lines().enumerate() {
182        let line = line
183            .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
184
185        // Skip empty lines
186        if line.trim().is_empty() {
187            continue;
188        }
189
190        // Parse JSON entry
191        if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
192            entries.push(entry);
193        }
194    }
195
196    Ok(entries)
197}
198
199/// Process multiple files with simple aggregation (sum, count, etc.)
200///
201/// This is a simpler version for cases where you just need to aggregate
202/// simple metrics across files.
203#[allow(dead_code)]
204pub fn process_files_aggregate<T, F, A>(
205    files: &[String],
206    processor: F,
207    aggregator: A,
208    initial: T,
209) -> Result<(T, usize)>
210where
211    T: Send + Clone + Sync + 'static,
212    F: Fn(&str) -> Result<T> + Send + Sync,
213    A: Fn(T, T) -> T + Send + Sync,
214{
215    process_files_parallel(files, processor, |results| {
216        results
217            .into_iter()
218            .fold(initial.clone(), |acc, result| aggregator(acc, result.data))
219    })
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use std::io::Write;
226    use tempfile::NamedTempFile;
227
228    #[test]
229    fn test_parallel_processing() {
230        // Create test files
231        let mut files = Vec::new();
232        let _temp_files: Vec<NamedTempFile> = (0..3).map(|i| {
233            let mut temp_file = NamedTempFile::new().unwrap();
234            writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i, i).unwrap();
235            writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i+1, i).unwrap();
236
237            files.push(temp_file.path().to_str().unwrap().to_string());
238            temp_file
239        }).collect();
240
241        // Process files to count entries per file
242        let (results, _total_lines) = process_files_parallel(
243            &files,
244            |file_path| -> Result<usize> {
245                let file = open_file(file_path)?;
246                let reader = BufReader::new(file);
247                let mut count = 0;
248                for line_result in reader.lines() {
249                    let line = line_result?;
250                    if line.trim().is_empty() {
251                        continue;
252                    }
253                    if serde_json::from_str::<AuditEntry>(&line).is_ok() {
254                        count += 1;
255                    }
256                }
257                Ok(count)
258            },
259            |results| results.into_iter().map(|r| r.data).sum::<usize>(),
260        )
261        .unwrap();
262
263        assert_eq!(results, 6); // 2 entries per file * 3 files
264                                // Note: total_lines from atomic counter is only updated by streaming processors
265                                // that explicitly call the global progress tracker
266    }
267}