vault_audit_tools/utils/
parallel.rs

1//! Parallel file processing utilities.
2//!
3//! This module provides high-performance parallel processing of multiple audit log files
4//! using Rayon for CPU-bound workloads. Files are processed concurrently with proper
5//! progress tracking and error handling.
6
7use crate::audit::types::AuditEntry;
8use crate::utils::progress::ProgressBar;
9use crate::utils::reader::open_file;
10use anyhow::{Context, Result};
11use rayon::prelude::*;
12use std::io::{BufRead, BufReader};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16/// Result of processing a single file
17#[derive(Debug)]
18#[allow(dead_code)]
19pub struct FileProcessResult<T> {
20    pub file_path: String,
21    pub lines_processed: usize,
22    pub data: T,
23}
24
25/// Process multiple files in parallel with memory-efficient streaming
26///
27/// This function processes files concurrently using a streaming approach that
28/// processes entries line-by-line without loading entire files into memory.
29/// This prevents memory allocation failures on large files.
30///
31/// # Arguments
32/// * `files` - List of file paths to process
33/// * `processor` - Function that processes a single file with streaming callback
34/// * `combiner` - Function that combines results from all files
35///
36/// # Returns
37/// Combined result from all files plus total lines processed
38pub fn process_files_parallel<T, F, C, R>(
39    files: &[String],
40    processor: F,
41    combiner: C,
42) -> Result<(R, usize)>
43where
44    T: Send + 'static,
45    R: Send + 'static,
46    F: Fn(&str) -> Result<T> + Send + Sync,
47    C: Fn(Vec<FileProcessResult<T>>) -> R + Send + Sync,
48{
49    if files.is_empty() {
50        return Err(anyhow::anyhow!("No files provided for processing"));
51    }
52
53    if files.len() == 1 {
54        eprintln!("Processing file...");
55    } else {
56        eprintln!("Processing {} files in parallel...", files.len());
57    }
58
59    // First pass: count total lines across all files for accurate progress
60    eprintln!("Scanning files to determine total work...");
61    let total_lines_to_process: usize = files
62        .par_iter()
63        .map(|file_path| count_file_lines(file_path).unwrap_or(0))
64        .sum();
65
66    eprintln!(
67        "Total lines to process: {}",
68        crate::utils::format::format_number(total_lines_to_process)
69    );
70
71    let processed_lines = Arc::new(AtomicUsize::new(0));
72    let progress = Arc::new(Mutex::new(ProgressBar::new(
73        total_lines_to_process,
74        "Processing",
75    )));
76
77    // Initialize global progress for commands that use streaming
78    crate::commands::system_overview::init_parallel_progress(
79        processed_lines.clone(),
80        progress.clone(),
81    );
82    crate::commands::client_traffic_analysis::init_parallel_progress(
83        processed_lines.clone(),
84        progress.clone(),
85    );
86
87    // Process files in parallel
88    let results: Result<Vec<_>> = files
89        .par_iter()
90        .enumerate()
91        .map(|(idx, file_path)| -> Result<FileProcessResult<T>> {
92            // Don't print starting messages to avoid interfering with progress bar
93
94            // Process file using streaming approach (progress updated internally)
95            let data = processor(file_path)
96                .with_context(|| format!("Failed to process file: {}", file_path))?;
97
98            // Count lines for completion message
99            let lines_count = count_file_lines(file_path)?;
100
101            // Print completion message above progress bar
102            if let Ok(progress) = progress.lock() {
103                progress.println(format!(
104                    "[{}/{}] ✓ Completed: {} ({} lines)",
105                    idx + 1,
106                    files.len(),
107                    file_path.split('/').next_back().unwrap_or(file_path),
108                    crate::utils::format::format_number(lines_count)
109                ));
110            }
111
112            Ok(FileProcessResult {
113                file_path: file_path.clone(),
114                lines_processed: lines_count,
115                data,
116            })
117        })
118        .collect();
119
120    let results = results?;
121    let total_lines_processed = processed_lines.load(Ordering::Relaxed);
122
123    if let Ok(progress) = progress.lock() {
124        progress.finish_with_message(&format!("Processed {} total lines", total_lines_processed));
125    }
126
127    // Combine results
128    let result = combiner(results);
129
130    Ok((result, total_lines_processed))
131}
132
133/// Count lines in a file for progress tracking (lightweight)
134pub fn count_file_lines(file_path: &str) -> Result<usize> {
135    let file =
136        open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
137    let reader = BufReader::new(file);
138
139    let mut count = 0;
140    for line_result in reader.lines() {
141        line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
142        count += 1;
143    }
144
145    Ok(count)
146}
147
148/// Process a file with streaming to minimize memory usage
149///
150/// This function processes audit entries one-by-one instead of loading
151/// the entire file into memory, making it suitable for very large files.
152#[allow(dead_code)]
153pub fn process_file_streaming<T, F>(file_path: &str, mut processor: F) -> Result<T>
154where
155    F: FnMut(&AuditEntry),
156    T: Default,
157{
158    let file =
159        open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
160    let reader = BufReader::new(file);
161
162    for (line_num, line_result) in reader.lines().enumerate() {
163        let line = line_result
164            .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
165
166        // Skip empty lines
167        if line.trim().is_empty() {
168            continue;
169        }
170
171        // Parse and process entry immediately
172        if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
173            processor(&entry);
174        }
175    }
176
177    Ok(T::default())
178}
179
180/// Read all entries from a single file (kept for compatibility but not recommended for large files)
181#[allow(dead_code)]
182fn read_file_entries(file_path: &str) -> Result<Vec<AuditEntry>> {
183    let file =
184        open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
185    let reader = BufReader::new(file);
186
187    let mut entries = Vec::new();
188
189    for (line_num, line) in reader.lines().enumerate() {
190        let line = line
191            .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
192
193        // Skip empty lines
194        if line.trim().is_empty() {
195            continue;
196        }
197
198        // Parse JSON entry
199        if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
200            entries.push(entry);
201        }
202    }
203
204    Ok(entries)
205}
206
207/// Process multiple files with simple aggregation (sum, count, etc.)
208///
209/// This is a simpler version for cases where you just need to aggregate
210/// simple metrics across files.
211#[allow(dead_code)]
212pub fn process_files_aggregate<T, F, A>(
213    files: &[String],
214    processor: F,
215    aggregator: A,
216    initial: T,
217) -> Result<(T, usize)>
218where
219    T: Send + Clone + Sync + 'static,
220    F: Fn(&str) -> Result<T> + Send + Sync,
221    A: Fn(T, T) -> T + Send + Sync,
222{
223    process_files_parallel(files, processor, |results| {
224        results
225            .into_iter()
226            .fold(initial.clone(), |acc, result| aggregator(acc, result.data))
227    })
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use std::io::Write;
234    use tempfile::TempDir;
235
236    #[test]
237    fn test_parallel_processing() {
238        // Create test files
239        let dir = TempDir::new().unwrap();
240        let mut files = Vec::new();
241
242        for i in 0..3 {
243            let path = dir.path().join(format!("test_{}.log", i));
244            {
245                let mut temp_file = std::fs::File::create(&path).unwrap();
246                writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i, i).unwrap();
247                writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i+1, i).unwrap();
248            }
249            files.push(path.to_str().unwrap().to_string());
250        }
251
252        // Process files to count entries per file
253        let (results, _total_lines) = process_files_parallel(
254            &files,
255            |file_path| -> Result<usize> {
256                let file = open_file(file_path)?;
257                let reader = BufReader::new(file);
258                let mut count = 0;
259                for line_result in reader.lines() {
260                    let line = line_result?;
261                    if line.trim().is_empty() {
262                        continue;
263                    }
264                    if serde_json::from_str::<AuditEntry>(&line).is_ok() {
265                        count += 1;
266                    }
267                }
268                Ok(count)
269            },
270            |results| results.into_iter().map(|r| r.data).sum::<usize>(),
271        )
272        .unwrap();
273
274        assert_eq!(results, 6); // 2 entries per file * 3 files
275                                // Note: total_lines from atomic counter is only updated by streaming processors
276                                // that explicitly call the global progress tracker
277    }
278}