Skip to content

Commit 0db7a29

Browse files
authored
Merge pull request #6 from faststats-dev/refactor/use-s3-streams
chore: stream file into s3 instead of putting it and some other optimizations
2 parents 87f1c31 + 5c94fac commit 0db7a29

File tree

7 files changed

+186
-118
lines changed

7 files changed

+186
-118
lines changed

Cargo.lock

Lines changed: 0 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
[package]
22
name = "data-exporter"
33
version = "0.1.0"
4-
edition = "2024"
4+
edition = "2021"
55

66
[dependencies]
7-
async-stream = "0.3"
87
axum = "0.8.7"
9-
bytes = "1.9"
108
chrono = { version = "0.4.42", features = ["serde"] }
9+
dotenvy = "0.15"
1110
futures = "0.3"
11+
rust-s3 = "0.37.1"
1212
serde = "1.0.228"
1313
serde_json = "1.0"
1414
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "uuid", "chrono"] }
15-
uuid = { version = "1.11.0", features = ["serde"] }
1615
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread"] }
17-
rust-s3 = "0.37.1"
18-
dotenvy = "0.15"
16+
uuid = { version = "1.11.0", features = ["serde"] }

purge.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { S3Client } from "bun";
2+
3+
const client = new S3Client({
4+
accessKeyId: process.env.S3_ACCESS_KEY_ID,
5+
secretAccessKey: process.env.S3_SECRET_ACCESS_KEY,
6+
bucket: process.env.S3_BUCKET,
7+
region: process.env.S3_REGION,
8+
endpoint: process.env.S3_ENDPOINT,
9+
});
10+
11+
async function purgeBucket() {
12+
console.log(`Starting purge for bucket: ${process.env.S3_BUCKET}...`);
13+
let totalDeleted = 0;
14+
let isTruncated = true;
15+
let startAfter: string | undefined = undefined;
16+
17+
try {
18+
while (isTruncated) {
19+
// List objects in the bucket
20+
const response = await client.list({
21+
maxKeys: 1000,
22+
startAfter,
23+
});
24+
25+
const objects = response.contents || [];
26+
27+
if (objects.length === 0) {
28+
break;
29+
}
30+
31+
// Perform deletions in parallel for the current batch
32+
await Promise.all(
33+
objects.map((obj) => {
34+
console.log(`Deleting: ${obj.key}`);
35+
return client.delete(obj.key);
36+
})
37+
);
38+
39+
totalDeleted += objects.length;
40+
isTruncated = response.isTruncated;
41+
if (isTruncated && objects.length > 0) {
42+
startAfter = objects[objects.length - 1].key;
43+
}
44+
}
45+
46+
console.log(`\nSuccessfully purged ${totalDeleted} objects.`);
47+
} catch (error) {
48+
console.error("Error purging bucket:", error);
49+
process.exit(1);
50+
}
51+
}
52+
53+
purgeBucket();

src/handler.rs

Lines changed: 118 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
use crate::models::{AppState, DataEntry, ExportRequest, Project};
2-
use crate::s3_helpers;
3-
use bytes::Bytes;
4-
52
use axum::{
63
extract::{Path, State},
7-
http::{StatusCode, header::LOCATION},
4+
http::{header::LOCATION, StatusCode},
85
response::{IntoResponse, Response},
96
};
107
use futures::stream::StreamExt;
11-
use sqlx::Row;
8+
use tokio::io::AsyncWriteExt;
129

1310
pub async fn export(
1411
State(state): State<AppState>,
@@ -42,82 +39,144 @@ pub async fn export(
4239

4340
let s3_key = format!("exports/{}.json", token);
4441

45-
let file_exists = s3_helpers::check_file_exists(&state.s3_bucket, &s3_key)
42+
let file_exists = state.s3_bucket.object_exists(&s3_key).await.map_err(|e| {
43+
println!("Error checking S3 file existence: {:?}", e);
44+
StatusCode::INTERNAL_SERVER_ERROR
45+
})?;
46+
47+
if !file_exists {
48+
stream_export_to_s3(&state, export_request.project_id, &s3_key).await?;
49+
}
50+
51+
let presigned_url = state
52+
.s3_bucket
53+
.presign_get(&s3_key, 300, None)
4654
.await
4755
.map_err(|e| {
48-
println!("Error checking S3 file existence: {:?}", e);
56+
println!("Error generating presigned URL: {:?}", e);
4957
StatusCode::INTERNAL_SERVER_ERROR
5058
})?;
5159

52-
if !file_exists {
53-
let export_data = generate_export_data(&state, export_request.project_id).await?;
60+
Ok((StatusCode::FOUND, [(LOCATION, presigned_url.as_str())]).into_response())
61+
}
5462

55-
s3_helpers::upload_file(&state.s3_bucket, &s3_key, export_data)
56-
.await
57-
.map_err(|e| {
58-
println!("Error uploading to S3: {:?}", e);
59-
StatusCode::INTERNAL_SERVER_ERROR
60-
})?;
63+
fn indent_json(json: &[u8], spaces: usize) -> Vec<u8> {
64+
let mut result = Vec::with_capacity(json.len() + (json.len() >> 3));
65+
66+
for &byte in json {
67+
result.push(byte);
68+
if byte == b'\n' {
69+
result.extend(std::iter::repeat_n(b' ', spaces));
70+
}
71+
}
72+
73+
while result.last() == Some(&b' ') {
74+
result.pop();
6175
}
6276

63-
let presigned_url = s3_helpers::generate_presigned_url(&state.s3_bucket, &s3_key, 300)
77+
result
78+
}
79+
80+
async fn stream_export_to_s3(
81+
state: &AppState,
82+
project_id: sqlx::types::Uuid,
83+
s3_key: &str,
84+
) -> Result<(), StatusCode> {
85+
let (writer, mut reader) = tokio::io::duplex(128 * 1024);
86+
87+
let pool = state.pool.clone();
88+
let s3_bucket = state.s3_bucket.clone();
89+
let s3_key_owned = s3_key.to_string();
90+
91+
let writer_handle = tokio::spawn(async move {
92+
let project = sqlx::query_as::<_, Project>(
93+
"SELECT id, name, token, slug, private, template_id, created_at, owner_id FROM project WHERE id = $1"
94+
)
95+
.bind(project_id)
96+
.fetch_optional(&pool)
97+
.await
98+
.map_err(std::io::Error::other)?
99+
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "Project not found"))?;
100+
101+
write_export_json(writer, project, &pool, project_id).await
102+
});
103+
104+
let upload_handle = tokio::spawn(async move {
105+
s3_bucket
106+
.put_object_stream(&mut reader, &s3_key_owned)
107+
.await
108+
.map_err(std::io::Error::other)
109+
});
110+
111+
writer_handle
64112
.await
65113
.map_err(|e| {
66-
println!("Error generating presigned URL: {:?}", e);
114+
println!("Writer task failed: {:?}", e);
115+
StatusCode::INTERNAL_SERVER_ERROR
116+
})?
117+
.map_err(|e| {
118+
println!("Error writing JSON: {:?}", e);
67119
StatusCode::INTERNAL_SERVER_ERROR
68120
})?;
69121

70-
Ok((StatusCode::FOUND, [(LOCATION, presigned_url.as_str())]).into_response())
122+
upload_handle
123+
.await
124+
.map_err(|e| {
125+
println!("Upload task failed: {:?}", e);
126+
StatusCode::INTERNAL_SERVER_ERROR
127+
})?
128+
.map_err(|e| {
129+
println!("Error uploading to S3: {:?}", e);
130+
StatusCode::INTERNAL_SERVER_ERROR
131+
})?;
132+
133+
Ok(())
71134
}
72135

73-
async fn generate_export_data(
74-
state: &AppState,
136+
async fn write_export_json<W>(
137+
mut writer: W,
138+
project: Project,
139+
pool: &sqlx::PgPool,
75140
project_id: sqlx::types::Uuid,
76-
) -> Result<Bytes, StatusCode> {
77-
let project = sqlx::query_as::<_, Project>(
78-
"SELECT id, name, token, slug, private, template_id, created_at, owner_id FROM project WHERE id = $1"
79-
)
80-
.bind(project_id)
81-
.fetch_optional(&state.pool)
82-
.await
83-
.map_err(|e| {
84-
println!("Error while fetching project: {:?}", e);
85-
StatusCode::INTERNAL_SERVER_ERROR
86-
})?
87-
.ok_or(StatusCode::NOT_FOUND)?;
141+
) -> Result<(), std::io::Error>
142+
where
143+
W: tokio::io::AsyncWrite + Unpin,
144+
{
145+
let mut buffer = Vec::with_capacity(131072);
146+
147+
writer.write_all(b"{\n \"project\": ").await?;
148+
149+
serde_json::to_writer_pretty(&mut buffer, &project).map_err(std::io::Error::other)?;
88150

89-
let mut data_entries = Vec::new();
90-
let mut row_stream = sqlx::query(
151+
let indented = indent_json(&buffer, 2);
152+
writer.write_all(&indented).await?;
153+
154+
writer.write_all(b",\n \"data_entries\": [\n").await?;
155+
156+
let mut entries_stream = sqlx::query_as::<_, DataEntry>(
91157
"SELECT data, created_at FROM data_entries WHERE project_id = $1 ORDER BY created_at DESC",
92158
)
93159
.bind(project_id)
94-
.fetch(&state.pool);
95-
96-
while let Some(row) = row_stream.next().await {
97-
match row {
98-
Ok(row) => {
99-
let data_entry = DataEntry {
100-
data: row.try_get("data").ok(),
101-
created_at: row.get("created_at"),
102-
};
103-
data_entries.push(data_entry);
104-
}
105-
Err(e) => {
106-
println!("Error while streaming data entry: {:?}", e);
107-
return Err(StatusCode::INTERNAL_SERVER_ERROR);
108-
}
160+
.fetch(pool);
161+
162+
let mut first = true;
163+
while let Some(entry_result) = entries_stream.next().await {
164+
let entry = entry_result.map_err(std::io::Error::other)?;
165+
166+
if !first {
167+
writer.write_all(b",\n").await?;
109168
}
110-
}
169+
first = false;
111170

112-
let export_data = serde_json::json!({
113-
"project": project,
114-
"data_entries": data_entries
115-
});
171+
writer.write_all(b" ").await?;
116172

117-
let json_string = serde_json::to_string_pretty(&export_data).map_err(|e| {
118-
println!("Error while serializing export data: {:?}", e);
119-
StatusCode::INTERNAL_SERVER_ERROR
120-
})?;
173+
buffer.clear();
174+
serde_json::to_writer_pretty(&mut buffer, &entry).map_err(std::io::Error::other)?;
175+
176+
let indented = indent_json(&buffer, 4);
177+
writer.write_all(&indented).await?;
178+
}
121179

122-
Ok(Bytes::from(json_string))
180+
writer.write_all(b"\n ]\n}\n").await?;
181+
writer.shutdown().await
123182
}

src/main.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
1-
use axum::{Extension, Router, http::StatusCode, routing::get};
2-
use s3::{Bucket, Region, creds::Credentials};
1+
use axum::{http::StatusCode, routing::get, Extension, Router};
2+
use s3::{creds::Credentials, Bucket, Region};
33
use sqlx::postgres::PgPoolOptions;
44
mod handler;
55
mod models;
66
mod rate_limit;
7-
mod s3_helpers;
87

98
#[tokio::main]
109
async fn main() {
11-
#[cfg(debug_assertions)]
12-
dotenvy::dotenv().ok();
10+
let _ = dotenvy::dotenv();
1311

1412
let database_url = std::env::var("DATABASE_URL")
1513
.expect("DATABASE_URL must be set in .env file or environment variables");
1614

1715
let pool = PgPoolOptions::new()
18-
.max_connections(10)
16+
.max_connections(5)
17+
.acquire_timeout(std::time::Duration::from_secs(8))
18+
.idle_timeout(std::time::Duration::from_secs(60))
19+
.max_lifetime(std::time::Duration::from_secs(600))
1920
.connect(&database_url)
2021
.await
2122
.expect("Failed to connect to database");
@@ -45,8 +46,9 @@ async fn main() {
4546
.expect("Invalid S3_REGION")
4647
};
4748

48-
let s3_bucket =
49-
*Bucket::new(&bucket_name, region, credentials).expect("Failed to create S3 bucket");
49+
let s3_bucket = *Bucket::new(&bucket_name, region, credentials)
50+
.expect("Failed to create S3 bucket")
51+
.with_path_style();
5052

5153
let state = models::AppState { pool, s3_bucket };
5254

src/models.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use chrono::NaiveDateTime;
22
use s3::Bucket;
33
use serde::Serialize;
4-
use sqlx::{PgPool, types::Uuid};
4+
use sqlx::{types::Uuid, PgPool};
55

66
#[derive(Clone)]
77
pub struct AppState {

0 commit comments

Comments
 (0)