From 7cca6fc298b9be8697052bf010b73c8f7df0b53a Mon Sep 17 00:00:00 2001 From: mdecimus Date: Sun, 22 Dec 2024 19:35:22 +0100 Subject: [PATCH] HTTP remote lists and Spam filter improvements --- Cargo.toml | 16 +- crates/common/src/config/inner.rs | 2 - crates/common/src/config/scripts.rs | 12 +- crates/common/src/config/spamfilter.rs | 162 +------------ crates/common/src/core.rs | 6 +- crates/common/src/enterprise/mod.rs | 4 +- crates/common/src/expr/functions/asynch.rs | 14 +- crates/common/src/expr/functions/misc.rs | 13 ++ crates/common/src/expr/functions/mod.rs | 1 + crates/common/src/lib.rs | 44 +--- crates/common/src/listener/acme/resolver.rs | 6 +- crates/common/src/manager/boot.rs | 2 +- crates/common/src/manager/mod.rs | 3 +- crates/common/src/manager/reload.rs | 2 +- crates/common/src/scripts/plugins/lookup.rs | 14 +- crates/jmap/src/api/http.rs | 5 +- crates/main/Cargo.toml | 4 +- crates/smtp/src/inbound/hooks/client.rs | 3 +- crates/smtp/src/outbound/mta_sts/lookup.rs | 2 +- crates/smtp/src/scripts/event_loop.rs | 13 +- crates/spam-filter/src/analysis/bayes.rs | 25 +- crates/spam-filter/src/analysis/dmarc.rs | 76 +----- crates/spam-filter/src/analysis/domain.rs | 2 +- crates/spam-filter/src/analysis/from.rs | 40 +--- crates/spam-filter/src/analysis/headers.rs | 4 +- crates/spam-filter/src/analysis/messageid.rs | 11 +- crates/spam-filter/src/analysis/mod.rs | 25 +- crates/spam-filter/src/analysis/recipient.rs | 27 --- crates/spam-filter/src/analysis/replyto.rs | 42 ---- crates/spam-filter/src/analysis/url.rs | 27 +-- crates/spam-filter/src/modules/mod.rs | 18 +- crates/spam-filter/src/modules/remote_list.rs | 217 ----------------- crates/store/Cargo.toml | 4 +- crates/store/src/backend/composite/mod.rs | 2 +- .../{distributed_blob.rs => sharded_blob.rs} | 10 +- crates/store/src/backend/http/config.rs | 116 +++++++++ crates/store/src/backend/http/lookup.rs | 220 ++++++++++++++++++ crates/store/src/backend/http/mod.rs | 51 ++++ crates/store/src/backend/memory/mod.rs | 15 +- crates/store/src/backend/mod.rs | 5 +- crates/store/src/config.rs | 19 +- crates/store/src/dispatch/blob.rs | 6 +- crates/store/src/dispatch/lookup.rs | 114 +++++++-- crates/store/src/lib.rs | 7 +- crates/trc/src/event/description.rs | 8 +- crates/trc/src/event/level.rs | 14 +- crates/trc/src/ipc/metrics.rs | 4 +- crates/trc/src/lib.rs | 4 +- crates/trc/src/serializers/binary.rs | 8 +- crates/utils/Cargo.toml | 2 +- crates/utils/src/lib.rs | 33 +++ tests/Cargo.toml | 4 +- tests/resources/smtp/antispam/combined.test | 4 +- tests/resources/smtp/antispam/dmarc.test | 18 +- tests/resources/smtp/antispam/from.test | 8 +- tests/resources/smtp/antispam/messageid.test | 2 +- tests/resources/smtp/antispam/recipient.test | 2 +- tests/resources/smtp/antispam/replyto.test | 10 +- tests/src/smtp/inbound/antispam.rs | 19 +- 59 files changed, 743 insertions(+), 808 deletions(-) delete mode 100644 crates/spam-filter/src/modules/remote_list.rs rename crates/store/src/backend/composite/{distributed_blob.rs => sharded_blob.rs} (96%) create mode 100644 crates/store/src/backend/http/config.rs create mode 100644 crates/store/src/backend/http/lookup.rs create mode 100644 crates/store/src/backend/http/mod.rs diff --git a/Cargo.toml b/Cargo.toml index bd71ca415..fba766803 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,14 +2,14 @@ resolver = "2" members = [ "crates/main", - "crates/jmap", - "crates/jmap-proto", - "crates/imap", - "crates/imap-proto", - "crates/smtp", - "crates/managesieve", - "crates/pop3", - "crates/spam-filter", +# "crates/jmap", +# "crates/jmap-proto", +# "crates/imap", +# "crates/imap-proto", +# "crates/smtp", +# "crates/managesieve", +# "crates/pop3", +# "crates/spam-filter", "crates/nlp", "crates/store", "crates/directory", diff --git a/crates/common/src/config/inner.rs b/crates/common/src/config/inner.rs index ac6ad199b..797eb240a 100644 --- a/crates/common/src/config/inner.rs +++ b/crates/common/src/config/inner.rs @@ -104,7 +104,6 @@ impl Data { shard_amount, ), smtp_connectors: TlsConnectors::default(), - remote_lists: Default::default(), asn_geo_data: Default::default(), } } @@ -121,7 +120,6 @@ impl Default for Data { blocked_ips_version: 0.into(), permissions: Default::default(), permissions_version: 0.into(), - remote_lists: Default::default(), jmap_id_gen: Default::default(), queue_id_gen: Default::default(), span_id_gen: Default::default(), diff --git a/crates/common/src/config/scripts.rs b/crates/common/src/config/scripts.rs index 254cd5b91..1a372279f 100644 --- a/crates/common/src/config/scripts.rs +++ b/crates/common/src/config/scripts.rs @@ -4,11 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::{ - collections::HashSet, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{sync::Arc, time::Duration}; use ahash::AHashMap; use sieve::{compiler::grammar::Capability, Compiler, Runtime, Sieve}; @@ -34,12 +30,6 @@ pub struct Scripting { pub untrusted_scripts: AHashMap>, } -#[derive(Clone)] -pub struct RemoteList { - pub entries: HashSet, - pub expires: Instant, -} - impl Scripting { pub async fn parse(config: &mut Config, stores: &Stores) -> Self { // Parse untrusted compiler diff --git a/crates/common/src/config/spamfilter.rs b/crates/common/src/config/spamfilter.rs index 0799ef334..05164e061 100644 --- a/crates/common/src/config/spamfilter.rs +++ b/crates/common/src/config/spamfilter.rs @@ -11,7 +11,7 @@ use nlp::bayes::BayesClassifier; use tokio::net::lookup_host; use utils::{ config::{utils::ParseValue, Config}, - glob::{GlobMap, GlobSet}, + glob::GlobMap, }; use super::{if_block::IfBlock, tokenizer::TokenMap}; @@ -61,16 +61,8 @@ pub struct DnsBlConfig { #[derive(Debug, Clone, Default)] pub struct SpamFilterLists { - pub dmarc_allow: GlobSet, - pub spf_dkim_allow: GlobSet, - pub freemail_providers: GlobSet, - pub disposable_providers: GlobSet, - pub trusted_domains: GlobSet, - pub url_redirectors: GlobSet, pub file_extensions: GlobMap, pub scores: GlobMap>, - pub spamtraps: GlobSet, - pub remote: Vec, } #[derive(Debug, Clone)] @@ -162,21 +154,6 @@ pub enum Location { Tcp, } -#[derive(Debug, Clone)] -pub struct RemoteListConfig { - pub id: String, - pub url: String, - pub retry: Duration, - pub refresh: Duration, - pub timeout: Duration, - pub max_size: usize, - pub max_entries: usize, - pub max_entry_size: usize, - pub format: RemoteListFormat, - pub scope: Element, - pub tag: String, -} - #[derive(Debug, Clone)] pub struct DnsBlServer { pub id: String, @@ -185,16 +162,6 @@ pub struct DnsBlServer { pub tags: IfBlock, } -#[derive(Debug, Clone)] -pub enum RemoteListFormat { - List, - Csv { - column: u32, - separator: char, - skip_first: bool, - }, -} - impl SpamFilterConfig { pub async fn parse(config: &mut Config) -> Self { SpamFilterConfig { @@ -373,16 +340,8 @@ impl SpamFilterHeaderConfig { impl SpamFilterLists { pub fn parse(config: &mut Config) -> Self { let mut lists = SpamFilterLists { - dmarc_allow: GlobSet::default(), - spf_dkim_allow: GlobSet::default(), - freemail_providers: GlobSet::default(), - disposable_providers: GlobSet::default(), - trusted_domains: GlobSet::default(), - url_redirectors: GlobSet::default(), - spamtraps: GlobSet::default(), file_extensions: GlobMap::default(), scores: GlobMap::default(), - remote: Default::default(), }; // Parse local lists @@ -393,27 +352,6 @@ impl SpamFilterLists { .filter(|(id, key)| !id.is_empty() && !key.is_empty()) { match id { - "dmarc-allow" => { - lists.dmarc_allow.insert(key); - } - "spf-dkim-allow" => { - lists.spf_dkim_allow.insert(key); - } - "freemail-providers" => { - lists.freemail_providers.insert(key); - } - "disposable-providers" => { - lists.disposable_providers.insert(key); - } - "trusted-domains" => { - lists.trusted_domains.insert(key); - } - "url-redirectors" => { - lists.url_redirectors.insert(key); - } - "spam-traps" => { - lists.spamtraps.insert(key); - } "scores" => { let action = match value.to_lowercase().as_str() { "reject" => SpamFilterAction::Reject, @@ -470,104 +408,6 @@ impl SpamFilterLists { config.new_parse_error(key, error); } - // Parse remote lists - for id in config - .sub_keys("spam-filter.remote-list", ".url") - .map(|k| k.to_string()) - .collect::>() - { - let id_ = id.as_str(); - if !config - .property_or_default(("spam-filter.remote-list", id_, "enable"), "true") - .unwrap_or(true) - { - continue; - } - - let format = match config - .value_require(("spam-filter.remote-list", id_, "format")) - .unwrap_or_default() - { - "list" => RemoteListFormat::List, - "csv" => RemoteListFormat::Csv { - column: config - .property_require(("spam-filter.remote-list", id_, "column")) - .unwrap_or(0), - separator: config - .property_or_default::( - ("spam-filter.remote-list", id_, "separator"), - ",", - ) - .unwrap_or_default() - .chars() - .next() - .unwrap_or(','), - skip_first: config - .property_or_default::( - ("spam-filter.remote-list", id_, "skip-first"), - "false", - ) - .unwrap_or(false), - }, - other => { - let message = format!("Invalid format: {other:?}"); - config.new_build_error(("spam-filter.remote-list", id_, "format"), message); - continue; - } - }; - - lists.remote.push(RemoteListConfig { - url: config - .value_require(("spam-filter.remote-list", id_, "url")) - .unwrap_or_default() - .to_string(), - retry: config - .property_or_default::( - ("spam-filter.remote-list", id_, "retry"), - "1h", - ) - .unwrap_or(Duration::from_secs(3600)), - refresh: config - .property_or_default::( - ("spam-filter.remote-list", id_, "refresh"), - "12h", - ) - .unwrap_or(Duration::from_secs(43200)), - timeout: config - .property_or_default::( - ("spam-filter.remote-list", id_, "timeout"), - "30s", - ) - .unwrap_or(Duration::from_secs(30)), - max_size: config - .property_or_default::( - ("spam-filter.remote-list", id_, "limits.size"), - "104857600", - ) - .unwrap_or(104857600), - max_entries: config - .property_or_default::( - ("spam-filter.remote-list", id_, "limits.entries"), - "100000", - ) - .unwrap_or(100000), - max_entry_size: config - .property_or_default::( - ("spam-filter.remote-list", id_, "limits.entry-size"), - "512", - ) - .unwrap_or(512), - format, - scope: config - .property_require(("spam-filter.remote-list", id_, "scope")) - .unwrap_or_default(), - tag: config - .property_require(("spam-filter.remote-list", id_, "tag")) - .unwrap_or_else(|| format!("REMOTE_LIST_{}", id_.to_uppercase())), - id, - }); - } - lists } } diff --git a/crates/common/src/core.rs b/crates/common/src/core.rs index 34c13d019..016084172 100644 --- a/crates/common/src/core.rs +++ b/crates/common/src/core.rs @@ -66,7 +66,11 @@ impl Server { }) } - pub fn get_in_memory_store(&self, name: &str, session_id: u64) -> &InMemoryStore { + pub fn get_in_memory_store(&self, name: &str) -> Option<&InMemoryStore> { + self.core.storage.lookups.get(name) + } + + pub fn get_in_memory_store_or_default(&self, name: &str, session_id: u64) -> &InMemoryStore { self.core.storage.lookups.get(name).unwrap_or_else(|| { if !name.is_empty() { trc::event!( diff --git a/crates/common/src/enterprise/mod.rs b/crates/common/src/enterprise/mod.rs index a5b986f15..315cc1801 100644 --- a/crates/common/src/enterprise/mod.rs +++ b/crates/common/src/enterprise/mod.rs @@ -26,9 +26,9 @@ use llm::AiApiConfig; use mail_parser::DateTime; use store::Store; use trc::{AddContext, EventType, MetricType}; -use utils::config::cron::SimpleCron; +use utils::{config::cron::SimpleCron, HttpLimitResponse}; -use crate::{expr::Expression, manager::webadmin::Resource, Core, HttpLimitResponse, Server}; +use crate::{expr::Expression, manager::webadmin::Resource, Core, Server}; #[derive(Clone)] pub struct Enterprise { diff --git a/crates/common/src/expr/functions/asynch.rs b/crates/common/src/expr/functions/asynch.rs index 768c7b4ef..3cbe64884 100644 --- a/crates/common/src/expr/functions/asynch.rs +++ b/crates/common/src/expr/functions/asynch.rs @@ -43,8 +43,8 @@ impl Server { let store = params.next_as_string(); let key = params.next_as_string(); - self.get_in_memory_store(store.as_ref(), session_id) - .key_get::(key.into_owned().into_bytes()) + self.get_in_memory_store_or_default(store.as_ref(), session_id) + .key_get::(key) .await .map(|value| value.map(|v| v.into_inner()).unwrap_or_default()) .caused_by(trc::location!()) @@ -53,8 +53,8 @@ impl Server { let store = params.next_as_string(); let key = params.next_as_string(); - self.get_in_memory_store(store.as_ref(), session_id) - .key_exists(key.into_owned().into_bytes()) + self.get_in_memory_store_or_default(store.as_ref(), session_id) + .key_exists(key) .await .caused_by(trc::location!()) .map(|v| v.into()) @@ -64,7 +64,7 @@ impl Server { let key = params.next_as_string(); let value = params.next_as_string(); - self.get_in_memory_store(store.as_ref(), session_id) + self.get_in_memory_store_or_default(store.as_ref(), session_id) .key_set(KeyValue::new( key.into_owned().into_bytes(), value.into_owned().into_bytes(), @@ -79,7 +79,7 @@ impl Server { let key = params.next_as_string(); let value = params.next_as_integer(); - self.get_in_memory_store(store.as_ref(), session_id) + self.get_in_memory_store_or_default(store.as_ref(), session_id) .counter_incr(KeyValue::new(key.into_owned(), value)) .await .map(Variable::Integer) @@ -89,7 +89,7 @@ impl Server { let store = params.next_as_string(); let key = params.next_as_string(); - self.get_in_memory_store(store.as_ref(), session_id) + self.get_in_memory_store_or_default(store.as_ref(), session_id) .counter_get(key.into_owned().into_bytes()) .await .map(Variable::Integer) diff --git a/crates/common/src/expr/functions/misc.rs b/crates/common/src/expr/functions/misc.rs index 76960f797..06776c851 100644 --- a/crates/common/src/expr/functions/misc.rs +++ b/crates/common/src/expr/functions/misc.rs @@ -48,3 +48,16 @@ pub(crate) fn fn_ip_reverse_name(v: Vec) -> Variable { .unwrap_or_default() .into() } + +pub(crate) fn fn_if_then(v: Vec) -> Variable { + let mut v = v.into_iter(); + let condition = v.next().unwrap(); + let iff = v.next().unwrap(); + let then = v.next().unwrap(); + + if condition.to_bool() { + iff + } else { + then + } +} diff --git a/crates/common/src/expr/functions/mod.rs b/crates/common/src/expr/functions/mod.rs index e45e92cc8..b547f6e24 100644 --- a/crates/common/src/expr/functions/mod.rs +++ b/crates/common/src/expr/functions/mod.rs @@ -80,6 +80,7 @@ pub(crate) const FUNCTIONS: &[(&str, fn(Vec) -> Variable, u32)] = &[ ("split_n", text::fn_split_n, 3), ("split_words", text::fn_split_words, 1), ("hash", text::fn_hash, 2), + ("if_then", misc::fn_if_then, 3), ]; pub const F_IS_LOCAL_DOMAIN: u32 = 0; diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 36e16942b..10c04dfc4 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -15,18 +15,11 @@ use ahash::{AHashMap, AHashSet, RandomState}; use arc_swap::ArcSwap; use auth::{oauth::config::OAuthConfig, roles::RolePermissions, AccessToken}; use config::{ - imap::ImapConfig, - jmap::settings::JmapConfig, - network::Network, - scripts::{RemoteList, Scripting}, - smtp::SmtpConfig, - spamfilter::SpamFilterConfig, - storage::Storage, - telemetry::Metrics, + imap::ImapConfig, jmap::settings::JmapConfig, network::Network, scripts::Scripting, + smtp::SmtpConfig, spamfilter::SpamFilterConfig, storage::Storage, telemetry::Metrics, }; use dashmap::DashMap; -use futures::StreamExt; use imap_proto::protocol::list::Attribute; use ipc::{DeliveryEvent, HousekeeperEvent, QueueEvent, ReportingEvent, StateEvent}; use listener::{ @@ -35,7 +28,6 @@ use listener::{ use manager::webadmin::{Resource, WebAdminManager}; use parking_lot::{Mutex, RwLock}; -use reqwest::Response; use rustls::sign::CertifiedKey; use tokio::sync::{mpsc, Notify}; use tokio_rustls::TlsConnector; @@ -117,7 +109,6 @@ pub struct Data { pub permissions: ADashMap>, pub permissions_version: AtomicU8, - pub remote_lists: RwLock>, pub asn_geo_data: AsnGeoLookupData, pub jmap_id_gen: SnowflakeIdGenerator, @@ -307,37 +298,6 @@ impl BuildHasher for ThrottleKeyHasherBuilder { } } -pub trait HttpLimitResponse: Sync + Send { - fn bytes_with_limit( - self, - limit: usize, - ) -> impl std::future::Future>>> + Send; -} - -impl HttpLimitResponse for Response { - async fn bytes_with_limit(self, limit: usize) -> reqwest::Result>> { - if self - .content_length() - .map_or(false, |len| len as usize > limit) - { - return Ok(None); - } - - let mut bytes = Vec::with_capacity(std::cmp::min(limit, 1024)); - let mut stream = self.bytes_stream(); - - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - if bytes.len() + chunk.len() > limit { - return Ok(None); - } - bytes.extend_from_slice(&chunk); - } - - Ok(Some(bytes)) - } -} - impl ConcurrencyLimiters { pub fn is_active(&self) -> bool { self.concurrent_requests.is_active() || self.concurrent_uploads.is_active() diff --git a/crates/common/src/listener/acme/resolver.rs b/crates/common/src/listener/acme/resolver.rs index 25d5e2d8b..cc4e74b9a 100644 --- a/crates/common/src/listener/acme/resolver.rs +++ b/crates/common/src/listener/acme/resolver.rs @@ -13,10 +13,10 @@ use rustls::{ ServerConfig, }; use rustls_pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; -use store::write::Bincode; +use store::{dispatch::lookup::KeyValue, write::Bincode}; use trc::AcmeEvent; -use crate::{listener::acme::directory::SerializedCert, Server}; +use crate::{listener::acme::directory::SerializedCert, Server, KV_ACME}; use super::{directory::ACME_TLS_ALPN_NAME, AcmeProvider, StaticResolver}; @@ -45,7 +45,7 @@ impl Server { pub(crate) async fn build_acme_certificate(&self, domain: &str) -> Option> { match self .in_memory_store() - .key_get::>(format!("acme:{domain}").into_bytes()) + .key_get::>(KeyValue::<()>::build_key(KV_ACME, domain)) .await { Ok(Some(cert)) => { diff --git a/crates/common/src/manager/boot.rs b/crates/common/src/manager/boot.rs index 2d3bd1264..74785670c 100644 --- a/crates/common/src/manager/boot.rs +++ b/crates/common/src/manager/boot.rs @@ -326,7 +326,7 @@ impl BootManager { } // Parse lookup stores - stores.parse_lookups(&mut config).await; + stores.parse_in_memory(&mut config).await; // Parse settings let core = Core::parse(&mut config, stores, manager).await; diff --git a/crates/common/src/manager/mod.rs b/crates/common/src/manager/mod.rs index 76cf2d4bf..90817ddeb 100644 --- a/crates/common/src/manager/mod.rs +++ b/crates/common/src/manager/mod.rs @@ -7,8 +7,9 @@ use std::time::Duration; use hyper::HeaderMap; +use utils::HttpLimitResponse; -use crate::{HttpLimitResponse, USER_AGENT}; +use crate::USER_AGENT; use self::config::ConfigManager; diff --git a/crates/common/src/manager/reload.rs b/crates/common/src/manager/reload.rs index af9d2b031..38c6c15c0 100644 --- a/crates/common/src/manager/reload.rs +++ b/crates/common/src/manager/reload.rs @@ -79,7 +79,7 @@ impl Server { purge_schedules: Default::default(), }; stores.parse_stores(&mut config).await; - stores.parse_lookups(&mut config).await; + stores.parse_in_memory(&mut config).await; // Parse tracers let tracers = Telemetry::parse(&mut config, &stores); diff --git a/crates/common/src/scripts/plugins/lookup.rs b/crates/common/src/scripts/plugins/lookup.rs index 398ec1de6..40c55f849 100644 --- a/crates/common/src/scripts/plugins/lookup.rs +++ b/crates/common/src/scripts/plugins/lookup.rs @@ -41,21 +41,13 @@ pub async fn exec(ctx: PluginContext<'_>) -> trc::Result { Ok(match &ctx.arguments[1] { Variable::Array(items) => { for item in items.iter() { - if !item.is_empty() - && store - .key_exists(item.to_string().into_owned().into_bytes()) - .await? - { + if !item.is_empty() && store.key_exists(item.to_string()).await? { return Ok(true.into()); } } false } - v if !v.is_empty() => { - store - .key_exists(v.to_string().into_owned().into_bytes()) - .await? - } + v if !v.is_empty() => store.key_exists(v.to_string()).await?, _ => false, } .into()) @@ -71,7 +63,7 @@ pub async fn exec_get(ctx: PluginContext<'_>) -> trc::Result { .ctx(trc::Key::Id, ctx.arguments[0].to_string().into_owned()) .details("Unknown store") })? - .key_get::(ctx.arguments[1].to_string().into_owned().into_bytes()) + .key_get::(ctx.arguments[1].to_string()) .await .map(|v| v.map(|v| v.into_inner()).unwrap_or_default()) } diff --git a/crates/jmap/src/api/http.rs b/crates/jmap/src/api/http.rs index 0553dc225..b311ddfc1 100644 --- a/crates/jmap/src/api/http.rs +++ b/crates/jmap/src/api/http.rs @@ -13,7 +13,7 @@ use common::{ ipc::StateEvent, listener::{ServerInstance, SessionData, SessionManager, SessionStream}, manager::webadmin::Resource, - Inner, Server, + Inner, Server, KV_ACME, }; use directory::Permission; use http_body_util::{BodyExt, Full}; @@ -32,6 +32,7 @@ use jmap_proto::{ types::{blob::BlobId, id::Id}, }; use std::future::Future; +use store::dispatch::lookup::KeyValue; use trc::SecurityEvent; use utils::url_params::UrlParams; @@ -247,7 +248,7 @@ impl ParseHttp for Server { .core .storage .lookup - .key_get::(format!("acme:{token}").into_bytes()) + .key_get::(KeyValue::<()>::build_key(KV_ACME, token)) .await? { Some(proof) => Ok(Resource::new("text/plain", proof.into_bytes()) diff --git a/crates/main/Cargo.toml b/crates/main/Cargo.toml index 39b928f37..2ff0c730a 100644 --- a/crates/main/Cargo.toml +++ b/crates/main/Cargo.toml @@ -34,8 +34,8 @@ tokio = { version = "1.23", features = ["full"] } jemallocator = "0.5.0" [features] -default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "enterprise"] -#default = ["rocks", "enterprise"] +#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "enterprise"] +default = ["rocks", "enterprise"] sqlite = ["store/sqlite"] foundationdb = ["store/foundation", "common/foundation"] postgres = ["store/postgres"] diff --git a/crates/smtp/src/inbound/hooks/client.rs b/crates/smtp/src/inbound/hooks/client.rs index e915bd81a..2e3a9e17c 100644 --- a/crates/smtp/src/inbound/hooks/client.rs +++ b/crates/smtp/src/inbound/hooks/client.rs @@ -4,7 +4,8 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use common::{config::smtp::session::MTAHook, HttpLimitResponse}; +use common::config::smtp::session::MTAHook; +use utils::HttpLimitResponse; use super::{Request, Response}; diff --git a/crates/smtp/src/outbound/mta_sts/lookup.rs b/crates/smtp/src/outbound/mta_sts/lookup.rs index e6ee8f6bf..68cbe76cb 100644 --- a/crates/smtp/src/outbound/mta_sts/lookup.rs +++ b/crates/smtp/src/outbound/mta_sts/lookup.rs @@ -19,7 +19,7 @@ use mail_auth::{common::lru::DnsCache, mta_sts::MtaSts, report::tlsrpt::ResultTy use super::{parse::ParsePolicy, Error}; #[cfg(not(feature = "test_mode"))] -use common::HttpLimitResponse; +use utils::HttpLimitResponse; #[cfg(not(feature = "test_mode"))] const MAX_POLICY_SIZE: usize = 1024 * 1024; diff --git a/crates/smtp/src/scripts/event_loop.rs b/crates/smtp/src/scripts/event_loop.rs index 57e5f2919..e7c0a61b5 100644 --- a/crates/smtp/src/scripts/event_loop.rs +++ b/crates/smtp/src/scripts/event_loop.rs @@ -102,14 +102,11 @@ impl RunScript for Server { if let Some(store) = self.core.storage.lookups.get(&list) { for value in &values { if let Ok(true) = store - .key_exists( - if !matches!(match_as, MatchAs::Lowercase) { - value.clone() - } else { - value.to_lowercase() - } - .into_bytes(), - ) + .key_exists(if !matches!(match_as, MatchAs::Lowercase) { + value.clone() + } else { + value.to_lowercase() + }) .await { input = true.into(); diff --git a/crates/spam-filter/src/analysis/bayes.rs b/crates/spam-filter/src/analysis/bayes.rs index 6d77a2b6c..ed8837b66 100644 --- a/crates/spam-filter/src/analysis/bayes.rs +++ b/crates/spam-filter/src/analysis/bayes.rs @@ -44,16 +44,21 @@ impl SpamFilterAnalyzeBayes for Server { } async fn spam_filter_analyze_spam_trap(&self, ctx: &mut SpamFilterContext<'_>) -> bool { - if ctx - .output - .env_to_addr - .iter() - .any(|addr| self.core.spam.lists.spamtraps.contains(&addr.address)) - { - ctx.result.add_tag("SPAM_TRAP"); - true - } else { - false + if let Some(store) = self.get_in_memory_store("spam-traps") { + for addr in &ctx.output.env_to_addr { + match store.key_exists(addr.address.as_str()).await { + Ok(true) => { + ctx.result.add_tag("SPAM_TRAP"); + return true; + } + Ok(false) => (), + Err(err) => { + trc::error!(err.span_id(ctx.input.span_id).caused_by(trc::location!())); + } + } + } } + + false } } diff --git a/crates/spam-filter/src/analysis/dmarc.rs b/crates/spam-filter/src/analysis/dmarc.rs index eeddba793..0b54b3cb3 100644 --- a/crates/spam-filter/src/analysis/dmarc.rs +++ b/crates/spam-filter/src/analysis/dmarc.rs @@ -7,9 +7,7 @@ use std::future::Future; use common::Server; -use mail_auth::{ - common::verify::VerifySignature, dmarc::Policy, DkimResult, DmarcResult, SpfResult, -}; +use mail_auth::{dmarc::Policy, DkimResult, DmarcResult, SpfResult}; use crate::SpamFilterContext; @@ -78,77 +76,5 @@ impl SpamFilterAnalyzeDmarc for Server { }, ), })); - - for header in ctx.input.message.headers() { - let header_name = header.name(); - if header_name.eq_ignore_ascii_case("DKIM-Signature") { - ctx.result.add_tag("DKIM_SIGNED"); - } else if header_name.eq_ignore_ascii_case("ARC-Seal") { - ctx.result.add_tag("ARC_SIGNED"); - } - } - - if self - .core - .spam - .lists - .dmarc_allow - .contains(&ctx.output.from.email.domain_part.fqdn) - { - if matches!(ctx.input.dmarc_result, Some(DmarcResult::Pass)) { - ctx.result.add_tag("ALLOWLIST_DMARC"); - } else if ctx.input.dmarc_result.is_some() { - ctx.result.add_tag("BLOCKLIST_DMARC"); - } - } else if self - .core - .spam - .lists - .spf_dkim_allow - .contains(&ctx.output.from.email.domain_part.fqdn) - { - let spf = ctx - .input - .spf_mail_from_result - .map(|r| r.result()) - .unwrap_or(SpfResult::None); - let is_dkim_pass = matches!( - ctx.input.arc_result.map(|r| r.result()), - Some(DkimResult::Pass) - ) || ctx.input.dkim_result.iter().any(|r| { - matches!(r.result(), DkimResult::Pass) - && r.signature().map_or(false, |s| { - s.domain().to_lowercase() == ctx.output.from.email.domain_part.fqdn - }) - }); - let is_spf_pass = matches!(spf, SpfResult::Pass); - - if is_dkim_pass && is_spf_pass { - ctx.result.add_tag("ALLOWLIST_SPF_DKIM"); - } else if is_dkim_pass { - ctx.result.add_tag("ALLOWLIST_DKIM"); - if !matches!(spf, SpfResult::TempError) { - ctx.result.add_tag("BLOCKLIST_SPF"); - } - } else if is_spf_pass { - ctx.result.add_tag("ALLOWLIST_SPF"); - if !ctx - .input - .dkim_result - .iter() - .any(|r| matches!(r.result(), DkimResult::TempError(_))) - { - ctx.result.add_tag("BLOCKLIST_DKIM"); - } - } else if !matches!(spf, SpfResult::TempError) - && !ctx - .input - .dkim_result - .iter() - .any(|r| matches!(r.result(), DkimResult::TempError(_))) - { - ctx.result.add_tag("BLOCKLIST_SPF_DKIM"); - } - } } } diff --git a/crates/spam-filter/src/analysis/domain.rs b/crates/spam-filter/src/analysis/domain.rs index 45381fa36..1cb1b9207 100644 --- a/crates/spam-filter/src/analysis/domain.rs +++ b/crates/spam-filter/src/analysis/domain.rs @@ -69,7 +69,7 @@ impl SpamFilterAnalyzeDomain for Server { } // Add EHLO domain - if !ctx.output.ehlo_host.fqdn.is_empty() { + if ctx.output.ehlo_host.sld.is_some() { domains.insert(ElementLocation::new( ctx.output.ehlo_host.fqdn.clone(), Location::Ehlo, diff --git a/crates/spam-filter/src/analysis/from.rs b/crates/spam-filter/src/analysis/from.rs index 7321cfb5d..73e4a9ae7 100644 --- a/crates/spam-filter/src/analysis/from.rs +++ b/crates/spam-filter/src/analysis/from.rs @@ -70,25 +70,7 @@ impl SpamFilterAnalyzeFrom for Server { if from_count > 0 { // Validate address let from_addr_is_valid = from_addr.is_valid(); - if from_addr_is_valid { - if self - .core - .spam - .lists - .freemail_providers - .contains(from_addr.domain_part.sld.as_deref().unwrap_or_default()) - { - ctx.result.add_tag("FREEMAIL_FROM"); - } else if self - .core - .spam - .lists - .disposable_providers - .contains(from_addr.domain_part.sld.as_deref().unwrap_or_default()) - { - ctx.result.add_tag("DISPOSABLE_FROM"); - } - } else { + if !from_addr_is_valid { ctx.result.add_tag("FROM_INVALID"); } @@ -200,26 +182,6 @@ impl SpamFilterAnalyzeFrom for Server { if !env_from_empty { // Validate envelope address if ctx.output.env_from_addr.is_valid() { - if self.core.spam.lists.freemail_providers.contains( - ctx.output - .env_from_addr - .domain_part - .sld - .as_deref() - .unwrap_or_default(), - ) { - ctx.result.add_tag("FREEMAIL_ENVFROM"); - } else if self.core.spam.lists.disposable_providers.contains( - ctx.output - .env_from_addr - .domain_part - .sld - .as_deref() - .unwrap_or_default(), - ) { - ctx.result.add_tag("DISPOSABLE_ENVFROM"); - } - // Mail from no resolve to A or MX if matches!( ( diff --git a/crates/spam-filter/src/analysis/headers.rs b/crates/spam-filter/src/analysis/headers.rs index 7085da786..0fe6449fe 100644 --- a/crates/spam-filter/src/analysis/headers.rs +++ b/crates/spam-filter/src/analysis/headers.rs @@ -33,8 +33,10 @@ impl SpamFilterAnalyzeHeaders for Server { for ch in hdr_name.chars() { if ch.is_ascii_alphanumeric() { tag.push(ch.to_ascii_uppercase()); - } else { + } else if ch == '-' { tag.push('_'); + } else { + tag.push('X'); } } ctx.result.add_tag(tag); diff --git a/crates/spam-filter/src/analysis/messageid.rs b/crates/spam-filter/src/analysis/messageid.rs index 7a8ef78dd..4c9208db3 100644 --- a/crates/spam-filter/src/analysis/messageid.rs +++ b/crates/spam-filter/src/analysis/messageid.rs @@ -59,15 +59,18 @@ impl SpamFilterAnalyzeMid for Server { } // From address present in Message-ID checks - for sender in [&ctx.output.from.email, &ctx.output.env_from_addr] { + for (part, sender) in [ + ("FROM", &ctx.output.from.email), + ("ENVFROM", &ctx.output.env_from_addr), + ] { if !sender.address.is_empty() { if mid.contains(&sender.address) { - ctx.result.add_tag("MID_CONTAINS_FROM"); + ctx.result.add_tag(format!("MID_CONTAINS_{part}")); } else if mid_host.fqdn == sender.domain_part.fqdn { - ctx.result.add_tag("MID_RHS_MATCH_FROM"); + ctx.result.add_tag(format!("MID_RHS_MATCH_{part}")); } else if matches!((&mid_host.sld, &sender.domain_part.sld), (Some(mid_sld), Some(sender_sld)) if mid_sld == sender_sld) { - ctx.result.add_tag("MID_RHS_MATCH_FROMTLD"); + ctx.result.add_tag(format!("MID_RHS_MATCH_{part}TLD")); } } } diff --git a/crates/spam-filter/src/analysis/mod.rs b/crates/spam-filter/src/analysis/mod.rs index 6203faca9..087501589 100644 --- a/crates/spam-filter/src/analysis/mod.rs +++ b/crates/spam-filter/src/analysis/mod.rs @@ -88,6 +88,7 @@ impl SpamFilterResult { } } +#[derive(Debug)] pub struct ElementLocation { pub element: T, pub location: Location, @@ -117,8 +118,14 @@ impl ElementLocation { } pub(crate) async fn is_trusted_domain(server: &Server, domain: &str, span_id: u64) -> bool { - if server.core.spam.lists.trusted_domains.contains(domain) { - return true; + if let Some(store) = server.core.storage.lookups.get("trusted-domains") { + match store.key_exists(domain).await { + Ok(true) => return true, + Ok(false) => (), + Err(err) => { + trc::error!(err.span_id(span_id).caused_by(trc::location!())); + } + } } match server.core.storage.directory.is_local_domain(domain).await { @@ -129,3 +136,17 @@ pub(crate) async fn is_trusted_domain(server: &Server, domain: &str, span_id: u6 } } } + +pub(crate) async fn is_url_redirector(server: &Server, url: &str, span_id: u64) -> bool { + if let Some(store) = server.core.storage.lookups.get("url-redirectors") { + match store.key_exists(url).await { + Ok(result) => result, + Err(err) => { + trc::error!(err.span_id(span_id).caused_by(trc::location!())); + false + } + } + } else { + false + } +} diff --git a/crates/spam-filter/src/analysis/recipient.rs b/crates/spam-filter/src/analysis/recipient.rs index 5a2850731..2cb66b66e 100644 --- a/crates/spam-filter/src/analysis/recipient.rs +++ b/crates/spam-filter/src/analysis/recipient.rs @@ -175,33 +175,6 @@ impl SpamFilterAnalyzeRecipient for Server { ctx.result.add_tag("RCPT_LOCAL_IN_SUBJECT"); } } - - // Check for freemail or disposable domains - if let Some(domain) = rcpt.email.domain_part.sld.as_deref() { - if self.core.spam.lists.freemail_providers.contains(domain) { - if ctx - .output - .recipients_to - .iter() - .any(|r| r.email == rcpt.email) - { - ctx.result.add_tag("FREEMAIL_TO"); - } else { - ctx.result.add_tag("FREEMAIL_CC"); - } - } else if self.core.spam.lists.disposable_providers.contains(domain) { - if ctx - .output - .recipients_to - .iter() - .any(|r| r.email == rcpt.email) - { - ctx.result.add_tag("DISPOSABLE_TO"); - } else { - ctx.result.add_tag("DISPOSABLE_CC"); - } - } - } } if to_dn_count == 0 && to_dn_eq_addr_count == 0 { diff --git a/crates/spam-filter/src/analysis/replyto.rs b/crates/spam-filter/src/analysis/replyto.rs index 045e9cd56..974ff5fce 100644 --- a/crates/spam-filter/src/analysis/replyto.rs +++ b/crates/spam-filter/src/analysis/replyto.rs @@ -102,48 +102,6 @@ impl SpamFilterAnalyzeReplyTo for Server { ctx.result.add_tag("REPLYTO_ADDR_EQ_FROM"); } - let reply_to_sld = reply_to - .email - .domain_part - .sld - .as_deref() - .unwrap_or_default(); - if self - .core - .spam - .lists - .freemail_providers - .contains(reply_to_sld) - { - ctx.result.add_tag("FREEMAIL_REPLYTO"); - let from_domain_sld = ctx - .output - .from - .email - .domain_part - .sld - .as_deref() - .unwrap_or_default(); - if reply_to_sld != from_domain_sld - && self - .core - .spam - .lists - .freemail_providers - .contains(from_domain_sld) - { - ctx.result.add_tag("FREEMAIL_REPLYTO_NEQ_FROM_DOM"); - } - } else if self - .core - .spam - .lists - .disposable_providers - .contains(reply_to_sld) - { - ctx.result.add_tag("DISPOSABLE_REPLYTO"); - } - // Validate unnecessary encoding let reply_to_raw_utf8 = std::str::from_utf8(reply_to_raw).unwrap_or_default(); if reply_to.email.address.is_ascii() diff --git a/crates/spam-filter/src/analysis/url.rs b/crates/spam-filter/src/analysis/url.rs index ede510ce8..7f26f1c90 100644 --- a/crates/spam-filter/src/analysis/url.rs +++ b/crates/spam-filter/src/analysis/url.rs @@ -19,13 +19,12 @@ use unicode_security::MixedScript; use crate::modules::dnsbl::is_dnsbl; use crate::modules::expression::{IpResolver, SpamFilterResolver, StringResolver}; use crate::modules::html::SRC; -use crate::modules::remote_list::is_in_remote_list; use crate::{ modules::html::{HtmlToken, A, HREF}, Hostname, SpamFilterContext, TextPart, }; -use super::{is_trusted_domain, ElementLocation}; +use super::{is_trusted_domain, is_url_redirector, ElementLocation}; pub trait SpamFilterAnalyzeUrl: Sync + Send { fn spam_filter_analyze_url( @@ -197,7 +196,7 @@ impl SpamFilterAnalyzeUrl for Server { } ctx.result.rbl_ip_checks += 1; } - } else if self.core.spam.lists.url_redirectors.contains(host_sld) { + } else if is_url_redirector(self, host_sld, ctx.input.span_id).await { // Check for redirectors ctx.result.add_tag("REDIRECTOR_URL"); @@ -217,12 +216,12 @@ impl SpamFilterAnalyzeUrl for Server { if let Ok(location_parsed) = location.parse::() { let host = Hostname::new(location_parsed.host().unwrap_or_default()); - if self - .core - .spam - .lists - .url_redirectors - .contains(host.sld_or_default()) + if is_url_redirector( + self, + host.sld_or_default(), + ctx.input.span_id, + ) + .await { url_redirect = Cow::Owned(location); redirect_count += 1; @@ -264,7 +263,6 @@ impl SpamFilterAnalyzeUrl for Server { .map(|url_parsed| (el, url_parsed)) }) { let host = &url_parsed.host; - let url = &el.element.url; if host.ip.is_none() { if !host.fqdn.is_ascii() { @@ -309,15 +307,6 @@ impl SpamFilterAnalyzeUrl for Server { ctx.result.add_tag("R_SUSPICIOUS_URL"); } - // Check remote lists - for remote in &self.core.spam.lists.remote { - if matches!(remote.scope, Element::Url) - && is_in_remote_list(self, remote, url.as_ref(), ctx.input.span_id).await - { - ctx.result.add_tag(&remote.tag); - } - } - // Check URL DNSBL if ctx.result.rbl_url_checks < self.core.spam.dnsbl.max_url_checks { for dnsbl in &self.core.spam.dnsbl.servers { diff --git a/crates/spam-filter/src/modules/mod.rs b/crates/spam-filter/src/modules/mod.rs index e7e7d6a74..00bf61b3b 100644 --- a/crates/spam-filter/src/modules/mod.rs +++ b/crates/spam-filter/src/modules/mod.rs @@ -5,28 +5,26 @@ */ use common::Server; -use store::{dispatch::lookup::KeyValue, Deserialize, Value}; +use store::{ + dispatch::lookup::{KeyValue, LookupKey}, + Deserialize, Value, +}; pub mod bayes; pub mod dnsbl; pub mod expression; pub mod html; pub mod pyzor; -pub mod remote_list; pub mod sanitize; pub(crate) async fn key_get> + std::fmt::Debug + 'static>( server: &Server, span_id: u64, - key: impl Into>, + key: impl Into>, ) -> Result, ()> { - server - .in_memory_store() - .key_get(key.into()) - .await - .map_err(|err| { - trc::error!(err.span_id(span_id).caused_by(trc::location!())); - }) + server.in_memory_store().key_get(key).await.map_err(|err| { + trc::error!(err.span_id(span_id).caused_by(trc::location!())); + }) } pub(crate) async fn key_set(server: &Server, span_id: u64, kv: KeyValue>) { diff --git a/crates/spam-filter/src/modules/remote_list.rs b/crates/spam-filter/src/modules/remote_list.rs deleted file mode 100644 index 05c5b9d6a..000000000 --- a/crates/spam-filter/src/modules/remote_list.rs +++ /dev/null @@ -1,217 +0,0 @@ -/* - * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd - * - * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL - */ - -use std::{ - collections::HashSet, - io::{BufRead, BufReader}, - time::Instant, -}; - -use common::{ - config::{ - scripts::RemoteList, - spamfilter::{RemoteListConfig, RemoteListFormat}, - }, - HttpLimitResponse, Server, USER_AGENT, -}; -use mail_auth::flate2; - -#[allow(unused_variables)] -#[allow(unreachable_code)] -pub async fn is_in_remote_list( - server: &Server, - config: &RemoteListConfig, - item: &str, - span_id: u64, -) -> bool { - #[cfg(feature = "test_mode")] - { - return (config.url.contains("open") && item.contains("open")) - || (config.url.contains("tank") && item.contains("tank")); - } - - match is_in_remote_list_(server, config, item, span_id).await { - Ok(result) => result, - Err(err) => { - let mut _lock = server.inner.data.remote_lists.write(); - let list = _lock - .entry(config.id.clone()) - .or_insert_with(|| RemoteList { - entries: HashSet::new(), - expires: Instant::now(), - }); - - if list.expires > Instant::now() { - list.entries.contains(item) - } else { - list.expires = Instant::now() + config.retry; - trc::error!(err.span_id(span_id)); - false - } - } - } -} - -async fn is_in_remote_list_( - server: &Server, - config: &RemoteListConfig, - item: &str, - span_id: u64, -) -> trc::Result { - #[cfg(feature = "test_mode")] - { - if (config.url.contains("open") && item.contains("open")) - || (config.url.contains("tank") && item.contains("tank")) - { - return Ok(true); - } - } - - match server.inner.data.remote_lists.read().get(&config.id) { - Some(remote_list) if remote_list.expires < Instant::now() => { - return Ok(remote_list.entries.contains(item)) - } - _ => {} - } - - let time = Instant::now(); - let response = reqwest::Client::builder() - .timeout(config.timeout) - .user_agent(USER_AGENT) - .build() - .unwrap_or_default() - .get(&config.url) - .send() - .await - .map_err(|err| { - trc::SpamEvent::RemoteListError - .into_err() - .reason(err) - .ctx(trc::Key::Url, config.url.to_string()) - .details("Failed to build request") - })?; - - if response.status().is_success() { - let bytes = response - .bytes_with_limit(config.max_size) - .await - .map_err(|err| { - trc::SpamEvent::RemoteListError - .into_err() - .reason(err) - .ctx(trc::Key::Url, config.url.to_string()) - .ctx(trc::Key::Elapsed, time.elapsed()) - .details("Failed to fetch resource") - })? - .ok_or_else(|| { - trc::SpamEvent::RemoteListError - .into_err() - .ctx(trc::Key::Url, config.url.to_string()) - .ctx(trc::Key::Elapsed, time.elapsed()) - .details("Resource is too large") - })?; - - let reader: Box = if config.url.ends_with(".gz") { - Box::new(flate2::read::GzDecoder::new(&bytes[..])) - } else { - Box::new(&bytes[..]) - }; - - // Lock remote list for writing - let mut _lock = server.inner.data.remote_lists.write(); - let list = _lock - .entry(config.id.to_string()) - .or_insert_with(|| RemoteList { - entries: HashSet::new(), - expires: Instant::now(), - }); - - // Make sure that the list is still expired - if list.expires > Instant::now() { - return Ok(list.entries.contains(item)); - } - - for (pos, line) in BufReader::new(reader).lines().enumerate() { - let line_ = line.map_err(|err| { - trc::SpamEvent::RemoteListError - .into_err() - .reason(err) - .ctx(trc::Key::Url, config.url.to_string()) - .ctx(trc::Key::Elapsed, time.elapsed()) - .details("Failed to read line") - })?; - // Clear list once the first entry has been successfully fetched, decompressed and UTF8-decoded - if pos == 0 { - list.entries.clear(); - } - - match &config.format { - RemoteListFormat::List => { - let line = line_.trim(); - if !line.is_empty() { - list.entries.insert(line.to_string()); - } - } - RemoteListFormat::Csv { - column, - separator, - skip_first, - } if pos > 0 || !*skip_first => { - let mut in_quote = false; - let mut col_num = 0; - let mut entry = String::new(); - - for ch in line_.chars() { - if ch != '"' { - if ch == *separator && !in_quote { - if col_num == *column { - break; - } else { - col_num += 1; - } - } else if col_num == *column { - entry.push(ch); - if entry.len() > config.max_entry_size { - break; - } - } - } else { - in_quote = !in_quote; - } - } - - if !entry.is_empty() { - list.entries.insert(entry); - } - } - _ => (), - } - - if list.entries.len() == config.max_entries { - break; - } - } - - trc::event!( - Spam(trc::SpamEvent::RemoteList), - Url = config.url.to_string(), - Total = list.entries.len(), - Elapsed = time.elapsed(), - SpanId = span_id - ); - - // Update expiration - list.expires = Instant::now() + config.refresh; - Ok(list.entries.contains(item)) - } else { - trc::bail!(trc::SpamEvent::RemoteListError - .into_err() - .ctx(trc::Key::Code, response.status().as_u16()) - .ctx(trc::Key::Url, config.url.to_string()) - .ctx(trc::Key::Elapsed, time.elapsed()) - .details("Failed to fetch remote list")); - } -} diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index 91b87a0f7..2444af732 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -15,7 +15,7 @@ rust-s3 = { version = "=0.35.0-alpha.2", default-features = false, features = [" azure_core = { version = "0.21.0", optional = true } azure_storage = { version = "0.21.0", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"], optional = true } azure_storage_blobs = { version = "0.21.0", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"], optional = true } -reqwest = { version = "0.12.0", default-features = false, optional = true } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "http2", "stream"]} tokio = { version = "1.23", features = ["sync", "fs", "io-util"] } r2d2 = { version = "0.8.10", optional = true } futures = { version = "0.3", optional = true } @@ -60,7 +60,7 @@ postgres = ["tokio-postgres", "deadpool-postgres", "tokio-rustls", "rustls", "ri elastic = ["elasticsearch", "serde_json"] mysql = ["mysql_async", "futures"] s3 = ["rust-s3"] -azure = ["azure_core", "azure_storage", "azure_storage_blobs", "reqwest"] +azure = ["azure_core", "azure_storage", "azure_storage_blobs"] foundation = ["foundationdb", "futures"] fdb-chunked-bm = [] redis = ["dep:redis", "deadpool"] diff --git a/crates/store/src/backend/composite/mod.rs b/crates/store/src/backend/composite/mod.rs index eee8cb745..382432c78 100644 --- a/crates/store/src/backend/composite/mod.rs +++ b/crates/store/src/backend/composite/mod.rs @@ -8,6 +8,6 @@ * */ -pub mod distributed_blob; #[cfg(any(feature = "postgres", feature = "mysql"))] pub mod read_replica; +pub mod sharded_blob; diff --git a/crates/store/src/backend/composite/distributed_blob.rs b/crates/store/src/backend/composite/sharded_blob.rs similarity index 96% rename from crates/store/src/backend/composite/distributed_blob.rs rename to crates/store/src/backend/composite/sharded_blob.rs index 4001a8324..9b4a11ffc 100644 --- a/crates/store/src/backend/composite/distributed_blob.rs +++ b/crates/store/src/backend/composite/sharded_blob.rs @@ -14,11 +14,11 @@ use utils::config::{utils::AsKey, Config}; use crate::{BlobBackend, Store, Stores}; -pub struct DistributedBlob { +pub struct ShardedBlob { pub stores: Vec, } -impl DistributedBlob { +impl ShardedBlob { pub fn open(config: &mut Config, prefix: impl AsKey, stores: &Stores) -> Option { let prefix = prefix.as_key(); let store_ids = config @@ -83,7 +83,7 @@ impl DistributedBlob { BlobBackend::S3(store) => store.get_blob(key, read_range).await, #[cfg(feature = "azure")] BlobBackend::Azure(store) => store.get_blob(key, read_range).await, - BlobBackend::Composite(_) => unimplemented!(), + BlobBackend::Sharded(_) => unimplemented!(), } }) .await @@ -115,7 +115,7 @@ impl DistributedBlob { BlobBackend::S3(store) => store.put_blob(key, data).await, #[cfg(feature = "azure")] BlobBackend::Azure(store) => store.put_blob(key, data).await, - BlobBackend::Composite(_) => unimplemented!(), + BlobBackend::Sharded(_) => unimplemented!(), } }) .await @@ -147,7 +147,7 @@ impl DistributedBlob { BlobBackend::S3(store) => store.delete_blob(key).await, #[cfg(feature = "azure")] BlobBackend::Azure(store) => store.delete_blob(key).await, - BlobBackend::Composite(_) => unimplemented!(), + BlobBackend::Sharded(_) => unimplemented!(), } }) .await diff --git a/crates/store/src/backend/http/config.rs b/crates/store/src/backend/http/config.rs new file mode 100644 index 000000000..7931748e6 --- /dev/null +++ b/crates/store/src/backend/http/config.rs @@ -0,0 +1,116 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::{ + collections::hash_map::Entry, + sync::atomic::{AtomicBool, AtomicU64}, + time::Duration, +}; + +use ahash::AHashMap; +use arc_swap::ArcSwap; +use utils::config::Config; + +use crate::{InMemoryStore, Stores}; + +use super::{HttpStore, HttpStoreConfig, HttpStoreFormat}; + +impl Stores { + pub fn parse_http_stores(&mut self, config: &mut Config) { + // Parse remote lists + for id in config + .sub_keys("http-lookup", ".url") + .map(|k| k.to_string()) + .collect::>() + { + let id_ = id.as_str(); + if !config + .property_or_default(("http-lookup", id_, "enable"), "true") + .unwrap_or(true) + { + continue; + } + + let format = match config + .value_require(("http-lookup", id_, "format")) + .unwrap_or_default() + { + "list" => HttpStoreFormat::List, + "csv" => HttpStoreFormat::Csv { + index_key: config + .property_require(("http-lookup", id_, "index.key")) + .unwrap_or(0), + index_value: config.property(("http-lookup", id_, "index.value")), + separator: config + .property_or_default::(("http-lookup", id_, "separator"), ",") + .unwrap_or_default() + .chars() + .next() + .unwrap_or(','), + skip_first: config + .property_or_default::(("http-lookup", id_, "skip-first"), "false") + .unwrap_or(false), + }, + other => { + let message = format!("Invalid format: {other:?}"); + config.new_build_error(("http-lookup", id_, "format"), message); + continue; + } + }; + + let http_config = HttpStoreConfig { + url: config + .value_require(("http-lookup", id_, "url")) + .unwrap_or_default() + .to_string(), + retry: config + .property_or_default::(("http-lookup", id_, "retry"), "1h") + .unwrap_or(Duration::from_secs(3600)) + .as_secs(), + refresh: config + .property_or_default::(("http-lookup", id_, "refresh"), "12h") + .unwrap_or(Duration::from_secs(43200)) + .as_secs(), + timeout: config + .property_or_default::(("http-lookup", id_, "timeout"), "30s") + .unwrap_or(Duration::from_secs(30)), + gzipped: config + .property_or_default::(("http-lookup", id_, "gzipped"), "false") + .unwrap_or_default(), + max_size: config + .property_or_default::(("http-lookup", id_, "limits.size"), "104857600") + .unwrap_or(104857600), + max_entries: config + .property_or_default::(("http-lookup", id_, "limits.entries"), "100000") + .unwrap_or(100000), + max_entry_size: config + .property_or_default::(("http-lookup", id_, "limits.entry-size"), "512") + .unwrap_or(512), + format, + id, + }; + + match self.in_memory_stores.entry(http_config.id.clone()) { + Entry::Vacant(entry) => { + let store = HttpStore { + entries: ArcSwap::from_pointee(AHashMap::new()), + expires: AtomicU64::new(0), + in_flight: AtomicBool::new(false), + config: http_config, + }; + + entry.insert(InMemoryStore::Http(store.into())); + } + Entry::Occupied(e) => { + config.new_build_error( + ("http-lookup", e.key().as_str()), + "An im-memory store with this id already exists", + ); + } + } + } + } +} diff --git a/crates/store/src/backend/http/lookup.rs b/crates/store/src/backend/http/lookup.rs new file mode 100644 index 000000000..c928764b4 --- /dev/null +++ b/crates/store/src/backend/http/lookup.rs @@ -0,0 +1,220 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::{ + io::{BufRead, BufReader}, + sync::{atomic::Ordering, Arc}, + time::Instant, +}; + +use ahash::AHashMap; +use rand::seq::SliceRandom; +use utils::HttpLimitResponse; + +use crate::{backend::http::HttpStoreFormat, write::now, Value}; + +use super::HttpStore; + +const BROWSER_USER_AGENTS: [&str; 5] = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", +]; + +pub(crate) trait HttpStoreGet { + fn get(&self, key: &str) -> Option>; + fn contains(&self, key: &str) -> bool; + fn refresh(&self); +} + +impl HttpStoreGet for Arc { + fn get(&self, key: &str) -> Option> { + self.refresh(); + self.entries.load().get(key).cloned() + } + + fn contains(&self, key: &str) -> bool { + #[cfg(feature = "test_mode")] + { + if key.contains("open") || key.contains("tank") { + return (self.config.url.contains("open") && key.contains("open")) + || (self.config.url.contains("tank") && key.contains("tank")); + } + } + + self.refresh(); + self.entries.load().contains_key(key) + } + + fn refresh(&self) { + if self.expires.load(Ordering::Relaxed) <= now() { + let in_flight = self.in_flight.swap(true, Ordering::Relaxed); + if !in_flight { + let this = self.clone(); + tokio::spawn(async move { + let expires = match this.try_refresh().await { + Ok(list) => { + this.entries.store(list.into()); + this.config.refresh + } + Err(err) => { + trc::error!(err); + this.config.retry + } + }; + + this.expires.store(now() + expires, Ordering::Relaxed); + this.in_flight.store(false, Ordering::Relaxed); + }); + } + } + } +} + +impl HttpStore { + async fn try_refresh(&self) -> trc::Result>> { + let time = Instant::now(); + let agent = BROWSER_USER_AGENTS.choose(&mut rand::thread_rng()).unwrap(); + let response = reqwest::Client::builder() + .timeout(self.config.timeout) + .user_agent(*agent) + .build() + .unwrap_or_default() + .get(&self.config.url) + .send() + .await + .map_err(|err| { + trc::StoreEvent::HttpStoreError + .into_err() + .reason(err) + .ctx(trc::Key::Url, self.config.url.to_string()) + .details("Failed to build request") + })?; + + if !response.status().is_success() { + trc::bail!(trc::StoreEvent::HttpStoreError + .into_err() + .ctx(trc::Key::Code, response.status().as_u16()) + .ctx(trc::Key::Url, self.config.url.to_string()) + .ctx(trc::Key::Elapsed, time.elapsed()) + .details("Failed to fetch HTTP list")); + } + + let bytes = response + .bytes_with_limit(self.config.max_size) + .await + .map_err(|err| { + trc::StoreEvent::HttpStoreError + .into_err() + .reason(err) + .ctx(trc::Key::Url, self.config.url.to_string()) + .ctx(trc::Key::Elapsed, time.elapsed()) + .details("Failed to fetch resource") + })? + .ok_or_else(|| { + trc::StoreEvent::HttpStoreError + .into_err() + .ctx(trc::Key::Url, self.config.url.to_string()) + .ctx(trc::Key::Elapsed, time.elapsed()) + .details("Resource is too large") + })?; + + let reader: Box = if self.config.gzipped { + Box::new(flate2::read::GzDecoder::new(&bytes[..])) + } else { + Box::new(&bytes[..]) + }; + + let mut entries = AHashMap::new(); + for (pos, line) in BufReader::new(reader).lines().enumerate() { + let line_ = line.map_err(|err| { + trc::StoreEvent::HttpStoreError + .into_err() + .reason(err) + .ctx(trc::Key::Url, self.config.url.to_string()) + .ctx(trc::Key::Elapsed, time.elapsed()) + .details("Failed to read line") + })?; + + match &self.config.format { + HttpStoreFormat::List => { + let line = line_.trim(); + if !line.is_empty() { + entries.insert(line.to_string(), Value::Integer(1)); + } + } + HttpStoreFormat::Csv { + index_key, + index_value, + separator, + skip_first, + } if pos > 0 || !*skip_first => { + let mut in_quote = false; + let mut col_num = 0; + let mut last_ch = ' '; + + let mut entry_key: String = String::new(); + let mut entry_value: String = String::new(); + + for ch in line_.chars() { + match ch { + '"' if last_ch != '\\' => { + in_quote = !in_quote; + } + '\\' if last_ch != '\\' => (), + _ => { + if ch == *separator && !in_quote { + if col_num == *index_key && index_value.is_none() { + break; + } else { + col_num += 1; + } + } else if col_num == *index_key { + entry_key.push(ch); + if entry_key.len() > self.config.max_entry_size { + break; + } + } else if index_value.map_or(false, |v| col_num == v) { + entry_value.push(ch); + if entry_value.len() > self.config.max_entry_size { + break; + } + } + } + } + + last_ch = ch; + } + + if !entry_key.is_empty() { + let entry_value = if !entry_value.is_empty() { + Value::Text(entry_value.into()) + } else { + Value::Integer(1) + }; + entries.insert(entry_key, entry_value); + } + } + _ => (), + } + + if entries.len() == self.config.max_entries { + break; + } + } + + trc::event!( + Store(trc::StoreEvent::HttpStoreFetch), + Url = self.config.url.to_string(), + Total = entries.len(), + Elapsed = time.elapsed(), + ); + + Ok(entries) + } +} diff --git a/crates/store/src/backend/http/mod.rs b/crates/store/src/backend/http/mod.rs new file mode 100644 index 000000000..52f989cfa --- /dev/null +++ b/crates/store/src/backend/http/mod.rs @@ -0,0 +1,51 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +pub mod config; +pub mod lookup; + +use std::{ + sync::atomic::{AtomicBool, AtomicU64}, + time::Duration, +}; + +use ahash::AHashMap; +use arc_swap::ArcSwap; + +use crate::Value; + +#[derive(Debug, Clone)] +pub struct HttpStoreConfig { + pub id: String, + pub url: String, + pub retry: u64, + pub refresh: u64, + pub timeout: Duration, + pub gzipped: bool, + pub max_size: usize, + pub max_entries: usize, + pub max_entry_size: usize, + pub format: HttpStoreFormat, +} + +#[derive(Debug, Clone)] +pub enum HttpStoreFormat { + List, + Csv { + index_key: u32, + index_value: Option, + separator: char, + skip_first: bool, + }, +} + +#[derive(Debug)] +pub struct HttpStore { + pub entries: ArcSwap>>, + pub expires: AtomicU64, + pub in_flight: AtomicBool, + pub config: HttpStoreConfig, +} diff --git a/crates/store/src/backend/memory/mod.rs b/crates/store/src/backend/memory/mod.rs index d0453bd84..14edc8750 100644 --- a/crates/store/src/backend/memory/mod.rs +++ b/crates/store/src/backend/memory/mod.rs @@ -4,6 +4,8 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +use std::collections::hash_map::Entry; + use ahash::AHashMap; use utils::{config::Config, glob::GlobMap}; @@ -80,8 +82,17 @@ impl Stores { } for (id, store) in lookups { - self.in_memory_stores - .insert(id, InMemoryStore::Static(store.into())); + match self.in_memory_stores.entry(id) { + Entry::Vacant(entry) => { + entry.insert(InMemoryStore::Static(store.into())); + } + Entry::Occupied(e) => { + config.new_build_error( + ("lookup", e.key().as_str()), + "An im-memory store with this id already exists", + ); + } + } } } } diff --git a/crates/store/src/backend/mod.rs b/crates/store/src/backend/mod.rs index 843b18072..909021dcf 100644 --- a/crates/store/src/backend/mod.rs +++ b/crates/store/src/backend/mod.rs @@ -4,6 +4,8 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +#[cfg(feature = "azure")] +pub mod azure; #[cfg(feature = "enterprise")] pub mod composite; #[cfg(feature = "elastic")] @@ -11,6 +13,7 @@ pub mod elastic; #[cfg(feature = "foundation")] pub mod foundationdb; pub mod fs; +pub mod http; pub mod memory; #[cfg(feature = "mysql")] pub mod mysql; @@ -24,8 +27,6 @@ pub mod rocksdb; pub mod s3; #[cfg(feature = "sqlite")] pub mod sqlite; -#[cfg(feature = "azure")] -pub mod azure; pub const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 1) as usize; pub const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1; diff --git a/crates/store/src/config.rs b/crates/store/src/config.rs index 4f2062486..b273467d3 100644 --- a/crates/store/src/config.rs +++ b/crates/store/src/config.rs @@ -42,7 +42,7 @@ use crate::backend::azure::AzureStore; impl Stores { pub async fn parse_all(config: &mut Config) -> Self { let mut stores = Self::parse(config).await; - stores.parse_lookups(config).await; + stores.parse_in_memory(config).await; stores } @@ -263,14 +263,12 @@ impl Stores { self.in_memory_stores.insert(id.to_string(), db.into()); } } - "distributed-blob" => { - if let Some(db) = - crate::backend::composite::distributed_blob::DistributedBlob::open( - config, prefix, self, - ) - { + "sharded-blob" | "distributed-blob" => { + if let Some(db) = crate::backend::composite::sharded_blob::ShardedBlob::open( + config, prefix, self, + ) { let store = BlobStore { - backend: crate::BlobBackend::Composite(db.into()), + backend: crate::BlobBackend::Sharded(db.into()), compression, }; self.blob_stores.insert(id, store); @@ -281,10 +279,13 @@ impl Stores { } } - pub async fn parse_lookups(&mut self, config: &mut Config) { + pub async fn parse_in_memory(&mut self, config: &mut Config) { // Parse memory stores self.parse_static_stores(config); + // Parse http stores + self.parse_http_stores(config); + // Parse purge schedules if let Some(store) = config .value("storage.data") diff --git a/crates/store/src/dispatch/blob.rs b/crates/store/src/dispatch/blob.rs index 965f8b1cb..02a9bc247 100644 --- a/crates/store/src/dispatch/blob.rs +++ b/crates/store/src/dispatch/blob.rs @@ -40,7 +40,7 @@ impl BlobStore { #[cfg(feature = "azure")] BlobBackend::Azure(store) => store.get_blob(key, read_range).await, #[cfg(feature = "enterprise")] - BlobBackend::Composite(store) => store.get_blob(key, read_range).await, + BlobBackend::Sharded(store) => store.get_blob(key, read_range).await, }; trc::event!( @@ -122,7 +122,7 @@ impl BlobStore { #[cfg(feature = "azure")] BlobBackend::Azure(store) => store.put_blob(key, data.as_ref()).await, #[cfg(feature = "enterprise")] - BlobBackend::Composite(store) => store.put_blob(key, data.as_ref()).await, + BlobBackend::Sharded(store) => store.put_blob(key, data.as_ref()).await, } .caused_by(trc::location!()); @@ -160,7 +160,7 @@ impl BlobStore { #[cfg(feature = "azure")] BlobBackend::Azure(store) => store.delete_blob(key).await, #[cfg(feature = "enterprise")] - BlobBackend::Composite(store) => store.delete_blob(key).await, + BlobBackend::Sharded(store) => store.delete_blob(key).await, } .caused_by(trc::location!()); diff --git a/crates/store/src/dispatch/lookup.rs b/crates/store/src/dispatch/lookup.rs index 1a0e46e5d..a39b9d72d 100644 --- a/crates/store/src/dispatch/lookup.rs +++ b/crates/store/src/dispatch/lookup.rs @@ -4,10 +4,12 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +use std::borrow::Cow; + use trc::AddContext; use utils::config::Rate; -use crate::write::LookupClass; +use crate::{backend::http::lookup::HttpStoreGet, write::LookupClass}; #[allow(unused_imports)] use crate::{ write::{ @@ -42,7 +44,9 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.key_set(kv.key, kv.value, kv.expires).await, - InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + InMemoryStore::Static(_) | InMemoryStore::Http(_) => { + Err(trc::StoreEvent::NotSupported.into_err()) + } } .caused_by(trc::location!()) } @@ -77,7 +81,9 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.key_incr(kv.key, kv.value, kv.expires).await, - InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + InMemoryStore::Static(_) | InMemoryStore::Http(_) => { + Err(trc::StoreEvent::NotSupported.into_err()) + } } .caused_by(trc::location!()) } @@ -94,7 +100,9 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.key_delete(key).await, - InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + InMemoryStore::Static(_) | InMemoryStore::Http(_) => { + Err(trc::StoreEvent::NotSupported.into_err()) + } } .caused_by(trc::location!()) } @@ -111,27 +119,32 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.key_delete(key).await, - InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + InMemoryStore::Static(_) | InMemoryStore::Http(_) => { + Err(trc::StoreEvent::NotSupported.into_err()) + } } .caused_by(trc::location!()) } pub async fn key_get> + std::fmt::Debug + 'static>( &self, - key: Vec, + key: impl Into>, ) -> trc::Result> { match self { InMemoryStore::Store(store) => store .get_value::>(ValueKey::from(ValueClass::Lookup(LookupClass::Key( - key, + key.into().into_bytes(), )))) .await .map(|value| value.and_then(|v| v.into())), #[cfg(feature = "redis")] - InMemoryStore::Redis(store) => store.key_get(key).await, + InMemoryStore::Redis(store) => store.key_get(key.into().into_bytes()).await, InMemoryStore::Static(store) => Ok(store - .get(std::str::from_utf8(&key).unwrap_or_default()) + .get(key.into().as_str()) .map(|value| T::from(value.clone()))), + InMemoryStore::Http(store) => { + Ok(store.get(key.into().as_str()).map(|value| T::from(value))) + } } .caused_by(trc::location!()) } @@ -147,24 +160,25 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.counter_get(key).await, - InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + InMemoryStore::Static(_) | InMemoryStore::Http(_) => { + Err(trc::StoreEvent::NotSupported.into_err()) + } } .caused_by(trc::location!()) } - pub async fn key_exists(&self, key: Vec) -> trc::Result { + pub async fn key_exists(&self, key: impl Into>) -> trc::Result { match self { InMemoryStore::Store(store) => store .get_value::>(ValueKey::from(ValueClass::Lookup(LookupClass::Key( - key, + key.into().into_bytes(), )))) .await .map(|value| matches!(value, Some(LookupValue::Value(())))), #[cfg(feature = "redis")] - InMemoryStore::Redis(store) => store.key_exists(key).await, - InMemoryStore::Static(store) => Ok(store - .get(std::str::from_utf8(&key).unwrap_or_default()) - .is_some()), + InMemoryStore::Redis(store) => store.key_exists(key.into().into_bytes()).await, + InMemoryStore::Static(store) => Ok(store.get(key.into().as_str()).is_some()), + InMemoryStore::Http(store) => Ok(store.contains(key.into().as_str())), } .caused_by(trc::location!()) } @@ -294,7 +308,7 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(_) => {} - InMemoryStore::Static(_) => {} + InMemoryStore::Static(_) | InMemoryStore::Http(_) => {} } Ok(()) @@ -308,6 +322,72 @@ impl InMemoryStore { } } +pub enum LookupKey<'x> { + String(String), + StringRef(&'x str), + Bytes(Vec), + BytesRef(&'x [u8]), +} + +impl<'x> From<&'x str> for LookupKey<'x> { + fn from(key: &'x str) -> Self { + LookupKey::StringRef(key) + } +} + +impl<'x> From<&'x String> for LookupKey<'x> { + fn from(key: &'x String) -> Self { + LookupKey::StringRef(key.as_str()) + } +} + +impl<'x> From<&'x [u8]> for LookupKey<'x> { + fn from(key: &'x [u8]) -> Self { + LookupKey::BytesRef(key) + } +} + +impl<'x> From> for LookupKey<'x> { + fn from(key: Cow<'x, str>) -> Self { + match key { + Cow::Borrowed(key) => LookupKey::StringRef(key), + Cow::Owned(key) => LookupKey::String(key), + } + } +} + +impl From for LookupKey<'static> { + fn from(key: String) -> Self { + LookupKey::String(key) + } +} + +impl From> for LookupKey<'static> { + fn from(key: Vec) -> Self { + LookupKey::Bytes(key) + } +} + +impl LookupKey<'_> { + pub fn as_str(&self) -> &str { + match self { + LookupKey::String(string) => string, + LookupKey::StringRef(string) => string, + LookupKey::Bytes(bytes) => std::str::from_utf8(bytes).unwrap_or_default(), + LookupKey::BytesRef(bytes) => std::str::from_utf8(bytes).unwrap_or_default(), + } + } + + pub fn into_bytes(self) -> Vec { + match self { + LookupKey::String(string) => string.into_bytes(), + LookupKey::StringRef(string) => string.as_bytes().to_vec(), + LookupKey::Bytes(bytes) => bytes, + LookupKey::BytesRef(bytes) => bytes.to_vec(), + } + } +} + impl KeyValue { pub fn build_key(prefix: u8, key: impl AsRef<[u8]>) -> Vec { let key_ = key.as_ref(); diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index 43df64820..01d598c6b 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -15,7 +15,7 @@ pub mod write; pub use ahash; use ahash::AHashMap; -use backend::{fs::FsStore, memory::StaticMemoryStore}; +use backend::{fs::FsStore, http::HttpStore, memory::StaticMemoryStore}; pub use blake3; pub use parking_lot; pub use rand; @@ -214,7 +214,7 @@ pub enum BlobBackend { #[cfg(feature = "azure")] Azure(Arc), #[cfg(feature = "enterprise")] - Composite(Arc), + Sharded(Arc), } #[derive(Clone)] @@ -229,6 +229,7 @@ pub enum InMemoryStore { Store(Store), #[cfg(feature = "redis")] Redis(Arc), + Http(Arc), Static(Arc), } @@ -750,7 +751,7 @@ impl Stores { self.stores .retain(|_, store| !matches!(store, Store::SQLReadReplica(_))); self.blob_stores - .retain(|_, store| !matches!(store.backend, BlobBackend::Composite(_))); + .retain(|_, store| !matches!(store.backend, BlobBackend::Sharded(_))); } } } diff --git a/crates/trc/src/event/description.rs b/crates/trc/src/event/description.rs index 0d06806ad..0e5777b67 100644 --- a/crates/trc/src/event/description.rs +++ b/crates/trc/src/event/description.rs @@ -1009,8 +1009,6 @@ impl SpamEvent { match self { SpamEvent::Pyzor => "Pyzor success", SpamEvent::PyzorError => "Pyzor error", - SpamEvent::RemoteList => "Remote list updated", - SpamEvent::RemoteListError => "Error updating remote list", SpamEvent::Train => "Training spam filter", SpamEvent::TrainBalance => "Spam filter model balance verify", SpamEvent::TrainError => "Error training spam filter", @@ -1030,8 +1028,6 @@ impl SpamEvent { SpamEvent::Classify => "The message is being classified for spam", SpamEvent::ClassifyError => "There is not enough training data for the spam filter", SpamEvent::Pyzor => "Pyzor query successful", - SpamEvent::RemoteList => "The remote list was updated", - SpamEvent::RemoteListError => "An error occurred while updating the remote list", SpamEvent::Dnsbl => "The DNSBL query was successful", SpamEvent::DnsblError => "An error occurred while querying the DNSBL", } @@ -1556,6 +1552,8 @@ impl StoreEvent { StoreEvent::BlobWrite => "Blob write operation", StoreEvent::BlobDelete => "Blob delete operation", StoreEvent::DataIterate => "Data store iteration operation", + StoreEvent::HttpStoreFetch => "HTTP store updated", + StoreEvent::HttpStoreError => "Error updating HTTP store", } } @@ -1591,6 +1589,8 @@ impl StoreEvent { StoreEvent::BlobWrite => "A blob write operation was executed", StoreEvent::BlobDelete => "A blob delete operation was executed", StoreEvent::DataIterate => "A data store iteration operation was executed", + StoreEvent::HttpStoreFetch => "The HTTP store was updated", + StoreEvent::HttpStoreError => "An error occurred while updating the HTTP store", } } } diff --git a/crates/trc/src/event/level.rs b/crates/trc/src/event/level.rs index ba48bf739..ba89bd76e 100644 --- a/crates/trc/src/event/level.rs +++ b/crates/trc/src/event/level.rs @@ -20,7 +20,7 @@ impl EventType { | StoreEvent::SqlQuery | StoreEvent::LdapQuery | StoreEvent::LdapBind => Level::Trace, - StoreEvent::NotFound => Level::Debug, + StoreEvent::NotFound | StoreEvent::HttpStoreFetch => Level::Debug, StoreEvent::AssertValueFailed | StoreEvent::FoundationdbError | StoreEvent::MysqlError @@ -41,7 +41,7 @@ impl EventType { | StoreEvent::NotSupported | StoreEvent::UnexpectedError | StoreEvent::CryptoError => Level::Error, - StoreEvent::BlobMissingMarker => Level::Warn, + StoreEvent::BlobMissingMarker | StoreEvent::HttpStoreError => Level::Warn, }, EventType::Jmap(_) => Level::Debug, EventType::Imap(event) => match event { @@ -335,17 +335,15 @@ impl EventType { | SieveEvent::ActionReject => Level::Debug, }, EventType::Spam(event) => match event { - SpamEvent::PyzorError - | SpamEvent::TrainError - | SpamEvent::DnsblError - | SpamEvent::RemoteListError => Level::Warn, + SpamEvent::PyzorError | SpamEvent::TrainError | SpamEvent::DnsblError => { + Level::Warn + } SpamEvent::Pyzor | SpamEvent::Train | SpamEvent::Classify | SpamEvent::ClassifyError | SpamEvent::TrainBalance - | SpamEvent::Dnsbl - | SpamEvent::RemoteList => Level::Debug, + | SpamEvent::Dnsbl => Level::Debug, }, EventType::Http(event) => match event { HttpEvent::ConnectionStart | HttpEvent::ConnectionEnd => Level::Debug, diff --git a/crates/trc/src/ipc/metrics.rs b/crates/trc/src/ipc/metrics.rs index ca0ad4b30..44890edd5 100644 --- a/crates/trc/src/ipc/metrics.rs +++ b/crates/trc/src/ipc/metrics.rs @@ -456,7 +456,8 @@ impl EventType { | StoreEvent::DataIterate | StoreEvent::BlobRead | StoreEvent::BlobWrite - | StoreEvent::BlobDelete, + | StoreEvent::BlobDelete + | StoreEvent::HttpStoreError, ) => true, EventType::MessageIngest(_) => true, EventType::Jmap( @@ -577,7 +578,6 @@ impl EventType { ) => true, EventType::Spam( SpamEvent::PyzorError - | SpamEvent::RemoteListError | SpamEvent::Train | SpamEvent::TrainError | SpamEvent::Classify diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs index 763bbab52..7e15b258e 100644 --- a/crates/trc/src/lib.rs +++ b/crates/trc/src/lib.rs @@ -598,8 +598,6 @@ pub enum PushSubscriptionEvent { pub enum SpamEvent { Pyzor, PyzorError, - RemoteList, - RemoteListError, Dnsbl, DnsblError, Train, @@ -838,6 +836,7 @@ pub enum StoreEvent { NotSupported, UnexpectedError, CryptoError, + HttpStoreError, // Warnings BlobMissingMarker, @@ -851,6 +850,7 @@ pub enum StoreEvent { SqlQuery, LdapQuery, LdapBind, + HttpStoreFetch, } #[event_type] diff --git a/crates/trc/src/serializers/binary.rs b/crates/trc/src/serializers/binary.rs index af13ec987..6b43f3726 100644 --- a/crates/trc/src/serializers/binary.rs +++ b/crates/trc/src/serializers/binary.rs @@ -791,8 +791,8 @@ impl EventType { EventType::Smtp(SmtpEvent::VrfyNotFound) => 489, EventType::Spam(SpamEvent::Classify) => 490, EventType::Spam(SpamEvent::ClassifyError) => 491, - EventType::Spam(SpamEvent::RemoteList) => 492, - EventType::Spam(SpamEvent::RemoteListError) => 493, + EventType::Store(StoreEvent::HttpStoreFetch) => 492, + EventType::Store(StoreEvent::HttpStoreError) => 493, EventType::Spam(SpamEvent::PyzorError) => 494, EventType::Spam(SpamEvent::Train) => 495, EventType::Spam(SpamEvent::TrainBalance) => 496, @@ -1391,8 +1391,8 @@ impl EventType { 489 => Some(EventType::Smtp(SmtpEvent::VrfyNotFound)), 490 => Some(EventType::Spam(SpamEvent::Classify)), 491 => Some(EventType::Spam(SpamEvent::ClassifyError)), - 492 => Some(EventType::Spam(SpamEvent::RemoteList)), - 493 => Some(EventType::Spam(SpamEvent::RemoteListError)), + 492 => Some(EventType::Store(StoreEvent::HttpStoreFetch)), + 493 => Some(EventType::Store(StoreEvent::HttpStoreError)), 494 => Some(EventType::Spam(SpamEvent::PyzorError)), 495 => Some(EventType::Spam(SpamEvent::Train)), 496 => Some(EventType::Spam(SpamEvent::TrainBalance)), diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index 1b272afde..e28127822 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -24,7 +24,7 @@ ring = { version = "0.17" } base64 = "0.22" serde_json = "1.0" rcgen = "0.13" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "http2"]} +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots", "http2", "stream"]} x509-parser = "0.16.0" pem = "3.0" parking_lot = "0.12" diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index f6da62a3f..6fc740b9d 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -14,6 +14,8 @@ pub mod map; pub mod snowflake; pub mod url_params; +use futures::StreamExt; +use reqwest::Response; use rustls::{ client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}, ClientConfig, RootCertStore, SignatureScheme, @@ -89,6 +91,37 @@ impl AsMut<[u8]> for BlobHash { } } +pub trait HttpLimitResponse: Sync + Send { + fn bytes_with_limit( + self, + limit: usize, + ) -> impl std::future::Future>>> + Send; +} + +impl HttpLimitResponse for Response { + async fn bytes_with_limit(self, limit: usize) -> reqwest::Result>> { + if self + .content_length() + .map_or(false, |len| len as usize > limit) + { + return Ok(None); + } + + let mut bytes = Vec::with_capacity(std::cmp::min(limit, 1024)); + let mut stream = self.bytes_stream(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + if bytes.len() + chunk.len() > limit { + return Ok(None); + } + bytes.extend_from_slice(&chunk); + } + + Ok(Some(bytes)) + } +} + pub trait UnwrapFailure { fn failed(self, action: &str) -> T; } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index c74fbc628..74cb017f3 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -5,8 +5,8 @@ edition = "2021" resolver = "2" [features] -default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "foundationdb"] -#default = ["rocks"] +#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "foundationdb"] +default = ["rocks"] sqlite = ["store/sqlite"] foundationdb = ["store/foundation", "common/foundation"] postgres = ["store/postgres"] diff --git a/tests/resources/smtp/antispam/combined.test b/tests/resources/smtp/antispam/combined.test index 899e5da27..87cdae76c 100644 --- a/tests/resources/smtp/antispam/combined.test +++ b/tests/resources/smtp/antispam/combined.test @@ -6,7 +6,7 @@ spf.result none spf_ehlo.result none dmarc.result none remote_ip 195.210.29.48 -expect_header X-Spam-Result: ARC_NA (0.00), DKIM_NA (0.00), DMARC_NA (0.00), FROM_EQ_ENVFROM (0.00), FROM_HAS_DN (0.00), HAS_DATA_URI (0.00), RCPT_COUNT_ONE (0.00), RCVD_COUNT_ZERO (0.00), SPF_NA (0.00), SUBJECT_ENDS_EXCLAIM (0.00), TO_DN_NONE (0.00), TO_MATCH_ENVRCPT_ALL (0.00), ONCE_RECEIVED (0.10), RCVD_NO_TLS_LAST (0.10), MIME_HTML_ONLY (0.20), HELO_NORES_A_OR_MX (0.30), AUTH_NA (1.00), DATE_IN_PAST (1.00), MID_RHS_MATCH_FROM (1.00), RDNS_NONE (1.00), FROMHOST_NORES_A_OR_MX (1.50), HTML_SHORT_LINK_IMG_1 (2.00), PYZOR (3.50) +expect_header X-Spam-Result: ARC_NA (0.00), DKIM_NA (0.00), DMARC_NA (0.00), FROM_EQ_ENVFROM (0.00), FROM_HAS_DN (0.00), HAS_DATA_URI (0.00), MID_RHS_MATCH_ENVFROM (0.00), RCPT_COUNT_ONE (0.00), RCVD_COUNT_ZERO (0.00), SPF_NA (0.00), SUBJECT_ENDS_EXCLAIM (0.00), TO_DN_NONE (0.00), TO_MATCH_ENVRCPT_ALL (0.00), ONCE_RECEIVED (0.10), RCVD_NO_TLS_LAST (0.10), MIME_HTML_ONLY (0.20), HELO_NORES_A_OR_MX (0.30), AUTH_NA (1.00), DATE_IN_PAST (1.00), MID_RHS_MATCH_FROM (1.00), RDNS_NONE (1.00), FROMHOST_NORES_A_OR_MX (1.50), HTML_SHORT_LINK_IMG_1 (2.00), PYZOR (3.50) expect_header X-Spam-Status: Yes, score=11.70 From: Client Services @@ -574,7 +574,7 @@ dmarc.result fail dmarc.policy reject remote_ip 51.89.165.39 tls.version TLS1_2 -expect_header X-Spam-Result: DKIM_ALLOW (-0.20), HAS_LIST_UNSUB (-0.01), ARC_NA (0.00), DKIM_SIGNED (0.00), FROM_EQ_ENVFROM (0.00), FROM_HAS_DN (0.00), HAS_REPLYTO (0.00), RCPT_COUNT_ONE (0.00), RCVD_COUNT_ZERO (0.00), REPLYTO_ADDR_EQ_FROM (0.00), REPLYTO_EQ_FROM (0.00), SPF_SOFTFAIL (0.00), TO_DN_NONE (0.00), TO_MATCH_ENVRCPT_ALL (0.00), ONCE_RECEIVED (0.10), RCVD_NO_TLS_LAST (0.10), HELO_NORES_A_OR_MX (0.30), DATE_IN_PAST (1.00), MID_RHS_MATCH_FROM (1.00), RDNS_NONE (1.00), R_PARTS_DIFFER (1.00), FROMHOST_NORES_A_OR_MX (1.50), DMARC_POLICY_REJECT (2.00), HTML_SHORT_LINK_IMG_1 (2.00), VIOLATED_DIRECT_SPF (3.50) +expect_header X-Spam-Result: DKIM_ALLOW (-0.20), HAS_LIST_UNSUB (-0.01), ARC_NA (0.00), DKIM_SIGNED (0.00), FROM_EQ_ENVFROM (0.00), FROM_HAS_DN (0.00), HAS_REPLYTO (0.00), MID_RHS_MATCH_ENVFROM (0.00), RCPT_COUNT_ONE (0.00), RCVD_COUNT_ZERO (0.00), REPLYTO_ADDR_EQ_FROM (0.00), REPLYTO_EQ_FROM (0.00), SPF_SOFTFAIL (0.00), TO_DN_NONE (0.00), TO_MATCH_ENVRCPT_ALL (0.00), ONCE_RECEIVED (0.10), RCVD_NO_TLS_LAST (0.10), HELO_NORES_A_OR_MX (0.30), DATE_IN_PAST (1.00), MID_RHS_MATCH_FROM (1.00), RDNS_NONE (1.00), R_PARTS_DIFFER (1.00), FROMHOST_NORES_A_OR_MX (1.50), DMARC_POLICY_REJECT (2.00), HTML_SHORT_LINK_IMG_1 (2.00), VIOLATED_DIRECT_SPF (3.50) expect_header X-Spam-Status: Yes, score=13.29 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; s=sectionalism; d=grupokonecta.net; diff --git a/tests/resources/smtp/antispam/dmarc.test b/tests/resources/smtp/antispam/dmarc.test index 57f0cd6b5..654ce07de 100644 --- a/tests/resources/smtp/antispam/dmarc.test +++ b/tests/resources/smtp/antispam/dmarc.test @@ -1,4 +1,4 @@ -expect DMARC_NA SPF_NA DKIM_NA ARC_NA +expect DMARC_NA SPF_NA DKIM_NA ARC_NA AUTH_NA Subject: test @@ -77,7 +77,7 @@ Test dkim.result pass dkim.domains spf-dkim-allow.org spf.result pass -expect ALLOWLIST_SPF_DKIM DKIM_ALLOW SPF_ALLOW ARC_NA DMARC_NA +expect DKIM_ALLOW SPF_ALLOW ARC_NA DMARC_NA From: user@spf-dkim-allow.org Subject: test @@ -87,7 +87,7 @@ Test dkim.result pass spf.result pass arc.result pass -expect ALLOWLIST_SPF_DKIM DKIM_ALLOW SPF_ALLOW ARC_ALLOW DMARC_NA +expect DKIM_ALLOW SPF_ALLOW ARC_ALLOW DMARC_NA From: user@spf-dkim-allow.org Subject: test @@ -96,7 +96,7 @@ Test spf.result pass dkim.result fail -expect ALLOWLIST_SPF BLOCKLIST_DKIM DKIM_REJECT SPF_ALLOW ARC_NA DMARC_NA +expect DKIM_REJECT SPF_ALLOW ARC_NA DMARC_NA From: user@spf-dkim-allow.org Subject: test @@ -105,7 +105,7 @@ Test spf.result pass dkim.result temperror -expect ALLOWLIST_SPF DKIM_TEMPFAIL SPF_ALLOW ARC_NA DMARC_NA +expect DKIM_TEMPFAIL SPF_ALLOW ARC_NA DMARC_NA From: user@spf-dkim-allow.org Subject: test @@ -115,7 +115,7 @@ Test dkim.result pass dkim.domains spf-dkim-allow.org spf.result fail -expect BLOCKLIST_SPF ALLOWLIST_DKIM DKIM_ALLOW SPF_FAIL ARC_NA DMARC_NA +expect DKIM_ALLOW SPF_FAIL ARC_NA DMARC_NA From: user@spf-dkim-allow.org Subject: test @@ -125,7 +125,7 @@ Test dkim.result pass dkim.domains spf-dkim-allow.org spf.result temperror -expect ALLOWLIST_DKIM DKIM_ALLOW SPF_DNSFAIL ARC_NA DMARC_NA +expect DKIM_ALLOW SPF_DNSFAIL ARC_NA DMARC_NA From: user@spf-dkim-allow.org Subject: test @@ -134,7 +134,7 @@ Test dkim.result fail spf.result fail -expect BLOCKLIST_SPF_DKIM DKIM_REJECT SPF_FAIL ARC_NA DMARC_NA +expect DKIM_REJECT SPF_FAIL ARC_NA DMARC_NA From: user@spf-dkim-allow.org Subject: test @@ -143,7 +143,7 @@ Test dkim.result temperror spf.result temperror -expect DKIM_TEMPFAIL SPF_DNSFAIL ARC_NA DMARC_NA +expect DKIM_TEMPFAIL SPF_DNSFAIL ARC_NA DMARC_NA AUTH_NA_OR_FAIL From: user@spf-dkim-allow.org Subject: test diff --git a/tests/resources/smtp/antispam/from.test b/tests/resources/smtp/antispam/from.test index e63bb5229..39bf73929 100644 --- a/tests/resources/smtp/antispam/from.test +++ b/tests/resources/smtp/antispam/from.test @@ -158,15 +158,15 @@ Reply-to: Test -envelope_from hello@gmail.com -expect FREEMAIL_ENVFROM FREEMAIL_FROM FROM_EQ_ENVFROM FROM_NO_DN +envelope_from hello@custom.disposable.org +expect FREEMAIL_FROM DISPOSABLE_ENV_FROM FROM_NEQ_ENVFROM FROM_NO_DN FORGED_SENDER From: hello@gmail.com Test -envelope_from hello@custom.disposable.org -expect DISPOSABLE_ENVFROM DISPOSABLE_FROM FROM_EQ_ENVFROM FROM_NO_DN +envelope_from hello@gmail.com +expect DISPOSABLE_FROM FREEMAIL_ENV_FROM FROM_NEQ_ENVFROM FROM_NO_DN FORGED_SENDER From: hello@custom.disposable.org diff --git a/tests/resources/smtp/antispam/messageid.test b/tests/resources/smtp/antispam/messageid.test index 037347b78..deff84603 100644 --- a/tests/resources/smtp/antispam/messageid.test +++ b/tests/resources/smtp/antispam/messageid.test @@ -81,7 +81,7 @@ Message-ID: <1234@host.domain.co.uk> Test envelope_from hello@domain.co.uk -expect MID_RHS_MATCH_FROMTLD +expect MID_RHS_MATCH_ENVFROMTLD Message-ID: <1234@host.domain.co.uk> diff --git a/tests/resources/smtp/antispam/recipient.test b/tests/resources/smtp/antispam/recipient.test index e7d43b407..7d84faae9 100644 --- a/tests/resources/smtp/antispam/recipient.test +++ b/tests/resources/smtp/antispam/recipient.test @@ -117,7 +117,7 @@ Cc: otheruser@guerrillamail.com Test envelope_from test@test.com -expect FREEMAIL_CC DISPOSABLE_TO DISPOSABLE_CC RCPT_COUNT_THREE TO_DN_NONE +expect FREEMAIL_CC DISPOSABLE_TO DISPOSABLE_BCC RCPT_COUNT_THREE TO_DN_NONE To: otheruser@guerrillamail.com Cc: user@gmail.com diff --git a/tests/resources/smtp/antispam/replyto.test b/tests/resources/smtp/antispam/replyto.test index 79100316e..4fbf81bd8 100644 --- a/tests/resources/smtp/antispam/replyto.test +++ b/tests/resources/smtp/antispam/replyto.test @@ -90,21 +90,21 @@ Reply-to: "Mr. Hello" Test -expect FREEMAIL_REPLYTO REPLYTO_EQ_FROM HAS_REPLYTO +expect FREEMAIL_REPLY_TO FREEMAIL_FROM REPLYTO_DOM_EQ_FROM_DOM HAS_REPLYTO From: hello@gmail.com -Reply-to: hello@gmail.com +Reply-to: bye@gmail.com Test -expect DISPOSABLE_REPLYTO REPLYTO_EQ_FROM HAS_REPLYTO +expect DISPOSABLE_REPLY_TO DISPOSABLE_FROM REPLYTO_DOM_EQ_FROM_DOM HAS_REPLYTO From: hello@custom.disposable.org -Reply-to: hello@custom.disposable.org +Reply-to: bye@custom.disposable.org Test -expect FREEMAIL_REPLYTO_NEQ_FROM_DOM FREEMAIL_REPLYTO REPLYTO_DOM_NEQ_FROM_DOM SPOOF_REPLYTO HAS_REPLYTO +expect FREEMAIL_REPLY_TO_NEQ_FROM_DOM FREEMAIL_REPLY_TO FREEMAIL_FROM REPLYTO_DOM_NEQ_FROM_DOM SPOOF_REPLYTO HAS_REPLYTO From: hello@gmail.com Reply-to: hello@yahoomail.com diff --git a/tests/src/smtp/inbound/antispam.rs b/tests/src/smtp/inbound/antispam.rs index 52d783e43..006572047 100644 --- a/tests/src/smtp/inbound/antispam.rs +++ b/tests/src/smtp/inbound/antispam.rs @@ -106,19 +106,19 @@ model = "gpt-dummy" allow-invalid-certs = true [spam-filter.list] -"freemail-providers" = {"gmail.com", "googlemail.com", "yahoomail.com", "*freemail.org"} -"disposable-providers" = {"guerrillamail.com", "*disposable.org"} -"url-redirectors" = {"bit.ly", "redirect.io", "redirect.me", "redirect.org", "redirect.com", "redirect.net", "t.ly", "tinyurl.com"} -"dmarc-allow" = {"dmarc-allow.org"} -"spf-dkim-allow" = {"spf-dkim-allow.org"} -"spam-traps" = {"spamtrap@*"} -"trusted-domains" = {"stalw.art"} "file-extensions" = { "html" = "text/html|BAD", "pdf" = "application/pdf|NZ", "txt" = "text/plain|message/disposition-notification|text/rfc822-headers", "zip" = "AR", "js" = "BAD|NZ", "hta" = "BAD|NZ" } +[lookup] +"url-redirectors" = {"bit.ly", "redirect.io", "redirect.me", "redirect.org", "redirect.com", "redirect.net", "t.ly", "tinyurl.com"} +"known-dmarc-domains" = {"dmarc-allow.org"} +"spam-traps" = {"spamtrap@*"} +"trusted-domains" = {"stalw.art"} +"freemail-providers" = {"gmail.com", "googlemail.com", "yahoomail.com", "*freemail.org"} +"disposable-providers" = {"guerrillamail.com", "*disposable.org"} "#; #[tokio::test(flavor = "multi_thread")] @@ -526,10 +526,12 @@ async fn antispam() { } "from" => { server.spam_filter_analyze_from(&mut spam_ctx).await; + server.spam_filter_analyze_domain(&mut spam_ctx).await; server.spam_filter_analyze_rules(&mut spam_ctx).await; } "replyto" => { server.spam_filter_analyze_reply_to(&mut spam_ctx).await; + server.spam_filter_analyze_domain(&mut spam_ctx).await; server.spam_filter_analyze_rules(&mut spam_ctx).await; } "recipient" => { @@ -553,6 +555,9 @@ async fn antispam() { } "dmarc" => { server.spam_filter_analyze_dmarc(&mut spam_ctx).await; + server.spam_filter_analyze_headers(&mut spam_ctx).await; + server.spam_filter_analyze_rules(&mut spam_ctx).await; + spam_ctx.result.tags.retain(|t| !t.starts_with("X_HDR_")); } "ip" => { server.spam_filter_analyze_ip(&mut spam_ctx).await;