diff --git a/Cargo.lock b/Cargo.lock index d96e303..9c188ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,28 @@ dependencies = [ "libc", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atoi" version = "2.0.0" @@ -242,8 +264,11 @@ dependencies = [ name = "data-exporter" version = "0.1.0" dependencies = [ + "async-stream", "axum", + "bytes", "chrono", + "futures", "serde", "serde_json", "sqlx", @@ -360,6 +385,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -404,6 +444,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -422,8 +473,10 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", diff --git a/Cargo.toml b/Cargo.toml index 1d4c6ef..a49efbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,11 @@ version = "0.1.0" edition = "2024" [dependencies] +async-stream = "0.3" axum = "0.8.7" +bytes = "1.9" chrono = { version = "0.4.42", features = ["serde"] } +futures = "0.3" serde = "1.0.228" serde_json = "1.0" sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "uuid", "chrono"] } diff --git a/src/handler.rs b/src/handler.rs index be14218..c7415b1 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,6 +1,8 @@ -use crate::models::{AppState, DataEntry, ExportData, ExportRequest, Project}; +use crate::models::{AppState, DataEntry, ExportRequest, Project}; +use bytes::Bytes; use axum::{ + body::Body, extract::{Path, State}, http::{ StatusCode, @@ -8,6 +10,8 @@ use axum::{ }, response::{IntoResponse, Response}, }; +use futures::stream::StreamExt; +use sqlx::Row; pub async fn export( State(state): State, @@ -51,28 +55,49 @@ pub async fn export( })? .ok_or(StatusCode::NOT_FOUND)?; - let data_entries = sqlx::query_as::<_, DataEntry>( - "SELECT data, created_at FROM data_entries WHERE project_id = $1 ORDER BY created_at DESC" - ) - .bind(export_request.project_id) - .fetch_all(&state.pool) - .await - .map_err(|e| { - println!("Error while fetching data entries: {:?}", e); + let project_json = serde_json::to_string(&project).map_err(|e| { + println!("Error while serializing project: {:?}", e); StatusCode::INTERNAL_SERVER_ERROR })?; - let export_data = ExportData { - project, - data_entries, - }; + let filename = format!("project-{}-export.json", export_request.project_id); + let project_id = export_request.project_id; - let json_string = serde_json::to_string_pretty(&export_data).map_err(|e| { - println!("Error while serializing export data: {:?}", e); - StatusCode::INTERNAL_SERVER_ERROR - })?; + let stream = async_stream::stream! { + yield Ok::<_, std::io::Error>(Bytes::from(format!("{{\"project\":{},\"data_entries\":[", project_json))); - let filename = format!("project-{}-export.json", export_request.project_id); + let mut row_stream = sqlx::query("SELECT data, created_at FROM data_entries WHERE project_id = $1 ORDER BY created_at DESC") + .bind(project_id) + .fetch(&state.pool); + + let mut first = true; + while let Some(row) = row_stream.next().await { + match row { + Ok(row) => { + let data_entry = DataEntry { + data: row.try_get("data").ok(), + created_at: row.get("created_at"), + }; + + if let Ok(entry_json) = serde_json::to_string(&data_entry) { + if !first { + yield Ok(Bytes::from(",")); + } + first = false; + yield Ok(Bytes::from(entry_json)); + } + } + Err(e) => { + println!("Error while streaming data entry: {:?}", e); + break; + } + } + } + + yield Ok(Bytes::from("]}")); + }; + + let body = Body::from_stream(stream); Ok(( StatusCode::OK, @@ -83,7 +108,7 @@ pub async fn export( &format!("attachment; filename=\"{}\"", filename), ), ], - json_string, + body, ) .into_response()) } diff --git a/src/models.rs b/src/models.rs index b706298..7d66475 100644 --- a/src/models.rs +++ b/src/models.rs @@ -32,9 +32,3 @@ pub struct DataEntry { pub data: Option, pub created_at: NaiveDateTime, } - -#[derive(Debug, Serialize)] -pub struct ExportData { - pub project: Project, - pub data_entries: Vec, -}