vault_audit_tools/utils/
parallel.rs1use crate::audit::types::AuditEntry;
8use crate::utils::progress::ProgressBar;
9use crate::utils::reader::open_file;
10use anyhow::{Context, Result};
11use rayon::prelude::*;
12use std::io::{BufRead, BufReader};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16#[derive(Debug)]
18#[allow(dead_code)]
19pub struct FileProcessResult<T> {
20 pub file_path: String,
21 pub lines_processed: usize,
22 pub data: T,
23}
24
25pub fn process_files_parallel<T, F, C, R>(
39 files: &[String],
40 processor: F,
41 combiner: C,
42) -> Result<(R, usize)>
43where
44 T: Send + 'static,
45 R: Send + 'static,
46 F: Fn(&str) -> Result<T> + Send + Sync,
47 C: Fn(Vec<FileProcessResult<T>>) -> R + Send + Sync,
48{
49 if files.is_empty() {
50 return Err(anyhow::anyhow!("No files provided for processing"));
51 }
52
53 if files.len() == 1 {
54 eprintln!("Processing file...");
55 } else {
56 eprintln!("Processing {} files in parallel...", files.len());
57 }
58
59 eprintln!("Scanning files to determine total work...");
61 let total_lines_to_process: usize = files
62 .par_iter()
63 .map(|file_path| count_file_lines(file_path).unwrap_or(0))
64 .sum();
65
66 eprintln!(
67 "Total lines to process: {}",
68 crate::utils::format::format_number(total_lines_to_process)
69 );
70
71 let processed_lines = Arc::new(AtomicUsize::new(0));
72 let progress = Arc::new(Mutex::new(ProgressBar::new(
73 total_lines_to_process,
74 "Processing",
75 )));
76
77 crate::commands::system_overview::init_parallel_progress(
79 processed_lines.clone(),
80 progress.clone(),
81 );
82 crate::commands::client_traffic_analysis::init_parallel_progress(
83 processed_lines.clone(),
84 progress.clone(),
85 );
86
87 let results: Result<Vec<_>> = files
89 .par_iter()
90 .enumerate()
91 .map(|(idx, file_path)| -> Result<FileProcessResult<T>> {
92 let data = processor(file_path)
96 .with_context(|| format!("Failed to process file: {}", file_path))?;
97
98 let lines_count = count_file_lines(file_path)?;
100
101 if let Ok(progress) = progress.lock() {
103 progress.println(format!(
104 "[{}/{}] ✓ Completed: {} ({} lines)",
105 idx + 1,
106 files.len(),
107 file_path.split('/').next_back().unwrap_or(file_path),
108 crate::utils::format::format_number(lines_count)
109 ));
110 }
111
112 Ok(FileProcessResult {
113 file_path: file_path.clone(),
114 lines_processed: lines_count,
115 data,
116 })
117 })
118 .collect();
119
120 let results = results?;
121 let total_lines_processed = processed_lines.load(Ordering::Relaxed);
122
123 if let Ok(progress) = progress.lock() {
124 progress.finish_with_message(&format!("Processed {} total lines", total_lines_processed));
125 }
126
127 let result = combiner(results);
129
130 Ok((result, total_lines_processed))
131}
132
133pub fn count_file_lines(file_path: &str) -> Result<usize> {
135 let file =
136 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
137 let reader = BufReader::new(file);
138
139 let mut count = 0;
140 for line_result in reader.lines() {
141 line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
142 count += 1;
143 }
144
145 Ok(count)
146}
147
148#[allow(dead_code)]
153pub fn process_file_streaming<T, F>(file_path: &str, mut processor: F) -> Result<T>
154where
155 F: FnMut(&AuditEntry),
156 T: Default,
157{
158 let file =
159 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
160 let reader = BufReader::new(file);
161
162 for (line_num, line_result) in reader.lines().enumerate() {
163 let line = line_result
164 .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
165
166 if line.trim().is_empty() {
168 continue;
169 }
170
171 if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
173 processor(&entry);
174 }
175 }
176
177 Ok(T::default())
178}
179
180#[allow(dead_code)]
182fn read_file_entries(file_path: &str) -> Result<Vec<AuditEntry>> {
183 let file =
184 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
185 let reader = BufReader::new(file);
186
187 let mut entries = Vec::new();
188
189 for (line_num, line) in reader.lines().enumerate() {
190 let line = line
191 .with_context(|| format!("Failed to read line {} from {}", line_num + 1, file_path))?;
192
193 if line.trim().is_empty() {
195 continue;
196 }
197
198 if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
200 entries.push(entry);
201 }
202 }
203
204 Ok(entries)
205}
206
207#[allow(dead_code)]
212pub fn process_files_aggregate<T, F, A>(
213 files: &[String],
214 processor: F,
215 aggregator: A,
216 initial: T,
217) -> Result<(T, usize)>
218where
219 T: Send + Clone + Sync + 'static,
220 F: Fn(&str) -> Result<T> + Send + Sync,
221 A: Fn(T, T) -> T + Send + Sync,
222{
223 process_files_parallel(files, processor, |results| {
224 results
225 .into_iter()
226 .fold(initial.clone(), |acc, result| aggregator(acc, result.data))
227 })
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use std::io::Write;
234 use tempfile::TempDir;
235
236 #[test]
237 fn test_parallel_processing() {
238 let dir = TempDir::new().unwrap();
240 let mut files = Vec::new();
241
242 for i in 0..3 {
243 let path = dir.path().join(format!("test_{}.log", i));
244 {
245 let mut temp_file = std::fs::File::create(&path).unwrap();
246 writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i, i).unwrap();
247 writeln!(temp_file, r#"{{"type":"response","time":"2025-10-07T10:00:0{}Z","auth":{{"entity_id":"entity-{}"}}}}"#, i+1, i).unwrap();
248 }
249 files.push(path.to_str().unwrap().to_string());
250 }
251
252 let (results, _total_lines) = process_files_parallel(
254 &files,
255 |file_path| -> Result<usize> {
256 let file = open_file(file_path)?;
257 let reader = BufReader::new(file);
258 let mut count = 0;
259 for line_result in reader.lines() {
260 let line = line_result?;
261 if line.trim().is_empty() {
262 continue;
263 }
264 if serde_json::from_str::<AuditEntry>(&line).is_ok() {
265 count += 1;
266 }
267 }
268 Ok(count)
269 },
270 |results| results.into_iter().map(|r| r.data).sum::<usize>(),
271 )
272 .unwrap();
273
274 assert_eq!(results, 6); }
278}