vault_audit_tools/commands/
kv_analyzer.rs1use crate::audit::types::AuditEntry;
36use crate::utils::progress::ProgressBar;
37use anyhow::{Context, Result};
38use std::collections::{HashMap, HashSet};
39use std::fs::File;
40use std::io::{BufRead, BufReader};
41
42fn format_number(n: usize) -> String {
43 let s = n.to_string();
44 let mut result = String::new();
45 for (i, c) in s.chars().rev().enumerate() {
46 if i > 0 && i % 3 == 0 {
47 result.push(',');
48 }
49 result.push(c);
50 }
51 result.chars().rev().collect()
52}
53
54struct KvUsageData {
56 entity_ids: HashSet<String>,
57 operations_count: usize,
58 paths_accessed: HashSet<String>,
59}
60
61fn normalize_kv_path(path: &str) -> String {
63 let parts: Vec<&str> = path.trim_matches('/').split('/').collect();
64
65 if parts.len() >= 3 && (parts[1] == "data" || parts[1] == "metadata") {
67 let mount = parts[0];
68 let remaining: Vec<&str> = std::iter::once(parts[2])
69 .chain(parts.iter().skip(3).copied())
70 .collect();
71
72 return if remaining.len() >= 3 {
73 format!(
74 "{}/{}/{}/{}/",
75 mount, remaining[0], remaining[1], remaining[2]
76 )
77 } else if remaining.len() == 2 {
78 format!("{}/{}/{}/", mount, remaining[0], remaining[1])
79 } else if remaining.len() == 1 {
80 format!("{}/{}/", mount, remaining[0])
81 } else {
82 format!("{}/", mount)
83 };
84 }
85
86 if parts.len() >= 4 {
88 format!("{}/{}/{}/{}/", parts[0], parts[1], parts[2], parts[3])
89 } else if parts.len() == 3 {
90 format!("{}/{}/{}/", parts[0], parts[1], parts[2])
91 } else if parts.len() == 2 {
92 format!("{}/{}/", parts[0], parts[1])
93 } else if parts.len() == 1 {
94 format!("{}/", parts[0])
95 } else {
96 String::new()
97 }
98}
99
100fn load_entity_alias_mapping(alias_export_csv: &str) -> Result<HashMap<String, Vec<String>>> {
101 let mut entity_aliases: HashMap<String, Vec<String>> = HashMap::new();
102
103 let file = match File::open(alias_export_csv) {
104 Ok(f) => f,
105 Err(_) => {
106 eprintln!("[WARN] Entity alias export not found: {}", alias_export_csv);
107 return Ok(entity_aliases);
108 }
109 };
110
111 let mut reader = csv::Reader::from_reader(file);
112
113 for result in reader.records() {
114 let record = result?;
115 if let (Some(entity_id), Some(alias_name)) = (record.get(0), record.get(1)) {
116 entity_aliases
117 .entry(entity_id.to_string())
118 .or_default()
119 .push(alias_name.to_string());
120 }
121 }
122
123 Ok(entity_aliases)
124}
125
126pub fn run(
127 log_files: &[String],
128 kv_prefix: &str,
129 output: Option<&str>,
130 entity_csv: Option<&str>,
131) -> Result<()> {
132 let output_file = output.unwrap_or("kv_usage_by_client.csv");
133
134 let mut kv_usage: HashMap<String, KvUsageData> = HashMap::new();
135 let mut total_lines = 0;
136 let mut parsed_lines = 0;
137
138 for (file_idx, log_file) in log_files.iter().enumerate() {
140 eprintln!(
141 "[{}/{}] Processing: {}",
142 file_idx + 1,
143 log_files.len(),
144 log_file
145 );
146
147 let file_size = std::fs::metadata(log_file).ok().map(|m| m.len() as usize);
149 let mut progress = if let Some(size) = file_size {
150 ProgressBar::new(size, "Processing")
151 } else {
152 ProgressBar::new_spinner("Processing")
153 };
154
155 let file = File::open(log_file)
156 .with_context(|| format!("Failed to open audit log file: {}", log_file))?;
157 let reader = BufReader::new(file);
158
159 let mut file_lines = 0;
160 let mut bytes_read = 0;
161
162 for line in reader.lines() {
163 file_lines += 1;
164 total_lines += 1;
165 let line = line?;
166 bytes_read += line.len() + 1; if file_lines % 10_000 == 0 {
170 if let Some(size) = file_size {
171 progress.update(bytes_read.min(size)); } else {
173 progress.update(file_lines);
174 }
175 }
176
177 let entry: AuditEntry = match serde_json::from_str(&line) {
178 Ok(e) => e,
179 Err(_) => continue,
180 };
181
182 let request = match &entry.request {
184 Some(r) => r,
185 None => continue,
186 };
187
188 let path = match &request.path {
189 Some(p) => p.as_str(),
190 None => continue,
191 };
192
193 if !kv_prefix.is_empty() && !path.starts_with(kv_prefix) {
195 continue;
196 }
197 if kv_prefix.is_empty() && !path.contains("/data/") && !path.contains("/metadata/") {
198 continue;
199 }
200
201 let operation = request.operation.as_deref().unwrap_or("");
203 if operation != "read" && operation != "list" {
204 continue;
205 }
206
207 let entity_id = match entry.auth.as_ref().and_then(|a| a.entity_id.as_deref()) {
208 Some(id) => id,
209 None => continue,
210 };
211
212 parsed_lines += 1;
213
214 let app_path = normalize_kv_path(path);
216
217 let usage = kv_usage.entry(app_path).or_insert_with(|| KvUsageData {
218 entity_ids: HashSet::new(),
219 operations_count: 0,
220 paths_accessed: HashSet::new(),
221 });
222
223 usage.entity_ids.insert(entity_id.to_string());
224 usage.operations_count += 1;
225 usage.paths_accessed.insert(path.to_string());
226 }
227
228 if let Some(size) = file_size {
230 progress.update(size);
231 }
232
233 progress.finish_with_message(&format!(
234 "Processed {} lines from this file",
235 format_number(file_lines)
236 ));
237 }
238
239 eprintln!(
240 "\nTotal: Processed {} lines, parsed {} KV operations",
241 format_number(total_lines),
242 format_number(parsed_lines)
243 );
244
245 if kv_usage.is_empty() {
246 eprintln!("[ERROR] No KV operations found in audit logs.");
247 std::process::exit(1);
248 }
249
250 let entity_aliases = if let Some(alias_file) = entity_csv {
252 load_entity_alias_mapping(alias_file)?
253 } else {
254 HashMap::new()
255 };
256
257 if let Some(parent) = std::path::Path::new(output_file).parent() {
259 std::fs::create_dir_all(parent)?;
260 }
261
262 let file = File::create(output_file).context("Failed to create output file")?;
264 let mut writer = csv::Writer::from_writer(file);
265
266 writer.write_record([
267 "kv_path",
268 "unique_clients",
269 "operations_count",
270 "entity_ids",
271 "alias_names",
272 "sample_paths_accessed",
273 ])?;
274
275 let mut paths: Vec<_> = kv_usage.keys().collect();
276 paths.sort();
277
278 for kv_path in paths {
279 let data = &kv_usage[kv_path];
280
281 let mut entity_ids: Vec<_> = data.entity_ids.iter().cloned().collect();
282 entity_ids.sort();
283
284 let unique_clients = entity_ids.len();
285 let operations = data.operations_count;
286
287 let mut alias_names = Vec::new();
289 for eid in &entity_ids {
290 if let Some(aliases) = entity_aliases.get(eid) {
291 alias_names.extend(aliases.iter().cloned());
292 }
293 }
294
295 let mut sample_paths: Vec<_> = data.paths_accessed.iter().cloned().collect();
297 sample_paths.sort();
298 sample_paths.truncate(5);
299
300 writer.write_record([
301 kv_path,
302 &unique_clients.to_string(),
303 &operations.to_string(),
304 &entity_ids.join(", "),
305 &alias_names.join(", "),
306 &sample_paths.join(", "),
307 ])?;
308 }
309
310 writer.flush()?;
311
312 println!("Done. Output written to: {}", output_file);
313 println!("Summary: {} KV paths analyzed", kv_usage.len());
314
315 Ok(())
316}