-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Google Drive uploads + unified Storage layer #1784
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 33 commits
25a5e50
57b5395
b0b2594
b1ab930
a35d7ad
0def2ca
d5e694b
a53eead
41da9a9
fb119f5
ba5eec6
f304b24
6863e85
021fdc1
b7f75d1
966bb17
276b432
65fac13
9097eb3
e6a5c3f
af94f6c
2259a56
9983a6b
4fff0d1
fda9fb5
070aa22
03c1849
53f6228
3c7b2b0
f5017b2
56ea266
7c2c3a3
f437de3
a571e8c
fec866e
81597da
036fe0e
a57b9e5
bcfd28e
a23d814
f63032f
7c7a852
e24de0d
0ccddbe
5b5552c
3883b29
c4975da
65b3d87
eae3b1a
e6b2e51
4532342
46abf6b
9cb4c0e
dafb054
c522e40
71ca82e
8fdecfc
00efb5f
5414acc
8aec215
438d6d3
ff3dc16
8c6c99e
c9529f9
9b39856
1eeaf13
4c4a030
af9ad47
d8bb36a
bc21c71
a139aea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -60,6 +60,41 @@ const NETWORK_RECOVERY_TIMEOUT: Duration = Duration::from_secs(5 * 60); | |||||
| const CONNECTIVITY_PROBE_INITIAL_DELAY: Duration = Duration::from_secs(2); | ||||||
| const CONNECTIVITY_PROBE_MAX_DELAY: Duration = Duration::from_secs(30); | ||||||
|
|
||||||
| fn is_google_drive_resumable_url(url: &str) -> bool { | ||||||
| url.contains("googleapis.com/upload/drive/") | ||||||
| } | ||||||
|
|
||||||
| fn with_drive_content_range( | ||||||
| request: reqwest::RequestBuilder, | ||||||
| url: &str, | ||||||
| offset: u64, | ||||||
| size: u64, | ||||||
| total_size: u64, | ||||||
| ) -> reqwest::RequestBuilder { | ||||||
| if !is_google_drive_resumable_url(url) || size == 0 { | ||||||
| return request; | ||||||
| } | ||||||
|
|
||||||
| let end = offset + size - 1; | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor edge case:
Suggested change
|
||||||
| request.header( | ||||||
| "Content-Range", | ||||||
| format!("bytes {offset}-{end}/{total_size}"), | ||||||
| ) | ||||||
| } | ||||||
|
|
||||||
| fn is_upload_response_accepted( | ||||||
| url: &str, | ||||||
| status: StatusCode, | ||||||
| offset: u64, | ||||||
| size: u64, | ||||||
| total_size: u64, | ||||||
| ) -> bool { | ||||||
| status.is_success() | ||||||
| || (is_google_drive_resumable_url(url) | ||||||
| && status == StatusCode::PERMANENT_REDIRECT | ||||||
| && offset.saturating_add(size) < total_size) | ||||||
| } | ||||||
|
|
||||||
| #[instrument(skip(app, channel, file_path, screenshot_path))] | ||||||
| pub async fn upload_video( | ||||||
| app: &AppHandle, | ||||||
|
|
@@ -656,6 +691,12 @@ struct SegmentUploadManifest { | |||||
| is_complete: bool, | ||||||
| } | ||||||
|
|
||||||
| impl SegmentUploadManifest { | ||||||
| fn has_video_content(&self) -> bool { | ||||||
| self.video_init_uploaded && !self.video_segments.is_empty() | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| struct PresignedUrlCache { | ||||||
| urls: tokio::sync::Mutex<HashMap<String, String>>, | ||||||
| } | ||||||
|
|
@@ -1394,6 +1435,23 @@ impl SegmentUploader { | |||||
| .lock() | ||||||
| .unwrap_or_else(|e| e.into_inner()) | ||||||
| .to_complete_manifest(); | ||||||
| if !final_manifest.has_video_content() { | ||||||
| let error = format!("Segment upload completed without video segments for {video_id}"); | ||||||
| error!(video_id, "Segment upload completed without video segments"); | ||||||
|
|
||||||
| if let Ok(mut meta) = RecordingMeta::load_for_project(&recording_dir) { | ||||||
| meta.upload = Some(UploadMeta::Failed { | ||||||
| error: error.clone(), | ||||||
| }); | ||||||
| if let Err(err) = meta.save_for_project() { | ||||||
| warn!("Failed to save failed segment upload metadata: {err}"); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| emit_upload_complete(&app, &video_id); | ||||||
|
|
||||||
| return Err(error.into()); | ||||||
| } | ||||||
| Self::upload_manifest(&app, &video_id, &final_manifest).await?; | ||||||
|
|
||||||
| { | ||||||
|
|
@@ -1644,6 +1702,11 @@ fn multipart_uploader( | |||||
|
|
||||||
| stream::once(async move { | ||||||
| let use_md5_hashes = app.is_server_url_custom().await; | ||||||
| let max_concurrent_uploads = if is_google_drive_resumable_url(&upload_id) { | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like it’s checking the wrong string: |
||||||
| 1 | ||||||
| } else { | ||||||
| MAX_CONCURRENT_UPLOADS | ||||||
| }; | ||||||
| let first_chunk_presigned_url = Arc::new(Mutex::new(None::<(String, Instant)>)); | ||||||
|
|
||||||
| stream::unfold( | ||||||
|
|
@@ -1747,11 +1810,18 @@ fn multipart_uploader( | |||||
| })? | ||||||
| .clone(); | ||||||
|
|
||||||
| let mut req = client | ||||||
| let req = client | ||||||
| .put(&presigned_url) | ||||||
| .header("Content-Length", size) | ||||||
| .timeout(Duration::from_secs(5 * 60)) | ||||||
| .body(chunk); | ||||||
| let mut req = with_drive_content_range( | ||||||
| req, | ||||||
| &presigned_url, | ||||||
| offset, | ||||||
| size as u64, | ||||||
| total_size, | ||||||
| ); | ||||||
|
|
||||||
| if let Some(md5_sum) = &md5_sum { | ||||||
| req = req.header("Content-MD5", md5_sum); | ||||||
|
|
@@ -1784,11 +1854,18 @@ fn multipart_uploader( | |||||
| md5_sum.as_deref(), | ||||||
| ) | ||||||
| .await?; | ||||||
| let mut retry_req = client | ||||||
| let retry_req = client | ||||||
| .put(&retry_url) | ||||||
| .header("Content-Length", size) | ||||||
| .timeout(Duration::from_secs(5 * 60)) | ||||||
| .body(chunk_for_retry); | ||||||
| let mut retry_req = with_drive_content_range( | ||||||
| retry_req, | ||||||
| &retry_url, | ||||||
| offset, | ||||||
| size as u64, | ||||||
| total_size, | ||||||
| ); | ||||||
| if let Some(md5_sum) = &md5_sum { | ||||||
| retry_req = | ||||||
| retry_req.header("Content-MD5", md5_sum); | ||||||
|
|
@@ -1821,7 +1898,13 @@ fn multipart_uploader( | |||||
| .and_then(|etag| etag.to_str().ok()) | ||||||
| .map(|v| v.trim_matches('"').to_string()); | ||||||
|
|
||||||
| match !resp.status().is_success() { | ||||||
| match !is_upload_response_accepted( | ||||||
| &presigned_url, | ||||||
| resp.status(), | ||||||
| offset, | ||||||
| size as u64, | ||||||
| total_size, | ||||||
| ) { | ||||||
| true => Err(format!( | ||||||
| "uploader/part/{part_number}/error: {}", | ||||||
| resp.text().await.unwrap_or_default() | ||||||
|
|
@@ -1831,12 +1914,21 @@ fn multipart_uploader( | |||||
|
|
||||||
| trace!("Completed upload of part {part_number}"); | ||||||
|
|
||||||
| Ok::<_, AuthedApiError>(UploadedPart { | ||||||
| etag: etag.ok_or_else(|| { | ||||||
| format!( | ||||||
| "uploader/part/{part_number}/error: ETag header not found" | ||||||
| let etag = match etag { | ||||||
| Some(etag) => etag, | ||||||
| None if is_google_drive_resumable_url(&presigned_url) => { | ||||||
| format!("drive-{part_number}") | ||||||
| } | ||||||
| None => { | ||||||
| return Err(format!( | ||||||
| "uploader/part/{part_number}/missing_etag" | ||||||
| ) | ||||||
| })?, | ||||||
| .into()); | ||||||
| } | ||||||
| }; | ||||||
|
|
||||||
| Ok::<_, AuthedApiError>(UploadedPart { | ||||||
| etag, | ||||||
| part_number, | ||||||
| size, | ||||||
| total_size, | ||||||
|
|
@@ -1874,7 +1966,7 @@ fn multipart_uploader( | |||||
| } | ||||||
| }, | ||||||
| ) | ||||||
| .buffered(MAX_CONCURRENT_UPLOADS) | ||||||
| .buffered(max_concurrent_uploads) | ||||||
| .filter_map(|item| async { item }) | ||||||
| .boxed() | ||||||
| }) | ||||||
|
|
@@ -1948,11 +2040,18 @@ async fn retry_failed_chunks( | |||||
| .map_err(|err| format!("retry/part/{}/client: {err:?}", failed.part_number))? | ||||||
| .clone(); | ||||||
|
|
||||||
| let mut req = client | ||||||
| let req = client | ||||||
| .put(&presigned_url) | ||||||
| .header("Content-Length", size) | ||||||
| .timeout(Duration::from_secs(5 * 60)) | ||||||
| .body(chunk); | ||||||
| let mut req = with_drive_content_range( | ||||||
| req, | ||||||
| &presigned_url, | ||||||
| failed.offset, | ||||||
| size as u64, | ||||||
| failed.total_size, | ||||||
| ); | ||||||
|
|
||||||
| if let Some(md5_sum) = &md5_sum { | ||||||
| req = req.header("Content-MD5", md5_sum); | ||||||
|
|
@@ -1990,11 +2089,18 @@ async fn retry_failed_chunks( | |||||
| md5_sum.as_deref(), | ||||||
| ) | ||||||
| .await?; | ||||||
| let mut retry_req = client | ||||||
| let retry_req = client | ||||||
| .put(&retry_url) | ||||||
| .header("Content-Length", size) | ||||||
| .timeout(Duration::from_secs(5 * 60)) | ||||||
| .body(chunk_for_retry); | ||||||
| let mut retry_req = with_drive_content_range( | ||||||
| retry_req, | ||||||
| &retry_url, | ||||||
| failed.offset, | ||||||
| size as u64, | ||||||
| failed.total_size, | ||||||
| ); | ||||||
| if let Some(md5_sum) = &md5_sum { | ||||||
| retry_req = retry_req.header("Content-MD5", md5_sum); | ||||||
| } | ||||||
|
|
@@ -2025,7 +2131,13 @@ async fn retry_failed_chunks( | |||||
| .and_then(|etag| etag.to_str().ok()) | ||||||
| .map(|v| v.trim_matches('"').to_string()); | ||||||
|
|
||||||
| if !resp.status().is_success() { | ||||||
| if !is_upload_response_accepted( | ||||||
| &presigned_url, | ||||||
| resp.status(), | ||||||
| failed.offset, | ||||||
| size as u64, | ||||||
| failed.total_size, | ||||||
| ) { | ||||||
| return Err(format!( | ||||||
| "retry/part/{}/error: {}", | ||||||
| failed.part_number, | ||||||
|
|
@@ -2039,13 +2151,18 @@ async fn retry_failed_chunks( | |||||
| "Successfully retried chunk upload" | ||||||
| ); | ||||||
|
|
||||||
| let etag = match etag { | ||||||
| Some(etag) => etag, | ||||||
| None if is_google_drive_resumable_url(&presigned_url) => { | ||||||
| format!("drive-{}", failed.part_number) | ||||||
| } | ||||||
| None => { | ||||||
| return Err(format!("retry/part/{}/missing_etag", failed.part_number).into()); | ||||||
| } | ||||||
| }; | ||||||
|
|
||||||
| retry_parts.push(UploadedPart { | ||||||
| etag: etag.ok_or_else(|| { | ||||||
| format!( | ||||||
| "retry/part/{}/error: ETag header not found", | ||||||
| failed.part_number | ||||||
| ) | ||||||
| })?, | ||||||
| etag, | ||||||
| part_number: failed.part_number, | ||||||
| size, | ||||||
| total_size: failed.total_size, | ||||||
|
|
@@ -2112,13 +2229,15 @@ pub async fn singlepart_uploader( | |||||
| ) -> Result<(), AuthedApiError> { | ||||||
| let presigned_url = api::upload_signed(&app, request).await?; | ||||||
|
|
||||||
| let resp = app | ||||||
| let request = app | ||||||
| .state::<RetryableHttpClient>() | ||||||
| .as_ref() | ||||||
| .map_err(|err| format!("singlepart_uploader/client: {err:?}"))? | ||||||
| .put(&presigned_url) | ||||||
| .header("Content-Length", total_size) | ||||||
| .body(reqwest::Body::wrap_stream(stream)) | ||||||
| .body(reqwest::Body::wrap_stream(stream)); | ||||||
|
|
||||||
| let resp = with_drive_content_range(request, &presigned_url, 0, total_size, total_size) | ||||||
| .send() | ||||||
| .await | ||||||
| .map_err(|err| format!("singlepart_uploader/error: {err:?}"))?; | ||||||
|
|
@@ -2759,6 +2878,18 @@ mod tests { | |||||
| let complete = state.to_complete_manifest(); | ||||||
| assert!(complete.is_complete); | ||||||
| assert_eq!(complete.video_segments.len(), 2); | ||||||
| assert!(complete.has_video_content()); | ||||||
| } | ||||||
|
|
||||||
| #[tokio::test] | ||||||
| async fn upload_state_without_video_segments_has_no_video_content() { | ||||||
| let mut state = SegmentUploadState::new(); | ||||||
| state.video_init_uploaded = true; | ||||||
| state.audio_init_uploaded = true; | ||||||
|
|
||||||
| let complete = state.to_complete_manifest(); | ||||||
| assert!(complete.is_complete); | ||||||
| assert!(!complete.has_video_content()); | ||||||
| } | ||||||
|
|
||||||
| #[tokio::test] | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| import { Button } from "@cap/ui-solid"; | ||
| import { useNavigate } from "@solidjs/router"; | ||
| import IconLucideArrowLeft from "~icons/lucide/arrow-left"; | ||
|
|
||
| export function IntegrationConfigHeader(props: { title: string }) { | ||
| const navigate = useNavigate(); | ||
|
|
||
| return ( | ||
| <div class="flex shrink-0 justify-between items-center pb-3"> | ||
| <Button | ||
| variant="gray" | ||
| size="sm" | ||
| class="gap-1.5" | ||
| onClick={() => navigate("/settings/integrations")} | ||
| > | ||
| <IconLucideArrowLeft class="size-3.5" /> | ||
| Back | ||
| </Button> | ||
| <p class="text-sm font-medium text-gray-12">{props.title}</p> | ||
| </div> | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
containshere can mis-detect non-Google hosts that happen to include the substring in the URL. Parsing + checking host/path keeps this strict and avoids false positives.