vault_audit_tools/utils/
parallel.rs1use crate::audit::types::AuditEntry;
8use crate::utils::progress::ProgressBar;
9use crate::utils::reader::open_file;
10use anyhow::{Context, Result};
11use rayon::prelude::*;
12use std::io::{BufRead, BufReader};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16#[derive(Debug)]
18#[allow(dead_code)]
19pub struct FileProcessResult<T> {
20 pub file_path: String,
21 pub lines_processed: usize,
22 pub data: T,
23}
24
25pub fn process_files_parallel<T, F, C, R>(
39 files: &[String],
40 processor: F,
41 combiner: C,
42) -> Result<(R, usize)>
43where
44 T: Send + 'static,
45 R: Send + 'static,
46 F: Fn(&str) -> Result<T> + Send + Sync,
47 C: Fn(Vec<FileProcessResult<T>>) -> R + Send + Sync,
48{
49 if files.is_empty() {
50 return Err(anyhow::anyhow!("No files provided for processing"));
51 }
52
53 eprintln!("Processing {} files in parallel...", files.len());
54
55 eprintln!("Scanning files to determine total work...");
57 let total_lines_to_process: usize = files
58 .par_iter()
59 .map(|file_path| count_file_lines(file_path).unwrap_or(0))
60 .sum();
61
62 eprintln!(
63 "Total lines to process: {}",
64 crate::utils::format::format_number(total_lines_to_process)
65 );
66
67 let processed_lines = Arc::new(AtomicUsize::new(0));
68 let progress = Arc::new(Mutex::new(ProgressBar::new(
69 total_lines_to_process,
70 "Processing",
71 )));
72
73 crate::commands::system_overview::init_parallel_progress(
75 processed_lines.clone(),
76 progress.clone(),
77 );
78
79 let results: Result<Vec<_>> = files
81 .par_iter()
82 .enumerate()
83 .map(|(idx, file_path)| -> Result<FileProcessResult<T>> {
84 let data = processor(file_path)
88 .with_context(|| format!("Failed to process file: {}", file_path))?;
89
90 let lines_count = count_file_lines(file_path)?;
92
93 if let Ok(progress) = progress.lock() {
95 progress.println(format!(
96 "[{}/{}] ✓ Completed: {} ({} lines)",
97 idx + 1,
98 files.len(),
99 file_path.split('/').next_back().unwrap_or(file_path),
100 crate::utils::format::format_number(lines_count)
101 ));
102 }
103
104 Ok(FileProcessResult {
105 file_path: file_path.clone(),
106 lines_processed: lines_count,
107 data,
108 })
109 })
110 .collect();
111
112 let results = results?;
113 let total_lines_processed = processed_lines.load(Ordering::Relaxed);
114
115 if let Ok(progress) = progress.lock() {
116 progress.finish_with_message(&format!("Processed {} total lines", total_lines_processed));
117 }
118
119 let result = combiner(results);
121
122 Ok((result, total_lines_processed))
123}
124
125pub fn count_file_lines(file_path: &str) -> Result<usize> {
127 let file =
128 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
129 let reader = BufReader::new(file);
130
131 let mut count = 0;
132 for line_result in reader.lines() {
133 line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
134 count += 1;
135 }
136
137 Ok(count)
138}
139
140#[allow(dead_code)]
145pub fn process_file_streaming<T, F>(file_path: &str, mut processor: F) -> Result<T>
146where
147 F: FnMut(&AuditEntry),
148 T: Default,
149{
150 let file =
151 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
152 let reader = BufReader::new(file);
153
154 for (line_num, line_result) in reader.lines().enumerate() {
155 let line = line_result
156 .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
157
158 if line.trim().is_empty() {
160 continue;
161 }
162
163 if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
165 processor(&entry);
166 }
167 }
168
169 Ok(T::default())
170}
171
172#[allow(dead_code)]
174fn read_file_entries(file_path: &str) -> Result<Vec<AuditEntry>> {
175 let file =
176 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
177 let reader = BufReader::new(file);
178
179 let mut entries = Vec::new();
180
181 for (line_num, line) in reader.lines().enumerate() {
182 let line = line
183 .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
184
185 if line.trim().is_empty() {
187 continue;
188 }
189
190 if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
192 entries.push(entry);
193 }
194 }
195
196 Ok(entries)
197}
198
199#[allow(dead_code)]
204pub fn process_files_aggregate<T, F, A>(
205 files: &[String],
206 processor: F,
207 aggregator: A,
208 initial: T,
209) -> Result<(T, usize)>
210where
211 T: Send + Clone + Sync + 'static,
212 F: Fn(&str) -> Result<T> + Send + Sync,
213 A: Fn(T, T) -> T + Send + Sync,
214{
215 process_files_parallel(files, processor, |results| {
216 results
217 .into_iter()
218 .fold(initial.clone(), |acc, result| aggregator(acc, result.data))
219 })
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use std::io::Write;
226 use tempfile::NamedTempFile;
227
228 #[test]
229 fn test_parallel_processing() {
230 let mut files = Vec::new();
232 let _temp_files: Vec<NamedTempFile> = (0..3).map(|i| {
233 let mut temp_file = NamedTempFile::new().unwrap();
234 writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i, i).unwrap();
235 writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i+1, i).unwrap();
236
237 files.push(temp_file.path().to_str().unwrap().to_string());
238 temp_file
239 }).collect();
240
241 let (results, _total_lines) = process_files_parallel(
243 &files,
244 |file_path| -> Result<usize> {
245 let file = open_file(file_path)?;
246 let reader = BufReader::new(file);
247 let mut count = 0;
248 for line_result in reader.lines() {
249 let line = line_result?;
250 if line.trim().is_empty() {
251 continue;
252 }
253 if serde_json::from_str::<AuditEntry>(&line).is_ok() {
254 count += 1;
255 }
256 }
257 Ok(count)
258 },
259 |results| results.into_iter().map(|r| r.data).sum::<usize>(),
260 )
261 .unwrap();
262
263 assert_eq!(results, 6); }
267}