vault_audit_tools/commands/
kv_analyzer.rs1use crate::audit::types::AuditEntry;
53use crate::utils::format::format_number;
54use crate::utils::processor::{ProcessingMode, ProcessorBuilder};
55use anyhow::{Context, Result};
56use std::collections::{HashMap, HashSet};
57use std::fs::File;
58
59#[derive(Debug, Clone)]
61struct KvUsageData {
62 entity_ids: HashSet<String>,
63 operations_count: usize,
64 paths_accessed: HashSet<String>,
65}
66
67impl KvUsageData {
68 fn new() -> Self {
69 Self {
70 entity_ids: HashSet::new(),
71 operations_count: 0,
72 paths_accessed: HashSet::new(),
73 }
74 }
75
76 fn merge(&mut self, other: Self) {
77 self.entity_ids.extend(other.entity_ids);
78 self.operations_count += other.operations_count;
79 self.paths_accessed.extend(other.paths_accessed);
80 }
81}
82
83#[derive(Debug, Clone)]
84struct KvAnalyzerState {
85 kv_usage: HashMap<String, KvUsageData>,
86 kv_prefix: String,
87 parsed_lines: usize,
88}
89
90impl KvAnalyzerState {
91 fn new(kv_prefix: String) -> Self {
92 Self {
93 kv_usage: HashMap::with_capacity(10000),
94 kv_prefix,
95 parsed_lines: 0,
96 }
97 }
98
99 fn merge(mut self, other: Self) -> Self {
100 self.parsed_lines += other.parsed_lines;
101 for (path, other_data) in other.kv_usage {
102 self.kv_usage
103 .entry(path)
104 .and_modify(|data| data.merge(other_data.clone()))
105 .or_insert(other_data);
106 }
107 self
108 }
109}
110
111fn normalize_kv_path(path: &str) -> String {
113 let parts: Vec<&str> = path.trim_matches('/').split('/').collect();
114
115 if parts.len() >= 3 && (parts[1] == "data" || parts[1] == "metadata") {
117 let mount = parts[0];
118 let remaining: Vec<&str> = std::iter::once(parts[2])
119 .chain(parts.iter().skip(3).copied())
120 .collect();
121
122 return if remaining.len() >= 3 {
123 format!(
124 "{}/{}/{}/{}/",
125 mount, remaining[0], remaining[1], remaining[2]
126 )
127 } else if remaining.len() == 2 {
128 format!("{}/{}/{}/", mount, remaining[0], remaining[1])
129 } else if remaining.len() == 1 {
130 format!("{}/{}/", mount, remaining[0])
131 } else {
132 format!("{}/", mount)
133 };
134 }
135
136 if parts.len() >= 4 {
138 format!("{}/{}/{}/{}/", parts[0], parts[1], parts[2], parts[3])
139 } else if parts.len() == 3 {
140 format!("{}/{}/{}/", parts[0], parts[1], parts[2])
141 } else if parts.len() == 2 {
142 format!("{}/{}/", parts[0], parts[1])
143 } else if parts.len() == 1 {
144 format!("{}/", parts[0])
145 } else {
146 String::new()
147 }
148}
149
150fn load_entity_alias_mapping(alias_export_csv: &str) -> Result<HashMap<String, Vec<String>>> {
151 let mut entity_aliases: HashMap<String, Vec<String>> = HashMap::with_capacity(2000); let Ok(file) = File::open(alias_export_csv) else {
154 eprintln!("[WARN] Entity alias export not found: {}", alias_export_csv);
155 return Ok(entity_aliases);
156 };
157
158 let mut reader = csv::Reader::from_reader(file);
159
160 for result in reader.records() {
161 let record = result?;
162 if let (Some(entity_id), Some(alias_name)) = (record.get(0), record.get(1)) {
163 entity_aliases
164 .entry(entity_id.to_string())
165 .or_default()
166 .push(alias_name.to_string());
167 }
168 }
169
170 Ok(entity_aliases)
171}
172
173pub fn run(
174 log_files: &[String],
175 kv_prefix: &str,
176 output: Option<&str>,
177 entity_csv: Option<&str>,
178) -> Result<()> {
179 let output_file = output.unwrap_or("kv_usage_by_client.csv");
180 let kv_prefix_owned = kv_prefix.to_string();
181
182 let processor = ProcessorBuilder::new()
183 .mode(ProcessingMode::Auto)
184 .progress_label("Processing".to_string())
185 .build();
186
187 let (result, stats) = processor.process_files_streaming(
188 log_files,
189 |entry: &AuditEntry, state: &mut KvAnalyzerState| {
190 let Some(request) = &entry.request else {
192 return;
193 };
194
195 let path = match &request.path {
196 Some(p) => p.as_str(),
197 None => return,
198 };
199
200 if !state.kv_prefix.is_empty() && !path.starts_with(&state.kv_prefix) {
202 return;
203 }
204 if state.kv_prefix.is_empty()
205 && !path.contains("/data/")
206 && !path.contains("/metadata/")
207 {
208 return;
209 }
210
211 let operation = request.operation.as_deref().unwrap_or("");
213 if operation != "read" && operation != "list" {
214 return;
215 }
216
217 let Some(entity_id) = entry.auth.as_ref().and_then(|a| a.entity_id.as_deref()) else {
218 return;
219 };
220
221 state.parsed_lines += 1;
222
223 let app_path = normalize_kv_path(path);
225
226 let usage = state
227 .kv_usage
228 .entry(app_path)
229 .or_insert_with(KvUsageData::new);
230
231 usage.entity_ids.insert(entity_id.to_string());
232 usage.operations_count += 1;
233 usage.paths_accessed.insert(path.to_string());
234 },
235 KvAnalyzerState::merge,
236 KvAnalyzerState::new(kv_prefix_owned),
237 )?;
238
239 let total_lines = stats.total_lines;
240 let parsed_lines = result.parsed_lines;
241 let kv_usage = result.kv_usage;
242
243 eprintln!(
244 "\nTotal: Processed {} lines, parsed {} KV operations",
245 format_number(total_lines),
246 format_number(parsed_lines)
247 );
248
249 if kv_usage.is_empty() {
250 eprintln!("[ERROR] No KV operations found in audit logs.");
251 std::process::exit(1);
252 }
253
254 let entity_aliases = if let Some(alias_file) = entity_csv {
256 load_entity_alias_mapping(alias_file)?
257 } else {
258 HashMap::with_capacity(0) };
260
261 if let Some(parent) = std::path::Path::new(output_file).parent() {
263 std::fs::create_dir_all(parent)?;
264 }
265
266 let file = File::create(output_file).context("Failed to create output file")?;
268 let mut writer = csv::Writer::from_writer(file);
269
270 writer.write_record([
271 "kv_path",
272 "unique_clients",
273 "operations_count",
274 "entity_ids",
275 "alias_names",
276 "sample_paths_accessed",
277 ])?;
278
279 let mut paths: Vec<_> = kv_usage.keys().collect();
280 paths.sort();
281
282 for kv_path in paths {
283 let data = &kv_usage[kv_path];
284
285 let mut entity_ids: Vec<_> = data.entity_ids.iter().cloned().collect();
286 entity_ids.sort();
287
288 let unique_clients = entity_ids.len();
289 let operations = data.operations_count;
290
291 let mut alias_names = Vec::new();
293 for eid in &entity_ids {
294 if let Some(aliases) = entity_aliases.get(eid) {
295 alias_names.extend(aliases.iter().cloned());
296 }
297 }
298
299 let mut sample_paths: Vec<_> = data.paths_accessed.iter().cloned().collect();
301 sample_paths.sort();
302 sample_paths.truncate(5);
303
304 writer.write_record([
305 kv_path,
306 &unique_clients.to_string(),
307 &operations.to_string(),
308 &entity_ids.join(", "),
309 &alias_names.join(", "),
310 &sample_paths.join(", "),
311 ])?;
312 }
313
314 writer.flush()?;
315
316 println!("Done. Output written to: {}", output_file);
317 println!("Summary: {} KV paths analyzed", kv_usage.len());
318
319 Ok(())
320}