1use crate::audit::types::AuditEntry;
8use crate::utils::progress::ProgressBar;
9use crate::utils::reader::open_file;
10use anyhow::{Context, Result};
11use rayon::prelude::*;
12use std::io::{BufRead, BufReader};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16#[derive(Debug, Clone, Copy)]
18pub enum ProcessingMode {
19 Parallel,
21 Sequential,
23 Auto,
25}
26
27#[derive(Debug)]
29pub struct ProcessorConfig {
30 pub mode: ProcessingMode,
32 pub progress_frequency: usize,
34 pub show_file_completion: bool,
36 pub progress_label: String,
38 pub strict_parsing: bool,
40}
41
42impl Default for ProcessorConfig {
43 fn default() -> Self {
44 Self {
45 mode: ProcessingMode::Auto,
46 progress_frequency: 2000,
47 show_file_completion: true,
48 progress_label: "Processing".to_string(),
49 strict_parsing: false,
50 }
51 }
52}
53
54#[derive(Debug, Default, Clone)]
56pub struct ProcessStats {
57 pub total_lines: usize,
59 pub parsed_entries: usize,
61 pub skipped_lines: usize,
63 pub files_processed: usize,
65}
66
67impl ProcessStats {
68 pub fn merge(&mut self, other: &Self) {
70 self.total_lines += other.total_lines;
71 self.parsed_entries += other.parsed_entries;
72 self.skipped_lines += other.skipped_lines;
73 self.files_processed += other.files_processed;
74 }
75
76 pub fn report(&self) {
78 eprintln!("\nProcessing Summary:");
79 eprintln!(" Files processed: {}", self.files_processed);
80 eprintln!(
81 " Total lines: {}",
82 crate::utils::format::format_number(self.total_lines)
83 );
84 eprintln!(
85 " Parsed entries: {}",
86 crate::utils::format::format_number(self.parsed_entries)
87 );
88 if self.skipped_lines > 0 {
89 let skip_percentage = (self.skipped_lines as f64 / self.total_lines as f64) * 100.0;
90 eprintln!(
91 " Skipped lines: {} ({:.2}%)",
92 crate::utils::format::format_number(self.skipped_lines),
93 skip_percentage
94 );
95 }
96 }
97}
98
99pub struct FileProcessor {
101 config: ProcessorConfig,
102}
103
104impl FileProcessor {
105 pub fn new() -> Self {
107 Self {
108 config: ProcessorConfig::default(),
109 }
110 }
111
112 pub const fn with_config(config: ProcessorConfig) -> Self {
114 Self { config }
115 }
116
117 pub fn process_files_streaming<T, F, A>(
148 &self,
149 files: &[String],
150 line_processor: F,
151 aggregator: A,
152 initial: T,
153 ) -> Result<(T, ProcessStats)>
154 where
155 T: Send + Clone + Sync + 'static,
156 F: FnMut(&AuditEntry, &mut T) + Send + Sync + Clone,
157 A: Fn(T, T) -> T + Send + Sync,
158 {
159 if files.is_empty() {
160 return Ok((initial, ProcessStats::default()));
161 }
162
163 let mode = self.determine_processing_mode(files);
164
165 match mode {
166 ProcessingMode::Parallel => {
167 self.process_parallel_streaming(files, line_processor, aggregator, initial)
168 }
169 ProcessingMode::Sequential => {
170 self.process_sequential_streaming(files, line_processor, aggregator, initial)
171 }
172 ProcessingMode::Auto => unreachable!(), }
174 }
175
176 #[allow(dead_code)]
181 pub fn process_files_collect<T, F>(
182 &self,
183 files: &[String],
184 processor: F,
185 ) -> Result<(Vec<T>, ProcessStats)>
186 where
187 T: Send + 'static,
188 F: Fn(&str) -> Result<T> + Send + Sync,
189 {
190 if files.is_empty() {
191 return Ok((Vec::new(), ProcessStats::default()));
192 }
193
194 let mode = self.determine_processing_mode(files);
195
196 match mode {
197 ProcessingMode::Parallel => self.process_parallel_collect(files, processor),
198 ProcessingMode::Sequential => self.process_sequential_collect(files, processor),
199 ProcessingMode::Auto => unreachable!(),
200 }
201 }
202
203 const fn determine_processing_mode(&self, files: &[String]) -> ProcessingMode {
205 match self.config.mode {
206 ProcessingMode::Auto => {
207 if files.len() == 1 {
208 ProcessingMode::Sequential
209 } else if files.len() >= 2 {
210 ProcessingMode::Parallel
211 } else {
212 ProcessingMode::Sequential
213 }
214 }
215 mode => mode,
216 }
217 }
218
219 fn process_parallel_streaming<T, F, A>(
221 &self,
222 files: &[String],
223 line_processor: F,
224 aggregator: A,
225 initial: T,
226 ) -> Result<(T, ProcessStats)>
227 where
228 T: Send + Clone + Sync + 'static,
229 F: FnMut(&AuditEntry, &mut T) + Send + Sync + Clone,
230 A: Fn(T, T) -> T + Send + Sync,
231 {
232 eprintln!("Processing {} files in parallel...", files.len());
233
234 eprintln!("Scanning files to determine total work...");
236 let total_lines: usize = files
237 .par_iter()
238 .map(|file_path| count_file_lines(file_path).unwrap_or(0))
239 .sum();
240
241 eprintln!(
242 "Total lines to process: {}",
243 crate::utils::format::format_number(total_lines)
244 );
245
246 let processed_lines = Arc::new(AtomicUsize::new(0));
247 let progress = Arc::new(Mutex::new(ProgressBar::new(
248 total_lines,
249 &self.config.progress_label,
250 )));
251
252 let results: Result<Vec<_>> = files
254 .par_iter()
255 .enumerate()
256 .map(|(idx, file_path)| -> Result<(T, ProcessStats)> {
257 let mut file_state = initial.clone();
258 let mut local_processor = line_processor.clone();
259
260 let progress_ref = (processed_lines.clone(), progress.clone());
261 let stats = self.process_single_file_streaming(
262 file_path,
263 &mut local_processor,
264 &mut file_state,
265 Some(&progress_ref),
266 )?;
267
268 if self.config.show_file_completion {
269 let lines_count = count_file_lines(file_path)?;
270 if let Ok(mut progress) = progress.lock() {
271 eprint!("\r");
272 eprint!("{}", " ".repeat(100));
273 eprint!("\r");
274 eprintln!(
275 "[{}/{}] ✓ Completed: {} ({} lines)",
276 idx + 1,
277 files.len(),
278 file_path.split('/').next_back().unwrap_or(file_path),
279 crate::utils::format::format_number(lines_count)
280 );
281 progress.render();
282 }
283 }
284
285 Ok((file_state, stats))
286 })
287 .collect();
288
289 let results = results?;
290
291 if let Ok(mut progress) = progress.lock() {
293 eprint!("\r");
294 eprint!("{}", " ".repeat(100));
295 eprint!("\r");
296 progress.finish_with_message(&format!(
297 "Processed {} total lines",
298 crate::utils::format::format_number(processed_lines.load(Ordering::Relaxed))
299 ));
300 }
301
302 let mut combined_stats = ProcessStats::default();
304 let final_result = results
305 .into_iter()
306 .fold(initial, |acc, (file_result, file_stats)| {
307 combined_stats.merge(&file_stats);
308 aggregator(acc, file_result)
309 });
310
311 Ok((final_result, combined_stats))
312 }
313
314 fn process_sequential_streaming<T, F, A>(
316 &self,
317 files: &[String],
318 mut line_processor: F,
319 aggregator: A,
320 initial: T,
321 ) -> Result<(T, ProcessStats)>
322 where
323 T: Send + Clone + Sync,
324 F: FnMut(&AuditEntry, &mut T) + Send + Sync,
325 A: Fn(T, T) -> T + Send + Sync,
326 {
327 eprintln!("Processing {} files sequentially...", files.len());
328
329 let mut combined_result = initial;
330 let mut combined_stats = ProcessStats::default();
331
332 for (file_idx, file_path) in files.iter().enumerate() {
333 eprintln!(
334 "[{}/{}] Processing: {}",
335 file_idx + 1,
336 files.len(),
337 file_path
338 );
339
340 let mut file_state = combined_result.clone();
341 let single_file_stats = self.process_single_file_streaming(
342 file_path,
343 &mut line_processor,
344 &mut file_state,
345 None, )?;
347
348 combined_result = aggregator(combined_result, file_state);
349 combined_stats.merge(&single_file_stats);
350
351 if self.config.show_file_completion {
352 eprintln!(
353 "[{}/{}] ✓ Completed: {} ({} lines)",
354 file_idx + 1,
355 files.len(),
356 file_path.split('/').next_back().unwrap_or(file_path),
357 crate::utils::format::format_number(single_file_stats.total_lines)
358 );
359 }
360 }
361
362 Ok((combined_result, combined_stats))
363 }
364
365 #[allow(dead_code)]
367 #[allow(clippy::unused_self)]
368 fn process_parallel_collect<T, F>(
369 &self,
370 files: &[String],
371 processor: F,
372 ) -> Result<(Vec<T>, ProcessStats)>
373 where
374 T: Send + 'static,
375 F: Fn(&str) -> Result<T> + Send + Sync,
376 {
377 eprintln!("Processing {} files in parallel...", files.len());
378
379 let total_lines: usize = files
380 .par_iter()
381 .map(|file_path| count_file_lines(file_path).unwrap_or(0))
382 .sum();
383
384 let results: Result<Vec<_>> = files
385 .par_iter()
386 .map(|file_path| processor(file_path))
387 .collect();
388
389 let stats = ProcessStats {
390 total_lines,
391 parsed_entries: 0, skipped_lines: 0,
393 files_processed: files.len(),
394 };
395
396 Ok((results?, stats))
397 }
398
399 #[allow(dead_code)]
401 fn process_sequential_collect<T, F>(
402 &self,
403 files: &[String],
404 processor: F,
405 ) -> Result<(Vec<T>, ProcessStats)>
406 where
407 F: Fn(&str) -> Result<T>,
408 {
409 eprintln!("Processing {} files sequentially...", files.len());
410
411 let mut results = Vec::new();
412 let mut total_lines = 0;
413
414 for (file_idx, file_path) in files.iter().enumerate() {
415 eprintln!(
416 "[{}/{}] Processing: {}",
417 file_idx + 1,
418 files.len(),
419 file_path
420 );
421
422 let result = processor(file_path)?;
423 let lines_processed = count_file_lines(file_path)?;
424
425 results.push(result);
426 total_lines += lines_processed;
427
428 if self.config.show_file_completion {
429 eprintln!(
430 "[{}/{}] ✓ Completed: {} ({} lines)",
431 file_idx + 1,
432 files.len(),
433 file_path.split('/').next_back().unwrap_or(file_path),
434 crate::utils::format::format_number(lines_processed)
435 );
436 }
437 }
438
439 let stats = ProcessStats {
440 total_lines,
441 parsed_entries: 0, skipped_lines: 0,
443 files_processed: files.len(),
444 };
445
446 Ok((results, stats))
447 }
448
449 fn process_single_file_streaming<T, F>(
451 &self,
452 file_path: &str,
453 line_processor: &mut F,
454 state: &mut T,
455 progress: Option<&(Arc<AtomicUsize>, Arc<Mutex<ProgressBar>>)>,
456 ) -> Result<ProcessStats>
457 where
458 F: FnMut(&AuditEntry, &mut T),
459 {
460 let file =
461 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
462 let reader = BufReader::new(file);
463
464 let mut file_stats = ProcessStats::default();
465
466 for line_result in reader.lines() {
467 let line =
468 line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
469
470 file_stats.total_lines += 1;
471
472 if file_stats.total_lines % self.config.progress_frequency == 0 {
474 if let Some((processed_lines, progress_bar)) = &progress {
475 processed_lines.fetch_add(self.config.progress_frequency, Ordering::Relaxed);
476 if let Ok(mut p) = progress_bar.lock() {
477 p.update(processed_lines.load(Ordering::Relaxed));
478 }
479 }
480 }
481
482 if line.trim().is_empty() {
484 continue;
485 }
486
487 match serde_json::from_str::<AuditEntry>(&line) {
489 Ok(entry) => {
490 file_stats.parsed_entries += 1;
491 line_processor(&entry, state);
492 }
493 Err(e) => {
494 file_stats.skipped_lines += 1;
495 if self.config.strict_parsing {
496 return Err(e).with_context(|| {
497 format!(
498 "Failed to parse JSON at line {} in {}",
499 file_stats.total_lines, file_path
500 )
501 });
502 }
503 }
505 }
506 }
507
508 if let Some((processed_lines, progress_bar)) = &progress {
510 let remaining = file_stats.total_lines % self.config.progress_frequency;
511 if remaining > 0 {
512 processed_lines.fetch_add(remaining, Ordering::Relaxed);
513 if let Ok(mut p) = progress_bar.lock() {
514 p.update(processed_lines.load(Ordering::Relaxed));
515 }
516 }
517 }
518
519 file_stats.files_processed = 1;
520 Ok(file_stats)
521 }
522}
523
524impl Default for FileProcessor {
525 fn default() -> Self {
526 Self::new()
527 }
528}
529
530fn count_file_lines(file_path: &str) -> Result<usize> {
532 let file =
533 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
534 let reader = BufReader::new(file);
535
536 let mut count = 0;
537 for line_result in reader.lines() {
538 line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
539 count += 1;
540 }
541
542 Ok(count)
543}
544
545pub struct ProcessorBuilder {
547 config: ProcessorConfig,
548}
549
550impl ProcessorBuilder {
551 pub fn new() -> Self {
553 Self {
554 config: ProcessorConfig::default(),
555 }
556 }
557
558 #[must_use]
560 pub const fn mode(mut self, mode: ProcessingMode) -> Self {
561 self.config.mode = mode;
562 self
563 }
564
565 #[must_use]
567 #[allow(dead_code)]
568 pub const fn progress_frequency(mut self, frequency: usize) -> Self {
569 self.config.progress_frequency = frequency;
570 self
571 }
572
573 #[must_use]
575 #[allow(dead_code)]
576 pub const fn show_file_completion(mut self, show: bool) -> Self {
577 self.config.show_file_completion = show;
578 self
579 }
580
581 #[must_use]
583 pub fn progress_label<S: Into<String>>(mut self, label: S) -> Self {
584 self.config.progress_label = label.into();
585 self
586 }
587
588 #[must_use]
590 #[allow(dead_code)]
591 pub const fn strict_parsing(mut self, strict: bool) -> Self {
592 self.config.strict_parsing = strict;
593 self
594 }
595
596 #[must_use]
598 pub fn build(self) -> FileProcessor {
599 FileProcessor::with_config(self.config)
600 }
601}
602
603impl Default for ProcessorBuilder {
604 fn default() -> Self {
605 Self::new()
606 }
607}