1use crate::audit::types::AuditEntry;
44use crate::utils::format::format_number;
45use crate::utils::parallel::process_files_parallel;
46use anyhow::Result;
47use std::collections::{HashMap, HashSet};
48
49#[derive(Debug)]
51struct PathData {
52 count: usize,
53 operations: HashMap<String, usize>,
54 entities: HashSet<String>,
55}
56
57impl PathData {
58 fn new() -> Self {
59 Self {
60 count: 0,
61 operations: HashMap::with_capacity(10), entities: HashSet::with_capacity(50), }
64 }
65}
66
67#[derive(Debug)]
69struct FileAnalysisResult {
70 path_operations: HashMap<String, PathData>,
71 operation_types: HashMap<String, usize>,
72 path_prefixes: HashMap<String, usize>,
73 entity_paths: HashMap<String, HashMap<String, usize>>,
74 entity_names: HashMap<String, String>,
75}
76
77static PARALLEL_PROGRESS: std::sync::OnceLock<(
79 std::sync::Arc<std::sync::atomic::AtomicUsize>,
80 std::sync::Arc<std::sync::Mutex<crate::utils::progress::ProgressBar>>,
81)> = std::sync::OnceLock::new();
82
83pub fn init_parallel_progress(
85 processed: std::sync::Arc<std::sync::atomic::AtomicUsize>,
86 progress: std::sync::Arc<std::sync::Mutex<crate::utils::progress::ProgressBar>>,
87) {
88 let _ = PARALLEL_PROGRESS.set((processed, progress));
89}
90
91fn process_file_entries_streaming(
94 file_path: &str,
95 namespace_filter: Option<&str>,
96) -> Result<FileAnalysisResult> {
97 process_file_entries_streaming_with_progress(file_path, namespace_filter, None)
98}
99
100fn process_file_entries_streaming_with_progress(
102 file_path: &str,
103 namespace_filter: Option<&str>,
104 local_progress: Option<&(
105 std::sync::Arc<std::sync::atomic::AtomicUsize>,
106 std::sync::Arc<std::sync::Mutex<crate::utils::progress::ProgressBar>>,
107 )>,
108) -> Result<FileAnalysisResult> {
109 use crate::utils::reader::open_file;
110 use std::io::{BufRead, BufReader};
111
112 let mut path_operations: HashMap<String, PathData> = HashMap::with_capacity(5000);
113 let mut operation_types: HashMap<String, usize> = HashMap::with_capacity(20);
114 let mut path_prefixes: HashMap<String, usize> = HashMap::with_capacity(100);
115 let mut entity_paths: HashMap<String, HashMap<String, usize>> = HashMap::with_capacity(2000);
116 let mut entity_names: HashMap<String, String> = HashMap::with_capacity(2000);
117
118 let file = open_file(file_path)?;
119 let reader = BufReader::new(file);
120
121 let mut lines_processed = 0;
122
123 for line_result in reader.lines() {
124 let line = line_result?;
125 lines_processed += 1;
126
127 if lines_processed % 2000 == 0 {
129 if let Some((processed, progress)) = &local_progress {
131 processed.store(lines_processed, std::sync::atomic::Ordering::Relaxed);
132 if let Ok(p) = progress.lock() {
133 p.update(lines_processed);
134 }
135 } else if let Some((processed_lines, progress)) = PARALLEL_PROGRESS.get() {
136 processed_lines.fetch_add(2000, std::sync::atomic::Ordering::Relaxed);
137 if let Ok(p) = progress.lock() {
138 p.update(processed_lines.load(std::sync::atomic::Ordering::Relaxed));
139 }
140 }
141 }
142
143 if line.trim().is_empty() {
145 continue;
146 }
147
148 let entry: AuditEntry = match serde_json::from_str(&line) {
150 Ok(entry) => entry,
151 Err(_) => continue, };
153
154 if let Some(filter_ns) = namespace_filter {
156 if entry.namespace_id() != Some(filter_ns) {
157 continue;
158 }
159 }
160
161 let Some(request) = &entry.request else {
162 continue;
163 };
164
165 let path = match &request.path {
166 Some(p) => p.as_str(),
167 None => continue,
168 };
169
170 let operation = match &request.operation {
171 Some(o) => o.as_str(),
172 None => continue,
173 };
174
175 let entity_id = entry
176 .auth
177 .as_ref()
178 .and_then(|a| a.entity_id.as_deref())
179 .unwrap_or("no-entity");
180
181 let display_name = entry
182 .auth
183 .as_ref()
184 .and_then(|a| a.display_name.as_deref())
185 .unwrap_or("N/A");
186
187 if path.is_empty() || operation.is_empty() {
188 continue;
189 }
190
191 let path_data = path_operations
193 .entry(path.to_string())
194 .or_insert_with(PathData::new);
195 path_data.count += 1;
196 *path_data
197 .operations
198 .entry(operation.to_string())
199 .or_insert(0) += 1;
200 path_data.entities.insert(entity_id.to_string());
201
202 *operation_types.entry(operation.to_string()).or_insert(0) += 1;
204
205 let parts: Vec<&str> = path.trim_matches('/').split('/').collect();
207 let prefix = if parts.len() >= 2 {
208 format!("{}/{}", parts[0], parts[1])
209 } else if !parts.is_empty() {
210 parts[0].to_string()
211 } else {
212 "root".to_string()
213 };
214 *path_prefixes.entry(prefix).or_insert(0) += 1;
215
216 let entity_map = entity_paths.entry(entity_id.to_string()).or_default();
218 *entity_map.entry(path.to_string()).or_insert(0) += 1;
219 entity_names
220 .entry(entity_id.to_string())
221 .or_insert_with(|| display_name.to_string());
222 }
223
224 let remaining = lines_processed % 2000;
226 if remaining > 0 {
227 if let Some((processed, progress)) = &local_progress {
228 processed.store(lines_processed, std::sync::atomic::Ordering::Relaxed);
229 if let Ok(p) = progress.lock() {
230 p.update(lines_processed);
231 }
232 } else if let Some((processed_lines, progress)) = PARALLEL_PROGRESS.get() {
233 processed_lines.fetch_add(remaining, std::sync::atomic::Ordering::Relaxed);
234 if let Ok(p) = progress.lock() {
235 p.update(processed_lines.load(std::sync::atomic::Ordering::Relaxed));
236 }
237 }
238 }
239
240 Ok(FileAnalysisResult {
241 path_operations,
242 operation_types,
243 path_prefixes,
244 entity_paths,
245 entity_names,
246 })
247}
248
249fn combine_results(
251 results: Vec<crate::utils::parallel::FileProcessResult<FileAnalysisResult>>,
252) -> FileAnalysisResult {
253 let mut combined = FileAnalysisResult {
254 path_operations: HashMap::with_capacity(5000),
255 operation_types: HashMap::with_capacity(20),
256 path_prefixes: HashMap::with_capacity(100),
257 entity_paths: HashMap::with_capacity(2000),
258 entity_names: HashMap::with_capacity(2000),
259 };
260
261 for file_result in results {
262 let result = file_result.data;
263
264 for (path, path_data) in result.path_operations {
266 if let Some(existing) = combined.path_operations.get_mut(&path) {
267 existing.count += path_data.count;
268 for (op, count) in path_data.operations {
269 *existing.operations.entry(op).or_insert(0) += count;
270 }
271 existing.entities.extend(path_data.entities);
272 } else {
273 combined.path_operations.insert(path, path_data);
274 }
275 }
276
277 for (op, count) in result.operation_types {
279 *combined.operation_types.entry(op).or_insert(0) += count;
280 }
281
282 for (prefix, count) in result.path_prefixes {
284 *combined.path_prefixes.entry(prefix).or_insert(0) += count;
285 }
286
287 for (entity_id, paths) in result.entity_paths {
289 let entity_map = combined.entity_paths.entry(entity_id).or_default();
290 for (path, count) in paths {
291 *entity_map.entry(path).or_insert(0) += count;
292 }
293 }
294
295 for (entity_id, name) in result.entity_names {
297 combined.entity_names.entry(entity_id).or_insert(name);
298 }
299 }
300
301 combined
302}
303
304fn run_sequential(
306 log_files: &[String],
307 namespace_filter: Option<&str>,
308) -> Result<(FileAnalysisResult, usize)> {
309 let mut combined_result = FileAnalysisResult {
310 path_operations: HashMap::with_capacity(5000),
311 operation_types: HashMap::with_capacity(20),
312 path_prefixes: HashMap::with_capacity(100),
313 entity_paths: HashMap::with_capacity(2000),
314 entity_names: HashMap::with_capacity(2000),
315 };
316 let mut total_lines = 0;
317
318 for (file_idx, log_file) in log_files.iter().enumerate() {
320 eprintln!(
321 "[{}/{}] Processing: {}",
322 file_idx + 1,
323 log_files.len(),
324 log_file
325 );
326
327 eprintln!("Scanning file to determine total lines...");
330 let file_lines = crate::utils::parallel::count_file_lines(log_file)?;
331
332 let progress = crate::utils::progress::ProgressBar::new(file_lines, "Processing");
334 let progress = std::sync::Arc::new(std::sync::Mutex::new(progress));
335 let processed = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
336
337 let local_progress_tuple = (processed.clone(), progress.clone());
339 let file_result = process_file_entries_streaming_with_progress(
340 log_file,
341 namespace_filter,
342 Some(&local_progress_tuple),
343 )?;
344
345 total_lines += file_lines;
346
347 if let Ok(p) = progress.lock() {
349 p.update(file_lines);
350 p.finish_with_message(&format!(
351 "Processed {} lines from this file",
352 format_number(file_lines)
353 ));
354 }
355
356 for (path, path_data) in file_result.path_operations {
358 if let Some(existing) = combined_result.path_operations.get_mut(&path) {
359 existing.count += path_data.count;
360 for (op, count) in path_data.operations {
361 *existing.operations.entry(op).or_insert(0) += count;
362 }
363 existing.entities.extend(path_data.entities);
364 } else {
365 combined_result.path_operations.insert(path, path_data);
366 }
367 }
368
369 for (op, count) in file_result.operation_types {
371 *combined_result.operation_types.entry(op).or_insert(0) += count;
372 }
373
374 for (prefix, count) in file_result.path_prefixes {
376 *combined_result.path_prefixes.entry(prefix).or_insert(0) += count;
377 }
378
379 for (entity_id, paths) in file_result.entity_paths {
381 let entity_map = combined_result.entity_paths.entry(entity_id).or_default();
382 for (path, count) in paths {
383 *entity_map.entry(path).or_insert(0) += count;
384 }
385 }
386
387 for (entity_id, name) in file_result.entity_names {
389 combined_result
390 .entity_names
391 .entry(entity_id)
392 .or_insert(name);
393 }
394 }
395
396 Ok((combined_result, total_lines))
397}
398
399pub fn run(
400 log_files: &[String],
401 top: usize,
402 min_operations: usize,
403 namespace_filter: Option<&str>,
404 sequential: bool,
405) -> Result<()> {
406 let (combined_result, total_lines) = if sequential || log_files.len() == 1 {
407 eprintln!("Processing {} files sequentially...", log_files.len());
409 run_sequential(log_files, namespace_filter)?
410 } else {
411 let processor =
414 |file_path: &str| process_file_entries_streaming(file_path, namespace_filter);
415 process_files_parallel(log_files, processor, combine_results)?
416 };
417
418 eprintln!("\nTotal: Processed {} lines", format_number(total_lines));
419
420 let path_operations = combined_result.path_operations;
421 let operation_types = combined_result.operation_types;
422 let path_prefixes = combined_result.path_prefixes;
423 let entity_paths = combined_result.entity_paths;
424 let entity_names = combined_result.entity_names;
425
426 let total_operations: usize = operation_types.values().sum();
427
428 println!("\n{}", "=".repeat(100));
430 println!("High-Volume Vault Operations Analysis");
431 println!("{}", "=".repeat(100));
432
433 println!("\n1. Operation Types (Overall)");
435 println!("{}", "-".repeat(100));
436 println!("{:<20} {:>15} {:>12}", "Operation", "Count", "Percentage");
437 println!("{}", "-".repeat(100));
438
439 let mut sorted_ops: Vec<_> = operation_types.iter().collect();
440 sorted_ops.sort_by(|a, b| b.1.cmp(a.1));
441
442 for (op, count) in sorted_ops {
443 let pct = if total_operations > 0 {
444 (*count as f64 / total_operations as f64) * 100.0
445 } else {
446 0.0
447 };
448 println!("{:<20} {:>15} {:>11.2}%", op, format_number(*count), pct);
449 }
450
451 println!("{}", "-".repeat(100));
452 println!(
453 "{:<20} {:>15} {:>11.2}%",
454 "TOTAL",
455 format_number(total_operations),
456 100.0
457 );
458
459 println!("\n2. Top Path Prefixes (First 2 components)");
461 println!("{}", "-".repeat(100));
462 println!(
463 "{:<40} {:>15} {:>12}",
464 "Path Prefix", "Operations", "Percentage"
465 );
466 println!("{}", "-".repeat(100));
467
468 let mut sorted_prefixes: Vec<_> = path_prefixes.iter().collect();
469 sorted_prefixes.sort_by(|a, b| b.1.cmp(a.1));
470
471 for (prefix, count) in sorted_prefixes.iter().take(top) {
472 let pct = if total_operations > 0 {
473 (**count as f64 / total_operations as f64) * 100.0
474 } else {
475 0.0
476 };
477 println!(
478 "{:<40} {:>15} {:>11.2}%",
479 prefix,
480 format_number(**count),
481 pct
482 );
483 }
484
485 println!("\n3. Top {} Individual Paths (Highest Volume)", top);
487 println!("{}", "-".repeat(100));
488 println!(
489 "{:<60} {:>10} {:>10} {:>15}",
490 "Path", "Ops", "Entities", "Top Op"
491 );
492 println!("{}", "-".repeat(100));
493
494 let mut sorted_paths: Vec<_> = path_operations.iter().collect();
495 sorted_paths.sort_by(|a, b| b.1.count.cmp(&a.1.count));
496
497 for (path, data) in sorted_paths.iter().take(top) {
498 if data.count < min_operations {
499 break;
500 }
501 let top_op = data
502 .operations
503 .iter()
504 .max_by_key(|x| x.1)
505 .map_or("N/A", |x| x.0.as_str());
506 let path_display = if path.len() > 60 {
507 format!("{}...", &path[..58])
508 } else {
509 (*path).clone()
510 };
511 println!(
512 "{:<60} {:>10} {:>10} {:>15}",
513 path_display,
514 format_number(data.count),
515 format_number(data.entities.len()),
516 top_op
517 );
518 }
519
520 println!("\n4. Top {} Entities by Total Operations", top);
522 println!("{}", "-".repeat(100));
523 println!(
524 "{:<50} {:<38} {:>10}",
525 "Display Name", "Entity ID", "Total Ops"
526 );
527 println!("{}", "-".repeat(100));
528
529 let mut entity_totals: HashMap<String, usize> = HashMap::with_capacity(entity_paths.len());
530 for (entity_id, paths) in &entity_paths {
531 let total: usize = paths.values().sum();
532 entity_totals.insert(entity_id.clone(), total);
533 }
534
535 let mut sorted_entities: Vec<_> = entity_totals.iter().collect();
536 sorted_entities.sort_by(|a, b| b.1.cmp(a.1));
537
538 for (entity_id, total) in sorted_entities.iter().take(top) {
539 let name = entity_names
540 .get(*entity_id)
541 .map_or("N/A", std::string::String::as_str);
542 let name_display = if name.len() > 48 { &name[..48] } else { name };
543 let entity_short = if entity_id.len() > 36 {
544 &entity_id[..36]
545 } else {
546 entity_id
547 };
548 println!(
549 "{:<50} {:<38} {:>10}",
550 name_display,
551 entity_short,
552 format_number(**total)
553 );
554 }
555
556 println!("\n5. Potential System Stress Points");
558 println!("{}", "-".repeat(100));
559
560 #[derive(Debug)]
561 struct StressPoint {
562 path: String,
563 entity_name: String,
564 operations: usize,
565 }
566
567 let mut stress_points = Vec::new();
568
569 for (path, data) in &path_operations {
570 if data.count >= min_operations {
571 for entity_id in &data.entities {
572 if let Some(entity_ops_map) = entity_paths.get(entity_id) {
573 if let Some(&entity_ops) = entity_ops_map.get(path) {
574 if entity_ops >= min_operations {
575 stress_points.push(StressPoint {
576 path: path.clone(),
577 entity_name: entity_names
578 .get(entity_id)
579 .cloned()
580 .unwrap_or_else(|| "N/A".to_string()),
581 operations: entity_ops,
582 });
583 }
584 }
585 }
586 }
587 }
588 }
589
590 stress_points.sort_by(|a, b| b.operations.cmp(&a.operations));
591
592 println!("{:<40} {:<40} {:>10}", "Entity", "Path", "Ops");
593 println!("{}", "-".repeat(100));
594
595 for sp in stress_points.iter().take(top) {
596 let entity_display = if sp.entity_name.len() > 38 {
597 &sp.entity_name[..38]
598 } else {
599 &sp.entity_name
600 };
601 let path_display = if sp.path.len() > 38 {
602 &sp.path[..38]
603 } else {
604 &sp.path
605 };
606 println!(
607 "{:<40} {:<40} {:>10}",
608 entity_display,
609 path_display,
610 format_number(sp.operations)
611 );
612 }
613
614 println!("{}", "=".repeat(100));
615 println!("\nTotal Lines Processed: {}", format_number(total_lines));
616 println!("Total Operations: {}", format_number(total_operations));
617 println!("{}", "=".repeat(100));
618
619 Ok(())
620}