1use crate::audit::types::AuditEntry;
40use crate::utils::format::format_number;
41use crate::utils::processor::{ProcessingMode, ProcessorBuilder};
42use crate::utils::time::parse_timestamp;
43use anyhow::Result;
44use chrono::DateTime;
45use chrono::Utc;
46use std::collections::{HashMap, HashSet};
47
48#[derive(Debug, Clone)]
50struct PathStats {
51 operations: usize,
52 entities: HashSet<String>,
53 operations_by_type: HashMap<String, usize>,
54 timestamps: Vec<DateTime<Utc>>,
55 entity_operations: HashMap<String, usize>,
56}
57
58impl PathStats {
59 fn new() -> Self {
60 Self {
61 operations: 0,
62 entities: HashSet::with_capacity(50), operations_by_type: HashMap::with_capacity(10), timestamps: Vec::with_capacity(1000), entity_operations: HashMap::with_capacity(50), }
67 }
68
69 fn merge(&mut self, other: Self) {
70 self.operations += other.operations;
71 self.entities.extend(other.entities);
72 for (op, count) in other.operations_by_type {
73 *self.operations_by_type.entry(op).or_insert(0) += count;
74 }
75 self.timestamps.extend(other.timestamps);
76 for (entity, count) in other.entity_operations {
77 *self.entity_operations.entry(entity).or_insert(0) += count;
78 }
79 }
80}
81
82#[derive(Debug, Clone)]
83struct HotspotsState {
84 path_stats: HashMap<String, PathStats>,
85 total_operations: usize,
86}
87
88impl HotspotsState {
89 fn new() -> Self {
90 Self {
91 path_stats: HashMap::with_capacity(5000),
92 total_operations: 0,
93 }
94 }
95
96 fn merge(mut self, other: Self) -> Self {
97 self.total_operations += other.total_operations;
98 for (path, other_stats) in other.path_stats {
99 self.path_stats
100 .entry(path)
101 .and_modify(|stats| stats.merge(other_stats.clone()))
102 .or_insert(other_stats);
103 }
104 self
105 }
106}
107
108pub fn run(log_files: &[String], top: usize) -> Result<()> {
109 let processor = ProcessorBuilder::new()
110 .mode(ProcessingMode::Auto)
111 .progress_label("Processing".to_string())
112 .build();
113
114 let (result, stats) = processor.process_files_streaming(
115 log_files,
116 |entry: &AuditEntry, state: &mut HotspotsState| {
117 let path = match &entry.request {
118 Some(r) => match &r.path {
119 Some(p) => p.as_str(),
120 None => return,
121 },
122 None => return,
123 };
124
125 let operation = match &entry.request {
126 Some(r) => match &r.operation {
127 Some(o) => o.as_str(),
128 None => return,
129 },
130 None => return,
131 };
132
133 state.total_operations += 1;
134
135 let entity_id = entry
136 .auth
137 .as_ref()
138 .and_then(|a| a.entity_id.as_deref())
139 .unwrap_or("no-entity");
140
141 let ts = parse_timestamp(&entry.time).ok();
143
144 let path_stats = state
146 .path_stats
147 .entry(path.to_string())
148 .or_insert_with(PathStats::new);
149 path_stats.operations += 1;
150 path_stats.entities.insert(entity_id.to_string());
151 *path_stats
152 .operations_by_type
153 .entry(operation.to_string())
154 .or_insert(0) += 1;
155 *path_stats
156 .entity_operations
157 .entry(entity_id.to_string())
158 .or_insert(0) += 1;
159 if let Some(t) = ts {
160 path_stats.timestamps.push(t);
161 }
162 },
163 HotspotsState::merge,
164 HotspotsState::new(),
165 )?;
166
167 let total_lines = stats.total_lines;
168 let total_operations = result.total_operations;
169 let path_stats = result.path_stats;
170
171 eprintln!(
172 "\nTotal: Processed {} lines, {} operations",
173 format_number(total_lines),
174 format_number(total_operations)
175 );
176
177 let mut sorted_paths: Vec<_> = path_stats.iter().collect();
179 sorted_paths.sort_by(|a, b| b.1.operations.cmp(&a.1.operations));
180
181 println!("\n{}", "=".repeat(120));
183 println!("TOP {} PATH HOT SPOTS ANALYSIS", top);
184 println!("{}", "=".repeat(120));
185
186 println!(
187 "\n{:<5} {:<60} {:<12} {:<10} {:<10} {:<10}",
188 "#", "Path", "Ops", "Entities", "Top Op", "%"
189 );
190 println!("{}", "-".repeat(120));
191
192 for (i, (path, data)) in sorted_paths.iter().take(top).enumerate() {
193 let ops = data.operations;
194 let entity_count = data.entities.len();
195 let percentage = (ops as f64 / total_operations as f64) * 100.0;
196
197 let top_op = data
198 .operations_by_type
199 .iter()
200 .max_by_key(|x| x.1)
201 .map_or("N/A", |x| x.0.as_str());
202
203 let display_path = if path.len() <= 58 {
204 (*path).clone()
205 } else {
206 format!("{}...", &path[..55])
207 };
208
209 println!(
210 "{:<5} {:<60} {:<12} {:<10} {:<10} {:<10.2}%",
211 i + 1,
212 display_path,
213 format_number(ops),
214 format_number(entity_count),
215 top_op,
216 percentage
217 );
218 }
219
220 println!("\n\nDETAILED ANALYSIS OF TOP {} PATHS", top.min(20));
222 println!("{}", "=".repeat(120));
223
224 for (i, (path, data)) in sorted_paths.iter().take(top.min(20)).enumerate() {
225 println!("\n{}. PATH: {}", i + 1, path);
226 println!("{}", "-".repeat(120));
227
228 let ops = data.operations;
229 let entity_count = data.entities.len();
230 let percentage = (ops as f64 / total_operations as f64) * 100.0;
231
232 println!(
233 " Total Operations: {} ({:.2}% of all traffic)",
234 format_number(ops),
235 percentage
236 );
237 println!(" Unique Entities: {}", format_number(entity_count));
238
239 if data.timestamps.len() >= 2 {
241 let mut sorted_ts = data.timestamps.clone();
242 sorted_ts.sort();
243 let time_span = (sorted_ts
244 .last()
245 .unwrap()
246 .signed_duration_since(*sorted_ts.first().unwrap()))
247 .num_seconds() as f64
248 / 3600.0;
249 if time_span > 0.0 {
250 let ops_per_hour = ops as f64 / time_span;
251 println!(
252 " Access Rate: {:.1} operations/hour ({:.2}/minute)",
253 ops_per_hour,
254 ops_per_hour / 60.0
255 );
256 }
257 }
258
259 println!(" Operations by type:");
261 let mut ops_by_type: Vec<_> = data.operations_by_type.iter().collect();
262 ops_by_type.sort_by(|a, b| b.1.cmp(a.1));
263 for (op, count) in ops_by_type.iter().take(5) {
264 let op_pct = (**count as f64 / ops as f64) * 100.0;
265 println!(
266 " - {}: {} ({:.1}%)",
267 op,
268 format_number(**count),
269 op_pct
270 );
271 }
272
273 let mut top_entities: Vec<_> = data.entity_operations.iter().collect();
275 top_entities.sort_by(|a, b| b.1.cmp(a.1));
276 if !top_entities.is_empty() {
277 println!(" Top {} entities:", top_entities.len().min(5));
278 for (entity_id, entity_ops) in top_entities.iter().take(5) {
279 let entity_pct = (**entity_ops as f64 / ops as f64) * 100.0;
280 let entity_display = if entity_id.len() <= 40 {
281 (*entity_id).clone()
282 } else {
283 format!("{}...", &entity_id[..37])
284 };
285 println!(
286 " - {}: {} ops ({:.1}%)",
287 entity_display,
288 format_number(**entity_ops),
289 entity_pct
290 );
291 }
292 }
293
294 print!(" Category: ");
296 let mut recommendations = Vec::new();
297
298 if path.contains("token/lookup") {
299 println!("TOKEN LOOKUP");
300 recommendations
301 .push("Implement client-side token TTL tracking to eliminate polling".to_string());
302 recommendations.push(format!(
303 "Potential reduction: 80-90% ({} operations)",
304 format_number((ops as f64 * 0.85) as usize)
305 ));
306 } else if path.to_lowercase().contains("airflow") {
307 println!("AIRFLOW SECRET");
308 recommendations
309 .push("Deploy Vault agent with template rendering for Airflow".to_string());
310 recommendations.push("Configure connection caching in Airflow".to_string());
311 recommendations.push(format!(
312 "Potential reduction: 95% ({} operations)",
313 format_number((ops as f64 * 0.95) as usize)
314 ));
315 } else if path.contains("approle/login") {
316 println!("APPROLE AUTHENTICATION");
317 if entity_count == 1 {
318 recommendations.push(format!(
319 "⚠️ CRITICAL: Single entity making all {} login requests",
320 format_number(ops)
321 ));
322 recommendations
323 .push("Review token TTL configuration - may be too short".to_string());
324 recommendations.push("Consider SecretID caching if appropriate".to_string());
325 }
326 } else if path.to_lowercase().contains("openshift")
327 || path.to_lowercase().contains("kubernetes")
328 {
329 println!("KUBERNETES/OPENSHIFT AUTH");
330 recommendations.push("Review pod authentication token TTLs".to_string());
331 recommendations.push("Consider increasing default token lifetime".to_string());
332 recommendations.push("Implement token renewal strategy in applications".to_string());
333 } else if path.to_lowercase().contains("github") && path.contains("login") {
334 println!("GITHUB AUTHENTICATION");
335 recommendations.push("Review GitHub auth token TTLs".to_string());
336 if entity_count == 1 {
337 recommendations.push(format!(
338 "⚠️ Single entity ({}) - investigate why",
339 entity_count
340 ));
341 }
342 } else if path.contains("data/") || path.contains("metadata/") {
343 println!("KV SECRET ENGINE");
344 if entity_count <= 3 && ops > 10000 {
345 recommendations.push(format!(
346 "⚠️ HIGH-FREQUENCY ACCESS: {} operations from only {} entities",
347 format_number(ops),
348 entity_count
349 ));
350 recommendations.push("Implement caching layer or Vault agent".to_string());
351 recommendations.push("Review if secret needs this frequency of access".to_string());
352 } else {
353 recommendations
354 .push("Consider Vault agent for high-frequency consumers".to_string());
355 }
356 } else {
357 println!("OTHER");
358 if ops > 5000 {
359 recommendations.push(format!(
360 "High-volume path ({} operations) - review necessity",
361 format_number(ops)
362 ));
363 }
364 }
365
366 if let Some((_, top_entity_ops)) = top_entities.first() {
368 let top_entity_pct = (**top_entity_ops as f64 / ops as f64) * 100.0;
369 if top_entity_pct > 50.0 && !recommendations.iter().any(|r| r.contains("CRITICAL")) {
370 recommendations.push(format!(
371 "⚠️ Entity concentration: Single entity responsible for {:.1}% of access",
372 top_entity_pct
373 ));
374 }
375 }
376
377 if !recommendations.is_empty() {
378 println!(" Recommendations:");
379 for rec in recommendations {
380 println!(" • {}", rec);
381 }
382 }
383 }
384
385 println!("\n\nSUMMARY BY PATH CATEGORY");
387 println!("{}", "=".repeat(120));
388
389 let mut categories: HashMap<&str, usize> = HashMap::with_capacity(10); categories.insert("Token Operations", 0);
391 categories.insert("KV Secret Access", 0);
392 categories.insert("Authentication", 0);
393 categories.insert("Airflow Secrets", 0);
394 categories.insert("System/Admin", 0);
395 categories.insert("Other", 0);
396
397 for (path, stats) in &path_stats {
398 let ops = stats.operations;
399 if path.contains("token/") {
400 *categories.get_mut("Token Operations").unwrap() += ops;
401 } else if path.contains("/data/") || path.contains("/metadata/") {
402 if path.to_lowercase().contains("airflow") {
403 *categories.get_mut("Airflow Secrets").unwrap() += ops;
404 } else {
405 *categories.get_mut("KV Secret Access").unwrap() += ops;
406 }
407 } else if path.contains("/login") || path.contains("/auth/") {
408 *categories.get_mut("Authentication").unwrap() += ops;
409 } else if path.contains("sys/") {
410 *categories.get_mut("System/Admin").unwrap() += ops;
411 } else {
412 *categories.get_mut("Other").unwrap() += ops;
413 }
414 }
415
416 println!(
417 "{:<30} {:<15} {:<15}",
418 "Category", "Operations", "% of Total"
419 );
420 println!("{}", "-".repeat(120));
421
422 let mut sorted_categories: Vec<_> = categories.iter().collect();
423 sorted_categories.sort_by(|a, b| b.1.cmp(a.1));
424
425 for (category, ops) in sorted_categories {
426 let percentage = (*ops as f64 / total_operations as f64) * 100.0;
427 println!(
428 "{:<30} {:<15} {:<15.2}%",
429 category,
430 format_number(*ops),
431 percentage
432 );
433 }
434
435 println!("\n{}", "=".repeat(120));
436
437 println!("\nTOP OPTIMIZATION OPPORTUNITIES (by impact)");
439 println!("{}", "=".repeat(120));
440
441 struct Opportunity {
442 name: String,
443 current_ops: usize,
444 potential_reduction: usize,
445 effort: String,
446 priority: u8,
447 }
448
449 let mut opportunities = Vec::new();
450
451 let token_lookup_ops: usize = path_stats
453 .iter()
454 .filter(|(path, _)| path.contains("token/lookup"))
455 .map(|(_, stats)| stats.operations)
456 .sum();
457
458 if token_lookup_ops > 10000 {
459 opportunities.push(Opportunity {
460 name: "Eliminate Token Lookup Polling".to_string(),
461 current_ops: token_lookup_ops,
462 potential_reduction: (token_lookup_ops as f64 * 0.85) as usize,
463 effort: "Medium".to_string(),
464 priority: 1,
465 });
466 }
467
468 let airflow_ops: usize = path_stats
470 .iter()
471 .filter(|(path, _)| path.to_lowercase().contains("airflow"))
472 .map(|(_, stats)| stats.operations)
473 .sum();
474
475 if airflow_ops > 10000 {
476 opportunities.push(Opportunity {
477 name: "Deploy Vault Agent for Airflow".to_string(),
478 current_ops: airflow_ops,
479 potential_reduction: (airflow_ops as f64 * 0.95) as usize,
480 effort: "Medium".to_string(),
481 priority: 2,
482 });
483 }
484
485 let high_freq_ops: usize = path_stats
487 .iter()
488 .filter(|(_, stats)| stats.operations > 5000 && stats.operations < 100_000)
489 .map(|(_, stats)| stats.operations)
490 .sum();
491
492 let high_freq_count = path_stats
493 .iter()
494 .filter(|(_, stats)| stats.operations > 5000 && stats.operations < 100_000)
495 .count();
496
497 if high_freq_ops > 10000 {
498 opportunities.push(Opportunity {
499 name: format!("Cache High-Frequency Paths ({} paths)", high_freq_count),
500 current_ops: high_freq_ops,
501 potential_reduction: (high_freq_ops as f64 * 0.70) as usize,
502 effort: "Low-Medium".to_string(),
503 priority: 3,
504 });
505 }
506
507 opportunities.sort_by_key(|o| o.priority);
508
509 println!(
510 "\n{:<10} {:<50} {:<15} {:<15} {:<15}",
511 "Priority", "Opportunity", "Current Ops", "Savings", "Effort"
512 );
513 println!("{}", "-".repeat(120));
514
515 let mut total_current_ops = 0;
516 let mut total_savings = 0;
517
518 for opp in &opportunities {
519 println!(
520 "{:<10} {:<50} {:<15} {:<15} {:<15}",
521 opp.priority,
522 opp.name,
523 format_number(opp.current_ops),
524 format_number(opp.potential_reduction),
525 opp.effort
526 );
527 total_current_ops += opp.current_ops;
528 total_savings += opp.potential_reduction;
529 }
530
531 println!("{}", "-".repeat(120));
532 println!(
533 "{:<10} {:<50} {:<15} {:<15}",
534 "TOTAL POTENTIAL SAVINGS",
535 "",
536 format_number(total_current_ops),
537 format_number(total_savings)
538 );
539
540 let projected_reduction = (total_savings as f64 / total_operations as f64) * 100.0;
541 println!(
542 "\nProjected reduction: {:.1}% of all Vault operations",
543 projected_reduction
544 );
545 println!("{}", "=".repeat(120));
546
547 Ok(())
548}