Skip to content

feat(send_queue): report progress for media uploads #5008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

Johennes
Copy link
Contributor

@Johennes Johennes commented May 6, 2025

This makes it possible to listen to the media upload progress when using the send queue. The progress is communicated via EventSendState::NotSentYet.

I haven't yet fixed / enhanced the tests or added a changelog because I'm not 100% sure about this approach and would like to get some preliminary feedback first.

  • Public API changes documented in changelogs (optional)

@Johennes Johennes marked this pull request as ready for review May 26, 2025 18:40
@Johennes Johennes requested a review from a team as a code owner May 26, 2025 18:40
@Johennes Johennes requested review from Hywan and removed request for a team May 26, 2025 18:40
Copy link
Member

@Hywan Hywan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A really nice PR! I've a bit of feedback, but otherwise we are on the right direction.

We (the Matrix Rust team) were discussing the fact the timeline item is going to be recreated for each transmission progress update. Maybe we want a bit of throttle when broadcasting the transmission progress so that we reduce the amount of data sent to the apps. People at Element will start playing with it inside Element X, let's see how it goes.

} {
let mut subscriber = progress.subscribe();
let our_updates = updates.clone();
spawn(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who controls this task? If the upload is cancelled, this task should be cancelled too right? Or maybe the subscriber.next().await may return a None at some point? If that's the case, please test that behaviour specifically, and comment that out, otherwise handle this 🙂.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed next() will yield None once the observable (progress) and its clones are dropped. This happens once the final RoomSendQueueUpdate::MediaUpload is emited further down after the request has completed.

I've added a comment about this but could also add an explicit abort if that would be better.

Copy link
Member

@bnjbvr bnjbvr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, and nice that we could in theory have this! I've got a few questions about the approach below. We'd like to try it out before validating the overall approach, because of the remark related to memory usage across the FFI, but other than that, it would be a great addition!

Comment on lines 100 to 101
/// Is the media a thumbnail?
is_thumbnail: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity: do you think there's lot of value in exposing this? In which cases would a consumer of the API care that an upload progress is related to a thumbnail or not? I'd imagine the interesting information is "what's the overall percentage of progress for the upload", and that this kind of details doesn't bring much value? (Also that would avoid the data migration 😈)

is_thumbnail,
progress,
} => {
self.update_event_send_state(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first sight, I was slightly worried about this approach. This will clone the item every time there's a new progress update, which will cause lots of unnecessary overhead at the FFI layer (especially on Android where the JVM may finalize the items much later after they've been dropped on the Rust side, causing spurious spikes in memory usage). Have you tried it out on a low to medium range Android device? How does it feel, in terms of latency/speed?

Overall, I imagined another API where we could have a side-channel progress stream, exposed independently of the RoomSendQueueUpdates. This way, we could channel those progress stream updates directly to the FFI consumers, in a new delegate — and not have to clone the timeline item each time.

But! Turns out we've discussed this in our team meeting, and we're curious to evaluate the performance of the current approach, so…

TLDR: no changes required here, but would be great if you had some performance feedback for this approach!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about the memory usage. I have actually only used this on iOS so far and don't even have a functional Android setup yet in the project I'm prototyping this. Overall, I'm finding it slightly odd to shove this into send queue updates, too, to be honest. It just seemed like the most straightforward way at the time. But I wouldn't necessarily be opposed to trying out a different approach if you already have concerns.

let mut req =
room.client().media().upload(&mime, data, Some(request_config));
if let Some(watcher) = progress_watcher {
req = req.with_send_progress_observable(watcher);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this will emit the following updates, if I'm sending a file with a thumbnail:

  • first progress updates for the thumbnail, going from 0 to 100%
  • then progress updates for the file itself, going from 0 to 100%

Is that right? I think observers would likely be more observed in the overall progress from 0 to 100%, so it would be nice if we could aggregate those into a single stream, for observers (especially for galleries which may contain many medias and thumbnails).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this goes together with your other comment. I actually cannot think of a reason why a client would be interested in displaying the thumbnail and file upload progress separately. So aggregating them would probably be best.

I think a difficulty with this is determining the overall upload size ahead of time. The way the code is currently structured, we only receive the size of the file upload once the file has been encrypted and actually started uploading. To aggregate it, we would have to know the size already when starting the thumbnail upload, however.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could update UploadEncryptedFile so that it can either take a reader for the plain data (like today) or a vector of the encrypted data? Then we could encrypt both thumbnail and media ahead of time, to compute the overall upload size, store it on the send queue requests and use it inside RoomSendQueue::sending_task to aggregate the progress per thumbnail / media pair. That would mingle the concerns a little of course.

The only other option I can think of is to do some heuristics and translate the bytes sent for the encrypted attachments into bytes of the unencrypted attachments. That would mean the SDK wouldn't report the actually uploaded bytes though which doesn't feel great either.

🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I thought about these two alternatives as well :)

  • The first one would work for a thumbnail and a media, but could lead to device memory exhaustion (and getting OOM killed) if you want the same behavior for a gallery with many items.
  • The second one could work nicely with a rule of 3: fake_number_bytes_sent = number_encrypted_bytes_sent * total_bytes / total_encrypted_bytes. As you're pointing it, it wouldn't be the "perfect" information when forwarded to end users, and it would be wrong if the end user decided to show the live bandwidth usage (MB per second).

An alternative would be to restrict ourselves to "relative" progress (i.e. get a percentage, not raw values). This would make it impossible to display the live bandwidth usage, but might be sufficient to display an inline spinner that fills up over time. That could be a nice 80/20 solution, what do you think @Johennes @Hywan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes good point that about the memory limitations when encrypting everything upfront.

I think the percentage should probably suffice in most cases. Maybe we could just map the internal TransmissionProgress to a new struct that contains only the current percentage then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented this by introducing

pub struct RelativeTransmissionProgress {
    /// The completion percentage as a number between 0 and 1.
    pub percentage: f32,
}

which worked relatively ok apart from having to add three further fields on the request kinds to track the different file sizes.

I've realized too late that a floating point percentage might introduce subtle precision issues. Maybe it's better to track the progress as a pair of current / total integers, possibly with an fn percentage() -> f32?

The other shortcoming is that it's not (easily) possible anymore to track just the thumbnail upload progress (or completion). That is probably not a huge use case though.

Copy link
Contributor Author

@Johennes Johennes Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've switched to

/// Progress of an operation in abstract units.
pub struct AbstractProgress {
    /// How many units were already transferred.
    pub current: usize,
    /// How many units there are in total.
    pub total: usize,
}

now which feels safer.

One thing that's still slightly odd is the file member on RoomSendQueueUpdate::MediaUpload which gives the final MediaSource of only the file but not the thumbnail at the end of the upload. This is in contrast to the progress member which provides the joint progress across file and thumbnail.

It might be better to provide both MediaSources in RoomSendQueueUpdate::MediaUpload?

Alternatively, and possibly a silly idea: What if we reported the individual upload progress (in exact units) and the combined upload progress (in proportional units) via two different RoomSendQueueUpdate variants? The variant for the combined progress wouldn't need any MediaSources then because consumers could get them from the other variant if needed.

(PS: I haven't put any time into fixing the tests yet because I find an agreement on the general approach first.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bnjbvr do you have any thoughts on how to proceed here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops sorry missed that message, I'll take another look next week!

@jmartinesp
Copy link
Contributor

jmartinesp commented May 28, 2025

I just tested this on EXA and it seems like there's a memory peak when receiving the upload progress:

Grabacion.de.pantalla.2025-05-28.a.las.9.54.21.mov

Keep in mind though that this may also be caused by the upload process itself.


I tried measuring the allocations now and I got this:

image

EventSendProgress$MediaUpload is the binding version of the SDK record, MediaUploadProgress is what we map it to. I think we have 2x the SDK items because we don't map them in the pinned events timeline, so they're discarded.

The range in the image shows an spike in general allocations, specially in the Java Heap. It's a shame we can't filter out items for the graph and just get the values for these allocations, we have no easy way of knowing whether these allocations come mainly from the EventSendProgress objects and their mapping process or there's something else happening at the same time.

@Hywan
Copy link
Member

Hywan commented May 28, 2025

@jmartinesp Thanks! Can you measure if the memory peak maps to the size of the media, or is it something else?

@jmartinesp
Copy link
Contributor

image

I'd say it looks quite similar without these changes, the image above being a bit more stretched.

@Johennes
Copy link
Contributor Author

@jmartinesp thanks a bunch for taking the time to test the impact of this on Android! ❤️

bnjbvr pushed a commit that referenced this pull request Jun 4, 2025
Addendum to #5125 to
allow sending galleries from the FFI crate. This is the final PR for
galleries (apart from possible enhancements to report the upload status
in #5008).

This is somewhat lengthy again, apologies. Most of the changes are just
wrappers and type mapping, however. So I was hoping that it's relatively
straightforward to review in bulk. Happy to try and elaborate on the
changes or break them up into smaller commits if that helps, however.

---------

Signed-off-by: Johannes Marbach <[email protected]>
@bnjbvr bnjbvr self-requested a review June 12, 2025 16:09
@@ -379,6 +379,10 @@ pub struct SentMediaInfo {
/// thumbnail media source.
pub file: MediaSource,

/// The number of bytes uploaded for the file.
#[serde(default)]
pub bytes: usize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the size of the media, or is it the number of bytes sent? It means: if the upload has been cancelled, what will be the value? Are we going to have this type at all?

If this is the total size of the media, couldn't it be part of MediaSource (in the file field)?

@@ -643,6 +643,7 @@ impl RoomSendQueue {
let _ = updates.send(RoomSendQueueUpdate::UploadedMedia {
related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
file: media_info.file,
bytes: media_info.bytes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see why you need bytes in SentMediaInfo now.

Copy link
Member

@bnjbvr bnjbvr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could avoid storing the sizes in persistent storage; after all, we're only interested in the aggregate of the remaining files to upload, so keeping this information in memory ought to be sufficient (plus, all the previously uploaded medias should still be in the event cache store until the media event is sent, at the end of the process). Let's try to make it more stateless instead.

This PR is a bit hard to review; if you introduce renamings, please keep the commit history tidy (no merge commits, only rebase/fixup! commits) and put renamings into individual commits. Also, would be sweet to group changes to the same layer (send queue vs timeline vs FFI) in commits, so as to give me a "direction" where to start from, and where to look at, in the end. You don't have to do it for this very specific PR, but for what it's worth, this is the kind of empathetic changes that usually helps (thus speeds up) reviews :)

I'd like to give this more thought and another look. Also, please add at least one end-to-end test so as to validate the approach :)

Comment on lines +115 to +127
#[serde(default)]
file_size: usize,

/// If this is a media file upload and a related thumbnail was
/// previously, uploaded, the number of bytes in the unencrypted
/// thumbnail.
#[serde(default)]
thumbnail_file_size: Option<usize>,

/// If this is a thumbnail upload, the number of bytes in the
/// unencrypted media file to be uploaded subsequently.
#[serde(default)]
media_file_size: Option<usize>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm failing to see why any of these should be persisted: after all, this information can be inferred from the combination of the previous sent media infos (that we already have at our disposal), and using the cache store, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, that sounds promising but I'm struggling a little to implement it. 🙈

The *file_size fields are all evaluated in RoomSendQueue::sending_task. I can get the size of the current file there using QueuedRequestKind::MediaUpload::cache_key, you're right. So that obsoletes file_size.

For thumbnail_file_size, I can access the MediaSource of a previously uploaded thumbnail from QueuedRequestKind::MediaUpload::thumbnail_source. This contains the final value after the upload. However, at this point the thumbnail is still stored under its local key in the cache. The local key is derived from the transaction ID and only swapped out in the handle_dependent_finish_upload method when processing the final DependentQueuedRequestKind::FinishUpload request. I'm not sure how to get at that transaction ID from here without adding it in QueuedRequestKind::MediaUpload?

For media_file_size, I have a similar problem but here I need the transaction ID of the dependent request to upload the file. I don't have direct access to that in RoomSendQueue::sending_task either. I could maybe try to load it from the store by generalizing load_dependent_queued_requests into something that gives me the child request of a specific parent request?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For thumbnail_file_size

I'm having a hard time to follow, so please bear with me :) It looks to me that the thumbnail_source field should include the uploaded (so the final non-local MXC) here, or many comments are wrong in the code. Is that not the case? I would assume that, since thumbnails are uploaded before the accompanying media, and the QueuedRequest is pushed after the thumbnail upload completed: the source would include a non-local URL. And it should've been renamed in the media cache as well, so you could get the media from there?

For media_file_size,

Sounds like you could load all the dependent queued requests and do the filtering outside the database. Or add a new event cache store trait method to do this kind of filtering.


Alternatively: since the upload progress is mostly concerned about what we do in the current user session, there might be a simpler alternative that doesn't involve the store at all: keep an in-memory mapping of (media|thumbnail) -> size. It's initially filled with some entries when creating the request, and these entries are removed when the final request (media | gallery upload) is complete.

I think that's correct too, in the case of interruptions (e.g. the process exits improperly). The next time we'll open the app, we'll be only interested about progress of the things we have not uploaded yet, so while the total may not include the total size for all the media/thumbnails we've already included, it should still be interesting to the user, as it shows the current progress.

Comment on lines +262 to +276
/// The number of bytes in the unencrypted file (or thumbnail) to be
/// uploaded by this request.
#[serde(default)]
file_size: usize,

/// If this is a media file upload and a related thumbnail was
/// previously, uploaded, the number of bytes in the unencrypted
/// thumbnail.
#[serde(default)]
thumbnail_file_size: Option<usize>,

/// If this is a thumbnail upload, the number of bytes in the
/// unencrypted media file to be uploaded subsequently.
#[serde(default)]
media_file_size: Option<usize>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

@@ -231,6 +231,15 @@ pub struct TransmissionProgress {
pub total: usize,
}

/// Progress of an operation in abstract units.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain in the doc comment why the units are abstract / what does it mean?

Comment on lines +626 to +629
// Watch and communicate the progress on a detached background task. Once
// the progress observable is dropped, next() will return None and the
// task will end.
spawn(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it correctly terminate when the upload is cancelled? (needs a test!)


// Prepare to watch and communicate the request progress.
let progress = if let Some(related_to) = related_txn_id.as_ref() {
let progress: SharedObservable<TransmissionProgress> = Default::default();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we ought to create the progress observer if no one's going to observe the upload. Let's make it configurable one way or another via a parameter to send_attachement (could be a new field in AttachmentConfig, maybe?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What might be slightly confusing is that the SendAttachment future holds the AttachmentConfig with the flag to report progress when using the send queue. It also has a subscribe_to_send_progress method for subscribing to progress updates when not using the config, however. Maybe we could instead add the flag as a parameter on use_send_queue?

pub fn use_send_queue(self, report_progress: bool) -> Self {
    Self { use_send_queue: true, report_send_queue_progress: report_progress, ..self }
}

In either case, I will also have to add it on QueuedRequestKind::MediaUpload and DependentQueuedRequestKind::UploadFileOrThumbnail then to make it available here. I assume that is ok because I cannot think of another way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Ah yeah, eventually we'd get rid of subscribe_to_send_progress entirely, as well as the use_send_queue method, so the timeline/FFI will only use the send queue.)

I think it's nice to keep it as a separate field in AttachmentConfig. Sorry I got confused for a moment :)

In either case, I will also have to add it on QueuedRequestKind::MediaUpload and DependentQueuedRequestKind::UploadFileOrThumbnail then to make it available here. I assume that is ok because I cannot think of another way?

We could also have a global setting, for all the RoomSendQueues, that controls whether we want to observe upload progress or not. I'd imagine this would cover most use cases: either you want all the progress updates for medias, or you want none of them. I can't imagine a case where one would be interested in some progress updates, but not the other ones.

This way, it could be stored directly as an AtomicBool in SendQueueData, and that would be read (1) before creating the task, (2) before sending any media upload progress update (maybe this point is overkill, though?).

What do you think?

@@ -592,9 +663,16 @@ impl RoomSendQueue {
}

SentRequestKey::Media(media_info) => {
let _ = updates.send(RoomSendQueueUpdate::UploadedMedia {
let _ = updates.send(RoomSendQueueUpdate::MediaUpload {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you really want to rename a data structure, please do it in a separate commit, otherwise the review becomes much harder than it ought to be :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it looks like I forgot all the good practices while iterating on this PR. 🙈

Will try to be more diligent in future.

// the original one. Use the rule of 3 to proportionally map the
// progress into units of the original file size.
let mut current =
(progress.current as f32 / progress.total as f32 * file_size as f32).round() as usize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use f64, we're not really constrained here, and we shouldn't lose precision for no good reasons.

@@ -2163,7 +2345,7 @@ pub struct SendHandle {
///
/// If this is a media upload, this is the "main" transaction id, i.e. the
/// one used to send the event, and that will be seen by observers.
transaction_id: OwnedTransactionId,
pub transaction_id: OwnedTransactionId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, you likely shouldn't have to do this, keeping the field private was a conscious choice :) Why do you need this?

@Johennes Johennes force-pushed the johannes/send-queue-media-progress branch from c1a2f65 to 857dcd2 Compare June 24, 2025 14:30
@bnjbvr bnjbvr self-requested a review June 25, 2025 08:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants