Skip to content

Commit

Permalink
Merge pull request #28 from isankadn/dev
Browse files Browse the repository at this point in the history
addressing xAPI username issue, getting only userid
  • Loading branch information
isankadn authored Apr 5, 2024
2 parents 487ef8b + eb4abb8 commit 44c2203
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 20 deletions.
69 changes: 59 additions & 10 deletions historical_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ async fn process_tenant_historical_data(
let filter_clone = filter.clone();
let total_docs = mongo_collection.count_documents(filter_clone, None).await?;

info!(
"Total documents in {}: {}",
tenant_config.mongo_collection, total_docs
);
// info!(
// "Total documents in {}: {}",
// tenant_config.mongo_collection, total_docs
// );

let batch_size = app_state.config.batch_size;
let num_batches = (total_docs as f64 / batch_size as f64).ceil() as u64;
Expand Down Expand Up @@ -174,12 +174,60 @@ async fn process_tenant_historical_data(
.and_then(|acc| acc.as_document_mut())
{
if let Some(name) = account.get_mut("name") {
let anonymized_name = anonymize_data(
name,
&app_state.config.encryption_salt,
&tenant_config.name,
);
*name = bson::Bson::String(anonymized_name);
if let bson::Bson::String(name_str) = name {
let anonymized_name = if name_str.contains(':') {
let parts: Vec<&str> =
name_str.split(':').collect();
info!(
"{}",
&bson::Bson::String(parts[1].to_string())
);
if parts.len() == 2 {
anonymize_data(
&bson::Bson::String(parts[1].to_string()),
&app_state.config.encryption_salt,
&tenant_config.name,
)
} else {
anonymize_data(
&bson::Bson::String(name_str.to_string()),
&app_state.config.encryption_salt,
&tenant_config.name,
)
}
} else if name_str.contains('@') {
let parts: Vec<&str> =
name_str.split('@').collect();
info!(
"{}",
&bson::Bson::String(parts[0].to_string())
);
if parts.len() == 2 {
anonymize_data(
&bson::Bson::String(parts[0].to_string()),
&app_state.config.encryption_salt,
&tenant_config.name,
)
} else {
anonymize_data(
&bson::Bson::String(name_str.to_string()),
&app_state.config.encryption_salt,
&tenant_config.name,
)
}
} else {
info!(
"{}",
&bson::Bson::String(name_str.to_string())
);
anonymize_data(
&bson::Bson::String(name_str.to_string()),
&app_state.config.encryption_salt,
&tenant_config.name,
)
};
*name = bson::Bson::String(anonymized_name);
}
}
}
}
Expand Down Expand Up @@ -559,6 +607,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
}
_ = shutdown_rx => {
println!("Program finished, shutting down gracefully..");
info!("Program finished, shutting down gracefully...");
}
run_result = run_handle => {
Expand Down
10 changes: 10 additions & 0 deletions pg_init.psql
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,14 @@ CREATE TABLE resume_token (
token BYTEA UNIQUE,
tenant_name VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);


CREATE TABLE failed_batches (
ID SERIAL PRIMARY KEY,
tenant_name TEXT,
clickhouse_db TEXT,
clickhouse_table TEXT,
failed_batch TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
68 changes: 58 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,63 @@ async fn process_tenant_records(
.and_then(|acc| acc.as_document_mut())
{
if let Some(name) = account.get_mut("name") {
let anonymized_name = anonymize_data(
name,
&app_state.config.encryption_salt,
&tenant_config.name,
);
*name = Bson::String(anonymized_name);
info!("<<-- Modified statement: {:?}", statement);
} else {
warn!("Missing 'name' field in 'actor.account'");
if let bson::Bson::String(name_str) = name {
let anonymized_name = if name_str.contains(':') {
let parts: Vec<&str> =
name_str.split(':').collect();
info!(
"{}",
&bson::Bson::String(parts[1].to_string())
);
if parts.len() == 2 {
anonymize_data(
&bson::Bson::String(parts[1].to_string()),
&app_state.config.encryption_salt,
&tenant_config.name,
)
} else {
anonymize_data(
&bson::Bson::String(name_str.to_string()),
&app_state.config.encryption_salt,
&tenant_config.name,
)
}
} else if name_str.contains('@') {
let parts: Vec<&str> =
name_str.split('@').collect();
info!(
"{}",
&bson::Bson::String(parts[0].to_string())
);
if parts.len() == 2 {
anonymize_data(
&bson::Bson::String(parts[0].to_string()),
&app_state.config.encryption_salt,
&tenant_config.name,
)
} else {
anonymize_data(
&bson::Bson::String(name_str.to_string()),
&app_state.config.encryption_salt,
&tenant_config.name,
)
}
} else {
info!(
"{}",
&bson::Bson::String(name_str.to_string())
);
anonymize_data(
&bson::Bson::String(name_str.to_string()),
&app_state.config.encryption_salt,
&tenant_config.name,
)
};
*name = bson::Bson::String(anonymized_name);
// info!("<<-- Modified statement: {:?}", statement);
} else {
warn!("Missing 'name' field in 'actor.account'");
}
}
} else {
warn!("Missing 'account' field in 'actor'");
Expand All @@ -162,7 +210,7 @@ async fn process_tenant_records(
continue;
}
};
info!("Inserting statement into ClickHouse: {}", statement_str);
// info!("Inserting statement into ClickHouse: {}", statement_str);

insert_into_clickhouse(
&ch_pool,
Expand Down

0 comments on commit 44c2203

Please sign in to comment.