vault_audit_tools/commands/
client_traffic_analysis.rs

1//! Client traffic analysis for understanding request patterns and client behavior.
2//!
3//! Analyzes aggregated audit logs to provide insights into:
4//! - Client-to-Vault traffic patterns (top clients, request volumes)
5//! - Request distribution analysis (temporal patterns, operation types)
6//! - Client behavior clustering (automated vs interactive patterns)
7//!
8//! # Usage
9//!
10//! ```bash
11//! # Analyze all audit logs
12//! vault-audit client-traffic-analysis audit*.log
13//!
14//! # Export detailed metrics to CSV
15//! vault-audit client-traffic-analysis audit*.log --output traffic.csv --format csv
16//!
17//! # Analyze compressed logs
18//! vault-audit client-traffic-analysis logs/*.log.gz
19//! ```
20
21use crate::audit::types::AuditEntry;
22use crate::utils::format::format_number;
23use crate::utils::parallel::process_files_parallel;
24use crate::utils::progress::ProgressBar;
25use anyhow::{Context, Result};
26use chrono::{DateTime, Timelike, Utc};
27use serde::Serialize;
28use std::collections::HashMap;
29use std::io::{BufRead, BufReader};
30use std::sync::atomic::{AtomicUsize, Ordering};
31use std::sync::{Arc, Mutex, OnceLock};
32
33/// Detailed error instance linking entity, error type, and path
34#[derive(Debug, Clone)]
35struct ErrorInstance {
36    entity_id: String,
37    display_name: String,
38    error_type: String,
39    path: String,
40    timestamp: String,
41}
42
43/// Statistics for a single client
44#[derive(Debug, Clone)]
45struct ClientStats {
46    /// Total number of requests from this client
47    request_count: usize,
48    /// Breakdown by operation type (read, write, list, delete)
49    operations: HashMap<String, usize>,
50    /// Breakdown by path accessed
51    paths: HashMap<String, usize>,
52    /// Breakdown by mount point
53    mount_points: HashMap<String, usize>,
54    /// Unique entities accessing from this client
55    entities: HashMap<String, String>, // entity_id -> display_name
56    /// First seen timestamp
57    first_seen: Option<String>,
58    /// Last seen timestamp
59    last_seen: Option<String>,
60    /// Requests with errors
61    error_count: usize,
62    /// Breakdown of error types
63    error_types: HashMap<String, usize>,
64    /// Paths that generated errors
65    error_paths: HashMap<String, usize>,
66    /// Detailed error instances (entity + error + path + timestamp)
67    error_instances: Vec<ErrorInstance>,
68    /// Requests by hour of day (0-23)
69    hourly_distribution: HashMap<u32, usize>,
70}
71
72/// Export structure for client metrics
73#[derive(Debug, Serialize)]
74struct ClientExport {
75    client_ip: String,
76    total_requests: usize,
77    unique_entities: usize,
78    unique_paths: usize,
79    unique_mount_points: usize,
80    error_count: usize,
81    error_rate: f64,
82    first_seen: String,
83    last_seen: String,
84    top_operation: String,
85    top_operation_count: usize,
86    top_path: String,
87    top_path_count: usize,
88    // Error details
89    top_error_type: String,
90    top_error_type_count: usize,
91    top_error_type_percentage: f64,
92    second_error_type: String,
93    second_error_type_count: usize,
94    third_error_type: String,
95    third_error_type_count: usize,
96    top_error_path: String,
97    top_error_path_count: usize,
98    classification: String,
99}
100
101impl ClientStats {
102    fn new() -> Self {
103        Self {
104            request_count: 0,
105            operations: HashMap::new(),
106            paths: HashMap::new(),
107            mount_points: HashMap::new(),
108            entities: HashMap::new(),
109            first_seen: None,
110            last_seen: None,
111            error_count: 0,
112            error_types: HashMap::new(),
113            error_paths: HashMap::new(),
114            error_instances: Vec::new(),
115            hourly_distribution: HashMap::new(),
116        }
117    }
118
119    /// Update stats with a new entry
120    fn update(&mut self, entry: &AuditEntry) {
121        self.request_count += 1;
122
123        // Track operation type
124        if let Some(op) = entry.operation() {
125            *self.operations.entry(op.to_string()).or_insert(0) += 1;
126        }
127
128        // Track path
129        if let Some(path) = entry.path() {
130            *self.paths.entry(path.to_string()).or_insert(0) += 1;
131        }
132
133        // Track mount point
134        if let Some(mp) = entry.mount_point() {
135            *self.mount_points.entry(mp.to_string()).or_insert(0) += 1;
136        }
137
138        // Track entity
139        if let Some(entity_id) = entry.entity_id() {
140            if let Some(display_name) = entry.display_name() {
141                self.entities
142                    .entry(entity_id.to_string())
143                    .or_insert_with(|| display_name.to_string());
144            }
145        }
146
147        // Track timestamps
148        if self.first_seen.is_none() {
149            self.first_seen = Some(entry.time.clone());
150        }
151        self.last_seen = Some(entry.time.clone());
152
153        // Track errors with detailed information
154        if let Some(error_msg) = &entry.error {
155            self.error_count += 1;
156
157            // Clean and categorize error message
158            let cleaned_error = error_msg.trim().replace(['\n', '\t'], " ");
159
160            // Extract the core error type
161            let error_type = if cleaned_error.contains("permission denied") {
162                "permission denied"
163            } else if cleaned_error.contains("service account name not authorized") {
164                "service account not authorized"
165            } else if cleaned_error.contains("namespace not authorized") {
166                "namespace not authorized"
167            } else if cleaned_error.contains("invalid credentials") {
168                "invalid credentials"
169            } else if cleaned_error.contains("wrapping token") {
170                "invalid wrapping token"
171            } else if cleaned_error.contains("internal error") {
172                "internal error"
173            } else if cleaned_error.contains("unsupported operation") {
174                "unsupported operation"
175            } else if cleaned_error.contains("max TTL") {
176                "max TTL exceeded"
177            } else if cleaned_error.is_empty() || cleaned_error == "null" {
178                "unknown error"
179            } else {
180                // Use first 50 chars of error message as type
181                if cleaned_error.len() > 50 {
182                    &cleaned_error[..50]
183                } else {
184                    &cleaned_error
185                }
186            };
187
188            *self.error_types.entry(error_type.to_string()).or_insert(0) += 1;
189
190            // Track which path generated the error
191            let path = entry.path().unwrap_or("unknown").to_string();
192            *self.error_paths.entry(path.clone()).or_insert(0) += 1;
193
194            // Create detailed error instance linking entity, error, and path
195            let entity_id = entry.entity_id().unwrap_or("unknown").to_string();
196            let display_name = entry.display_name().unwrap_or("unknown").to_string();
197
198            self.error_instances.push(ErrorInstance {
199                entity_id,
200                display_name,
201                error_type: error_type.to_string(),
202                path,
203                timestamp: entry.time.clone(),
204            });
205        }
206
207        // Track hourly distribution
208        if let Ok(dt) = entry.time.parse::<DateTime<Utc>>() {
209            let hour = dt.hour();
210            *self.hourly_distribution.entry(hour).or_insert(0) += 1;
211        }
212    }
213
214    /// Merge another `ClientStats` into this one
215    fn merge(&mut self, other: Self) {
216        self.request_count += other.request_count;
217        self.error_count += other.error_count;
218
219        // Merge operations
220        for (op, count) in other.operations {
221            *self.operations.entry(op).or_insert(0) += count;
222        }
223
224        // Merge paths
225        for (path, count) in other.paths {
226            *self.paths.entry(path).or_insert(0) += count;
227        }
228
229        // Merge mount points
230        for (mp, count) in other.mount_points {
231            *self.mount_points.entry(mp).or_insert(0) += count;
232        }
233
234        // Merge entities
235        for (entity_id, display_name) in other.entities {
236            self.entities.entry(entity_id).or_insert(display_name);
237        }
238
239        // Merge error types
240        for (error_type, count) in other.error_types {
241            *self.error_types.entry(error_type).or_insert(0) += count;
242        }
243
244        // Merge error paths
245        for (path, count) in other.error_paths {
246            *self.error_paths.entry(path).or_insert(0) += count;
247        }
248
249        // Merge error instances
250        self.error_instances.extend(other.error_instances);
251
252        // Merge hourly distribution
253        for (hour, count) in other.hourly_distribution {
254            *self.hourly_distribution.entry(hour).or_insert(0) += count;
255        }
256
257        // Update timestamps
258        if self.first_seen.is_none()
259            || (other.first_seen.is_some() && other.first_seen < self.first_seen)
260        {
261            self.first_seen = other.first_seen;
262        }
263        if self.last_seen.is_none()
264            || (other.last_seen.is_some() && other.last_seen > self.last_seen)
265        {
266            self.last_seen = other.last_seen;
267        }
268    }
269
270    /// Classify client behavior
271    fn classify_behavior(&self) -> String {
272        let paths_per_request = self.paths.len() as f64 / self.request_count as f64;
273        if self.request_count > 1000 || paths_per_request < 0.1 {
274            "automated".to_string()
275        } else {
276            "interactive".to_string()
277        }
278    }
279
280    /// Convert to export format
281    fn to_export(&self, client_ip: String) -> ClientExport {
282        let error_rate = if self.request_count > 0 {
283            (self.error_count as f64 / self.request_count as f64) * 100.0
284        } else {
285            0.0
286        };
287
288        let (top_operation, top_operation_count) = self
289            .operations
290            .iter()
291            .max_by_key(|(_, count)| *count)
292            .map_or_else(
293                || ("none".to_string(), 0),
294                |(op, count)| (op.clone(), *count),
295            );
296
297        let (top_path, top_path_count) = self
298            .paths
299            .iter()
300            .max_by_key(|(_, count)| *count)
301            .map_or_else(
302                || ("none".to_string(), 0),
303                |(path, count)| (path.clone(), *count),
304            );
305
306        // Get top 3 error types
307        let mut error_types_sorted: Vec<_> = self.error_types.iter().collect();
308        error_types_sorted.sort_by(|a, b| b.1.cmp(a.1));
309
310        let (top_error_type, top_error_type_count) = error_types_sorted
311            .first()
312            .map_or_else(|| ("none".to_string(), 0), |(t, c)| ((*t).clone(), **c));
313
314        let top_error_type_percentage = if self.error_count > 0 {
315            (top_error_type_count as f64 / self.error_count as f64) * 100.0
316        } else {
317            0.0
318        };
319
320        let (second_error_type, second_error_type_count) = error_types_sorted
321            .get(1)
322            .map_or_else(|| ("none".to_string(), 0), |(t, c)| ((*t).clone(), **c));
323
324        let (third_error_type, third_error_type_count) = error_types_sorted
325            .get(2)
326            .map_or_else(|| ("none".to_string(), 0), |(t, c)| ((*t).clone(), **c));
327
328        // Get top error path
329        let (top_error_path, top_error_path_count) = self
330            .error_paths
331            .iter()
332            .max_by_key(|(_, count)| *count)
333            .map_or_else(
334                || ("none".to_string(), 0),
335                |(path, count)| (path.clone(), *count),
336            );
337
338        ClientExport {
339            client_ip,
340            total_requests: self.request_count,
341            unique_entities: self.entities.len(),
342            unique_paths: self.paths.len(),
343            unique_mount_points: self.mount_points.len(),
344            error_count: self.error_count,
345            error_rate,
346            first_seen: self
347                .first_seen
348                .clone()
349                .unwrap_or_else(|| "unknown".to_string()),
350            last_seen: self
351                .last_seen
352                .clone()
353                .unwrap_or_else(|| "unknown".to_string()),
354            top_operation,
355            top_operation_count,
356            top_path,
357            top_path_count,
358            top_error_type,
359            top_error_type_count,
360            top_error_type_percentage,
361            second_error_type,
362            second_error_type_count,
363            third_error_type,
364            third_error_type_count,
365            top_error_path,
366            top_error_path_count,
367            classification: self.classify_behavior(),
368        }
369    }
370}
371
372/// Global progress tracking for parallel processing
373static PARALLEL_PROGRESS: OnceLock<(Arc<AtomicUsize>, Arc<Mutex<ProgressBar>>)> = OnceLock::new();
374
375/// Initialize parallel progress tracking (called by parallel processor)
376pub fn init_parallel_progress(processed: Arc<AtomicUsize>, progress: Arc<Mutex<ProgressBar>>) {
377    let _ = PARALLEL_PROGRESS.set((processed, progress));
378}
379
380/// Overall traffic statistics
381#[derive(Debug)]
382struct TrafficStats {
383    /// Stats per client IP
384    clients: HashMap<String, ClientStats>,
385    /// Total requests processed
386    total_requests: usize,
387}
388
389impl TrafficStats {
390    fn new() -> Self {
391        Self {
392            clients: HashMap::new(),
393            total_requests: 0,
394        }
395    }
396
397    fn merge(&mut self, other: Self) {
398        self.total_requests += other.total_requests;
399
400        for (client_ip, stats) in other.clients {
401            self.clients
402                .entry(client_ip)
403                .or_insert_with(ClientStats::new)
404                .merge(stats);
405        }
406    }
407}
408
409/// Process a single file and extract client traffic stats
410fn process_file(file_path: &str) -> Result<TrafficStats> {
411    let file = crate::utils::reader::open_file(file_path)?;
412    let reader = BufReader::new(file);
413
414    let mut stats = TrafficStats::new();
415    let mut lines_processed = 0usize;
416
417    // Check if we're in parallel mode with progress tracking
418    let parallel_progress = PARALLEL_PROGRESS.get();
419
420    for line_result in reader.lines() {
421        let line = line_result?;
422        lines_processed += 1;
423
424        // Update progress every 1000 lines to reduce contention
425        if lines_processed % 1000 == 0 {
426            if let Some((processed_lines, progress)) = parallel_progress {
427                processed_lines.fetch_add(1000, Ordering::Relaxed);
428                if let Ok(progress) = progress.lock() {
429                    progress.inc(1000);
430                }
431            }
432        }
433
434        // Skip empty lines
435        if line.trim().is_empty() {
436            continue;
437        }
438
439        // Parse JSON entry
440        let entry: AuditEntry = match serde_json::from_str(&line) {
441            Ok(entry) => entry,
442            Err(_) => continue,
443        };
444
445        // Only process request entries (responses are duplicates)
446        if entry.entry_type != "request" {
447            continue;
448        }
449
450        // Get client IP
451        let Some(client_ip) = entry.remote_address() else {
452            continue;
453        };
454
455        // Update client stats
456        stats
457            .clients
458            .entry(client_ip.to_string())
459            .or_insert_with(ClientStats::new)
460            .update(&entry);
461
462        stats.total_requests += 1;
463    }
464
465    // Update progress with any remaining lines
466    let remainder = lines_processed % 1000;
467    if remainder > 0 {
468        if let Some((processed_lines, progress)) = parallel_progress {
469            processed_lines.fetch_add(remainder, Ordering::Relaxed);
470            if let Ok(progress) = progress.lock() {
471                progress.inc(remainder as u64);
472            }
473        }
474    }
475
476    Ok(stats)
477}
478
479/// Main command function
480#[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
481pub fn run(
482    log_files: &[String],
483    output: Option<String>,
484    format: Option<&str>,
485    error_details_output: Option<String>,
486    top_n: usize,
487    show_temporal: bool,
488    min_requests: usize,
489    show_operations: bool,
490    show_errors: bool,
491    show_details: bool,
492) -> Result<()> {
493    if log_files.len() == 1 {
494        eprintln!("Analyzing client traffic patterns from 1 file...");
495    } else {
496        eprintln!(
497            "Analyzing client traffic patterns from {} files...",
498            log_files.len()
499        );
500    }
501
502    // Process files in parallel
503    let (combined_stats, _total_lines) =
504        process_files_parallel(log_files, process_file, |results| {
505            let mut combined = TrafficStats::new();
506            for result in results {
507                combined.merge(result.data);
508            }
509            combined
510        })?;
511
512    // Filter clients by minimum request threshold
513    let filtered_stats = if min_requests > 1 {
514        let mut filtered = TrafficStats::new();
515        filtered.total_requests = combined_stats.total_requests;
516        for (ip, stats) in combined_stats.clients {
517            if stats.request_count >= min_requests {
518                filtered.clients.insert(ip, stats);
519            }
520        }
521        filtered
522    } else {
523        combined_stats
524    };
525
526    // Export summary data if requested
527    if let Some(output_file) = output {
528        export_data(&filtered_stats, &output_file, format)?;
529        eprintln!("Exported summary data to {}", output_file);
530    }
531
532    // Export detailed error analysis with entity information if requested
533    if let Some(error_output_file) = error_details_output {
534        export_error_details(&filtered_stats, &error_output_file)?;
535        eprintln!(
536            "Exported detailed error analysis (with entities) to {}",
537            error_output_file
538        );
539    }
540
541    // Generate report
542    print_summary(&filtered_stats);
543    print_top_clients(&filtered_stats, top_n);
544    print_client_behavior_analysis(&filtered_stats);
545
546    if show_operations {
547        print_operation_breakdown(&filtered_stats, top_n.min(10));
548    }
549
550    if show_errors {
551        print_error_analysis(&filtered_stats, top_n.min(10));
552    }
553
554    if show_details {
555        print_detailed_client_analysis(&filtered_stats, top_n.min(10));
556    }
557
558    if show_temporal {
559        print_temporal_analysis(&filtered_stats, top_n.min(10));
560    }
561
562    Ok(())
563}
564
565/// Print overall summary
566fn print_summary(stats: &TrafficStats) {
567    println!("\n{}", "=".repeat(100));
568    println!("Client Traffic Analysis Summary");
569    println!("{}", "=".repeat(100));
570    println!("Total Requests: {}", format_number(stats.total_requests));
571    println!("Unique Clients: {}", format_number(stats.clients.len()));
572    println!(
573        "Avg Requests per Client: {:.2}",
574        stats.total_requests as f64 / stats.clients.len() as f64
575    );
576}
577
578/// Print top clients by request volume
579fn print_top_clients(stats: &TrafficStats, top_n: usize) {
580    println!("\n{}", "=".repeat(100));
581    println!("Top {} Clients by Request Volume", top_n);
582    println!("{}", "=".repeat(100));
583    println!(
584        "{:<20} {:>15} {:>15} {:>15} {:>15}",
585        "Client IP", "Requests", "Entities", "Errors", "Error %"
586    );
587    println!("{}", "-".repeat(100));
588
589    let mut clients: Vec<_> = stats.clients.iter().collect();
590    clients.sort_by(|a, b| b.1.request_count.cmp(&a.1.request_count));
591
592    for (ip, client_stats) in clients.iter().take(top_n) {
593        let error_pct = if client_stats.request_count > 0 {
594            (client_stats.error_count as f64 / client_stats.request_count as f64) * 100.0
595        } else {
596            0.0
597        };
598
599        println!(
600            "{:<20} {:>15} {:>15} {:>15} {:>14.2}%",
601            ip,
602            format_number(client_stats.request_count),
603            format_number(client_stats.entities.len()),
604            format_number(client_stats.error_count),
605            error_pct
606        );
607    }
608}
609
610/// Analyze and print client behavior patterns
611fn print_client_behavior_analysis(stats: &TrafficStats) {
612    println!("\n{}", "=".repeat(100));
613    println!("Client Behavior Analysis");
614    println!("{}", "=".repeat(100));
615
616    // Categorize clients
617    let mut automated_clients = Vec::new();
618    let mut interactive_clients = Vec::new();
619
620    for (ip, client_stats) in &stats.clients {
621        // Heuristic: Automated clients typically have higher request volumes
622        // and access fewer unique paths per request
623        let paths_per_request = client_stats.paths.len() as f64 / client_stats.request_count as f64;
624
625        if client_stats.request_count > 1000 || paths_per_request < 0.1 {
626            automated_clients.push((ip, client_stats));
627        } else {
628            interactive_clients.push((ip, client_stats));
629        }
630    }
631
632    println!(
633        "Automated Clients (likely services): {}",
634        automated_clients.len()
635    );
636    println!(
637        "Interactive Clients (likely users): {}",
638        interactive_clients.len()
639    );
640
641    // Show top automated clients
642    if !automated_clients.is_empty() {
643        println!("\nTop Automated Clients:");
644        println!(
645            "{:<20} {:>15} {:>15}",
646            "Client IP", "Requests", "Unique Paths"
647        );
648        println!("{}", "-".repeat(60));
649
650        automated_clients.sort_by(|a, b| b.1.request_count.cmp(&a.1.request_count));
651        for (ip, stats) in automated_clients.iter().take(10) {
652            println!(
653                "{:<20} {:>15} {:>15}",
654                ip,
655                format_number(stats.request_count),
656                format_number(stats.paths.len())
657            );
658        }
659    }
660}
661
662/// Print operation type breakdown for top clients
663fn print_operation_breakdown(stats: &TrafficStats, top_n: usize) {
664    println!("\n{}", "=".repeat(100));
665    println!("Operation Type Breakdown - Top {} Clients", top_n);
666    println!("{}", "=".repeat(100));
667
668    let mut clients: Vec<_> = stats.clients.iter().collect();
669    clients.sort_by(|a, b| b.1.request_count.cmp(&a.1.request_count));
670
671    for (ip, client_stats) in clients.iter().take(top_n) {
672        println!(
673            "\nClient: {} (Total: {})",
674            ip,
675            format_number(client_stats.request_count)
676        );
677        println!("{}", "-".repeat(80));
678
679        let mut operations: Vec<_> = client_stats.operations.iter().collect();
680        operations.sort_by(|a, b| b.1.cmp(a.1));
681
682        println!("{:<30} {:>15} {:>15}", "Operation", "Count", "Percentage");
683        println!("{}", "-".repeat(60));
684
685        for (op, count) in operations {
686            let percentage = (*count as f64 / client_stats.request_count as f64) * 100.0;
687            println!(
688                "{:<30} {:>15} {:>14.2}%",
689                op,
690                format_number(*count),
691                percentage
692            );
693        }
694    }
695}
696
697/// Print error analysis for clients with significant errors
698fn print_error_analysis(stats: &TrafficStats, top_n: usize) {
699    println!("\n{}", "=".repeat(100));
700    println!("Error Analysis - Clients with Errors");
701    println!("{}", "=".repeat(100));
702
703    let mut clients_with_errors: Vec<_> = stats
704        .clients
705        .iter()
706        .filter(|(_, client)| client.error_count > 0)
707        .collect();
708
709    clients_with_errors.sort_by(|a, b| b.1.error_count.cmp(&a.1.error_count));
710
711    if clients_with_errors.is_empty() {
712        println!("No errors detected in the analyzed logs.");
713        return;
714    }
715
716    println!(
717        "{:<20} {:>15} {:>15} {:>15}",
718        "Client IP", "Total Requests", "Errors", "Error Rate"
719    );
720    println!("{}", "-".repeat(80));
721
722    for (ip, client_stats) in clients_with_errors.iter().take(top_n) {
723        let error_rate =
724            (client_stats.error_count as f64 / client_stats.request_count as f64) * 100.0;
725        println!(
726            "{:<20} {:>15} {:>15} {:>14.2}%",
727            ip,
728            format_number(client_stats.request_count),
729            format_number(client_stats.error_count),
730            error_rate
731        );
732    }
733
734    // Print detailed error type breakdown
735    println!("\n{}", "=".repeat(100));
736    println!("Error Type Breakdown by Client");
737    println!("{}", "=".repeat(100));
738
739    for (ip, client_stats) in clients_with_errors.iter().take(top_n) {
740        if client_stats.error_types.is_empty() {
741            continue;
742        }
743
744        println!(
745            "\nClient: {} (Total Errors: {})",
746            ip,
747            format_number(client_stats.error_count)
748        );
749        println!("{}", "-".repeat(80));
750
751        let mut error_types: Vec<_> = client_stats.error_types.iter().collect();
752        error_types.sort_by(|a, b| b.1.cmp(a.1));
753
754        println!("{:<50} {:>15} {:>15}", "Error Type", "Count", "Percentage");
755        println!("{}", "-".repeat(80));
756
757        for (error_type, count) in error_types.iter().take(10) {
758            let percentage = (**count as f64 / client_stats.error_count as f64) * 100.0;
759            let truncated = if error_type.len() > 50 {
760                format!("{}...", &error_type[..47])
761            } else {
762                (*error_type).clone()
763            };
764            println!(
765                "{:<50} {:>15} {:>14.2}%",
766                truncated,
767                format_number(**count),
768                percentage
769            );
770        }
771
772        // Print top error paths
773        if !client_stats.error_paths.is_empty() {
774            println!("\nTop Paths Generating Errors:");
775            println!("{:<60} {:>15}", "Path", "Error Count");
776            println!("{}", "-".repeat(80));
777
778            let mut error_paths: Vec<_> = client_stats.error_paths.iter().collect();
779            error_paths.sort_by(|a, b| b.1.cmp(a.1));
780
781            for (path, count) in error_paths.iter().take(5) {
782                let truncated_path = if path.len() > 60 {
783                    format!("{}...", &path[..57])
784                } else {
785                    (*path).clone()
786                };
787                println!("{:<60} {:>15}", truncated_path, format_number(**count));
788            }
789        }
790    }
791
792    // Overall error distribution
793    println!("\n{}", "=".repeat(100));
794    println!("Overall Error Type Distribution");
795    println!("{}", "=".repeat(100));
796
797    let mut overall_errors: HashMap<String, usize> = HashMap::new();
798    let mut total_errors = 0usize;
799
800    for (_, client_stats) in &clients_with_errors {
801        for (error_type, count) in &client_stats.error_types {
802            *overall_errors.entry(error_type.clone()).or_insert(0) += count;
803            total_errors += count;
804        }
805    }
806
807    let mut sorted_errors: Vec<_> = overall_errors.iter().collect();
808    sorted_errors.sort_by(|a, b| b.1.cmp(a.1));
809
810    println!("{:<50} {:>15} {:>15}", "Error Type", "Count", "Percentage");
811    println!("{}", "-".repeat(80));
812
813    for (error_type, count) in sorted_errors.iter().take(15) {
814        let percentage = (**count as f64 / total_errors as f64) * 100.0;
815        let truncated = if error_type.len() > 50 {
816            format!("{}...", &error_type[..47])
817        } else {
818            (*error_type).clone()
819        };
820        println!(
821            "{:<50} {:>15} {:>14.2}%",
822            truncated,
823            format_number(**count),
824            percentage
825        );
826    }
827}
828
829/// Print detailed per-client analysis
830fn print_detailed_client_analysis(stats: &TrafficStats, top_n: usize) {
831    println!("\n{}", "=".repeat(100));
832    println!("Detailed Client Analysis - Top {} Clients", top_n);
833    println!("{}", "=".repeat(100));
834
835    let mut clients: Vec<_> = stats.clients.iter().collect();
836    clients.sort_by(|a, b| b.1.request_count.cmp(&a.1.request_count));
837
838    for (ip, client_stats) in clients.iter().take(top_n) {
839        println!("\n{}", "=".repeat(100));
840        println!("Client: {}", ip);
841        println!("{}", "=".repeat(100));
842        println!(
843            "Total Requests: {}",
844            format_number(client_stats.request_count)
845        );
846        println!(
847            "Unique Entities: {}",
848            format_number(client_stats.entities.len())
849        );
850        println!("Unique Paths: {}", format_number(client_stats.paths.len()));
851        println!(
852            "Unique Mount Points: {}",
853            format_number(client_stats.mount_points.len())
854        );
855        println!("Error Count: {}", format_number(client_stats.error_count));
856        println!("Classification: {}", client_stats.classify_behavior());
857        println!(
858            "First Seen: {}",
859            client_stats.first_seen.as_deref().unwrap_or("unknown")
860        );
861        println!(
862            "Last Seen: {}",
863            client_stats.last_seen.as_deref().unwrap_or("unknown")
864        );
865
866        // Top paths accessed
867        println!("\nTop Paths Accessed:");
868        println!("{:<60} {:>15}", "Path", "Count");
869        println!("{}", "-".repeat(80));
870
871        let mut paths: Vec<_> = client_stats.paths.iter().collect();
872        paths.sort_by(|a, b| b.1.cmp(a.1));
873
874        for (path, count) in paths.iter().take(10) {
875            let truncated_path = if path.len() > 60 {
876                format!("{}...", &path[..57])
877            } else {
878                (*path).clone()
879            };
880            println!("{:<60} {:>15}", truncated_path, format_number(**count));
881        }
882
883        // Top mount points
884        println!("\nTop Mount Points:");
885        println!("{:<60} {:>15}", "Mount Point", "Count");
886        println!("{}", "-".repeat(80));
887
888        let mut mount_points: Vec<_> = client_stats.mount_points.iter().collect();
889        mount_points.sort_by(|a, b| b.1.cmp(a.1));
890
891        for (mp, count) in mount_points.iter().take(10) {
892            println!("{:<60} {:>15}", mp, format_number(**count));
893        }
894
895        // Associated entities
896        if !client_stats.entities.is_empty() {
897            println!("\nAssociated Entities:");
898            println!("{:<40} {:<}", "Entity ID", "Display Name");
899            println!("{}", "-".repeat(80));
900
901            for (entity_id, display_name) in client_stats.entities.iter().take(10) {
902                println!("{:<40} {}", entity_id, display_name);
903            }
904        }
905    }
906}
907
908/// Print temporal analysis (hourly distribution)
909fn print_temporal_analysis(stats: &TrafficStats, top_n: usize) {
910    println!("\n{}", "=".repeat(100));
911    println!("Temporal Analysis - Hourly Request Distribution");
912    println!("{}", "=".repeat(100));
913
914    let mut clients: Vec<_> = stats.clients.iter().collect();
915    clients.sort_by(|a, b| b.1.request_count.cmp(&a.1.request_count));
916
917    for (ip, client_stats) in clients.iter().take(top_n) {
918        println!(
919            "\nClient: {} (Total: {})",
920            ip,
921            format_number(client_stats.request_count)
922        );
923        println!("{}", "-".repeat(80));
924
925        // Create sorted list of hours
926        let mut hourly: Vec<_> = client_stats.hourly_distribution.iter().collect();
927        hourly.sort_by_key(|(hour, _)| *hour);
928
929        // Print hourly distribution
930        for (hour, count) in hourly {
931            let percentage = (*count as f64 / client_stats.request_count as f64) * 100.0;
932            let bar_length = (percentage / 2.0) as usize; // Scale to max 50 chars
933            let bar = "#".repeat(bar_length);
934            println!(
935                "{:02}:00 {:>8} {:>6.2}% {}",
936                hour,
937                format_number(*count),
938                percentage,
939                bar
940            );
941        }
942    }
943}
944
945/// Export data to CSV or JSON
946fn export_data(stats: &TrafficStats, output_file: &str, format: Option<&str>) -> Result<()> {
947    let format = format.unwrap_or("csv");
948
949    // Convert stats to export format
950    let mut exports: Vec<ClientExport> = stats
951        .clients
952        .iter()
953        .map(|(ip, stats)| stats.to_export(ip.clone()))
954        .collect();
955
956    // Sort by request count descending
957    exports.sort_by(|a, b| b.total_requests.cmp(&a.total_requests));
958
959    match format {
960        "csv" => export_csv(&exports, output_file),
961        "json" => export_json(&exports, output_file),
962        _ => Err(anyhow::anyhow!("Unsupported format: {}", format)),
963    }
964}
965
966/// Export to CSV format
967fn export_csv(data: &[ClientExport], output_file: &str) -> Result<()> {
968    let file = std::fs::File::create(output_file)
969        .context(format!("Failed to create output file: {}", output_file))?;
970    let mut writer = csv::Writer::from_writer(file);
971
972    for record in data {
973        writer.serialize(record)?;
974    }
975
976    writer.flush()?;
977    Ok(())
978}
979
980/// Export to JSON format
981fn export_json(data: &[ClientExport], output_file: &str) -> Result<()> {
982    let json = serde_json::to_string_pretty(data)?;
983    std::fs::write(output_file, json)
984        .context(format!("Failed to write to output file: {}", output_file))?;
985    Ok(())
986}
987
988/// Detailed error export with entity information
989#[derive(Debug, Serialize)]
990struct DetailedErrorExport {
991    client_ip: String,
992    entity_id: String,
993    display_name: String,
994    error_type: String,
995    path: String,
996    timestamp: String,
997}
998
999/// Export detailed error analysis with entity-level granularity
1000fn export_error_details(stats: &TrafficStats, output_file: &str) -> Result<()> {
1001    let file = std::fs::File::create(output_file)
1002        .context(format!("Failed to create output file: {}", output_file))?;
1003    let mut writer = csv::Writer::from_writer(file);
1004
1005    // Collect all error instances from all clients
1006    let mut all_errors = Vec::new();
1007
1008    for (client_ip, client_stats) in &stats.clients {
1009        for error_instance in &client_stats.error_instances {
1010            all_errors.push(DetailedErrorExport {
1011                client_ip: client_ip.clone(),
1012                entity_id: error_instance.entity_id.clone(),
1013                display_name: error_instance.display_name.clone(),
1014                error_type: error_instance.error_type.clone(),
1015                path: error_instance.path.clone(),
1016                timestamp: error_instance.timestamp.clone(),
1017            });
1018        }
1019    }
1020
1021    // Sort by timestamp (most recent first)
1022    all_errors.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1023
1024    // Write all error records
1025    for record in all_errors {
1026        writer.serialize(record)?;
1027    }
1028
1029    writer.flush()?;
1030    Ok(())
1031}