Skip to content

Commit

Permalink
Reduce use of Stream trait extensions from the futures_util crate
Browse files Browse the repository at this point in the history
  • Loading branch information
nickelc committed Nov 14, 2023
1 parent 9803ec3 commit ef3100d
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 65 deletions.
16 changes: 7 additions & 9 deletions src/commands/game.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::fmt::Write;

use futures_util::{Stream, StreamExt, TryStreamExt};
use modio::filter::prelude::*;
use modio::types::games::Game;
use modio::types::id::GameId;
use tokio_stream::{Stream, StreamExt};
use twilight_model::application::command::{Command, CommandType};
use twilight_model::application::interaction::application_command::{
CommandData, CommandDataOption, CommandOptionValue,
Expand Down Expand Up @@ -77,14 +77,12 @@ pub async fn games(
}
}
_ => {
let games = games
.try_fold(ContentBuilder::new(4000), |mut buf, game| {
_ = writeln!(&mut buf, "`{}.` {}", game.id, game.name);
async { Ok(buf) }
})
.await?;

update_response_from_content(ctx, interaction, "Games", &games.content).await
let mut buf = ContentBuilder::new(4000);
while let Some(game) = games.try_next().await? {
_ = writeln!(&mut buf, "`{}.` {}", game.id, game.name);
}

update_response_from_content(ctx, interaction, "Games", &buf.content).await
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/commands/mods.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::borrow::Cow;
use std::fmt::Write;

use futures_util::TryStreamExt;
use modio::filter::prelude::*;
use modio::mods::filters::Popular;
use modio::types::games::{ApiAccessOptions, Game};
use modio::types::id::{GameId, ModId};
use modio::types::mods::{Mod, Statistics};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
use twilight_model::application::command::{Command, CommandType};
use twilight_model::application::interaction::application_command::{
CommandData, CommandDataOption, CommandOptionValue,
Expand Down
73 changes: 39 additions & 34 deletions src/commands/subs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::collections::{BTreeMap, HashMap};
use std::fmt::{Display, Write};

use futures_util::stream::FuturesUnordered;
use futures_util::{future, TryStreamExt};
use modio::filter::prelude::*;
use modio::types::games::{ApiAccessOptions, Game};
use modio::types::id;
use modio::types::mods::Mod;
use modio::Modio;
use tokio_stream::StreamExt;
use twilight_model::application::command::{Command, CommandType};
use twilight_model::application::interaction::application_command::{
CommandData, CommandDataOption, CommandOptionValue,
Expand Down Expand Up @@ -502,24 +502,26 @@ async fn mods_muted(
1 => {
let (GameId(game), mods) = excluded.into_iter().next().unwrap();
let filter = Id::_in(mods.into_iter().collect::<Vec<_>>());
ctx.modio
let mut st = ctx
.modio
.game(id::Id::new(game))
.mods()
.search(filter)
.iter()
.await?
.try_fold(ContentBuilder::new(4000), |mut buf, m| {
_ = writeln!(&mut buf, "`{}.` {}", m.id, m.name);
async { Ok(buf) }
})
.await?
.await?;

let mut buf = ContentBuilder::new(4000);
while let Some(mod_) = st.try_next().await? {
_ = writeln!(&mut buf, "`{}.` {}", mod_.id, mod_.name);
}
buf
}
_ => {
excluded
let mut st = excluded
.into_iter()
.map(|(GameId(game), mods)| {
.map(|(GameId(game), mods)| async move {
let filter = Id::_in(mods.into_iter().collect::<Vec<_>>());
future::try_join(
tokio::try_join!(
ctx.modio.game(id::Id::new(game)).get(),
ctx.modio
.game(id::Id::new(game))
Expand All @@ -528,16 +530,17 @@ async fn mods_muted(
.collect(),
)
})
.collect::<FuturesUnordered<_>>()
.try_fold(ContentBuilder::new(4000), |mut buf, (game, mods)| {
_ = writeln!(&mut buf, "**{}**", game.name);
for m in mods {
_ = writeln!(&mut buf, "`{}.` {}", m.id, m.name);
}
_ = writeln!(&mut buf);
async { Ok(buf) }
})
.await?
.collect::<FuturesUnordered<_>>();

let mut buf = ContentBuilder::new(4000);
while let Some((game, mods)) = st.try_next().await? {
_ = writeln!(&mut buf, "**{}**", game.name);
for m in mods {
_ = writeln!(&mut buf, "`{}.` {}", m.id, m.name);
}
_ = writeln!(&mut buf);
}
buf
}
};

Expand Down Expand Up @@ -687,21 +690,23 @@ async fn users_muted(
muted
}
_ => {
excluded
let mut st = excluded
.into_iter()
.map(|(GameId(game), users)| {
future::try_join(ctx.modio.game(id::Id::new(game)).get(), async { Ok(users) })
})
.collect::<FuturesUnordered<_>>()
.try_fold(ContentBuilder::new(4000), |mut buf, (game, users)| {
_ = writeln!(&mut buf, "**{}**", game.name);
for (i, name) in users.iter().enumerate() {
_ = writeln!(&mut buf, "`{}.` {name}", i + 1);
}
_ = writeln!(&mut buf);
async { Ok(buf) }
.map(|(GameId(game), users)| async move {
let game = ctx.modio.game(id::Id::new(game)).get().await?;
Ok::<_, Error>((game, users))
})
.await?
.collect::<FuturesUnordered<_>>();

let mut buf = ContentBuilder::new(4000);
while let Some((game, users)) = st.try_next().await? {
_ = writeln!(&mut buf, "**{}**", game.name);
for (i, name) in users.iter().enumerate() {
_ = writeln!(&mut buf, "`{}.` {name}", i + 1);
}
_ = writeln!(&mut buf);
}
buf
}
};

Expand Down
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
use std::path::PathBuf;

use dotenv::dotenv;
use futures_util::{future, StreamExt};
use futures_util::future;
use tokio_stream::StreamExt;
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::EnvFilter;

Expand Down
38 changes: 18 additions & 20 deletions src/tasks/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Arc;
use std::time::Duration;

use dashmap::DashSet;
use futures_util::TryStreamExt;
use modio::filter::prelude::*;
use modio::mods::filters::events::EventType as EventTypeFilter;
use modio::types::games::{ApiAccessOptions, Game};
Expand Down Expand Up @@ -152,17 +151,14 @@ pub fn task(ctx: Context) -> impl Future<Output = ()> {
// - Filter `MODFILE_CHANGED` events for new mods
// - Ungroup the events ordered by event id

let mut events = events
.iter()
.await?
.try_fold(Events::new(), |mut events, e| async move {
events
.entry(e.mod_id)
.or_default()
.push((e.id, e.event_type));
Ok(events)
})
.await?;
let mut st = events.iter().await?;
let mut events = Events::new();
while let Some(event) = st.try_next().await? {
events
.entry(event.mod_id)
.or_default()
.push((event.id, event.event_type));
}

if events.is_empty() {
return Ok(());
Expand All @@ -182,14 +178,16 @@ pub fn task(ctx: Context) -> impl Future<Output = ()> {

// Load the mods for the events
let filter = Id::_in(events.keys().collect::<Vec<_>>());
let events = mods
.search(filter)
.iter()
.await?
.map_ok(|m| events.get(&m.id).map(|evt| (m, evt)))
.try_filter_map(|e| async { Ok(e) })
.try_collect::<Vec<_>>()
.await?;
let mut st = mods.search(filter).iter().await?;
let events = {
let mut evts = Vec::new();
while let Some(Ok(mod_)) = st.next().await {
if let Some(evt) = events.get(&mod_.id) {
evts.push((mod_, evt));
}
}
evts
};

// Ungroup the events ordered by event id
let mut updates = BTreeMap::new();
Expand Down

0 comments on commit ef3100d

Please sign in to comment.