1use crate::audit::types::AuditEntry;
22use crate::utils::format::format_number;
23use crate::utils::parallel::process_files_parallel;
24use crate::utils::progress::ProgressBar;
25use anyhow::{Context, Result};
26use chrono::{DateTime, Timelike, Utc};
27use serde::Serialize;
28use std::collections::HashMap;
29use std::io::{BufRead, BufReader};
30use std::sync::atomic::{AtomicUsize, Ordering};
31use std::sync::{Arc, Mutex, OnceLock};
32
33#[derive(Debug, Clone)]
35struct ErrorInstance {
36 entity_id: String,
37 display_name: String,
38 error_type: String,
39 path: String,
40 timestamp: String,
41}
42
43#[derive(Debug, Clone)]
45struct ClientStats {
46 request_count: usize,
48 operations: HashMap<String, usize>,
50 paths: HashMap<String, usize>,
52 mount_points: HashMap<String, usize>,
54 entities: HashMap<String, String>, first_seen: Option<String>,
58 last_seen: Option<String>,
60 error_count: usize,
62 error_types: HashMap<String, usize>,
64 error_paths: HashMap<String, usize>,
66 error_instances: Vec<ErrorInstance>,
68 hourly_distribution: HashMap<u32, usize>,
70}
71
72#[derive(Debug, Serialize)]
74struct ClientExport {
75 client_ip: String,
76 total_requests: usize,
77 unique_entities: usize,
78 unique_paths: usize,
79 unique_mount_points: usize,
80 error_count: usize,
81 error_rate: f64,
82 first_seen: String,
83 last_seen: String,
84 top_operation: String,
85 top_operation_count: usize,
86 top_path: String,
87 top_path_count: usize,
88 top_error_type: String,
90 top_error_type_count: usize,
91 top_error_type_percentage: f64,
92 second_error_type: String,
93 second_error_type_count: usize,
94 third_error_type: String,
95 third_error_type_count: usize,
96 top_error_path: String,
97 top_error_path_count: usize,
98 classification: String,
99}
100
101impl ClientStats {
102 fn new() -> Self {
103 Self {
104 request_count: 0,
105 operations: HashMap::new(),
106 paths: HashMap::new(),
107 mount_points: HashMap::new(),
108 entities: HashMap::new(),
109 first_seen: None,
110 last_seen: None,
111 error_count: 0,
112 error_types: HashMap::new(),
113 error_paths: HashMap::new(),
114 error_instances: Vec::new(),
115 hourly_distribution: HashMap::new(),
116 }
117 }
118
119 fn update(&mut self, entry: &AuditEntry) {
121 self.request_count += 1;
122
123 if let Some(op) = entry.operation() {
125 *self.operations.entry(op.to_string()).or_insert(0) += 1;
126 }
127
128 if let Some(path) = entry.path() {
130 *self.paths.entry(path.to_string()).or_insert(0) += 1;
131 }
132
133 if let Some(mp) = entry.mount_point() {
135 *self.mount_points.entry(mp.to_string()).or_insert(0) += 1;
136 }
137
138 if let Some(entity_id) = entry.entity_id() {
140 if let Some(display_name) = entry.display_name() {
141 self.entities
142 .entry(entity_id.to_string())
143 .or_insert_with(|| display_name.to_string());
144 }
145 }
146
147 if self.first_seen.is_none() {
149 self.first_seen = Some(entry.time.clone());
150 }
151 self.last_seen = Some(entry.time.clone());
152
153 if let Some(error_msg) = &entry.error {
155 self.error_count += 1;
156
157 let cleaned_error = error_msg.trim().replace(['\n', '\t'], " ");
159
160 let error_type = if cleaned_error.contains("permission denied") {
162 "permission denied"
163 } else if cleaned_error.contains("service account name not authorized") {
164 "service account not authorized"
165 } else if cleaned_error.contains("namespace not authorized") {
166 "namespace not authorized"
167 } else if cleaned_error.contains("invalid credentials") {
168 "invalid credentials"
169 } else if cleaned_error.contains("wrapping token") {
170 "invalid wrapping token"
171 } else if cleaned_error.contains("internal error") {
172 "internal error"
173 } else if cleaned_error.contains("unsupported operation") {
174 "unsupported operation"
175 } else if cleaned_error.contains("max TTL") {
176 "max TTL exceeded"
177 } else if cleaned_error.is_empty() || cleaned_error == "null" {
178 "unknown error"
179 } else {
180 if cleaned_error.len() > 50 {
182 &cleaned_error[..50]
183 } else {
184 &cleaned_error
185 }
186 };
187
188 *self.error_types.entry(error_type.to_string()).or_insert(0) += 1;
189
190 let path = entry.path().unwrap_or("unknown").to_string();
192 *self.error_paths.entry(path.clone()).or_insert(0) += 1;
193
194 let entity_id = entry.entity_id().unwrap_or("unknown").to_string();
196 let display_name = entry.display_name().unwrap_or("unknown").to_string();
197
198 self.error_instances.push(ErrorInstance {
199 entity_id,
200 display_name,
201 error_type: error_type.to_string(),
202 path,
203 timestamp: entry.time.clone(),
204 });
205 }
206
207 if let Ok(dt) = entry.time.parse::<DateTime<Utc>>() {
209 let hour = dt.hour();
210 *self.hourly_distribution.entry(hour).or_insert(0) += 1;
211 }
212 }
213
214 fn merge(&mut self, other: Self) {
216 self.request_count += other.request_count;
217 self.error_count += other.error_count;
218
219 for (op, count) in other.operations {
221 *self.operations.entry(op).or_insert(0) += count;
222 }
223
224 for (path, count) in other.paths {
226 *self.paths.entry(path).or_insert(0) += count;
227 }
228
229 for (mp, count) in other.mount_points {
231 *self.mount_points.entry(mp).or_insert(0) += count;
232 }
233
234 for (entity_id, display_name) in other.entities {
236 self.entities.entry(entity_id).or_insert(display_name);
237 }
238
239 for (error_type, count) in other.error_types {
241 *self.error_types.entry(error_type).or_insert(0) += count;
242 }
243
244 for (path, count) in other.error_paths {
246 *self.error_paths.entry(path).or_insert(0) += count;
247 }
248
249 self.error_instances.extend(other.error_instances);
251
252 for (hour, count) in other.hourly_distribution {
254 *self.hourly_distribution.entry(hour).or_insert(0) += count;
255 }
256
257 if self.first_seen.is_none()
259 || (other.first_seen.is_some() && other.first_seen < self.first_seen)
260 {
261 self.first_seen = other.first_seen;
262 }
263 if self.last_seen.is_none()
264 || (other.last_seen.is_some() && other.last_seen > self.last_seen)
265 {
266 self.last_seen = other.last_seen;
267 }
268 }
269
270 fn classify_behavior(&self) -> String {
272 let paths_per_request = self.paths.len() as f64 / self.request_count as f64;
273 if self.request_count > 1000 || paths_per_request < 0.1 {
274 "automated".to_string()
275 } else {
276 "interactive".to_string()
277 }
278 }
279
280 fn to_export(&self, client_ip: String) -> ClientExport {
282 let error_rate = if self.request_count > 0 {
283 (self.error_count as f64 / self.request_count as f64) * 100.0
284 } else {
285 0.0
286 };
287
288 let (top_operation, top_operation_count) = self
289 .operations
290 .iter()
291 .max_by_key(|(_, count)| *count)
292 .map_or_else(
293 || ("none".to_string(), 0),
294 |(op, count)| (op.clone(), *count),
295 );
296
297 let (top_path, top_path_count) = self
298 .paths
299 .iter()
300 .max_by_key(|(_, count)| *count)
301 .map_or_else(
302 || ("none".to_string(), 0),
303 |(path, count)| (path.clone(), *count),
304 );
305
306 let mut error_types_sorted: Vec<_> = self.error_types.iter().collect();
308 error_types_sorted.sort_by(|a, b| b.1.cmp(a.1));
309
310 let (top_error_type, top_error_type_count) = error_types_sorted
311 .first()
312 .map_or_else(|| ("none".to_string(), 0), |(t, c)| ((*t).clone(), **c));
313
314 let top_error_type_percentage = if self.error_count > 0 {
315 (top_error_type_count as f64 / self.error_count as f64) * 100.0
316 } else {
317 0.0
318 };
319
320 let (second_error_type, second_error_type_count) = error_types_sorted
321 .get(1)
322 .map_or_else(|| ("none".to_string(), 0), |(t, c)| ((*t).clone(), **c));
323
324 let (third_error_type, third_error_type_count) = error_types_sorted
325 .get(2)
326 .map_or_else(|| ("none".to_string(), 0), |(t, c)| ((*t).clone(), **c));
327
328 let (top_error_path, top_error_path_count) = self
330 .error_paths
331 .iter()
332 .max_by_key(|(_, count)| *count)
333 .map_or_else(
334 || ("none".to_string(), 0),
335 |(path, count)| (path.clone(), *count),
336 );
337
338 ClientExport {
339 client_ip,
340 total_requests: self.request_count,
341 unique_entities: self.entities.len(),
342 unique_paths: self.paths.len(),
343 unique_mount_points: self.mount_points.len(),
344 error_count: self.error_count,
345 error_rate,
346 first_seen: self
347 .first_seen
348 .clone()
349 .unwrap_or_else(|| "unknown".to_string()),
350 last_seen: self
351 .last_seen
352 .clone()
353 .unwrap_or_else(|| "unknown".to_string()),
354 top_operation,
355 top_operation_count,
356 top_path,
357 top_path_count,
358 top_error_type,
359 top_error_type_count,
360 top_error_type_percentage,
361 second_error_type,
362 second_error_type_count,
363 third_error_type,
364 third_error_type_count,
365 top_error_path,
366 top_error_path_count,
367 classification: self.classify_behavior(),
368 }
369 }
370}
371
372static PARALLEL_PROGRESS: OnceLock<(Arc<AtomicUsize>, Arc<Mutex<ProgressBar>>)> = OnceLock::new();
374
375pub fn init_parallel_progress(processed: Arc<AtomicUsize>, progress: Arc<Mutex<ProgressBar>>) {
377 let _ = PARALLEL_PROGRESS.set((processed, progress));
378}
379
380#[derive(Debug)]
382struct TrafficStats {
383 clients: HashMap<String, ClientStats>,
385 total_requests: usize,
387}
388
389impl TrafficStats {
390 fn new() -> Self {
391 Self {
392 clients: HashMap::new(),
393 total_requests: 0,
394 }
395 }
396
397 fn merge(&mut self, other: Self) {
398 self.total_requests += other.total_requests;
399
400 for (client_ip, stats) in other.clients {
401 self.clients
402 .entry(client_ip)
403 .or_insert_with(ClientStats::new)
404 .merge(stats);
405 }
406 }
407}
408
409fn process_file(file_path: &str) -> Result<TrafficStats> {
411 let file = crate::utils::reader::open_file(file_path)?;
412 let reader = BufReader::new(file);
413
414 let mut stats = TrafficStats::new();
415 let mut lines_processed = 0usize;
416
417 let parallel_progress = PARALLEL_PROGRESS.get();
419
420 for line_result in reader.lines() {
421 let line = line_result?;
422 lines_processed += 1;
423
424 if lines_processed % 1000 == 0 {
426 if let Some((processed_lines, progress)) = parallel_progress {
427 processed_lines.fetch_add(1000, Ordering::Relaxed);
428 if let Ok(progress) = progress.lock() {
429 progress.inc(1000);
430 }
431 }
432 }
433
434 if line.trim().is_empty() {
436 continue;
437 }
438
439 let entry: AuditEntry = match serde_json::from_str(&line) {
441 Ok(entry) => entry,
442 Err(_) => continue,
443 };
444
445 if entry.entry_type != "request" {
447 continue;
448 }
449
450 let Some(client_ip) = entry.remote_address() else {
452 continue;
453 };
454
455 stats
457 .clients
458 .entry(client_ip.to_string())
459 .or_insert_with(ClientStats::new)
460 .update(&entry);
461
462 stats.total_requests += 1;
463 }
464
465 let remainder = lines_processed % 1000;
467 if remainder > 0 {
468 if let Some((processed_lines, progress)) = parallel_progress {
469 processed_lines.fetch_add(remainder, Ordering::Relaxed);
470 if let Ok(progress) = progress.lock() {
471 progress.inc(remainder as u64);
472 }
473 }
474 }
475
476 Ok(stats)
477}
478
479#[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
481pub fn run(
482 log_files: &[String],
483 output: Option<String>,
484 format: Option<&str>,
485 error_details_output: Option<String>,
486 top_n: usize,
487 show_temporal: bool,
488 min_requests: usize,
489 show_operations: bool,
490 show_errors: bool,
491 show_details: bool,
492) -> Result<()> {
493 if log_files.len() == 1 {
494 eprintln!("Analyzing client traffic patterns from 1 file...");
495 } else {
496 eprintln!(
497 "Analyzing client traffic patterns from {} files...",
498 log_files.len()
499 );
500 }
501
502 let (combined_stats, _total_lines) =
504 process_files_parallel(log_files, process_file, |results| {
505 let mut combined = TrafficStats::new();
506 for result in results {
507 combined.merge(result.data);
508 }
509 combined
510 })?;
511
512 let filtered_stats = if min_requests > 1 {
514 let mut filtered = TrafficStats::new();
515 filtered.total_requests = combined_stats.total_requests;
516 for (ip, stats) in combined_stats.clients {
517 if stats.request_count >= min_requests {
518 filtered.clients.insert(ip, stats);
519 }
520 }
521 filtered
522 } else {
523 combined_stats
524 };
525
526 if let Some(output_file) = output {
528 export_data(&filtered_stats, &output_file, format)?;
529 eprintln!("Exported summary data to {}", output_file);
530 }
531
532 if let Some(error_output_file) = error_details_output {
534 export_error_details(&filtered_stats, &error_output_file)?;
535 eprintln!(
536 "Exported detailed error analysis (with entities) to {}",
537 error_output_file
538 );
539 }
540
541 print_summary(&filtered_stats);
543 print_top_clients(&filtered_stats, top_n);
544 print_client_behavior_analysis(&filtered_stats);
545
546 if show_operations {
547 print_operation_breakdown(&filtered_stats, top_n.min(10));
548 }
549
550 if show_errors {
551 print_error_analysis(&filtered_stats, top_n.min(10));
552 }
553
554 if show_details {
555 print_detailed_client_analysis(&filtered_stats, top_n.min(10));
556 }
557
558 if show_temporal {
559 print_temporal_analysis(&filtered_stats, top_n.min(10));
560 }
561
562 Ok(())
563}
564
565fn print_summary(stats: &TrafficStats) {
567 println!("\n{}", "=".repeat(100));
568 println!("Client Traffic Analysis Summary");
569 println!("{}", "=".repeat(100));
570 println!("Total Requests: {}", format_number(stats.total_requests));
571 println!("Unique Clients: {}", format_number(stats.clients.len()));
572 println!(
573 "Avg Requests per Client: {:.2}",
574 stats.total_requests as f64 / stats.clients.len() as f64
575 );
576}
577
578fn print_top_clients(stats: &TrafficStats, top_n: usize) {
580 println!("\n{}", "=".repeat(100));
581 println!("Top {} Clients by Request Volume", top_n);
582 println!("{}", "=".repeat(100));
583 println!(
584 "{:<20} {:>15} {:>15} {:>15} {:>15}",
585 "Client IP", "Requests", "Entities", "Errors", "Error %"
586 );
587 println!("{}", "-".repeat(100));
588
589 let mut clients: Vec<_> = stats.clients.iter().collect();
590 clients.sort_by(|a, b| b.1.request_count.cmp(&a.1.request_count));
591
592 for (ip, client_stats) in clients.iter().take(top_n) {
593 let error_pct = if client_stats.request_count > 0 {
594 (client_stats.error_count as f64 / client_stats.request_count as f64) * 100.0
595 } else {
596 0.0
597 };
598
599 println!(
600 "{:<20} {:>15} {:>15} {:>15} {:>14.2}%",
601 ip,
602 format_number(client_stats.request_count),
603 format_number(client_stats.entities.len()),
604 format_number(client_stats.error_count),
605 error_pct
606 );
607 }
608}
609
610fn print_client_behavior_analysis(stats: &TrafficStats) {
612 println!("\n{}", "=".repeat(100));
613 println!("Client Behavior Analysis");
614 println!("{}", "=".repeat(100));
615
616 let mut automated_clients = Vec::new();
618 let mut interactive_clients = Vec::new();
619
620 for (ip, client_stats) in &stats.clients {
621 let paths_per_request = client_stats.paths.len() as f64 / client_stats.request_count as f64;
624
625 if client_stats.request_count > 1000 || paths_per_request < 0.1 {
626 automated_clients.push((ip, client_stats));
627 } else {
628 interactive_clients.push((ip, client_stats));
629 }
630 }
631
632 println!(
633 "Automated Clients (likely services): {}",
634 automated_clients.len()
635 );
636 println!(
637 "Interactive Clients (likely users): {}",
638 interactive_clients.len()
639 );
640
641 if !automated_clients.is_empty() {
643 println!("\nTop Automated Clients:");
644 println!(
645 "{:<20} {:>15} {:>15}",
646 "Client IP", "Requests", "Unique Paths"
647 );
648 println!("{}", "-".repeat(60));
649
650 automated_clients.sort_by(|a, b| b.1.request_count.cmp(&a.1.request_count));
651 for (ip, stats) in automated_clients.iter().take(10) {
652 println!(
653 "{:<20} {:>15} {:>15}",
654 ip,
655 format_number(stats.request_count),
656 format_number(stats.paths.len())
657 );
658 }
659 }
660}
661
662fn print_operation_breakdown(stats: &TrafficStats, top_n: usize) {
664 println!("\n{}", "=".repeat(100));
665 println!("Operation Type Breakdown - Top {} Clients", top_n);
666 println!("{}", "=".repeat(100));
667
668 let mut clients: Vec<_> = stats.clients.iter().collect();
669 clients.sort_by(|a, b| b.1.request_count.cmp(&a.1.request_count));
670
671 for (ip, client_stats) in clients.iter().take(top_n) {
672 println!(
673 "\nClient: {} (Total: {})",
674 ip,
675 format_number(client_stats.request_count)
676 );
677 println!("{}", "-".repeat(80));
678
679 let mut operations: Vec<_> = client_stats.operations.iter().collect();
680 operations.sort_by(|a, b| b.1.cmp(a.1));
681
682 println!("{:<30} {:>15} {:>15}", "Operation", "Count", "Percentage");
683 println!("{}", "-".repeat(60));
684
685 for (op, count) in operations {
686 let percentage = (*count as f64 / client_stats.request_count as f64) * 100.0;
687 println!(
688 "{:<30} {:>15} {:>14.2}%",
689 op,
690 format_number(*count),
691 percentage
692 );
693 }
694 }
695}
696
697fn print_error_analysis(stats: &TrafficStats, top_n: usize) {
699 println!("\n{}", "=".repeat(100));
700 println!("Error Analysis - Clients with Errors");
701 println!("{}", "=".repeat(100));
702
703 let mut clients_with_errors: Vec<_> = stats
704 .clients
705 .iter()
706 .filter(|(_, client)| client.error_count > 0)
707 .collect();
708
709 clients_with_errors.sort_by(|a, b| b.1.error_count.cmp(&a.1.error_count));
710
711 if clients_with_errors.is_empty() {
712 println!("No errors detected in the analyzed logs.");
713 return;
714 }
715
716 println!(
717 "{:<20} {:>15} {:>15} {:>15}",
718 "Client IP", "Total Requests", "Errors", "Error Rate"
719 );
720 println!("{}", "-".repeat(80));
721
722 for (ip, client_stats) in clients_with_errors.iter().take(top_n) {
723 let error_rate =
724 (client_stats.error_count as f64 / client_stats.request_count as f64) * 100.0;
725 println!(
726 "{:<20} {:>15} {:>15} {:>14.2}%",
727 ip,
728 format_number(client_stats.request_count),
729 format_number(client_stats.error_count),
730 error_rate
731 );
732 }
733
734 println!("\n{}", "=".repeat(100));
736 println!("Error Type Breakdown by Client");
737 println!("{}", "=".repeat(100));
738
739 for (ip, client_stats) in clients_with_errors.iter().take(top_n) {
740 if client_stats.error_types.is_empty() {
741 continue;
742 }
743
744 println!(
745 "\nClient: {} (Total Errors: {})",
746 ip,
747 format_number(client_stats.error_count)
748 );
749 println!("{}", "-".repeat(80));
750
751 let mut error_types: Vec<_> = client_stats.error_types.iter().collect();
752 error_types.sort_by(|a, b| b.1.cmp(a.1));
753
754 println!("{:<50} {:>15} {:>15}", "Error Type", "Count", "Percentage");
755 println!("{}", "-".repeat(80));
756
757 for (error_type, count) in error_types.iter().take(10) {
758 let percentage = (**count as f64 / client_stats.error_count as f64) * 100.0;
759 let truncated = if error_type.len() > 50 {
760 format!("{}...", &error_type[..47])
761 } else {
762 (*error_type).clone()
763 };
764 println!(
765 "{:<50} {:>15} {:>14.2}%",
766 truncated,
767 format_number(**count),
768 percentage
769 );
770 }
771
772 if !client_stats.error_paths.is_empty() {
774 println!("\nTop Paths Generating Errors:");
775 println!("{:<60} {:>15}", "Path", "Error Count");
776 println!("{}", "-".repeat(80));
777
778 let mut error_paths: Vec<_> = client_stats.error_paths.iter().collect();
779 error_paths.sort_by(|a, b| b.1.cmp(a.1));
780
781 for (path, count) in error_paths.iter().take(5) {
782 let truncated_path = if path.len() > 60 {
783 format!("{}...", &path[..57])
784 } else {
785 (*path).clone()
786 };
787 println!("{:<60} {:>15}", truncated_path, format_number(**count));
788 }
789 }
790 }
791
792 println!("\n{}", "=".repeat(100));
794 println!("Overall Error Type Distribution");
795 println!("{}", "=".repeat(100));
796
797 let mut overall_errors: HashMap<String, usize> = HashMap::new();
798 let mut total_errors = 0usize;
799
800 for (_, client_stats) in &clients_with_errors {
801 for (error_type, count) in &client_stats.error_types {
802 *overall_errors.entry(error_type.clone()).or_insert(0) += count;
803 total_errors += count;
804 }
805 }
806
807 let mut sorted_errors: Vec<_> = overall_errors.iter().collect();
808 sorted_errors.sort_by(|a, b| b.1.cmp(a.1));
809
810 println!("{:<50} {:>15} {:>15}", "Error Type", "Count", "Percentage");
811 println!("{}", "-".repeat(80));
812
813 for (error_type, count) in sorted_errors.iter().take(15) {
814 let percentage = (**count as f64 / total_errors as f64) * 100.0;
815 let truncated = if error_type.len() > 50 {
816 format!("{}...", &error_type[..47])
817 } else {
818 (*error_type).clone()
819 };
820 println!(
821 "{:<50} {:>15} {:>14.2}%",
822 truncated,
823 format_number(**count),
824 percentage
825 );
826 }
827}
828
829fn print_detailed_client_analysis(stats: &TrafficStats, top_n: usize) {
831 println!("\n{}", "=".repeat(100));
832 println!("Detailed Client Analysis - Top {} Clients", top_n);
833 println!("{}", "=".repeat(100));
834
835 let mut clients: Vec<_> = stats.clients.iter().collect();
836 clients.sort_by(|a, b| b.1.request_count.cmp(&a.1.request_count));
837
838 for (ip, client_stats) in clients.iter().take(top_n) {
839 println!("\n{}", "=".repeat(100));
840 println!("Client: {}", ip);
841 println!("{}", "=".repeat(100));
842 println!(
843 "Total Requests: {}",
844 format_number(client_stats.request_count)
845 );
846 println!(
847 "Unique Entities: {}",
848 format_number(client_stats.entities.len())
849 );
850 println!("Unique Paths: {}", format_number(client_stats.paths.len()));
851 println!(
852 "Unique Mount Points: {}",
853 format_number(client_stats.mount_points.len())
854 );
855 println!("Error Count: {}", format_number(client_stats.error_count));
856 println!("Classification: {}", client_stats.classify_behavior());
857 println!(
858 "First Seen: {}",
859 client_stats.first_seen.as_deref().unwrap_or("unknown")
860 );
861 println!(
862 "Last Seen: {}",
863 client_stats.last_seen.as_deref().unwrap_or("unknown")
864 );
865
866 println!("\nTop Paths Accessed:");
868 println!("{:<60} {:>15}", "Path", "Count");
869 println!("{}", "-".repeat(80));
870
871 let mut paths: Vec<_> = client_stats.paths.iter().collect();
872 paths.sort_by(|a, b| b.1.cmp(a.1));
873
874 for (path, count) in paths.iter().take(10) {
875 let truncated_path = if path.len() > 60 {
876 format!("{}...", &path[..57])
877 } else {
878 (*path).clone()
879 };
880 println!("{:<60} {:>15}", truncated_path, format_number(**count));
881 }
882
883 println!("\nTop Mount Points:");
885 println!("{:<60} {:>15}", "Mount Point", "Count");
886 println!("{}", "-".repeat(80));
887
888 let mut mount_points: Vec<_> = client_stats.mount_points.iter().collect();
889 mount_points.sort_by(|a, b| b.1.cmp(a.1));
890
891 for (mp, count) in mount_points.iter().take(10) {
892 println!("{:<60} {:>15}", mp, format_number(**count));
893 }
894
895 if !client_stats.entities.is_empty() {
897 println!("\nAssociated Entities:");
898 println!("{:<40} {:<}", "Entity ID", "Display Name");
899 println!("{}", "-".repeat(80));
900
901 for (entity_id, display_name) in client_stats.entities.iter().take(10) {
902 println!("{:<40} {}", entity_id, display_name);
903 }
904 }
905 }
906}
907
908fn print_temporal_analysis(stats: &TrafficStats, top_n: usize) {
910 println!("\n{}", "=".repeat(100));
911 println!("Temporal Analysis - Hourly Request Distribution");
912 println!("{}", "=".repeat(100));
913
914 let mut clients: Vec<_> = stats.clients.iter().collect();
915 clients.sort_by(|a, b| b.1.request_count.cmp(&a.1.request_count));
916
917 for (ip, client_stats) in clients.iter().take(top_n) {
918 println!(
919 "\nClient: {} (Total: {})",
920 ip,
921 format_number(client_stats.request_count)
922 );
923 println!("{}", "-".repeat(80));
924
925 let mut hourly: Vec<_> = client_stats.hourly_distribution.iter().collect();
927 hourly.sort_by_key(|(hour, _)| *hour);
928
929 for (hour, count) in hourly {
931 let percentage = (*count as f64 / client_stats.request_count as f64) * 100.0;
932 let bar_length = (percentage / 2.0) as usize; let bar = "#".repeat(bar_length);
934 println!(
935 "{:02}:00 {:>8} {:>6.2}% {}",
936 hour,
937 format_number(*count),
938 percentage,
939 bar
940 );
941 }
942 }
943}
944
945fn export_data(stats: &TrafficStats, output_file: &str, format: Option<&str>) -> Result<()> {
947 let format = format.unwrap_or("csv");
948
949 let mut exports: Vec<ClientExport> = stats
951 .clients
952 .iter()
953 .map(|(ip, stats)| stats.to_export(ip.clone()))
954 .collect();
955
956 exports.sort_by(|a, b| b.total_requests.cmp(&a.total_requests));
958
959 match format {
960 "csv" => export_csv(&exports, output_file),
961 "json" => export_json(&exports, output_file),
962 _ => Err(anyhow::anyhow!("Unsupported format: {}", format)),
963 }
964}
965
966fn export_csv(data: &[ClientExport], output_file: &str) -> Result<()> {
968 let file = std::fs::File::create(output_file)
969 .context(format!("Failed to create output file: {}", output_file))?;
970 let mut writer = csv::Writer::from_writer(file);
971
972 for record in data {
973 writer.serialize(record)?;
974 }
975
976 writer.flush()?;
977 Ok(())
978}
979
980fn export_json(data: &[ClientExport], output_file: &str) -> Result<()> {
982 let json = serde_json::to_string_pretty(data)?;
983 std::fs::write(output_file, json)
984 .context(format!("Failed to write to output file: {}", output_file))?;
985 Ok(())
986}
987
988#[derive(Debug, Serialize)]
990struct DetailedErrorExport {
991 client_ip: String,
992 entity_id: String,
993 display_name: String,
994 error_type: String,
995 path: String,
996 timestamp: String,
997}
998
999fn export_error_details(stats: &TrafficStats, output_file: &str) -> Result<()> {
1001 let file = std::fs::File::create(output_file)
1002 .context(format!("Failed to create output file: {}", output_file))?;
1003 let mut writer = csv::Writer::from_writer(file);
1004
1005 let mut all_errors = Vec::new();
1007
1008 for (client_ip, client_stats) in &stats.clients {
1009 for error_instance in &client_stats.error_instances {
1010 all_errors.push(DetailedErrorExport {
1011 client_ip: client_ip.clone(),
1012 entity_id: error_instance.entity_id.clone(),
1013 display_name: error_instance.display_name.clone(),
1014 error_type: error_instance.error_type.clone(),
1015 path: error_instance.path.clone(),
1016 timestamp: error_instance.timestamp.clone(),
1017 });
1018 }
1019 }
1020
1021 all_errors.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1023
1024 for record in all_errors {
1026 writer.serialize(record)?;
1027 }
1028
1029 writer.flush()?;
1030 Ok(())
1031}