vault_audit_tools/commands/
preprocess_entities.rs1use crate::audit::types::AuditEntry;
53use crate::utils::progress::ProgressBar;
54use crate::utils::reader::open_file;
55use anyhow::{Context, Result};
56use serde::{Deserialize, Serialize};
57use std::collections::HashMap;
58use std::fs::File;
59use std::io::{BufRead, BufReader, Write};
60
61#[derive(Debug, Serialize, Deserialize)]
63pub struct EntityMapping {
64 pub display_name: String,
65 pub mount_path: String,
66 pub mount_accessor: String,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub username: Option<String>,
69 pub login_count: usize,
70 pub first_seen: String,
71 pub last_seen: String,
72}
73
74pub fn build_entity_map(log_files: &[String]) -> Result<HashMap<String, EntityMapping>> {
77 let mut entity_map: HashMap<String, EntityMapping> = HashMap::new();
78 let mut login_events = 0;
79 let mut lines_processed = 0;
80
81 for (file_idx, log_file) in log_files.iter().enumerate() {
83 eprintln!(
84 "[{}/{}] Processing: {}",
85 file_idx + 1,
86 log_files.len(),
87 log_file
88 );
89
90 let file_size = std::fs::metadata(log_file).ok().map(|m| m.len() as usize);
92
93 let file = open_file(log_file)
94 .with_context(|| format!("Failed to open audit log file: {}", log_file))?;
95 let reader = BufReader::new(file);
96
97 let mut progress = if let Some(size) = file_size {
98 ProgressBar::new(size, "Processing")
99 } else {
100 ProgressBar::new_spinner("Processing")
101 };
102 let mut bytes_read = 0;
103 let mut file_lines = 0;
104
105 for line in reader.lines() {
106 file_lines += 1;
107 lines_processed += 1;
108 let line = line?;
109 bytes_read += line.len() + 1; if file_lines % 10_000 == 0 {
113 if let Some(size) = file_size {
114 progress.update(bytes_read.min(size)); } else {
116 progress.update(file_lines);
117 }
118 }
119 let entry: AuditEntry = match serde_json::from_str(&line) {
120 Ok(e) => e,
121 Err(_) => continue,
122 };
123
124 let request = match &entry.request {
126 Some(r) => r,
127 None => continue,
128 };
129
130 let path = match &request.path {
131 Some(p) => p,
132 None => continue,
133 };
134
135 if !path.starts_with("auth/") {
136 continue;
137 }
138
139 if !path.contains("/login") {
140 continue;
141 }
142
143 let auth = match &entry.auth {
145 Some(a) => a,
146 None => continue,
147 };
148
149 let entity_id = match &auth.entity_id {
151 Some(id) if !id.is_empty() => id.clone(),
152 _ => continue,
153 };
154
155 let display_name = match &auth.display_name {
156 Some(name) if !name.is_empty() => name.clone(),
157 _ => continue,
158 };
159
160 login_events += 1;
161
162 let mount_path = path
164 .trim_end_matches("/login")
165 .trim_end_matches(&format!("/{}", display_name))
166 .to_string();
167
168 let mount_accessor = auth.accessor.clone().unwrap_or_default();
169 let username = auth
170 .metadata
171 .as_ref()
172 .and_then(|m| m.get("username"))
173 .and_then(|v| v.as_str())
174 .map(|s| s.to_string());
175
176 entity_map
178 .entry(entity_id)
179 .and_modify(|mapping| {
180 mapping.login_count += 1;
181 mapping.last_seen = entry.time.clone();
182 if entry.time > mapping.last_seen {
184 mapping.display_name = display_name.clone();
185 }
186 })
187 .or_insert_with(|| EntityMapping {
188 display_name,
189 mount_path,
190 mount_accessor,
191 username,
192 login_count: 1,
193 first_seen: entry.time.clone(),
194 last_seen: entry.time.clone(),
195 });
196 }
197
198 if let Some(size) = file_size {
200 progress.update(size);
201 } else {
202 progress.update(file_lines);
203 }
204
205 progress.finish_with_message(&format!("Processed {} lines from this file", file_lines));
206 }
207
208 eprintln!(
209 "\nTotal: Processed {} lines, found {} login events, tracked {} entities\n",
210 lines_processed,
211 login_events,
212 entity_map.len()
213 );
214
215 Ok(entity_map)
216}
217
218pub fn run(log_files: &[String], output: &str, format: &str) -> Result<()> {
219 eprintln!("Preprocessing audit logs...");
220 eprintln!("Extracting entity → display_name mappings from login events...\n");
221
222 let entity_map = build_entity_map(log_files)?;
223
224 eprintln!("\nWriting entity mappings to: {}", output);
226
227 match format.to_lowercase().as_str() {
228 "json" => {
229 let output_file = File::create(output)
230 .with_context(|| format!("Failed to create output file: {}", output))?;
231 let mut writer = std::io::BufWriter::new(output_file);
232
233 let json = serde_json::to_string_pretty(&entity_map)
235 .context("Failed to serialize entity mappings")?;
236 writer.write_all(json.as_bytes())?;
237 writer.flush()?;
238
239 eprintln!("JSON entity mapping file created successfully!\n");
240 }
241 "csv" => {
242 let output_file = File::create(output)
243 .with_context(|| format!("Failed to create output file: {}", output))?;
244 let mut csv_writer = csv::Writer::from_writer(output_file);
245
246 csv_writer.write_record([
248 "entity_id",
249 "display_name",
250 "mount_path",
251 "mount_accessor",
252 "username",
253 "login_count",
254 "first_seen",
255 "last_seen",
256 ])?;
257
258 for (entity_id, mapping) in &entity_map {
260 csv_writer.write_record([
261 entity_id,
262 &mapping.display_name,
263 &mapping.mount_path,
264 &mapping.mount_accessor,
265 mapping.username.as_deref().unwrap_or(""),
266 &mapping.login_count.to_string(),
267 &mapping.first_seen,
268 &mapping.last_seen,
269 ])?;
270 }
271
272 csv_writer.flush()?;
273 eprintln!("✓ CSV entity mapping file created successfully!\n");
274 }
275 _ => {
276 anyhow::bail!("Invalid format '{}'. Use 'csv' or 'json'", format);
277 }
278 }
279
280 eprintln!("Usage with client-activity command:");
281 eprintln!(
282 " vault-audit client-activity --start <START> --end <END> --entity-map {}",
283 output
284 );
285
286 Ok(())
287}