vault_audit_tools/utils/
parallel.rs1use crate::audit::types::AuditEntry;
8use crate::utils::progress::ProgressBar;
9use crate::utils::reader::open_file;
10use anyhow::{Context, Result};
11use rayon::prelude::*;
12use std::io::{BufRead, BufReader};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16#[derive(Debug)]
18#[allow(dead_code)]
19pub struct FileProcessResult<T> {
20 pub file_path: String,
21 pub lines_processed: usize,
22 pub data: T,
23}
24
25pub fn process_files_parallel<T, F, C, R>(
39 files: &[String],
40 processor: F,
41 combiner: C,
42) -> Result<(R, usize)>
43where
44 T: Send + 'static,
45 R: Send + 'static,
46 F: Fn(&str) -> Result<T> + Send + Sync,
47 C: Fn(Vec<FileProcessResult<T>>) -> R + Send + Sync,
48{
49 if files.is_empty() {
50 return Err(anyhow::anyhow!("No files provided for processing"));
51 }
52
53 eprintln!("Processing {} files in parallel...", files.len());
54
55 eprintln!("Scanning files to determine total work...");
57 let total_lines_to_process: usize = files
58 .par_iter()
59 .map(|file_path| count_file_lines(file_path).unwrap_or(0))
60 .sum();
61
62 eprintln!(
63 "Total lines to process: {}",
64 crate::utils::format::format_number(total_lines_to_process)
65 );
66
67 let processed_lines = Arc::new(AtomicUsize::new(0));
68 let progress = Arc::new(Mutex::new(ProgressBar::new(
69 total_lines_to_process,
70 "Processing",
71 )));
72
73 crate::commands::system_overview::init_parallel_progress(
75 processed_lines.clone(),
76 progress.clone(),
77 );
78
79 let results: Result<Vec<_>> = files
81 .par_iter()
82 .enumerate()
83 .map(|(idx, file_path)| -> Result<FileProcessResult<T>> {
84 let data = processor(file_path)
88 .with_context(|| format!("Failed to process file: {}", file_path))?;
89
90 let lines_count = count_file_lines(file_path)?;
92
93 if let Ok(mut progress) = progress.lock() {
95 eprint!("\r"); eprint!("{}", " ".repeat(100)); eprint!("\r"); eprintln!(
99 "[{}/{}] ✓ Completed: {} ({} lines)",
100 idx + 1,
101 files.len(),
102 file_path.split('/').next_back().unwrap_or(file_path),
103 crate::utils::format::format_number(lines_count)
104 );
105 progress.render();
107 }
108
109 Ok(FileProcessResult {
110 file_path: file_path.clone(),
111 lines_processed: lines_count,
112 data,
113 })
114 })
115 .collect();
116
117 let results = results?;
118 let total_lines_processed = processed_lines.load(Ordering::Relaxed);
119
120 if let Ok(mut progress) = progress.lock() {
121 eprint!("\r");
123 eprint!("{}", " ".repeat(80));
124 eprint!("\r");
125 progress.finish_with_message(&format!("Processed {} total lines", total_lines_processed));
126 }
127
128 let result = combiner(results);
130
131 Ok((result, total_lines_processed))
132}
133
134fn count_file_lines(file_path: &str) -> Result<usize> {
136 let file =
137 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
138 let reader = BufReader::new(file);
139
140 let mut count = 0;
141 for line_result in reader.lines() {
142 line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
143 count += 1;
144 }
145
146 Ok(count)
147}
148
149#[allow(dead_code)]
154pub fn process_file_streaming<T, F>(file_path: &str, mut processor: F) -> Result<T>
155where
156 F: FnMut(&AuditEntry),
157 T: Default,
158{
159 let file =
160 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
161 let reader = BufReader::new(file);
162
163 for (line_num, line_result) in reader.lines().enumerate() {
164 let line = line_result
165 .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
166
167 if line.trim().is_empty() {
169 continue;
170 }
171
172 if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
174 processor(&entry);
175 }
176 }
177
178 Ok(T::default())
179}
180
181#[allow(dead_code)]
183fn read_file_entries(file_path: &str) -> Result<Vec<AuditEntry>> {
184 let file =
185 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
186 let reader = BufReader::new(file);
187
188 let mut entries = Vec::new();
189
190 for (line_num, line) in reader.lines().enumerate() {
191 let line = line
192 .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
193
194 if line.trim().is_empty() {
196 continue;
197 }
198
199 if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
201 entries.push(entry);
202 }
203 }
204
205 Ok(entries)
206}
207
208#[allow(dead_code)]
213pub fn process_files_aggregate<T, F, A>(
214 files: &[String],
215 processor: F,
216 aggregator: A,
217 initial: T,
218) -> Result<(T, usize)>
219where
220 T: Send + Clone + Sync + 'static,
221 F: Fn(&str) -> Result<T> + Send + Sync,
222 A: Fn(T, T) -> T + Send + Sync,
223{
224 process_files_parallel(files, processor, |results| {
225 results
226 .into_iter()
227 .fold(initial.clone(), |acc, result| aggregator(acc, result.data))
228 })
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use std::io::Write;
235 use tempfile::NamedTempFile;
236
237 #[test]
238 fn test_parallel_processing() {
239 let mut files = Vec::new();
241 let _temp_files: Vec<NamedTempFile> = (0..3).map(|i| {
242 let mut temp_file = NamedTempFile::new().unwrap();
243 writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i, i).unwrap();
244 writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i+1, i).unwrap();
245
246 files.push(temp_file.path().to_str().unwrap().to_string());
247 temp_file
248 }).collect();
249
250 let (results, _total_lines) = process_files_parallel(
252 &files,
253 |file_path| -> Result<usize> {
254 let file = open_file(file_path)?;
255 let reader = BufReader::new(file);
256 let mut count = 0;
257 for line_result in reader.lines() {
258 let line = line_result?;
259 if line.trim().is_empty() {
260 continue;
261 }
262 if serde_json::from_str::<AuditEntry>(&line).is_ok() {
263 count += 1;
264 }
265 }
266 Ok(count)
267 },
268 |results| results.into_iter().map(|r| r.data).sum::<usize>(),
269 )
270 .unwrap();
271
272 assert_eq!(results, 6); }
276}