vault_audit_tools/commands/
kv_analyzer.rs1use crate::audit::types::AuditEntry;
53use crate::utils::progress::ProgressBar;
54use crate::utils::reader::open_file;
55use anyhow::{Context, Result};
56use std::collections::{HashMap, HashSet};
57use std::fs::File;
58use std::io::{BufRead, BufReader};
59
60fn format_number(n: usize) -> String {
61 let s = n.to_string();
62 let mut result = String::new();
63 for (i, c) in s.chars().rev().enumerate() {
64 if i > 0 && i % 3 == 0 {
65 result.push(',');
66 }
67 result.push(c);
68 }
69 result.chars().rev().collect()
70}
71
72struct KvUsageData {
74 entity_ids: HashSet<String>,
75 operations_count: usize,
76 paths_accessed: HashSet<String>,
77}
78
79fn normalize_kv_path(path: &str) -> String {
81 let parts: Vec<&str> = path.trim_matches('/').split('/').collect();
82
83 if parts.len() >= 3 && (parts[1] == "data" || parts[1] == "metadata") {
85 let mount = parts[0];
86 let remaining: Vec<&str> = std::iter::once(parts[2])
87 .chain(parts.iter().skip(3).copied())
88 .collect();
89
90 return if remaining.len() >= 3 {
91 format!(
92 "{}/{}/{}/{}/",
93 mount, remaining[0], remaining[1], remaining[2]
94 )
95 } else if remaining.len() == 2 {
96 format!("{}/{}/{}/", mount, remaining[0], remaining[1])
97 } else if remaining.len() == 1 {
98 format!("{}/{}/", mount, remaining[0])
99 } else {
100 format!("{}/", mount)
101 };
102 }
103
104 if parts.len() >= 4 {
106 format!("{}/{}/{}/{}/", parts[0], parts[1], parts[2], parts[3])
107 } else if parts.len() == 3 {
108 format!("{}/{}/{}/", parts[0], parts[1], parts[2])
109 } else if parts.len() == 2 {
110 format!("{}/{}/", parts[0], parts[1])
111 } else if parts.len() == 1 {
112 format!("{}/", parts[0])
113 } else {
114 String::new()
115 }
116}
117
118fn load_entity_alias_mapping(alias_export_csv: &str) -> Result<HashMap<String, Vec<String>>> {
119 let mut entity_aliases: HashMap<String, Vec<String>> = HashMap::new();
120
121 let file = match File::open(alias_export_csv) {
122 Ok(f) => f,
123 Err(_) => {
124 eprintln!("[WARN] Entity alias export not found: {}", alias_export_csv);
125 return Ok(entity_aliases);
126 }
127 };
128
129 let mut reader = csv::Reader::from_reader(file);
130
131 for result in reader.records() {
132 let record = result?;
133 if let (Some(entity_id), Some(alias_name)) = (record.get(0), record.get(1)) {
134 entity_aliases
135 .entry(entity_id.to_string())
136 .or_default()
137 .push(alias_name.to_string());
138 }
139 }
140
141 Ok(entity_aliases)
142}
143
144pub fn run(
145 log_files: &[String],
146 kv_prefix: &str,
147 output: Option<&str>,
148 entity_csv: Option<&str>,
149) -> Result<()> {
150 let output_file = output.unwrap_or("kv_usage_by_client.csv");
151
152 let mut kv_usage: HashMap<String, KvUsageData> = HashMap::new();
153 let mut total_lines = 0;
154 let mut parsed_lines = 0;
155
156 for (file_idx, log_file) in log_files.iter().enumerate() {
158 eprintln!(
159 "[{}/{}] Processing: {}",
160 file_idx + 1,
161 log_files.len(),
162 log_file
163 );
164
165 let file_size = std::fs::metadata(log_file).ok().map(|m| m.len() as usize);
167 let mut progress = if let Some(size) = file_size {
168 ProgressBar::new(size, "Processing")
169 } else {
170 ProgressBar::new_spinner("Processing")
171 };
172
173 let file = open_file(log_file)
174 .with_context(|| format!("Failed to open audit log file: {}", log_file))?;
175 let reader = BufReader::new(file);
176
177 let mut file_lines = 0;
178 let mut bytes_read = 0;
179
180 for line in reader.lines() {
181 file_lines += 1;
182 total_lines += 1;
183 let line = line?;
184 bytes_read += line.len() + 1; if file_lines % 10_000 == 0 {
188 if let Some(size) = file_size {
189 progress.update(bytes_read.min(size)); } else {
191 progress.update(file_lines);
192 }
193 }
194
195 let entry: AuditEntry = match serde_json::from_str(&line) {
196 Ok(e) => e,
197 Err(_) => continue,
198 };
199
200 let request = match &entry.request {
202 Some(r) => r,
203 None => continue,
204 };
205
206 let path = match &request.path {
207 Some(p) => p.as_str(),
208 None => continue,
209 };
210
211 if !kv_prefix.is_empty() && !path.starts_with(kv_prefix) {
213 continue;
214 }
215 if kv_prefix.is_empty() && !path.contains("/data/") && !path.contains("/metadata/") {
216 continue;
217 }
218
219 let operation = request.operation.as_deref().unwrap_or("");
221 if operation != "read" && operation != "list" {
222 continue;
223 }
224
225 let entity_id = match entry.auth.as_ref().and_then(|a| a.entity_id.as_deref()) {
226 Some(id) => id,
227 None => continue,
228 };
229
230 parsed_lines += 1;
231
232 let app_path = normalize_kv_path(path);
234
235 let usage = kv_usage.entry(app_path).or_insert_with(|| KvUsageData {
236 entity_ids: HashSet::new(),
237 operations_count: 0,
238 paths_accessed: HashSet::new(),
239 });
240
241 usage.entity_ids.insert(entity_id.to_string());
242 usage.operations_count += 1;
243 usage.paths_accessed.insert(path.to_string());
244 }
245
246 if let Some(size) = file_size {
248 progress.update(size);
249 }
250
251 progress.finish_with_message(&format!(
252 "Processed {} lines from this file",
253 format_number(file_lines)
254 ));
255 }
256
257 eprintln!(
258 "\nTotal: Processed {} lines, parsed {} KV operations",
259 format_number(total_lines),
260 format_number(parsed_lines)
261 );
262
263 if kv_usage.is_empty() {
264 eprintln!("[ERROR] No KV operations found in audit logs.");
265 std::process::exit(1);
266 }
267
268 let entity_aliases = if let Some(alias_file) = entity_csv {
270 load_entity_alias_mapping(alias_file)?
271 } else {
272 HashMap::new()
273 };
274
275 if let Some(parent) = std::path::Path::new(output_file).parent() {
277 std::fs::create_dir_all(parent)?;
278 }
279
280 let file = File::create(output_file).context("Failed to create output file")?;
282 let mut writer = csv::Writer::from_writer(file);
283
284 writer.write_record([
285 "kv_path",
286 "unique_clients",
287 "operations_count",
288 "entity_ids",
289 "alias_names",
290 "sample_paths_accessed",
291 ])?;
292
293 let mut paths: Vec<_> = kv_usage.keys().collect();
294 paths.sort();
295
296 for kv_path in paths {
297 let data = &kv_usage[kv_path];
298
299 let mut entity_ids: Vec<_> = data.entity_ids.iter().cloned().collect();
300 entity_ids.sort();
301
302 let unique_clients = entity_ids.len();
303 let operations = data.operations_count;
304
305 let mut alias_names = Vec::new();
307 for eid in &entity_ids {
308 if let Some(aliases) = entity_aliases.get(eid) {
309 alias_names.extend(aliases.iter().cloned());
310 }
311 }
312
313 let mut sample_paths: Vec<_> = data.paths_accessed.iter().cloned().collect();
315 sample_paths.sort();
316 sample_paths.truncate(5);
317
318 writer.write_record([
319 kv_path,
320 &unique_clients.to_string(),
321 &operations.to_string(),
322 &entity_ids.join(", "),
323 &alias_names.join(", "),
324 &sample_paths.join(", "),
325 ])?;
326 }
327
328 writer.flush()?;
329
330 println!("Done. Output written to: {}", output_file);
331 println!("Summary: {} KV paths analyzed", kv_usage.len());
332
333 Ok(())
334}