1use crate::audit::types::AuditEntry;
8use crate::utils::progress::ProgressBar;
9use crate::utils::reader::open_file;
10use anyhow::{Context, Result};
11use rayon::prelude::*;
12use std::io::{BufRead, BufReader};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16#[derive(Debug, Clone, Copy)]
18pub enum ProcessingMode {
19 Parallel,
21 Sequential,
23 Auto,
25}
26
27#[derive(Debug)]
29pub struct ProcessorConfig {
30 pub mode: ProcessingMode,
32 pub progress_frequency: usize,
34 pub show_file_completion: bool,
36 pub progress_label: String,
38 pub strict_parsing: bool,
40}
41
42impl Default for ProcessorConfig {
43 fn default() -> Self {
44 Self {
45 mode: ProcessingMode::Auto,
46 progress_frequency: 2000,
47 show_file_completion: true,
48 progress_label: "Processing".to_string(),
49 strict_parsing: false,
50 }
51 }
52}
53
54#[derive(Debug, Default, Clone)]
56pub struct ProcessStats {
57 pub total_lines: usize,
59 pub parsed_entries: usize,
61 pub skipped_lines: usize,
63 pub files_processed: usize,
65}
66
67impl ProcessStats {
68 pub fn merge(&mut self, other: &Self) {
70 self.total_lines += other.total_lines;
71 self.parsed_entries += other.parsed_entries;
72 self.skipped_lines += other.skipped_lines;
73 self.files_processed += other.files_processed;
74 }
75
76 pub fn report(&self) {
78 eprintln!("\nProcessing Summary:");
79 eprintln!(" Files processed: {}", self.files_processed);
80 eprintln!(
81 " Total lines: {}",
82 crate::utils::format::format_number(self.total_lines)
83 );
84 eprintln!(
85 " Parsed entries: {}",
86 crate::utils::format::format_number(self.parsed_entries)
87 );
88 if self.skipped_lines > 0 {
89 let skip_percentage = (self.skipped_lines as f64 / self.total_lines as f64) * 100.0;
90 eprintln!(
91 " Skipped lines: {} ({:.2}%)",
92 crate::utils::format::format_number(self.skipped_lines),
93 skip_percentage
94 );
95 }
96 }
97}
98
99pub struct FileProcessor {
101 config: ProcessorConfig,
102}
103
104impl FileProcessor {
105 pub fn new() -> Self {
107 Self {
108 config: ProcessorConfig::default(),
109 }
110 }
111
112 pub const fn with_config(config: ProcessorConfig) -> Self {
114 Self { config }
115 }
116
117 pub fn process_files_streaming<T, F, A>(
148 &self,
149 files: &[String],
150 line_processor: F,
151 aggregator: A,
152 initial: T,
153 ) -> Result<(T, ProcessStats)>
154 where
155 T: Send + Clone + Sync + 'static,
156 F: FnMut(&AuditEntry, &mut T) + Send + Sync + Clone,
157 A: Fn(T, T) -> T + Send + Sync,
158 {
159 if files.is_empty() {
160 return Ok((initial, ProcessStats::default()));
161 }
162
163 let mode = self.determine_processing_mode(files);
164
165 match mode {
166 ProcessingMode::Parallel => {
167 self.process_parallel_streaming(files, line_processor, aggregator, initial)
168 }
169 ProcessingMode::Sequential => {
170 self.process_sequential_streaming(files, line_processor, aggregator, initial)
171 }
172 ProcessingMode::Auto => unreachable!(), }
174 }
175
176 #[allow(dead_code)]
181 pub fn process_files_collect<T, F>(
182 &self,
183 files: &[String],
184 processor: F,
185 ) -> Result<(Vec<T>, ProcessStats)>
186 where
187 T: Send + 'static,
188 F: Fn(&str) -> Result<T> + Send + Sync,
189 {
190 if files.is_empty() {
191 return Ok((Vec::new(), ProcessStats::default()));
192 }
193
194 let mode = self.determine_processing_mode(files);
195
196 match mode {
197 ProcessingMode::Parallel => self.process_parallel_collect(files, processor),
198 ProcessingMode::Sequential => self.process_sequential_collect(files, processor),
199 ProcessingMode::Auto => unreachable!(),
200 }
201 }
202
203 const fn determine_processing_mode(&self, files: &[String]) -> ProcessingMode {
205 match self.config.mode {
206 ProcessingMode::Auto => {
207 if files.len() == 1 {
208 ProcessingMode::Sequential
209 } else if files.len() >= 2 {
210 ProcessingMode::Parallel
211 } else {
212 ProcessingMode::Sequential
213 }
214 }
215 mode => mode,
216 }
217 }
218
219 fn process_parallel_streaming<T, F, A>(
221 &self,
222 files: &[String],
223 line_processor: F,
224 aggregator: A,
225 initial: T,
226 ) -> Result<(T, ProcessStats)>
227 where
228 T: Send + Clone + Sync + 'static,
229 F: FnMut(&AuditEntry, &mut T) + Send + Sync + Clone,
230 A: Fn(T, T) -> T + Send + Sync,
231 {
232 eprintln!("Processing {} files in parallel...", files.len());
233
234 eprintln!("Scanning files to determine total work...");
236 let total_lines: usize = files
237 .par_iter()
238 .map(|file_path| count_file_lines(file_path).unwrap_or(0))
239 .sum();
240
241 eprintln!(
242 "Total lines to process: {}",
243 crate::utils::format::format_number(total_lines)
244 );
245
246 let processed_lines = Arc::new(AtomicUsize::new(0));
247 let progress = Arc::new(Mutex::new(ProgressBar::new(
248 total_lines,
249 &self.config.progress_label,
250 )));
251
252 let results: Result<Vec<_>> = files
254 .par_iter()
255 .enumerate()
256 .map(|(idx, file_path)| -> Result<(T, ProcessStats)> {
257 let mut file_state = initial.clone();
258 let mut local_processor = line_processor.clone();
259
260 let progress_ref = (processed_lines.clone(), progress.clone());
261 let stats = self.process_single_file_streaming(
262 file_path,
263 &mut local_processor,
264 &mut file_state,
265 Some(&progress_ref),
266 )?;
267
268 if self.config.show_file_completion {
269 let lines_count = count_file_lines(file_path)?;
270 if let Ok(progress) = progress.lock() {
271 progress.println(format!(
272 "[{}/{}] ✓ Completed: {} ({} lines)",
273 idx + 1,
274 files.len(),
275 file_path.split('/').next_back().unwrap_or(file_path),
276 crate::utils::format::format_number(lines_count)
277 ));
278 }
279 }
280
281 Ok((file_state, stats))
282 })
283 .collect();
284
285 let results = results?;
286
287 if let Ok(progress) = progress.lock() {
289 progress.finish_with_message(&format!(
290 "Processed {} total lines",
291 crate::utils::format::format_number(processed_lines.load(Ordering::Relaxed))
292 ));
293 }
294
295 let mut combined_stats = ProcessStats::default();
297 let final_result = results
298 .into_iter()
299 .fold(initial, |acc, (file_result, file_stats)| {
300 combined_stats.merge(&file_stats);
301 aggregator(acc, file_result)
302 });
303
304 Ok((final_result, combined_stats))
305 }
306
307 fn process_sequential_streaming<T, F, A>(
309 &self,
310 files: &[String],
311 mut line_processor: F,
312 aggregator: A,
313 initial: T,
314 ) -> Result<(T, ProcessStats)>
315 where
316 T: Send + Clone + Sync,
317 F: FnMut(&AuditEntry, &mut T) + Send + Sync,
318 A: Fn(T, T) -> T + Send + Sync,
319 {
320 eprintln!("Processing {} files sequentially...", files.len());
321
322 let mut combined_result = initial;
323 let mut combined_stats = ProcessStats::default();
324
325 for (file_idx, file_path) in files.iter().enumerate() {
326 eprintln!(
327 "[{}/{}] Processing: {}",
328 file_idx + 1,
329 files.len(),
330 file_path
331 );
332
333 let mut file_state = combined_result.clone();
334 let single_file_stats = self.process_single_file_streaming(
335 file_path,
336 &mut line_processor,
337 &mut file_state,
338 None, )?;
340
341 combined_result = aggregator(combined_result, file_state);
342 combined_stats.merge(&single_file_stats);
343
344 if self.config.show_file_completion {
345 eprintln!(
346 "[{}/{}] ✓ Completed: {} ({} lines)",
347 file_idx + 1,
348 files.len(),
349 file_path.split('/').next_back().unwrap_or(file_path),
350 crate::utils::format::format_number(single_file_stats.total_lines)
351 );
352 }
353 }
354
355 Ok((combined_result, combined_stats))
356 }
357
358 #[allow(dead_code)]
360 #[allow(clippy::unused_self)]
361 fn process_parallel_collect<T, F>(
362 &self,
363 files: &[String],
364 processor: F,
365 ) -> Result<(Vec<T>, ProcessStats)>
366 where
367 T: Send + 'static,
368 F: Fn(&str) -> Result<T> + Send + Sync,
369 {
370 eprintln!("Processing {} files in parallel...", files.len());
371
372 let total_lines: usize = files
373 .par_iter()
374 .map(|file_path| count_file_lines(file_path).unwrap_or(0))
375 .sum();
376
377 let results: Result<Vec<_>> = files
378 .par_iter()
379 .map(|file_path| processor(file_path))
380 .collect();
381
382 let stats = ProcessStats {
383 total_lines,
384 parsed_entries: 0, skipped_lines: 0,
386 files_processed: files.len(),
387 };
388
389 Ok((results?, stats))
390 }
391
392 #[allow(dead_code)]
394 fn process_sequential_collect<T, F>(
395 &self,
396 files: &[String],
397 processor: F,
398 ) -> Result<(Vec<T>, ProcessStats)>
399 where
400 F: Fn(&str) -> Result<T>,
401 {
402 eprintln!("Processing {} files sequentially...", files.len());
403
404 let mut results = Vec::new();
405 let mut total_lines = 0;
406
407 for (file_idx, file_path) in files.iter().enumerate() {
408 eprintln!(
409 "[{}/{}] Processing: {}",
410 file_idx + 1,
411 files.len(),
412 file_path
413 );
414
415 let result = processor(file_path)?;
416 let lines_processed = count_file_lines(file_path)?;
417
418 results.push(result);
419 total_lines += lines_processed;
420
421 if self.config.show_file_completion {
422 eprintln!(
423 "[{}/{}] ✓ Completed: {} ({} lines)",
424 file_idx + 1,
425 files.len(),
426 file_path.split('/').next_back().unwrap_or(file_path),
427 crate::utils::format::format_number(lines_processed)
428 );
429 }
430 }
431
432 let stats = ProcessStats {
433 total_lines,
434 parsed_entries: 0, skipped_lines: 0,
436 files_processed: files.len(),
437 };
438
439 Ok((results, stats))
440 }
441
442 fn process_single_file_streaming<T, F>(
444 &self,
445 file_path: &str,
446 line_processor: &mut F,
447 state: &mut T,
448 progress: Option<&(Arc<AtomicUsize>, Arc<Mutex<ProgressBar>>)>,
449 ) -> Result<ProcessStats>
450 where
451 F: FnMut(&AuditEntry, &mut T),
452 {
453 let file =
454 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
455 let reader = BufReader::new(file);
456
457 let mut file_stats = ProcessStats::default();
458
459 for line_result in reader.lines() {
460 let line =
461 line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
462
463 file_stats.total_lines += 1;
464
465 if file_stats.total_lines % self.config.progress_frequency == 0 {
467 if let Some((processed_lines, progress_bar)) = &progress {
468 processed_lines.fetch_add(self.config.progress_frequency, Ordering::Relaxed);
469 if let Ok(p) = progress_bar.lock() {
470 p.update(processed_lines.load(Ordering::Relaxed));
471 }
472 }
473 }
474
475 if line.trim().is_empty() {
477 continue;
478 }
479
480 match serde_json::from_str::<AuditEntry>(&line) {
482 Ok(entry) => {
483 file_stats.parsed_entries += 1;
484 line_processor(&entry, state);
485 }
486 Err(e) => {
487 file_stats.skipped_lines += 1;
488 if self.config.strict_parsing {
489 return Err(e).with_context(|| {
490 format!(
491 "Failed to parse JSON at line {} in {}",
492 file_stats.total_lines, file_path
493 )
494 });
495 }
496 }
498 }
499 }
500
501 if let Some((processed_lines, progress_bar)) = &progress {
503 let remaining = file_stats.total_lines % self.config.progress_frequency;
504 if remaining > 0 {
505 processed_lines.fetch_add(remaining, Ordering::Relaxed);
506 if let Ok(p) = progress_bar.lock() {
507 p.update(processed_lines.load(Ordering::Relaxed));
508 }
509 }
510 }
511
512 file_stats.files_processed = 1;
513 Ok(file_stats)
514 }
515}
516
517impl Default for FileProcessor {
518 fn default() -> Self {
519 Self::new()
520 }
521}
522
523fn count_file_lines(file_path: &str) -> Result<usize> {
525 let file =
526 open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
527 let reader = BufReader::new(file);
528
529 let mut count = 0;
530 for line_result in reader.lines() {
531 line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
532 count += 1;
533 }
534
535 Ok(count)
536}
537
538pub struct ProcessorBuilder {
540 config: ProcessorConfig,
541}
542
543impl ProcessorBuilder {
544 pub fn new() -> Self {
546 Self {
547 config: ProcessorConfig::default(),
548 }
549 }
550
551 #[must_use]
553 pub const fn mode(mut self, mode: ProcessingMode) -> Self {
554 self.config.mode = mode;
555 self
556 }
557
558 #[must_use]
560 #[allow(dead_code)]
561 pub const fn progress_frequency(mut self, frequency: usize) -> Self {
562 self.config.progress_frequency = frequency;
563 self
564 }
565
566 #[must_use]
568 #[allow(dead_code)]
569 pub const fn show_file_completion(mut self, show: bool) -> Self {
570 self.config.show_file_completion = show;
571 self
572 }
573
574 #[must_use]
576 pub fn progress_label<S: Into<String>>(mut self, label: S) -> Self {
577 self.config.progress_label = label.into();
578 self
579 }
580
581 #[must_use]
583 #[allow(dead_code)]
584 pub const fn strict_parsing(mut self, strict: bool) -> Self {
585 self.config.strict_parsing = strict;
586 self
587 }
588
589 #[must_use]
591 pub fn build(self) -> FileProcessor {
592 FileProcessor::with_config(self.config)
593 }
594}
595
596impl Default for ProcessorBuilder {
597 fn default() -> Self {
598 Self::new()
599 }
600}