vault_audit_tools/commands/
kv_analyzer.rs1use crate::audit::types::AuditEntry;
36use crate::utils::progress::ProgressBar;
37use crate::utils::reader::open_file;
38use anyhow::{Context, Result};
39use std::collections::{HashMap, HashSet};
40use std::fs::File;
41use std::io::{BufRead, BufReader};
42
43fn format_number(n: usize) -> String {
44 let s = n.to_string();
45 let mut result = String::new();
46 for (i, c) in s.chars().rev().enumerate() {
47 if i > 0 && i % 3 == 0 {
48 result.push(',');
49 }
50 result.push(c);
51 }
52 result.chars().rev().collect()
53}
54
55struct KvUsageData {
57 entity_ids: HashSet<String>,
58 operations_count: usize,
59 paths_accessed: HashSet<String>,
60}
61
62fn normalize_kv_path(path: &str) -> String {
64 let parts: Vec<&str> = path.trim_matches('/').split('/').collect();
65
66 if parts.len() >= 3 && (parts[1] == "data" || parts[1] == "metadata") {
68 let mount = parts[0];
69 let remaining: Vec<&str> = std::iter::once(parts[2])
70 .chain(parts.iter().skip(3).copied())
71 .collect();
72
73 return if remaining.len() >= 3 {
74 format!(
75 "{}/{}/{}/{}/",
76 mount, remaining[0], remaining[1], remaining[2]
77 )
78 } else if remaining.len() == 2 {
79 format!("{}/{}/{}/", mount, remaining[0], remaining[1])
80 } else if remaining.len() == 1 {
81 format!("{}/{}/", mount, remaining[0])
82 } else {
83 format!("{}/", mount)
84 };
85 }
86
87 if parts.len() >= 4 {
89 format!("{}/{}/{}/{}/", parts[0], parts[1], parts[2], parts[3])
90 } else if parts.len() == 3 {
91 format!("{}/{}/{}/", parts[0], parts[1], parts[2])
92 } else if parts.len() == 2 {
93 format!("{}/{}/", parts[0], parts[1])
94 } else if parts.len() == 1 {
95 format!("{}/", parts[0])
96 } else {
97 String::new()
98 }
99}
100
101fn load_entity_alias_mapping(alias_export_csv: &str) -> Result<HashMap<String, Vec<String>>> {
102 let mut entity_aliases: HashMap<String, Vec<String>> = HashMap::new();
103
104 let file = match File::open(alias_export_csv) {
105 Ok(f) => f,
106 Err(_) => {
107 eprintln!("[WARN] Entity alias export not found: {}", alias_export_csv);
108 return Ok(entity_aliases);
109 }
110 };
111
112 let mut reader = csv::Reader::from_reader(file);
113
114 for result in reader.records() {
115 let record = result?;
116 if let (Some(entity_id), Some(alias_name)) = (record.get(0), record.get(1)) {
117 entity_aliases
118 .entry(entity_id.to_string())
119 .or_default()
120 .push(alias_name.to_string());
121 }
122 }
123
124 Ok(entity_aliases)
125}
126
127pub fn run(
128 log_files: &[String],
129 kv_prefix: &str,
130 output: Option<&str>,
131 entity_csv: Option<&str>,
132) -> Result<()> {
133 let output_file = output.unwrap_or("kv_usage_by_client.csv");
134
135 let mut kv_usage: HashMap<String, KvUsageData> = HashMap::new();
136 let mut total_lines = 0;
137 let mut parsed_lines = 0;
138
139 for (file_idx, log_file) in log_files.iter().enumerate() {
141 eprintln!(
142 "[{}/{}] Processing: {}",
143 file_idx + 1,
144 log_files.len(),
145 log_file
146 );
147
148 let file_size = std::fs::metadata(log_file).ok().map(|m| m.len() as usize);
150 let mut progress = if let Some(size) = file_size {
151 ProgressBar::new(size, "Processing")
152 } else {
153 ProgressBar::new_spinner("Processing")
154 };
155
156 let file = open_file(log_file)
157 .with_context(|| format!("Failed to open audit log file: {}", log_file))?;
158 let reader = BufReader::new(file);
159
160 let mut file_lines = 0;
161 let mut bytes_read = 0;
162
163 for line in reader.lines() {
164 file_lines += 1;
165 total_lines += 1;
166 let line = line?;
167 bytes_read += line.len() + 1; if file_lines % 10_000 == 0 {
171 if let Some(size) = file_size {
172 progress.update(bytes_read.min(size)); } else {
174 progress.update(file_lines);
175 }
176 }
177
178 let entry: AuditEntry = match serde_json::from_str(&line) {
179 Ok(e) => e,
180 Err(_) => continue,
181 };
182
183 let request = match &entry.request {
185 Some(r) => r,
186 None => continue,
187 };
188
189 let path = match &request.path {
190 Some(p) => p.as_str(),
191 None => continue,
192 };
193
194 if !kv_prefix.is_empty() && !path.starts_with(kv_prefix) {
196 continue;
197 }
198 if kv_prefix.is_empty() && !path.contains("/data/") && !path.contains("/metadata/") {
199 continue;
200 }
201
202 let operation = request.operation.as_deref().unwrap_or("");
204 if operation != "read" && operation != "list" {
205 continue;
206 }
207
208 let entity_id = match entry.auth.as_ref().and_then(|a| a.entity_id.as_deref()) {
209 Some(id) => id,
210 None => continue,
211 };
212
213 parsed_lines += 1;
214
215 let app_path = normalize_kv_path(path);
217
218 let usage = kv_usage.entry(app_path).or_insert_with(|| KvUsageData {
219 entity_ids: HashSet::new(),
220 operations_count: 0,
221 paths_accessed: HashSet::new(),
222 });
223
224 usage.entity_ids.insert(entity_id.to_string());
225 usage.operations_count += 1;
226 usage.paths_accessed.insert(path.to_string());
227 }
228
229 if let Some(size) = file_size {
231 progress.update(size);
232 }
233
234 progress.finish_with_message(&format!(
235 "Processed {} lines from this file",
236 format_number(file_lines)
237 ));
238 }
239
240 eprintln!(
241 "\nTotal: Processed {} lines, parsed {} KV operations",
242 format_number(total_lines),
243 format_number(parsed_lines)
244 );
245
246 if kv_usage.is_empty() {
247 eprintln!("[ERROR] No KV operations found in audit logs.");
248 std::process::exit(1);
249 }
250
251 let entity_aliases = if let Some(alias_file) = entity_csv {
253 load_entity_alias_mapping(alias_file)?
254 } else {
255 HashMap::new()
256 };
257
258 if let Some(parent) = std::path::Path::new(output_file).parent() {
260 std::fs::create_dir_all(parent)?;
261 }
262
263 let file = File::create(output_file).context("Failed to create output file")?;
265 let mut writer = csv::Writer::from_writer(file);
266
267 writer.write_record([
268 "kv_path",
269 "unique_clients",
270 "operations_count",
271 "entity_ids",
272 "alias_names",
273 "sample_paths_accessed",
274 ])?;
275
276 let mut paths: Vec<_> = kv_usage.keys().collect();
277 paths.sort();
278
279 for kv_path in paths {
280 let data = &kv_usage[kv_path];
281
282 let mut entity_ids: Vec<_> = data.entity_ids.iter().cloned().collect();
283 entity_ids.sort();
284
285 let unique_clients = entity_ids.len();
286 let operations = data.operations_count;
287
288 let mut alias_names = Vec::new();
290 for eid in &entity_ids {
291 if let Some(aliases) = entity_aliases.get(eid) {
292 alias_names.extend(aliases.iter().cloned());
293 }
294 }
295
296 let mut sample_paths: Vec<_> = data.paths_accessed.iter().cloned().collect();
298 sample_paths.sort();
299 sample_paths.truncate(5);
300
301 writer.write_record([
302 kv_path,
303 &unique_clients.to_string(),
304 &operations.to_string(),
305 &entity_ids.join(", "),
306 &alias_names.join(", "),
307 &sample_paths.join(", "),
308 ])?;
309 }
310
311 writer.flush()?;
312
313 println!("Done. Output written to: {}", output_file);
314 println!("Summary: {} KV paths analyzed", kv_usage.len());
315
316 Ok(())
317}