1use crate::audit::types::AuditEntry;
68use crate::utils::format::format_number;
69use crate::utils::processor::{ProcessingMode, ProcessorBuilder};
70use crate::utils::time::parse_timestamp;
71use anyhow::{Context, Result};
72use std::collections::HashMap;
73use std::fs::File;
74use std::io::Write;
75
76type ProcessLogsResult = (
78 HashMap<String, TokenOps>,
79 HashMap<String, EntityAccessors>,
80 usize,
81);
82
83#[derive(Debug, Default, Clone)]
85struct TokenOps {
86 lookup_self: usize,
87 renew_self: usize,
88 revoke_self: usize,
89 create: usize,
90 login: usize,
91 other: usize,
92 display_name: Option<String>,
93 username: Option<String>,
94 first_seen: Option<String>,
95 last_seen: Option<String>,
96}
97
98impl TokenOps {
99 const fn total(&self) -> usize {
100 self.lookup_self
101 + self.renew_self
102 + self.revoke_self
103 + self.create
104 + self.login
105 + self.other
106 }
107
108 fn update_timestamps(&mut self, timestamp: &str) {
109 if self.first_seen.is_none() {
110 self.first_seen = Some(timestamp.to_string());
111 }
112 self.last_seen = Some(timestamp.to_string());
113 }
114}
115
116#[derive(Debug, Default, Clone)]
118struct AccessorData {
119 operations: usize,
120 first_seen: String,
121 last_seen: String,
122}
123
124#[derive(Debug, Default, Clone)]
126struct EntityAccessors {
127 accessors: HashMap<String, AccessorData>,
128 display_name: Option<String>,
129}
130
131#[derive(Debug, Default, Clone)]
133struct TokenAnalysisState {
134 token_ops: HashMap<String, TokenOps>,
135 accessor_data: HashMap<String, EntityAccessors>,
136}
137
138impl TokenAnalysisState {
139 fn new() -> Self {
140 Self {
141 token_ops: HashMap::with_capacity(2000),
142 accessor_data: HashMap::with_capacity(2000),
143 }
144 }
145
146 fn merge(mut self, other: Self) -> Self {
147 for (entity_id, other_ops) in other.token_ops {
149 let ops = self.token_ops.entry(entity_id).or_default();
150 ops.lookup_self += other_ops.lookup_self;
151 ops.renew_self += other_ops.renew_self;
152 ops.revoke_self += other_ops.revoke_self;
153 ops.create += other_ops.create;
154 ops.login += other_ops.login;
155 ops.other += other_ops.other;
156
157 if ops.display_name.is_none() {
159 ops.display_name = other_ops.display_name;
160 }
161 if ops.username.is_none() {
162 ops.username = other_ops.username;
163 }
164
165 if ops.first_seen.is_none()
167 || (other_ops.first_seen.is_some() && ops.first_seen > other_ops.first_seen)
168 {
169 ops.first_seen = other_ops.first_seen;
170 }
171 if ops.last_seen.is_none()
172 || (other_ops.last_seen.is_some() && ops.last_seen < other_ops.last_seen)
173 {
174 ops.last_seen = other_ops.last_seen;
175 }
176 }
177
178 for (entity_id, other_entity) in other.accessor_data {
180 let entity = self.accessor_data.entry(entity_id).or_default();
181
182 for (accessor, other_data) in other_entity.accessors {
184 let data = entity.accessors.entry(accessor).or_default();
185 data.operations += other_data.operations;
186
187 if data.first_seen.is_empty() || data.first_seen > other_data.first_seen {
189 data.first_seen = other_data.first_seen;
190 }
191 if data.last_seen.is_empty() || data.last_seen < other_data.last_seen {
192 data.last_seen = other_data.last_seen;
193 }
194 }
195
196 if entity.display_name.is_none() {
198 entity.display_name = other_entity.display_name;
199 }
200 }
201
202 self
203 }
204}
205
206fn calculate_time_span_hours(first_seen: &str, last_seen: &str) -> Result<f64> {
207 let first = parse_timestamp(first_seen)
208 .with_context(|| format!("Failed to parse first timestamp: {}", first_seen))?;
209 let last = parse_timestamp(last_seen)
210 .with_context(|| format!("Failed to parse last timestamp: {}", last_seen))?;
211
212 let duration = last.signed_duration_since(first);
213 Ok(duration.num_seconds() as f64 / 3600.0)
214}
215
216fn process_logs(
218 log_files: &[String],
219 operation_filter: Option<&[String]>,
220) -> Result<ProcessLogsResult> {
221 let processor = ProcessorBuilder::new()
222 .progress_label("Analyzing tokens")
223 .mode(ProcessingMode::Auto)
224 .build();
225
226 let operation_filter = operation_filter.map(<[std::string::String]>::to_vec);
227
228 let (result, stats) = processor.process_files_streaming(
229 log_files,
230 move |entry: &AuditEntry, state: &mut TokenAnalysisState| {
231 let Some(request) = &entry.request else {
233 return;
234 };
235
236 let Some(auth) = &entry.auth else { return };
237
238 let entity_id = match &auth.entity_id {
239 Some(id) if !id.is_empty() => id.clone(),
240 _ => return,
241 };
242
243 let path = request.path.as_deref().unwrap_or("");
245 let operation = request.operation.as_deref().unwrap_or("");
246
247 let op_type = if path == "auth/token/lookup-self" {
248 "lookup"
249 } else if path == "auth/token/renew-self" {
250 "renew"
251 } else if path == "auth/token/revoke-self" {
252 "revoke"
253 } else if path == "auth/token/create" {
254 "create"
255 } else if path.starts_with("auth/") && operation == "update" {
256 "login"
257 } else if path.starts_with("auth/token/") {
258 "other"
259 } else {
260 return; };
262
263 if let Some(ref filters) = operation_filter {
265 if !filters.iter().any(|f| op_type.contains(f.as_str())) {
266 return;
267 }
268 }
269
270 let ops = state.token_ops.entry(entity_id.clone()).or_default();
272 match op_type {
273 "lookup" => ops.lookup_self += 1,
274 "renew" => ops.renew_self += 1,
275 "revoke" => ops.revoke_self += 1,
276 "create" => ops.create += 1,
277 "login" => ops.login += 1,
278 _ => ops.other += 1,
279 }
280
281 if ops.display_name.is_none() {
282 ops.display_name.clone_from(&auth.display_name);
283 }
284 if ops.username.is_none() {
285 ops.username = auth.metadata.as_ref().and_then(|m| {
286 m.get("username")
287 .and_then(|v| v.as_str())
288 .map(std::string::ToString::to_string)
289 });
290 }
291 ops.update_timestamps(&entry.time);
292
293 if let Some(accessor) = &auth.accessor {
295 let entity_acc = state.accessor_data.entry(entity_id).or_default();
296 if entity_acc.display_name.is_none() {
297 entity_acc.display_name.clone_from(&auth.display_name);
298 }
299
300 let acc_data = entity_acc
301 .accessors
302 .entry(accessor.clone())
303 .or_insert_with(|| AccessorData {
304 operations: 0,
305 first_seen: entry.time.clone(),
306 last_seen: entry.time.clone(),
307 });
308 acc_data.operations += 1;
309 acc_data.last_seen.clone_from(&entry.time);
310 }
311 },
312 TokenAnalysisState::merge,
313 TokenAnalysisState::new(),
314 )?;
315
316 stats.report();
317 Ok((result.token_ops, result.accessor_data, stats.total_lines))
318}
319
320fn display_summary(token_ops: &HashMap<String, TokenOps>, total_lines: usize) {
322 let mut ops_vec: Vec<_> = token_ops.iter().collect();
323 ops_vec.sort_by(|a, b| b.1.total().cmp(&a.1.total()));
324
325 let total_ops: usize = ops_vec.iter().map(|(_, ops)| ops.total()).sum();
327 let total_lookup: usize = ops_vec.iter().map(|(_, ops)| ops.lookup_self).sum();
328 let total_renew: usize = ops_vec.iter().map(|(_, ops)| ops.renew_self).sum();
329 let total_revoke: usize = ops_vec.iter().map(|(_, ops)| ops.revoke_self).sum();
330 let total_create: usize = ops_vec.iter().map(|(_, ops)| ops.create).sum();
331 let total_login: usize = ops_vec.iter().map(|(_, ops)| ops.login).sum();
332 let total_other: usize = ops_vec.iter().map(|(_, ops)| ops.other).sum();
333
334 println!("Total: Processed {} lines\n", format_number(total_lines));
335 println!("{}", "=".repeat(150));
336 println!(
337 "{:<30} {:<25} {:>10} {:>10} {:>10} {:>10} {:>10} {:>10} {:>10}",
338 "Display Name",
339 "Username",
340 "Total",
341 "Lookup",
342 "Renew",
343 "Revoke",
344 "Create",
345 "Login",
346 "Other"
347 );
348 println!("{}", "=".repeat(150));
349
350 for (_, ops) in ops_vec.iter().take(50) {
352 let display = ops.display_name.as_deref().unwrap_or("");
353 let username = ops.username.as_deref().unwrap_or("");
354
355 println!(
356 "{:<30} {:<25} {:>10} {:>10} {:>10} {:>10} {:>10} {:>10} {:>10}",
357 if display.len() > 30 {
358 &display[..30]
359 } else {
360 display
361 },
362 if username.len() > 25 {
363 &username[..25]
364 } else {
365 username
366 },
367 format_number(ops.total()),
368 format_number(ops.lookup_self),
369 format_number(ops.renew_self),
370 format_number(ops.revoke_self),
371 format_number(ops.create),
372 format_number(ops.login),
373 format_number(ops.other)
374 );
375 }
376
377 println!("{}", "=".repeat(150));
378 println!(
379 "TOTAL (top 50) {:>10}",
380 format_number(total_ops)
381 );
382 println!(
383 "TOTAL ENTITIES {:>10}",
384 format_number(token_ops.len())
385 );
386 println!("{}", "=".repeat(150));
387 println!();
388 println!("Operation Type Breakdown:");
389 println!("{}", "-".repeat(60));
390 println!(
391 "Lookup (lookup-self): {:>12} ({:>5.1}%)",
392 format_number(total_lookup),
393 (total_lookup as f64 / total_ops as f64) * 100.0
394 );
395 println!(
396 "Renew (renew-self): {:>12} ({:>5.1}%)",
397 format_number(total_renew),
398 (total_renew as f64 / total_ops as f64) * 100.0
399 );
400 println!(
401 "Revoke (revoke-self): {:>12} ({:>5.1}%)",
402 format_number(total_revoke),
403 (total_revoke as f64 / total_ops as f64) * 100.0
404 );
405 println!(
406 "Create (child token): {:>12} ({:>5.1}%)",
407 format_number(total_create),
408 (total_create as f64 / total_ops as f64) * 100.0
409 );
410 println!(
411 "Login (auth token): {:>12} ({:>5.1}%)",
412 format_number(total_login),
413 (total_login as f64 / total_ops as f64) * 100.0
414 );
415 println!(
416 "Other: {:>12} ({:>5.1}%)",
417 format_number(total_other),
418 (total_other as f64 / total_ops as f64) * 100.0
419 );
420 println!("{}", "-".repeat(60));
421 println!("TOTAL: {:>16}", format_number(total_ops));
422}
423
424fn display_abuse(token_ops: &HashMap<String, TokenOps>, threshold: usize) {
426 let mut abusers: Vec<_> = token_ops
427 .iter()
428 .filter(|(_, ops)| ops.lookup_self >= threshold)
429 .collect();
430
431 abusers.sort_by(|a, b| b.1.lookup_self.cmp(&a.1.lookup_self));
432
433 if abusers.is_empty() {
434 println!(
435 "\n No entities found exceeding threshold of {} lookup operations",
436 format_number(threshold)
437 );
438 return;
439 }
440
441 println!(
442 "\n Found {} entities exceeding {} lookup operations:\n",
443 abusers.len(),
444 format_number(threshold)
445 );
446
447 println!(
448 "{:<50} {:>12} {:>20} {:>12}",
449 "Entity", "Lookups", "Time Span", "Rate/Hour"
450 );
451 println!("{}", "=".repeat(106));
452
453 for (entity_id, ops) in abusers {
454 let display = ops
455 .display_name
456 .as_deref()
457 .or(ops.username.as_deref())
458 .unwrap_or(entity_id);
459
460 let time_span = if let (Some(first), Some(last)) = (&ops.first_seen, &ops.last_seen) {
461 calculate_time_span_hours(first, last).unwrap_or_else(|err| {
462 eprintln!(
463 "Warning: Failed to calculate time span for entity {}: {}",
464 entity_id, err
465 );
466 0.0
467 })
468 } else {
469 0.0
470 };
471
472 let rate = if time_span > 0.0 {
473 ops.lookup_self as f64 / time_span
474 } else {
475 0.0
476 };
477
478 println!(
479 "{:<50} {:>12} {:>17.1}h {:>12.1}",
480 if display.len() > 50 {
481 format!("{}...", &display[..47])
482 } else {
483 display.to_string()
484 },
485 format_number(ops.lookup_self),
486 time_span,
487 rate
488 );
489 }
490}
491
492fn export_csv(
494 accessor_data: &HashMap<String, EntityAccessors>,
495 output: &str,
496 min_operations: usize,
497) -> Result<()> {
498 let mut file = File::create(output)
499 .with_context(|| format!("Failed to create output file: {}", output))?;
500
501 writeln!(
502 file,
503 "entity_id,display_name,accessor,operations,first_seen,last_seen,duration_hours"
504 )?;
505
506 let mut rows: Vec<_> = accessor_data
507 .iter()
508 .flat_map(|(entity_id, entity_data)| {
509 entity_data
510 .accessors
511 .iter()
512 .map(move |(accessor, data)| (entity_id, &entity_data.display_name, accessor, data))
513 })
514 .filter(|(_, _, _, data)| data.operations >= min_operations)
515 .collect();
516
517 rows.sort_by(|a, b| b.3.operations.cmp(&a.3.operations));
518
519 for (entity_id, display_name, accessor, data) in rows {
520 let duration =
521 calculate_time_span_hours(&data.first_seen, &data.last_seen).unwrap_or_else(|err| {
522 eprintln!(
523 "Warning: Failed to calculate duration for accessor {}: {}",
524 accessor, err
525 );
526 0.0
527 });
528 let display = display_name.as_deref().unwrap_or(entity_id);
529
530 writeln!(
531 file,
532 "{},{},{},{},{},{},{:.2}",
533 entity_id,
534 display,
535 accessor,
536 data.operations,
537 data.first_seen,
538 data.last_seen,
539 duration
540 )?;
541 }
542
543 Ok(())
544}
545
546pub fn run(
548 log_files: &[String],
549 abuse_threshold: Option<usize>,
550 operation_filter: Option<&[String]>,
551 export_path: Option<&str>,
552 min_operations: usize,
553) -> Result<()> {
554 eprintln!("Token Analysis");
555 eprintln!(" Files: {}", log_files.len());
556 if let Some(filters) = operation_filter {
557 eprintln!(" Filter: {}", filters.join(", "));
558 }
559 if let Some(threshold) = abuse_threshold {
560 eprintln!(" Abuse threshold: {}", format_number(threshold));
561 }
562 if let Some(output) = export_path {
563 eprintln!(" Export: {}", output);
564 }
565 eprintln!();
566
567 let (token_ops, accessor_data, total_lines) = process_logs(log_files, operation_filter)?;
568
569 eprintln!("\n Processed {} total lines", format_number(total_lines));
570 eprintln!(
571 " {} unique entities with token operations",
572 format_number(token_ops.len())
573 );
574
575 if let Some(threshold) = abuse_threshold {
577 display_abuse(&token_ops, threshold);
578 } else {
579 display_summary(&token_ops, total_lines);
580 }
581
582 if let Some(output) = export_path {
584 export_csv(&accessor_data, output, min_operations)?;
585 eprintln!("\n Exported data to: {}", output);
586 }
587
588 Ok(())
589}