vault_audit_tools/utils/
processor.rs

1//! Unified file processing abstraction for audit log analysis.
2//!
3//! This module provides a DRY abstraction for processing multiple audit log files
4//! with consistent progress tracking, error handling, and support for both
5//! parallel and sequential processing modes.
6
7use crate::audit::types::AuditEntry;
8use crate::utils::progress::ProgressBar;
9use crate::utils::reader::open_file;
10use anyhow::{Context, Result};
11use rayon::prelude::*;
12use std::io::{BufRead, BufReader};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16/// Processing mode for file processing
17#[derive(Debug, Clone, Copy)]
18pub enum ProcessingMode {
19    /// Process files in parallel using all available CPU cores
20    Parallel,
21    /// Process files sequentially (one at a time)
22    Sequential,
23    /// Automatically choose based on file count and size
24    Auto,
25}
26
27/// Configuration for file processing
28#[derive(Debug)]
29pub struct ProcessorConfig {
30    /// Processing mode to use
31    pub mode: ProcessingMode,
32    /// Progress update frequency (lines)
33    pub progress_frequency: usize,
34    /// Whether to show detailed per-file completion messages
35    pub show_file_completion: bool,
36    /// Custom progress label
37    pub progress_label: String,
38    /// Whether to use strict JSON parsing (fail on any parse error)
39    pub strict_parsing: bool,
40}
41
42impl Default for ProcessorConfig {
43    fn default() -> Self {
44        Self {
45            mode: ProcessingMode::Auto,
46            progress_frequency: 2000,
47            show_file_completion: true,
48            progress_label: "Processing".to_string(),
49            strict_parsing: false,
50        }
51    }
52}
53
54/// Statistics collected during processing
55#[derive(Debug, Default, Clone)]
56pub struct ProcessStats {
57    /// Total number of lines processed across all files
58    pub total_lines: usize,
59    /// Number of successfully parsed audit entries
60    pub parsed_entries: usize,
61    /// Number of lines skipped due to parse errors
62    pub skipped_lines: usize,
63    /// Number of files processed
64    pub files_processed: usize,
65}
66
67impl ProcessStats {
68    /// Merge another stats object into this one
69    pub fn merge(&mut self, other: &Self) {
70        self.total_lines += other.total_lines;
71        self.parsed_entries += other.parsed_entries;
72        self.skipped_lines += other.skipped_lines;
73        self.files_processed += other.files_processed;
74    }
75
76    /// Print a summary of processing statistics
77    pub fn report(&self) {
78        eprintln!("\nProcessing Summary:");
79        eprintln!("  Files processed: {}", self.files_processed);
80        eprintln!(
81            "  Total lines: {}",
82            crate::utils::format::format_number(self.total_lines)
83        );
84        eprintln!(
85            "  Parsed entries: {}",
86            crate::utils::format::format_number(self.parsed_entries)
87        );
88        if self.skipped_lines > 0 {
89            let skip_percentage = (self.skipped_lines as f64 / self.total_lines as f64) * 100.0;
90            eprintln!(
91                "  Skipped lines: {} ({:.2}%)",
92                crate::utils::format::format_number(self.skipped_lines),
93                skip_percentage
94            );
95        }
96    }
97}
98
99/// Unified file processor that handles both parallel and sequential processing
100pub struct FileProcessor {
101    config: ProcessorConfig,
102}
103
104impl FileProcessor {
105    /// Create a new file processor with default configuration
106    pub fn new() -> Self {
107        Self {
108            config: ProcessorConfig::default(),
109        }
110    }
111
112    /// Create a new file processor with custom configuration
113    pub const fn with_config(config: ProcessorConfig) -> Self {
114        Self { config }
115    }
116
117    /// Process multiple files with a streaming line-by-line processor
118    ///
119    /// This is the main entry point for file processing. It automatically handles:
120    /// - Progress tracking with accurate totals and ETA
121    /// - Parallel or sequential processing based on configuration
122    /// - Error handling and context
123    /// - Memory-efficient streaming
124    ///
125    /// # Arguments
126    /// * `files` - List of file paths to process
127    /// * `line_processor` - Function that processes individual audit entries
128    /// * `aggregator` - Function that combines results from all files
129    /// * `initial` - Initial value for the aggregator
130    ///
131    /// # Example
132    /// ```rust,ignore
133    /// let processor = FileProcessor::new();
134    /// let result = processor.process_files_streaming(
135    ///     &files,
136    ///     |entry, state| {
137    ///         // Process each audit entry
138    ///         state.counter += 1;
139    ///     },
140    ///     |acc, file_result| {
141    ///         // Combine results from each file
142    ///         acc.merge(file_result)
143    ///     },
144    ///     MyState::new(),
145    /// )?;
146    /// ```
147    pub fn process_files_streaming<T, F, A>(
148        &self,
149        files: &[String],
150        line_processor: F,
151        aggregator: A,
152        initial: T,
153    ) -> Result<(T, ProcessStats)>
154    where
155        T: Send + Clone + Sync + 'static,
156        F: FnMut(&AuditEntry, &mut T) + Send + Sync + Clone,
157        A: Fn(T, T) -> T + Send + Sync,
158    {
159        if files.is_empty() {
160            return Ok((initial, ProcessStats::default()));
161        }
162
163        let mode = self.determine_processing_mode(files);
164
165        match mode {
166            ProcessingMode::Parallel => {
167                self.process_parallel_streaming(files, line_processor, aggregator, initial)
168            }
169            ProcessingMode::Sequential => {
170                self.process_sequential_streaming(files, line_processor, aggregator, initial)
171            }
172            ProcessingMode::Auto => unreachable!(), // determine_processing_mode resolves this
173        }
174    }
175
176    /// Process multiple files and collect results into a collection
177    ///
178    /// This is a convenience method for cases where you want to collect
179    /// individual results from each file rather than aggregating.
180    #[allow(dead_code)]
181    pub fn process_files_collect<T, F>(
182        &self,
183        files: &[String],
184        processor: F,
185    ) -> Result<(Vec<T>, ProcessStats)>
186    where
187        T: Send + 'static,
188        F: Fn(&str) -> Result<T> + Send + Sync,
189    {
190        if files.is_empty() {
191            return Ok((Vec::new(), ProcessStats::default()));
192        }
193
194        let mode = self.determine_processing_mode(files);
195
196        match mode {
197            ProcessingMode::Parallel => self.process_parallel_collect(files, processor),
198            ProcessingMode::Sequential => self.process_sequential_collect(files, processor),
199            ProcessingMode::Auto => unreachable!(),
200        }
201    }
202
203    /// Determine the optimal processing mode based on files and configuration
204    const fn determine_processing_mode(&self, files: &[String]) -> ProcessingMode {
205        match self.config.mode {
206            ProcessingMode::Auto => {
207                if files.len() == 1 {
208                    ProcessingMode::Sequential
209                } else if files.len() >= 2 {
210                    ProcessingMode::Parallel
211                } else {
212                    ProcessingMode::Sequential
213                }
214            }
215            mode => mode,
216        }
217    }
218
219    /// Process files in parallel with streaming
220    fn process_parallel_streaming<T, F, A>(
221        &self,
222        files: &[String],
223        line_processor: F,
224        aggregator: A,
225        initial: T,
226    ) -> Result<(T, ProcessStats)>
227    where
228        T: Send + Clone + Sync + 'static,
229        F: FnMut(&AuditEntry, &mut T) + Send + Sync + Clone,
230        A: Fn(T, T) -> T + Send + Sync,
231    {
232        eprintln!("Processing {} files in parallel...", files.len());
233
234        // Pre-scan to determine total work
235        eprintln!("Scanning files to determine total work...");
236        let total_lines: usize = files
237            .par_iter()
238            .map(|file_path| count_file_lines(file_path).unwrap_or(0))
239            .sum();
240
241        eprintln!(
242            "Total lines to process: {}",
243            crate::utils::format::format_number(total_lines)
244        );
245
246        let processed_lines = Arc::new(AtomicUsize::new(0));
247        let progress = Arc::new(Mutex::new(ProgressBar::new(
248            total_lines,
249            &self.config.progress_label,
250        )));
251
252        // Process files in parallel
253        let results: Result<Vec<_>> = files
254            .par_iter()
255            .enumerate()
256            .map(|(idx, file_path)| -> Result<(T, ProcessStats)> {
257                let mut file_state = initial.clone();
258                let mut local_processor = line_processor.clone();
259
260                let progress_ref = (processed_lines.clone(), progress.clone());
261                let stats = self.process_single_file_streaming(
262                    file_path,
263                    &mut local_processor,
264                    &mut file_state,
265                    Some(&progress_ref),
266                )?;
267
268                if self.config.show_file_completion {
269                    let lines_count = count_file_lines(file_path)?;
270                    if let Ok(mut progress) = progress.lock() {
271                        eprint!("\r");
272                        eprint!("{}", " ".repeat(100));
273                        eprint!("\r");
274                        eprintln!(
275                            "[{}/{}] ✓ Completed: {} ({} lines)",
276                            idx + 1,
277                            files.len(),
278                            file_path.split('/').next_back().unwrap_or(file_path),
279                            crate::utils::format::format_number(lines_count)
280                        );
281                        progress.render();
282                    }
283                }
284
285                Ok((file_state, stats))
286            })
287            .collect();
288
289        let results = results?;
290
291        // Clear progress and show final message
292        if let Ok(mut progress) = progress.lock() {
293            eprint!("\r");
294            eprint!("{}", " ".repeat(100));
295            eprint!("\r");
296            progress.finish_with_message(&format!(
297                "Processed {} total lines",
298                crate::utils::format::format_number(processed_lines.load(Ordering::Relaxed))
299            ));
300        }
301
302        // Aggregate all results
303        let mut combined_stats = ProcessStats::default();
304        let final_result = results
305            .into_iter()
306            .fold(initial, |acc, (file_result, file_stats)| {
307                combined_stats.merge(&file_stats);
308                aggregator(acc, file_result)
309            });
310
311        Ok((final_result, combined_stats))
312    }
313
314    /// Process files sequentially with streaming
315    fn process_sequential_streaming<T, F, A>(
316        &self,
317        files: &[String],
318        mut line_processor: F,
319        aggregator: A,
320        initial: T,
321    ) -> Result<(T, ProcessStats)>
322    where
323        T: Send + Clone + Sync,
324        F: FnMut(&AuditEntry, &mut T) + Send + Sync,
325        A: Fn(T, T) -> T + Send + Sync,
326    {
327        eprintln!("Processing {} files sequentially...", files.len());
328
329        let mut combined_result = initial;
330        let mut combined_stats = ProcessStats::default();
331
332        for (file_idx, file_path) in files.iter().enumerate() {
333            eprintln!(
334                "[{}/{}] Processing: {}",
335                file_idx + 1,
336                files.len(),
337                file_path
338            );
339
340            let mut file_state = combined_result.clone();
341            let single_file_stats = self.process_single_file_streaming(
342                file_path,
343                &mut line_processor,
344                &mut file_state,
345                None, // No shared progress for sequential
346            )?;
347
348            combined_result = aggregator(combined_result, file_state);
349            combined_stats.merge(&single_file_stats);
350
351            if self.config.show_file_completion {
352                eprintln!(
353                    "[{}/{}] ✓ Completed: {} ({} lines)",
354                    file_idx + 1,
355                    files.len(),
356                    file_path.split('/').next_back().unwrap_or(file_path),
357                    crate::utils::format::format_number(single_file_stats.total_lines)
358                );
359            }
360        }
361
362        Ok((combined_result, combined_stats))
363    }
364
365    /// Process files in parallel and collect individual results
366    #[allow(dead_code)]
367    #[allow(clippy::unused_self)]
368    fn process_parallel_collect<T, F>(
369        &self,
370        files: &[String],
371        processor: F,
372    ) -> Result<(Vec<T>, ProcessStats)>
373    where
374        T: Send + 'static,
375        F: Fn(&str) -> Result<T> + Send + Sync,
376    {
377        eprintln!("Processing {} files in parallel...", files.len());
378
379        let total_lines: usize = files
380            .par_iter()
381            .map(|file_path| count_file_lines(file_path).unwrap_or(0))
382            .sum();
383
384        let results: Result<Vec<_>> = files
385            .par_iter()
386            .map(|file_path| processor(file_path))
387            .collect();
388
389        let stats = ProcessStats {
390            total_lines,
391            parsed_entries: 0, // Unknown for collected results
392            skipped_lines: 0,
393            files_processed: files.len(),
394        };
395
396        Ok((results?, stats))
397    }
398
399    /// Process files sequentially and collect individual results
400    #[allow(dead_code)]
401    fn process_sequential_collect<T, F>(
402        &self,
403        files: &[String],
404        processor: F,
405    ) -> Result<(Vec<T>, ProcessStats)>
406    where
407        F: Fn(&str) -> Result<T>,
408    {
409        eprintln!("Processing {} files sequentially...", files.len());
410
411        let mut results = Vec::new();
412        let mut total_lines = 0;
413
414        for (file_idx, file_path) in files.iter().enumerate() {
415            eprintln!(
416                "[{}/{}] Processing: {}",
417                file_idx + 1,
418                files.len(),
419                file_path
420            );
421
422            let result = processor(file_path)?;
423            let lines_processed = count_file_lines(file_path)?;
424
425            results.push(result);
426            total_lines += lines_processed;
427
428            if self.config.show_file_completion {
429                eprintln!(
430                    "[{}/{}] ✓ Completed: {} ({} lines)",
431                    file_idx + 1,
432                    files.len(),
433                    file_path.split('/').next_back().unwrap_or(file_path),
434                    crate::utils::format::format_number(lines_processed)
435                );
436            }
437        }
438
439        let stats = ProcessStats {
440            total_lines,
441            parsed_entries: 0, // Unknown for collected results
442            skipped_lines: 0,
443            files_processed: files.len(),
444        };
445
446        Ok((results, stats))
447    }
448
449    /// Process a single file with streaming and optional progress tracking
450    fn process_single_file_streaming<T, F>(
451        &self,
452        file_path: &str,
453        line_processor: &mut F,
454        state: &mut T,
455        progress: Option<&(Arc<AtomicUsize>, Arc<Mutex<ProgressBar>>)>,
456    ) -> Result<ProcessStats>
457    where
458        F: FnMut(&AuditEntry, &mut T),
459    {
460        let file =
461            open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
462        let reader = BufReader::new(file);
463
464        let mut file_stats = ProcessStats::default();
465
466        for line_result in reader.lines() {
467            let line =
468                line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
469
470            file_stats.total_lines += 1;
471
472            // Update progress if in parallel mode
473            if file_stats.total_lines % self.config.progress_frequency == 0 {
474                if let Some((processed_lines, progress_bar)) = &progress {
475                    processed_lines.fetch_add(self.config.progress_frequency, Ordering::Relaxed);
476                    if let Ok(mut p) = progress_bar.lock() {
477                        p.update(processed_lines.load(Ordering::Relaxed));
478                    }
479                }
480            }
481
482            // Skip empty lines
483            if line.trim().is_empty() {
484                continue;
485            }
486
487            // Parse and process entry
488            match serde_json::from_str::<AuditEntry>(&line) {
489                Ok(entry) => {
490                    file_stats.parsed_entries += 1;
491                    line_processor(&entry, state);
492                }
493                Err(e) => {
494                    file_stats.skipped_lines += 1;
495                    if self.config.strict_parsing {
496                        return Err(e).with_context(|| {
497                            format!(
498                                "Failed to parse JSON at line {} in {}",
499                                file_stats.total_lines, file_path
500                            )
501                        });
502                    }
503                    // Skip invalid lines and continue in non-strict mode
504                }
505            }
506        }
507
508        // Update progress for remaining lines
509        if let Some((processed_lines, progress_bar)) = &progress {
510            let remaining = file_stats.total_lines % self.config.progress_frequency;
511            if remaining > 0 {
512                processed_lines.fetch_add(remaining, Ordering::Relaxed);
513                if let Ok(mut p) = progress_bar.lock() {
514                    p.update(processed_lines.load(Ordering::Relaxed));
515                }
516            }
517        }
518
519        file_stats.files_processed = 1;
520        Ok(file_stats)
521    }
522}
523
524impl Default for FileProcessor {
525    fn default() -> Self {
526        Self::new()
527    }
528}
529
530/// Count lines in a file for progress tracking
531fn count_file_lines(file_path: &str) -> Result<usize> {
532    let file =
533        open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
534    let reader = BufReader::new(file);
535
536    let mut count = 0;
537    for line_result in reader.lines() {
538        line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
539        count += 1;
540    }
541
542    Ok(count)
543}
544
545/// Convenience builder for creating configured processors
546pub struct ProcessorBuilder {
547    config: ProcessorConfig,
548}
549
550impl ProcessorBuilder {
551    /// Create a new processor builder
552    pub fn new() -> Self {
553        Self {
554            config: ProcessorConfig::default(),
555        }
556    }
557
558    /// Set the processing mode
559    #[must_use]
560    pub const fn mode(mut self, mode: ProcessingMode) -> Self {
561        self.config.mode = mode;
562        self
563    }
564
565    /// Set progress update frequency
566    #[must_use]
567    #[allow(dead_code)]
568    pub const fn progress_frequency(mut self, frequency: usize) -> Self {
569        self.config.progress_frequency = frequency;
570        self
571    }
572
573    /// Set whether to show file completion messages
574    #[must_use]
575    #[allow(dead_code)]
576    pub const fn show_file_completion(mut self, show: bool) -> Self {
577        self.config.show_file_completion = show;
578        self
579    }
580
581    /// Set custom progress label
582    #[must_use]
583    pub fn progress_label<S: Into<String>>(mut self, label: S) -> Self {
584        self.config.progress_label = label.into();
585        self
586    }
587
588    /// Enable strict JSON parsing
589    #[must_use]
590    #[allow(dead_code)]
591    pub const fn strict_parsing(mut self, strict: bool) -> Self {
592        self.config.strict_parsing = strict;
593        self
594    }
595
596    /// Build the file processor
597    #[must_use]
598    pub fn build(self) -> FileProcessor {
599        FileProcessor::with_config(self.config)
600    }
601}
602
603impl Default for ProcessorBuilder {
604    fn default() -> Self {
605        Self::new()
606    }
607}