Skip to content

Commit

Permalink
Merge pull request #52 from travisbrown/topic/wayback-rs
Browse files Browse the repository at this point in the history
Begin integrating wayback-rs
  • Loading branch information
travisbrown authored Mar 21, 2022
2 parents 56bf955 + 88d708d commit 38bfc69
Show file tree
Hide file tree
Showing 34 changed files with 553 additions and 2,581 deletions.
384 changes: 332 additions & 52 deletions Cargo.lock

Large diffs are not rendered by default.

21 changes: 1 addition & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tokio-test = "0.4"
toml = "0.5"
tryhard = "0.4"
url = "2.2"
wayback-rs = "0.2.0"

[features]
bundled-sqlite3 = ["libsqlite3-sys/bundled"]
Expand All @@ -67,11 +68,6 @@ name = "twcc"
test = false
bench = false

[[bin]]
name = "twdl"
test = false
bench = false

[[bin]]
name = "twsearch"
test = false
Expand All @@ -82,11 +78,6 @@ name = "twshoot"
test = false
bench = false

[[bin]]
name = "wbdl"
test = false
bench = false

[[bin]]
name = "wbparse"
test = false
Expand All @@ -106,13 +97,3 @@ bench = false
name = "wbstore"
test = false
bench = false

[[bin]]
name = "wbtweets"
test = false
bench = false

[[bin]]
name = "wbvalidate"
test = false
bench = false
55 changes: 0 additions & 55 deletions schemas/store.sql

This file was deleted.

18 changes: 0 additions & 18 deletions schemas/wb-tweet.sql

This file was deleted.

62 changes: 62 additions & 0 deletions src/bin/cdxdl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use chrono::Utc;
use clap::Parser;
use futures::TryStreamExt;
use futures_locks::Mutex;
use std::fs::File;
use wayback_rs::cdx::IndexClient;

const PAGE_SIZE: usize = 150000;

#[tokio::main]
async fn main() -> Result<(), Error> {
let opts: Opts = Opts::parse();
let _ = cancel_culture::cli::init_logging(opts.verbose)?;
let client = IndexClient::default();

let output_path = opts
.output
.unwrap_or_else(|| format!("{}.csv", Utc::now().timestamp()));
let output = Mutex::new(csv::WriterBuilder::new().from_writer(File::create(output_path)?));

client
.stream_search(&opts.query, PAGE_SIZE)
.map_err(Error::from)
.try_for_each(|item| {
let output = output.clone();
async move {
let mut output = output.lock().await;
output.write_record(item.to_record())?;

Ok(())
}
})
.await?;

Ok(())
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("I/O error: {0:?}")]
Io(#[from] std::io::Error),
#[error("CDX error: {0:?}")]
IndexClient(#[from] wayback_rs::cdx::Error),
#[error("CSV writing error: {0:?}")]
Csv(#[from] csv::Error),
#[error("Log initialization error: {0:?}")]
LogInitialization(#[from] log::SetLoggerError),
}

#[derive(Parser)]
#[clap(name = "cdxdl", version, author)]
struct Opts {
/// Level of verbosity
#[clap(short, long, parse(from_occurrences))]
verbose: i32,
/// Query URL
#[clap(short, long)]
query: String,
/// Output file (defaults to <timestamp>.csv)
#[clap(short, long)]
output: Option<String>,
}
14 changes: 10 additions & 4 deletions src/bin/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use csv::ReaderBuilder;
use itertools::Itertools;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use wayback_rs::Item;

type Void = Result<(), Box<dyn std::error::Error>>;

Expand Down Expand Up @@ -44,11 +45,16 @@ async fn main() -> Void {
if row.len() > 2 && hashes.contains(&row[2]) {
Some((
row[2].to_string(),
wbm::Item::parse_optional(
Item::parse_optional_record(
row.get(0),
row.get(1),
row.get(2),
row.get(3),
if row.len() == 5 {
Some("0")
} else {
row.get(4)
},
if row.len() == 5 {
row.get(4)
} else {
Expand All @@ -61,7 +67,7 @@ async fn main() -> Void {
None
}
})
.collect::<HashMap<String, wbm::Item>>();
.collect::<HashMap<String, Item>>();

log::info!("{} items found", by_digest.len());

Expand Down Expand Up @@ -95,7 +101,7 @@ async fn main() -> Void {
}
}

items.sort_by_key(|(_, item)| item.archived);
items.sort_by_key(|(_, item)| item.archived_at);
items.reverse();

println!(
Expand All @@ -109,7 +115,7 @@ async fn main() -> Void {
println!(
"* Archived as @{} on [{}]({})",
tweet.user_screen_name,
item.archived.format("%e %B %Y"),
item.archived_at.format("%e %B %Y"),
item.wayback_url(false)
);
}
Expand Down
32 changes: 19 additions & 13 deletions src/bin/twcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use cancel_culture::{
cli,
reports::deleted_tweets::DeletedTweetReport,
twitter::{extract_status_id, Client, Error, Result},
wayback,
wbm,
};
use clap::Parser;
use egg_mode::{tweet::Tweet, user::TwitterUser};
Expand All @@ -13,6 +13,8 @@ use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::Read;

const CDX_PAGE_LIMIT: usize = 150000;

#[tokio::main]
async fn main() -> Result<()> {
let opts: Opts = Opts::parse();
Expand Down Expand Up @@ -359,15 +361,19 @@ async fn main() -> Result<()> {
ref cdx,
ref screen_name,
} => {
let wayback_client = wayback::cdx::Client::new();
let index_client = wayback_rs::cdx::IndexClient::default();
let downloader = wayback_rs::Downloader::new()?;
let mut items = match cdx {
Some(cdx_path) => {
let cdx_file = File::open(cdx_path).map_err(Error::CdxJsonError)?;
wayback::cdx::Client::load_json(cdx_file)?
wayback_rs::cdx::IndexClient::load_json(cdx_file)?
}
None => {
let url = format!("twitter.com/{}/status/*", screen_name);
wayback_client.search(&url).await?
index_client
.stream_search(&url, CDX_PAGE_LIMIT)
.try_collect::<Vec<_>>()
.await?
}
};

Expand All @@ -376,7 +382,7 @@ async fn main() -> Result<()> {
let results = items.into_iter().group_by(|item| item.url.clone());

let store = match store {
Some(dir) => Some(wayback::Store::load(dir)?),
Some(dir) => Some(wbm::store::Store::load(dir)?),
None => None,
};

Expand All @@ -389,8 +395,8 @@ async fn main() -> Result<()> {
.into_iter()
.filter(|item| item.status.is_none() || item.status == Some(200))
.collect::<Vec<_>>();
let last = valid.iter().map(|item| item.archived).max();
let first = valid.into_iter().min_by_key(|item| item.archived);
let last = valid.iter().map(|item| item.archived_at).max();
let first = valid.into_iter().min_by_key(|item| item.archived_at);

first.zip(last).map(|(f, l)| (id, l, f))
})
Expand All @@ -402,12 +408,12 @@ async fn main() -> Result<()> {

let selected = candidates.into_iter().take(limit.unwrap_or(usize::MAX));

let mut by_id: HashMap<u64, wayback::Item> = HashMap::new();
let mut by_id: HashMap<u64, wayback_rs::Item> = HashMap::new();

for (id, _, current) in selected {
match by_id.get(&id) {
Some(latest) => {
if latest.archived < current.archived {
if latest.archived_at < current.archived_at {
by_id.insert(id, current);
}
}
Expand All @@ -428,7 +434,7 @@ async fn main() -> Result<()> {

use cancel_culture::browser::twitter::parser::BrowserTweet;

let mut report_items = HashMap::<u64, (BrowserTweet, wayback::Item)>::new();
let mut report_items = HashMap::<u64, (BrowserTweet, wayback_rs::Item)>::new();

if let Some(s) = store.as_ref() {
let mut items = Vec::with_capacity(by_id.len());
Expand All @@ -441,7 +447,7 @@ async fn main() -> Result<()> {
}

log::info!("Saving {} items to store", items.len());
wayback_client.save_all(s, &items, true, 4).await?;
s.save_all(&downloader, &items, true, 4).await?;
}

for (id, _) in deleted {
Expand All @@ -454,7 +460,7 @@ async fn main() -> Result<()> {
Some(v) => v,
None => {
log::info!("Downloading {}", item.url);
let bytes = wayback_client.download(item, true).await?;
let bytes = downloader.download_item(item).await?;
match String::from_utf8_lossy(&bytes) {
Cow::Borrowed(value) => value.to_string(),
Cow::Owned(value_with_replacements) => {
Expand Down Expand Up @@ -567,7 +573,7 @@ fn print_user_report(users: &[TwitterUser]) {
}

fn escape_tweet_text(text: &str) -> String {
text.replace(r"\'", "'").replace("\n", " ")
text.replace(r"\'", "'").replace('\n', " ")
}

#[derive(Parser)]
Expand Down
4 changes: 2 additions & 2 deletions src/bin/twcli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn main() -> Void {
id,
names
.iter()
.map(|name| name.replace("|", "\\|"))
.map(|name| name.replace('|', "\\|"))
.collect::<Vec<_>>()
.join(", ")
);
Expand All @@ -131,7 +131,7 @@ async fn main() -> Void {
result
.names
.iter()
.map(|name| name.replace(";", "\\;"))
.map(|name| name.replace(';', "\\;"))
.collect::<Vec<_>>()
.join(";"),
];
Expand Down
Loading

0 comments on commit 38bfc69

Please sign in to comment.