1use crate::audit::types::AuditEntry;
44use crate::utils::format::format_number;
45use crate::utils::parallel::process_files_parallel;
46use crate::utils::progress::ProgressBar;
47use crate::utils::reader::open_file;
48use anyhow::Result;
49use std::collections::{HashMap, HashSet};
50use std::io::{BufRead, BufReader};
51
52#[derive(Debug)]
54struct PathData {
55 count: usize,
56 operations: HashMap<String, usize>,
57 entities: HashSet<String>,
58}
59
60impl PathData {
61 fn new() -> Self {
62 Self {
63 count: 0,
64 operations: HashMap::with_capacity(10), entities: HashSet::with_capacity(50), }
67 }
68}
69
70#[derive(Debug)]
72struct FileAnalysisResult {
73 path_operations: HashMap<String, PathData>,
74 operation_types: HashMap<String, usize>,
75 path_prefixes: HashMap<String, usize>,
76 entity_paths: HashMap<String, HashMap<String, usize>>,
77 entity_names: HashMap<String, String>,
78}
79
80static PARALLEL_PROGRESS: std::sync::OnceLock<(
82 std::sync::Arc<std::sync::atomic::AtomicUsize>,
83 std::sync::Arc<std::sync::Mutex<crate::utils::progress::ProgressBar>>,
84)> = std::sync::OnceLock::new();
85
86pub fn init_parallel_progress(
88 processed: std::sync::Arc<std::sync::atomic::AtomicUsize>,
89 progress: std::sync::Arc<std::sync::Mutex<crate::utils::progress::ProgressBar>>,
90) {
91 let _ = PARALLEL_PROGRESS.set((processed, progress));
92}
93
94fn process_file_entries_streaming(file_path: &str) -> Result<FileAnalysisResult> {
96 use crate::utils::reader::open_file;
97 use std::io::{BufRead, BufReader};
98
99 let mut path_operations: HashMap<String, PathData> = HashMap::with_capacity(5000);
100 let mut operation_types: HashMap<String, usize> = HashMap::with_capacity(20);
101 let mut path_prefixes: HashMap<String, usize> = HashMap::with_capacity(100);
102 let mut entity_paths: HashMap<String, HashMap<String, usize>> = HashMap::with_capacity(2000);
103 let mut entity_names: HashMap<String, String> = HashMap::with_capacity(2000);
104
105 let file = open_file(file_path)?;
106 let reader = BufReader::new(file);
107
108 let mut lines_processed = 0;
109
110 for line_result in reader.lines() {
111 let line = line_result?;
112 lines_processed += 1;
113
114 if lines_processed % 2000 == 0 {
116 if let Some((processed_lines, progress)) = PARALLEL_PROGRESS.get() {
117 processed_lines.fetch_add(2000, std::sync::atomic::Ordering::Relaxed);
118 if let Ok(mut p) = progress.lock() {
119 p.update(processed_lines.load(std::sync::atomic::Ordering::Relaxed));
120 }
121 }
122 }
123
124 if line.trim().is_empty() {
126 continue;
127 }
128
129 let entry: AuditEntry = match serde_json::from_str(&line) {
131 Ok(entry) => entry,
132 Err(_) => continue, };
134
135 let Some(request) = &entry.request else {
136 continue;
137 };
138
139 let path = match &request.path {
140 Some(p) => p.as_str(),
141 None => continue,
142 };
143
144 let operation = match &request.operation {
145 Some(o) => o.as_str(),
146 None => continue,
147 };
148
149 let entity_id = entry
150 .auth
151 .as_ref()
152 .and_then(|a| a.entity_id.as_deref())
153 .unwrap_or("no-entity");
154
155 let display_name = entry
156 .auth
157 .as_ref()
158 .and_then(|a| a.display_name.as_deref())
159 .unwrap_or("N/A");
160
161 if path.is_empty() || operation.is_empty() {
162 continue;
163 }
164
165 let path_data = path_operations
167 .entry(path.to_string())
168 .or_insert_with(PathData::new);
169 path_data.count += 1;
170 *path_data
171 .operations
172 .entry(operation.to_string())
173 .or_insert(0) += 1;
174 path_data.entities.insert(entity_id.to_string());
175
176 *operation_types.entry(operation.to_string()).or_insert(0) += 1;
178
179 let parts: Vec<&str> = path.trim_matches('/').split('/').collect();
181 let prefix = if parts.len() >= 2 {
182 format!("{}/{}", parts[0], parts[1])
183 } else if !parts.is_empty() {
184 parts[0].to_string()
185 } else {
186 "root".to_string()
187 };
188 *path_prefixes.entry(prefix).or_insert(0) += 1;
189
190 let entity_map = entity_paths.entry(entity_id.to_string()).or_default();
192 *entity_map.entry(path.to_string()).or_insert(0) += 1;
193 entity_names
194 .entry(entity_id.to_string())
195 .or_insert_with(|| display_name.to_string());
196 }
197
198 let remaining = lines_processed % 2000;
200 if remaining > 0 {
201 if let Some((processed_lines, progress)) = PARALLEL_PROGRESS.get() {
202 processed_lines.fetch_add(remaining, std::sync::atomic::Ordering::Relaxed);
203 if let Ok(mut p) = progress.lock() {
204 p.update(processed_lines.load(std::sync::atomic::Ordering::Relaxed));
205 }
206 }
207 }
208
209 Ok(FileAnalysisResult {
210 path_operations,
211 operation_types,
212 path_prefixes,
213 entity_paths,
214 entity_names,
215 })
216}
217
218fn process_file_entries(_file_path: &str, entries: Vec<AuditEntry>) -> FileAnalysisResult {
220 let mut path_operations: HashMap<String, PathData> = HashMap::with_capacity(5000);
221 let mut operation_types: HashMap<String, usize> = HashMap::with_capacity(20);
222 let mut path_prefixes: HashMap<String, usize> = HashMap::with_capacity(100);
223 let mut entity_paths: HashMap<String, HashMap<String, usize>> = HashMap::with_capacity(2000);
224 let mut entity_names: HashMap<String, String> = HashMap::with_capacity(2000);
225
226 for entry in entries {
227 let Some(request) = &entry.request else {
228 continue;
229 };
230
231 let path = match &request.path {
232 Some(p) => p.as_str(),
233 None => continue,
234 };
235
236 let operation = match &request.operation {
237 Some(o) => o.as_str(),
238 None => continue,
239 };
240
241 let entity_id = entry
242 .auth
243 .as_ref()
244 .and_then(|a| a.entity_id.as_deref())
245 .unwrap_or("no-entity");
246
247 let display_name = entry
248 .auth
249 .as_ref()
250 .and_then(|a| a.display_name.as_deref())
251 .unwrap_or("N/A");
252
253 if path.is_empty() || operation.is_empty() {
254 continue;
255 }
256
257 let path_data = path_operations
259 .entry(path.to_string())
260 .or_insert_with(PathData::new);
261 path_data.count += 1;
262 *path_data
263 .operations
264 .entry(operation.to_string())
265 .or_insert(0) += 1;
266 path_data.entities.insert(entity_id.to_string());
267
268 *operation_types.entry(operation.to_string()).or_insert(0) += 1;
270
271 let parts: Vec<&str> = path.trim_matches('/').split('/').collect();
273 let prefix = if parts.len() >= 2 {
274 format!("{}/{}", parts[0], parts[1])
275 } else if !parts.is_empty() {
276 parts[0].to_string()
277 } else {
278 "root".to_string()
279 };
280 *path_prefixes.entry(prefix).or_insert(0) += 1;
281
282 let entity_map = entity_paths.entry(entity_id.to_string()).or_default();
284 *entity_map.entry(path.to_string()).or_insert(0) += 1;
285 entity_names
286 .entry(entity_id.to_string())
287 .or_insert_with(|| display_name.to_string());
288 }
289
290 FileAnalysisResult {
291 path_operations,
292 operation_types,
293 path_prefixes,
294 entity_paths,
295 entity_names,
296 }
297}
298
299fn combine_results(
301 results: Vec<crate::utils::parallel::FileProcessResult<FileAnalysisResult>>,
302) -> FileAnalysisResult {
303 let mut combined = FileAnalysisResult {
304 path_operations: HashMap::with_capacity(5000),
305 operation_types: HashMap::with_capacity(20),
306 path_prefixes: HashMap::with_capacity(100),
307 entity_paths: HashMap::with_capacity(2000),
308 entity_names: HashMap::with_capacity(2000),
309 };
310
311 for file_result in results {
312 let result = file_result.data;
313
314 for (path, path_data) in result.path_operations {
316 if let Some(existing) = combined.path_operations.get_mut(&path) {
317 existing.count += path_data.count;
318 for (op, count) in path_data.operations {
319 *existing.operations.entry(op).or_insert(0) += count;
320 }
321 existing.entities.extend(path_data.entities);
322 } else {
323 combined.path_operations.insert(path, path_data);
324 }
325 }
326
327 for (op, count) in result.operation_types {
329 *combined.operation_types.entry(op).or_insert(0) += count;
330 }
331
332 for (prefix, count) in result.path_prefixes {
334 *combined.path_prefixes.entry(prefix).or_insert(0) += count;
335 }
336
337 for (entity_id, paths) in result.entity_paths {
339 let entity_map = combined.entity_paths.entry(entity_id).or_default();
340 for (path, count) in paths {
341 *entity_map.entry(path).or_insert(0) += count;
342 }
343 }
344
345 for (entity_id, name) in result.entity_names {
347 combined.entity_names.entry(entity_id).or_insert(name);
348 }
349 }
350
351 combined
352}
353
354fn run_sequential(log_files: &[String]) -> Result<(FileAnalysisResult, usize)> {
356 let mut combined_result = FileAnalysisResult {
357 path_operations: HashMap::with_capacity(5000),
358 operation_types: HashMap::with_capacity(20),
359 path_prefixes: HashMap::with_capacity(100),
360 entity_paths: HashMap::with_capacity(2000),
361 entity_names: HashMap::with_capacity(2000),
362 };
363 let mut total_lines = 0;
364
365 for (file_idx, log_file) in log_files.iter().enumerate() {
367 eprintln!(
368 "[{}/{}] Processing: {}",
369 file_idx + 1,
370 log_files.len(),
371 log_file
372 );
373
374 let file_size = std::fs::metadata(log_file).ok().map(|m| m.len() as usize);
375 let mut progress = if let Some(size) = file_size {
376 ProgressBar::new(size, "Processing")
377 } else {
378 ProgressBar::new_spinner("Processing")
379 };
380
381 let file = open_file(log_file)?;
382 let reader = BufReader::new(file);
383
384 let mut file_lines = 0;
385 let mut bytes_read = 0;
386 let mut entries = Vec::new();
387
388 for line in reader.lines() {
389 file_lines += 1;
390 total_lines += 1;
391 let line = line?;
392 bytes_read += line.len() + 1;
393
394 if file_lines % 10_000 == 0 {
395 if let Some(size) = file_size {
396 progress.update(bytes_read.min(size));
397 } else {
398 progress.update(file_lines);
399 }
400 }
401
402 if let Ok(entry) = serde_json::from_str::<AuditEntry>(&line) {
403 entries.push(entry);
404 }
405 }
406
407 if let Some(size) = file_size {
408 progress.update(size);
409 }
410
411 progress.finish_with_message(&format!(
412 "Processed {} lines from this file",
413 format_number(file_lines)
414 ));
415
416 let file_result = process_file_entries(log_file, entries);
418
419 for (path, path_data) in file_result.path_operations {
421 if let Some(existing) = combined_result.path_operations.get_mut(&path) {
422 existing.count += path_data.count;
423 for (op, count) in path_data.operations {
424 *existing.operations.entry(op).or_insert(0) += count;
425 }
426 existing.entities.extend(path_data.entities);
427 } else {
428 combined_result.path_operations.insert(path, path_data);
429 }
430 }
431
432 for (op, count) in file_result.operation_types {
434 *combined_result.operation_types.entry(op).or_insert(0) += count;
435 }
436
437 for (prefix, count) in file_result.path_prefixes {
439 *combined_result.path_prefixes.entry(prefix).or_insert(0) += count;
440 }
441
442 for (entity_id, paths) in file_result.entity_paths {
444 let entity_map = combined_result.entity_paths.entry(entity_id).or_default();
445 for (path, count) in paths {
446 *entity_map.entry(path).or_insert(0) += count;
447 }
448 }
449
450 for (entity_id, name) in file_result.entity_names {
452 combined_result
453 .entity_names
454 .entry(entity_id)
455 .or_insert(name);
456 }
457 }
458
459 Ok((combined_result, total_lines))
460}
461
462pub fn run(
463 log_files: &[String],
464 top: usize,
465 min_operations: usize,
466 sequential: bool,
467) -> Result<()> {
468 let (combined_result, total_lines) = if sequential || log_files.len() == 1 {
469 eprintln!("Processing {} files sequentially...", log_files.len());
471 run_sequential(log_files)?
472 } else {
473 process_files_parallel(log_files, process_file_entries_streaming, combine_results)?
475 };
476
477 eprintln!("\nTotal: Processed {} lines", format_number(total_lines));
478
479 let path_operations = combined_result.path_operations;
480 let operation_types = combined_result.operation_types;
481 let path_prefixes = combined_result.path_prefixes;
482 let entity_paths = combined_result.entity_paths;
483 let entity_names = combined_result.entity_names;
484
485 let total_operations: usize = operation_types.values().sum();
486
487 println!("\n{}", "=".repeat(100));
489 println!("High-Volume Vault Operations Analysis");
490 println!("{}", "=".repeat(100));
491
492 println!("\n1. Operation Types (Overall)");
494 println!("{}", "-".repeat(100));
495 println!("{:<20} {:>15} {:>12}", "Operation", "Count", "Percentage");
496 println!("{}", "-".repeat(100));
497
498 let mut sorted_ops: Vec<_> = operation_types.iter().collect();
499 sorted_ops.sort_by(|a, b| b.1.cmp(a.1));
500
501 for (op, count) in sorted_ops {
502 let pct = if total_operations > 0 {
503 (*count as f64 / total_operations as f64) * 100.0
504 } else {
505 0.0
506 };
507 println!("{:<20} {:>15} {:>11.2}%", op, format_number(*count), pct);
508 }
509
510 println!("{}", "-".repeat(100));
511 println!(
512 "{:<20} {:>15} {:>11.2}%",
513 "TOTAL",
514 format_number(total_operations),
515 100.0
516 );
517
518 println!("\n2. Top Path Prefixes (First 2 components)");
520 println!("{}", "-".repeat(100));
521 println!(
522 "{:<40} {:>15} {:>12}",
523 "Path Prefix", "Operations", "Percentage"
524 );
525 println!("{}", "-".repeat(100));
526
527 let mut sorted_prefixes: Vec<_> = path_prefixes.iter().collect();
528 sorted_prefixes.sort_by(|a, b| b.1.cmp(a.1));
529
530 for (prefix, count) in sorted_prefixes.iter().take(top) {
531 let pct = if total_operations > 0 {
532 (**count as f64 / total_operations as f64) * 100.0
533 } else {
534 0.0
535 };
536 println!(
537 "{:<40} {:>15} {:>11.2}%",
538 prefix,
539 format_number(**count),
540 pct
541 );
542 }
543
544 println!("\n3. Top {} Individual Paths (Highest Volume)", top);
546 println!("{}", "-".repeat(100));
547 println!(
548 "{:<60} {:>10} {:>10} {:>15}",
549 "Path", "Ops", "Entities", "Top Op"
550 );
551 println!("{}", "-".repeat(100));
552
553 let mut sorted_paths: Vec<_> = path_operations.iter().collect();
554 sorted_paths.sort_by(|a, b| b.1.count.cmp(&a.1.count));
555
556 for (path, data) in sorted_paths.iter().take(top) {
557 if data.count < min_operations {
558 break;
559 }
560 let top_op = data
561 .operations
562 .iter()
563 .max_by_key(|x| x.1)
564 .map_or("N/A", |x| x.0.as_str());
565 let path_display = if path.len() > 60 {
566 format!("{}...", &path[..58])
567 } else {
568 (*path).to_string()
569 };
570 println!(
571 "{:<60} {:>10} {:>10} {:>15}",
572 path_display,
573 format_number(data.count),
574 format_number(data.entities.len()),
575 top_op
576 );
577 }
578
579 println!("\n4. Top {} Entities by Total Operations", top);
581 println!("{}", "-".repeat(100));
582 println!(
583 "{:<50} {:<38} {:>10}",
584 "Display Name", "Entity ID", "Total Ops"
585 );
586 println!("{}", "-".repeat(100));
587
588 let mut entity_totals: HashMap<String, usize> = HashMap::with_capacity(entity_paths.len());
589 for (entity_id, paths) in &entity_paths {
590 let total: usize = paths.values().sum();
591 entity_totals.insert(entity_id.clone(), total);
592 }
593
594 let mut sorted_entities: Vec<_> = entity_totals.iter().collect();
595 sorted_entities.sort_by(|a, b| b.1.cmp(a.1));
596
597 for (entity_id, total) in sorted_entities.iter().take(top) {
598 let name = entity_names
599 .get(*entity_id)
600 .map_or("N/A", std::string::String::as_str);
601 let name_display = if name.len() > 48 { &name[..48] } else { name };
602 let entity_short = if entity_id.len() > 36 {
603 &entity_id[..36]
604 } else {
605 entity_id
606 };
607 println!(
608 "{:<50} {:<38} {:>10}",
609 name_display,
610 entity_short,
611 format_number(**total)
612 );
613 }
614
615 println!("\n5. Potential System Stress Points");
617 println!("{}", "-".repeat(100));
618
619 #[derive(Debug)]
620 struct StressPoint {
621 path: String,
622 entity_name: String,
623 operations: usize,
624 }
625
626 let mut stress_points = Vec::new();
627
628 for (path, data) in &path_operations {
629 if data.count >= min_operations {
630 for entity_id in &data.entities {
631 if let Some(entity_ops_map) = entity_paths.get(entity_id) {
632 if let Some(&entity_ops) = entity_ops_map.get(path) {
633 if entity_ops >= min_operations {
634 stress_points.push(StressPoint {
635 path: path.clone(),
636 entity_name: entity_names
637 .get(entity_id)
638 .cloned()
639 .unwrap_or_else(|| "N/A".to_string()),
640 operations: entity_ops,
641 });
642 }
643 }
644 }
645 }
646 }
647 }
648
649 stress_points.sort_by(|a, b| b.operations.cmp(&a.operations));
650
651 println!("{:<40} {:<40} {:>10}", "Entity", "Path", "Ops");
652 println!("{}", "-".repeat(100));
653
654 for sp in stress_points.iter().take(top) {
655 let entity_display = if sp.entity_name.len() > 38 {
656 &sp.entity_name[..38]
657 } else {
658 &sp.entity_name
659 };
660 let path_display = if sp.path.len() > 38 {
661 &sp.path[..38]
662 } else {
663 &sp.path
664 };
665 println!(
666 "{:<40} {:<40} {:>10}",
667 entity_display,
668 path_display,
669 format_number(sp.operations)
670 );
671 }
672
673 println!("{}", "=".repeat(100));
674 println!("\nTotal Lines Processed: {}", format_number(total_lines));
675 println!("Total Operations: {}", format_number(total_operations));
676 println!("{}", "=".repeat(100));
677
678 Ok(())
679}