vault_audit_tools/utils/
processor.rs

1//! Unified file processing abstraction for audit log analysis.
2//!
3//! This module provides a DRY abstraction for processing multiple audit log files
4//! with consistent progress tracking, error handling, and support for both
5//! parallel and sequential processing modes.
6
7use crate::audit::types::AuditEntry;
8use crate::utils::progress::ProgressBar;
9use crate::utils::reader::open_file;
10use anyhow::{Context, Result};
11use rayon::prelude::*;
12use std::io::{BufRead, BufReader};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex};
15
16/// Processing mode for file processing
17#[derive(Debug, Clone, Copy)]
18pub enum ProcessingMode {
19    /// Process files in parallel using all available CPU cores
20    Parallel,
21    /// Process files sequentially (one at a time)
22    Sequential,
23    /// Automatically choose based on file count and size
24    Auto,
25}
26
27/// Configuration for file processing
28#[derive(Debug)]
29pub struct ProcessorConfig {
30    /// Processing mode to use
31    pub mode: ProcessingMode,
32    /// Progress update frequency (lines)
33    pub progress_frequency: usize,
34    /// Whether to show detailed per-file completion messages
35    pub show_file_completion: bool,
36    /// Custom progress label
37    pub progress_label: String,
38    /// Whether to use strict JSON parsing (fail on any parse error)
39    pub strict_parsing: bool,
40}
41
42impl Default for ProcessorConfig {
43    fn default() -> Self {
44        Self {
45            mode: ProcessingMode::Auto,
46            progress_frequency: 2000,
47            show_file_completion: true,
48            progress_label: "Processing".to_string(),
49            strict_parsing: false,
50        }
51    }
52}
53
54/// Statistics collected during processing
55#[derive(Debug, Default, Clone)]
56pub struct ProcessStats {
57    /// Total number of lines processed across all files
58    pub total_lines: usize,
59    /// Number of successfully parsed audit entries
60    pub parsed_entries: usize,
61    /// Number of lines skipped due to parse errors
62    pub skipped_lines: usize,
63    /// Number of files processed
64    pub files_processed: usize,
65}
66
67impl ProcessStats {
68    /// Merge another stats object into this one
69    pub fn merge(&mut self, other: &Self) {
70        self.total_lines += other.total_lines;
71        self.parsed_entries += other.parsed_entries;
72        self.skipped_lines += other.skipped_lines;
73        self.files_processed += other.files_processed;
74    }
75
76    /// Print a summary of processing statistics
77    pub fn report(&self) {
78        eprintln!("\nProcessing Summary:");
79        eprintln!("  Files processed: {}", self.files_processed);
80        eprintln!(
81            "  Total lines: {}",
82            crate::utils::format::format_number(self.total_lines)
83        );
84        eprintln!(
85            "  Parsed entries: {}",
86            crate::utils::format::format_number(self.parsed_entries)
87        );
88        if self.skipped_lines > 0 {
89            let skip_percentage = (self.skipped_lines as f64 / self.total_lines as f64) * 100.0;
90            eprintln!(
91                "  Skipped lines: {} ({:.2}%)",
92                crate::utils::format::format_number(self.skipped_lines),
93                skip_percentage
94            );
95        }
96    }
97}
98
99/// Unified file processor that handles both parallel and sequential processing
100pub struct FileProcessor {
101    config: ProcessorConfig,
102}
103
104impl FileProcessor {
105    /// Create a new file processor with default configuration
106    pub fn new() -> Self {
107        Self {
108            config: ProcessorConfig::default(),
109        }
110    }
111
112    /// Create a new file processor with custom configuration
113    pub const fn with_config(config: ProcessorConfig) -> Self {
114        Self { config }
115    }
116
117    /// Process multiple files with a streaming line-by-line processor
118    ///
119    /// This is the main entry point for file processing. It automatically handles:
120    /// - Progress tracking with accurate totals and ETA
121    /// - Parallel or sequential processing based on configuration
122    /// - Error handling and context
123    /// - Memory-efficient streaming
124    ///
125    /// # Arguments
126    /// * `files` - List of file paths to process
127    /// * `line_processor` - Function that processes individual audit entries
128    /// * `aggregator` - Function that combines results from all files
129    /// * `initial` - Initial value for the aggregator
130    ///
131    /// # Example
132    /// ```rust,ignore
133    /// let processor = FileProcessor::new();
134    /// let result = processor.process_files_streaming(
135    ///     &files,
136    ///     |entry, state| {
137    ///         // Process each audit entry
138    ///         state.counter += 1;
139    ///     },
140    ///     |acc, file_result| {
141    ///         // Combine results from each file
142    ///         acc.merge(file_result)
143    ///     },
144    ///     MyState::new(),
145    /// )?;
146    /// ```
147    pub fn process_files_streaming<T, F, A>(
148        &self,
149        files: &[String],
150        line_processor: F,
151        aggregator: A,
152        initial: T,
153    ) -> Result<(T, ProcessStats)>
154    where
155        T: Send + Clone + Sync + 'static,
156        F: FnMut(&AuditEntry, &mut T) + Send + Sync + Clone,
157        A: Fn(T, T) -> T + Send + Sync,
158    {
159        if files.is_empty() {
160            return Ok((initial, ProcessStats::default()));
161        }
162
163        let mode = self.determine_processing_mode(files);
164
165        match mode {
166            ProcessingMode::Parallel => {
167                self.process_parallel_streaming(files, line_processor, aggregator, initial)
168            }
169            ProcessingMode::Sequential => {
170                self.process_sequential_streaming(files, line_processor, aggregator, initial)
171            }
172            ProcessingMode::Auto => unreachable!(), // determine_processing_mode resolves this
173        }
174    }
175
176    /// Process multiple files and collect results into a collection
177    ///
178    /// This is a convenience method for cases where you want to collect
179    /// individual results from each file rather than aggregating.
180    #[allow(dead_code)]
181    pub fn process_files_collect<T, F>(
182        &self,
183        files: &[String],
184        processor: F,
185    ) -> Result<(Vec<T>, ProcessStats)>
186    where
187        T: Send + 'static,
188        F: Fn(&str) -> Result<T> + Send + Sync,
189    {
190        if files.is_empty() {
191            return Ok((Vec::new(), ProcessStats::default()));
192        }
193
194        let mode = self.determine_processing_mode(files);
195
196        match mode {
197            ProcessingMode::Parallel => self.process_parallel_collect(files, processor),
198            ProcessingMode::Sequential => self.process_sequential_collect(files, processor),
199            ProcessingMode::Auto => unreachable!(),
200        }
201    }
202
203    /// Determine the optimal processing mode based on files and configuration
204    const fn determine_processing_mode(&self, files: &[String]) -> ProcessingMode {
205        match self.config.mode {
206            ProcessingMode::Auto => {
207                if files.len() == 1 {
208                    ProcessingMode::Sequential
209                } else if files.len() >= 2 {
210                    ProcessingMode::Parallel
211                } else {
212                    ProcessingMode::Sequential
213                }
214            }
215            mode => mode,
216        }
217    }
218
219    /// Process files in parallel with streaming
220    fn process_parallel_streaming<T, F, A>(
221        &self,
222        files: &[String],
223        line_processor: F,
224        aggregator: A,
225        initial: T,
226    ) -> Result<(T, ProcessStats)>
227    where
228        T: Send + Clone + Sync + 'static,
229        F: FnMut(&AuditEntry, &mut T) + Send + Sync + Clone,
230        A: Fn(T, T) -> T + Send + Sync,
231    {
232        eprintln!("Processing {} files in parallel...", files.len());
233
234        // Pre-scan to determine total work
235        eprintln!("Scanning files to determine total work...");
236        let total_lines: usize = files
237            .par_iter()
238            .map(|file_path| count_file_lines(file_path).unwrap_or(0))
239            .sum();
240
241        eprintln!(
242            "Total lines to process: {}",
243            crate::utils::format::format_number(total_lines)
244        );
245
246        let processed_lines = Arc::new(AtomicUsize::new(0));
247        let progress = Arc::new(Mutex::new(ProgressBar::new(
248            total_lines,
249            &self.config.progress_label,
250        )));
251
252        // Process files in parallel
253        let results: Result<Vec<_>> = files
254            .par_iter()
255            .enumerate()
256            .map(|(idx, file_path)| -> Result<(T, ProcessStats)> {
257                let mut file_state = initial.clone();
258                let mut local_processor = line_processor.clone();
259
260                let progress_ref = (processed_lines.clone(), progress.clone());
261                let stats = self.process_single_file_streaming(
262                    file_path,
263                    &mut local_processor,
264                    &mut file_state,
265                    Some(&progress_ref),
266                )?;
267
268                if self.config.show_file_completion {
269                    let lines_count = count_file_lines(file_path)?;
270                    if let Ok(progress) = progress.lock() {
271                        progress.println(format!(
272                            "[{}/{}] ✓ Completed: {} ({} lines)",
273                            idx + 1,
274                            files.len(),
275                            file_path.split('/').next_back().unwrap_or(file_path),
276                            crate::utils::format::format_number(lines_count)
277                        ));
278                    }
279                }
280
281                Ok((file_state, stats))
282            })
283            .collect();
284
285        let results = results?;
286
287        // Finish progress bar with final message
288        if let Ok(progress) = progress.lock() {
289            progress.finish_with_message(&format!(
290                "Processed {} total lines",
291                crate::utils::format::format_number(processed_lines.load(Ordering::Relaxed))
292            ));
293        }
294
295        // Aggregate all results
296        let mut combined_stats = ProcessStats::default();
297        let final_result = results
298            .into_iter()
299            .fold(initial, |acc, (file_result, file_stats)| {
300                combined_stats.merge(&file_stats);
301                aggregator(acc, file_result)
302            });
303
304        Ok((final_result, combined_stats))
305    }
306
307    /// Process files sequentially with streaming
308    fn process_sequential_streaming<T, F, A>(
309        &self,
310        files: &[String],
311        mut line_processor: F,
312        aggregator: A,
313        initial: T,
314    ) -> Result<(T, ProcessStats)>
315    where
316        T: Send + Clone + Sync,
317        F: FnMut(&AuditEntry, &mut T) + Send + Sync,
318        A: Fn(T, T) -> T + Send + Sync,
319    {
320        eprintln!("Processing {} files sequentially...", files.len());
321
322        let mut combined_result = initial;
323        let mut combined_stats = ProcessStats::default();
324
325        for (file_idx, file_path) in files.iter().enumerate() {
326            eprintln!(
327                "[{}/{}] Processing: {}",
328                file_idx + 1,
329                files.len(),
330                file_path
331            );
332
333            let mut file_state = combined_result.clone();
334            let single_file_stats = self.process_single_file_streaming(
335                file_path,
336                &mut line_processor,
337                &mut file_state,
338                None, // No shared progress for sequential
339            )?;
340
341            combined_result = aggregator(combined_result, file_state);
342            combined_stats.merge(&single_file_stats);
343
344            if self.config.show_file_completion {
345                eprintln!(
346                    "[{}/{}] ✓ Completed: {} ({} lines)",
347                    file_idx + 1,
348                    files.len(),
349                    file_path.split('/').next_back().unwrap_or(file_path),
350                    crate::utils::format::format_number(single_file_stats.total_lines)
351                );
352            }
353        }
354
355        Ok((combined_result, combined_stats))
356    }
357
358    /// Process files in parallel and collect individual results
359    #[allow(dead_code)]
360    #[allow(clippy::unused_self)]
361    fn process_parallel_collect<T, F>(
362        &self,
363        files: &[String],
364        processor: F,
365    ) -> Result<(Vec<T>, ProcessStats)>
366    where
367        T: Send + 'static,
368        F: Fn(&str) -> Result<T> + Send + Sync,
369    {
370        eprintln!("Processing {} files in parallel...", files.len());
371
372        let total_lines: usize = files
373            .par_iter()
374            .map(|file_path| count_file_lines(file_path).unwrap_or(0))
375            .sum();
376
377        let results: Result<Vec<_>> = files
378            .par_iter()
379            .map(|file_path| processor(file_path))
380            .collect();
381
382        let stats = ProcessStats {
383            total_lines,
384            parsed_entries: 0, // Unknown for collected results
385            skipped_lines: 0,
386            files_processed: files.len(),
387        };
388
389        Ok((results?, stats))
390    }
391
392    /// Process files sequentially and collect individual results
393    #[allow(dead_code)]
394    fn process_sequential_collect<T, F>(
395        &self,
396        files: &[String],
397        processor: F,
398    ) -> Result<(Vec<T>, ProcessStats)>
399    where
400        F: Fn(&str) -> Result<T>,
401    {
402        eprintln!("Processing {} files sequentially...", files.len());
403
404        let mut results = Vec::new();
405        let mut total_lines = 0;
406
407        for (file_idx, file_path) in files.iter().enumerate() {
408            eprintln!(
409                "[{}/{}] Processing: {}",
410                file_idx + 1,
411                files.len(),
412                file_path
413            );
414
415            let result = processor(file_path)?;
416            let lines_processed = count_file_lines(file_path)?;
417
418            results.push(result);
419            total_lines += lines_processed;
420
421            if self.config.show_file_completion {
422                eprintln!(
423                    "[{}/{}] ✓ Completed: {} ({} lines)",
424                    file_idx + 1,
425                    files.len(),
426                    file_path.split('/').next_back().unwrap_or(file_path),
427                    crate::utils::format::format_number(lines_processed)
428                );
429            }
430        }
431
432        let stats = ProcessStats {
433            total_lines,
434            parsed_entries: 0, // Unknown for collected results
435            skipped_lines: 0,
436            files_processed: files.len(),
437        };
438
439        Ok((results, stats))
440    }
441
442    /// Process a single file with streaming and optional progress tracking
443    fn process_single_file_streaming<T, F>(
444        &self,
445        file_path: &str,
446        line_processor: &mut F,
447        state: &mut T,
448        progress: Option<&(Arc<AtomicUsize>, Arc<Mutex<ProgressBar>>)>,
449    ) -> Result<ProcessStats>
450    where
451        F: FnMut(&AuditEntry, &mut T),
452    {
453        let file =
454            open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
455        let reader = BufReader::new(file);
456
457        let mut file_stats = ProcessStats::default();
458
459        for line_result in reader.lines() {
460            let line =
461                line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
462
463            file_stats.total_lines += 1;
464
465            // Update progress if in parallel mode
466            if file_stats.total_lines % self.config.progress_frequency == 0 {
467                if let Some((processed_lines, progress_bar)) = &progress {
468                    processed_lines.fetch_add(self.config.progress_frequency, Ordering::Relaxed);
469                    if let Ok(p) = progress_bar.lock() {
470                        p.update(processed_lines.load(Ordering::Relaxed));
471                    }
472                }
473            }
474
475            // Skip empty lines
476            if line.trim().is_empty() {
477                continue;
478            }
479
480            // Parse and process entry
481            match serde_json::from_str::<AuditEntry>(&line) {
482                Ok(entry) => {
483                    file_stats.parsed_entries += 1;
484                    line_processor(&entry, state);
485                }
486                Err(e) => {
487                    file_stats.skipped_lines += 1;
488                    if self.config.strict_parsing {
489                        return Err(e).with_context(|| {
490                            format!(
491                                "Failed to parse JSON at line {} in {}",
492                                file_stats.total_lines, file_path
493                            )
494                        });
495                    }
496                    // Skip invalid lines and continue in non-strict mode
497                }
498            }
499        }
500
501        // Update progress for remaining lines
502        if let Some((processed_lines, progress_bar)) = &progress {
503            let remaining = file_stats.total_lines % self.config.progress_frequency;
504            if remaining > 0 {
505                processed_lines.fetch_add(remaining, Ordering::Relaxed);
506                if let Ok(p) = progress_bar.lock() {
507                    p.update(processed_lines.load(Ordering::Relaxed));
508                }
509            }
510        }
511
512        file_stats.files_processed = 1;
513        Ok(file_stats)
514    }
515}
516
517impl Default for FileProcessor {
518    fn default() -> Self {
519        Self::new()
520    }
521}
522
523/// Count lines in a file for progress tracking
524fn count_file_lines(file_path: &str) -> Result<usize> {
525    let file =
526        open_file(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
527    let reader = BufReader::new(file);
528
529    let mut count = 0;
530    for line_result in reader.lines() {
531        line_result.with_context(|| format!("Failed to read line from {}", file_path))?;
532        count += 1;
533    }
534
535    Ok(count)
536}
537
538/// Convenience builder for creating configured processors
539pub struct ProcessorBuilder {
540    config: ProcessorConfig,
541}
542
543impl ProcessorBuilder {
544    /// Create a new processor builder
545    pub fn new() -> Self {
546        Self {
547            config: ProcessorConfig::default(),
548        }
549    }
550
551    /// Set the processing mode
552    #[must_use]
553    pub const fn mode(mut self, mode: ProcessingMode) -> Self {
554        self.config.mode = mode;
555        self
556    }
557
558    /// Set progress update frequency
559    #[must_use]
560    #[allow(dead_code)]
561    pub const fn progress_frequency(mut self, frequency: usize) -> Self {
562        self.config.progress_frequency = frequency;
563        self
564    }
565
566    /// Set whether to show file completion messages
567    #[must_use]
568    #[allow(dead_code)]
569    pub const fn show_file_completion(mut self, show: bool) -> Self {
570        self.config.show_file_completion = show;
571        self
572    }
573
574    /// Set custom progress label
575    #[must_use]
576    pub fn progress_label<S: Into<String>>(mut self, label: S) -> Self {
577        self.config.progress_label = label.into();
578        self
579    }
580
581    /// Enable strict JSON parsing
582    #[must_use]
583    #[allow(dead_code)]
584    pub const fn strict_parsing(mut self, strict: bool) -> Self {
585        self.config.strict_parsing = strict;
586        self
587    }
588
589    /// Build the file processor
590    #[must_use]
591    pub fn build(self) -> FileProcessor {
592        FileProcessor::with_config(self.config)
593    }
594}
595
596impl Default for ProcessorBuilder {
597    fn default() -> Self {
598        Self::new()
599    }
600}