From a18c82f9e20c7243e114cf0802464a2c077db7ca Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Fri, 15 Nov 2024 23:57:41 +0000 Subject: [PATCH 1/5] Fix md5 return_type to only return Utf8 as per current code impl. --- datafusion/functions/src/crypto/md5.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions/src/crypto/md5.rs b/datafusion/functions/src/crypto/md5.rs index 0f18fd47b4cf0..0e8ff1cd31928 100644 --- a/datafusion/functions/src/crypto/md5.rs +++ b/datafusion/functions/src/crypto/md5.rs @@ -64,11 +64,11 @@ impl ScalarUDFImpl for Md5Func { fn return_type(&self, arg_types: &[DataType]) -> Result { use DataType::*; Ok(match &arg_types[0] { - LargeUtf8 | LargeBinary => LargeUtf8, + LargeUtf8 | LargeBinary => Utf8, Utf8View | Utf8 | Binary => Utf8, Null => Null, Dictionary(_, t) => match **t { - LargeUtf8 | LargeBinary => LargeUtf8, + LargeUtf8 | LargeBinary => Utf8, Utf8 | Binary => Utf8, Null => Null, _ => { From 758e400ef9925ef4ec86d96303c0ee792fd51b50 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Sat, 28 Dec 2024 22:25:38 +0000 Subject: [PATCH 2/5] Add support for sqlite test files to sqllogictest --- .gitmodules | 4 + datafusion-testing | 1 + datafusion/sqllogictest/Cargo.toml | 9 +- datafusion/sqllogictest/README.md | 44 +- datafusion/sqllogictest/bin/sqllogictests.rs | 522 +++++++++++++++--- .../engines/datafusion_engine/normalize.rs | 4 + .../src/engines/datafusion_engine/runner.rs | 65 ++- .../src/engines/postgres_engine/mod.rs | 63 ++- 8 files changed, 613 insertions(+), 99 deletions(-) create mode 100755 datafusion-testing diff --git a/.gitmodules b/.gitmodules index ec5d6208b8ddb..037accdbe4241 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,7 @@ [submodule "testing"] path = testing url = https://github.com/apache/arrow-testing +[submodule "datafusion-testing"] + path = datafusion-testing + url = https://github.com/apache/datafusion-testing.git + branch = main diff --git a/datafusion-testing b/datafusion-testing new file mode 100755 index 0000000000000..8ec2a3ebcfea8 --- /dev/null +++ b/datafusion-testing @@ -0,0 +1 @@ +e2e320c9477a6d8ab09662eae255887733c0e304 \ No newline at end of file diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 7ceabd87855f5..100b925dae41f 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -45,15 +45,19 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } futures = { workspace = true } half = { workspace = true, default-features = true } +indicatif = "0.17" itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } +once_cell = { version = "1.20", optional = true } postgres-protocol = { version = "0.6.7", optional = true } postgres-types = { version = "0.2.8", features = ["derive", "with-chrono-0_4"], optional = true } rust_decimal = { version = "1.36.0", features = ["tokio-pg"] } sqllogictest = "0.24.0" sqlparser = { workspace = true } tempfile = { workspace = true } +testcontainers = { version = "0.23", features = ["default"], optional = true } +testcontainers-modules = { version = "0.11", features = ["postgres"], optional = true } thiserror = "2.0.0" tokio = { workspace = true } tokio-postgres = { version = "0.7.12", optional = true } @@ -63,9 +67,12 @@ avro = ["datafusion/avro"] postgres = [ "bytes", "chrono", - "tokio-postgres", + "once_cell", "postgres-types", "postgres-protocol", + "testcontainers", + "testcontainers-modules", + "tokio-postgres", ] [dev-dependencies] diff --git a/datafusion/sqllogictest/README.md b/datafusion/sqllogictest/README.md index 885e92fee270f..124735c89d874 100644 --- a/datafusion/sqllogictest/README.md +++ b/datafusion/sqllogictest/README.md @@ -28,7 +28,8 @@ This crate is a submodule of DataFusion that contains an implementation of [sqll ## Overview This crate uses [sqllogictest-rs](https://github.com/risinglightdb/sqllogictest-rs) to parse and run `.slt` files in the -[`test_files`](test_files) directory of this crate. +[`test_files`](test_files) directory of this crate or the [`data/sqlite`](sqlite) +directory of the datafusion-testing crate. ## Testing setup @@ -160,7 +161,7 @@ cargo test --test sqllogictests -- information Test files that start with prefix `pg_compat_` verify compatibility with Postgres by running the same script files both with DataFusion and with Postgres -In order to run the sqllogictests running against a previously running Postgres instance, do: +In order to have the sqllogictest run against an existing running Postgres instance, do: ```shell PG_COMPAT=true PG_URI="postgresql://postgres@127.0.0.1/postgres" cargo test --features=postgres --test sqllogictests @@ -172,7 +173,7 @@ The environment variables: 2. `PG_URI` contains a `libpq` style connection string, whose format is described in [the docs](https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url) -One way to create a suitable a posgres container in docker is to use +One way to create a suitable a postgres container in docker is to use the [Official Image](https://hub.docker.com/_/postgres) with a command such as the following. Note the collation **must** be set to `C` otherwise `ORDER BY` will not match DataFusion and the tests will diff. @@ -185,6 +186,15 @@ docker run \ postgres ``` +If you do not want to create a new postgres database and you have docker +installed you can skip providing a PG_URI env variable and the sqllogictest +runner will automatically create a temporary postgres docker container. +For example: + +```shell +PG_COMPAT=true cargo test --features=postgres --test sqllogictests +``` + ## Running Tests: `tpch` Test files in `tpch` directory runs against the `TPCH` data set (SF = @@ -205,6 +215,34 @@ Then you need to add `INCLUDE_TPCH=true` to run tpch tests: INCLUDE_TPCH=true cargo test --test sqllogictests ``` +## Running Tests: `sqlite` + +Test files in `data/sqlite` directory of the datafusion-testing crate were +sourced from the sqlite test suite and have been cleansed and updated to +run within DataFusion's sqllogictest runner. + +To run the sqlite tests you need to increase the rust stack size and add +`INCLUDE_SQLITE=true` to run the sqlite tests: + +```shell +export RUST_MIN_STACK=30485760; +INCLUDE_SQLITE=true cargo test --test sqllogictests +``` + +Note that there are well over 5 million queries in these tests and running the +sqlite tests will take a long time. You may wish to run them in release-nonlto mode: + +```shell +INCLUDE_SQLITE=true cargo test --profile release-nonlto --test sqllogictests +``` + +The sqlite tests can also be run with the postgres runner to verify compatibility: + +```shell +export RUST_MIN_STACK=30485760; +PG_COMPAT=true INCLUDE_SQLITE=true cargo test --features=postgres --test sqllogictests +``` + ## Updating tests: Completion Mode In test script completion mode, `sqllogictests` reads a prototype script and runs the statements and queries against the diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 066cc8ee98244..498539c1674a1 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -16,57 +16,129 @@ // under the License. use clap::Parser; +use datafusion_common::instant::Instant; use datafusion_common::utils::get_available_parallelism; +use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result}; +use datafusion_common_runtime::SpawnedTask; use datafusion_sqllogictest::{DataFusion, TestContext}; use futures::stream::StreamExt; +use indicatif::{ + HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle, +}; use itertools::Itertools; -use log::info; -use sqllogictest::{strict_column_validator, Normalizer}; +use log::Level::{Info, Warn}; +use log::{info, log_enabled, warn}; +#[cfg(feature = "postgres")] +use once_cell::sync::Lazy; +use sqllogictest::{ + parse_file, strict_column_validator, AsyncDB, Condition, Normalizer, Record, + Validator, +}; +#[cfg(feature = "postgres")] +use std::env::set_var; use std::ffi::OsStr; use std::fs; +#[cfg(feature = "postgres")] +use std::future::Future; use std::path::{Path, PathBuf}; - -use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result}; -use datafusion_common_runtime::SpawnedTask; +#[cfg(feature = "postgres")] +use std::{env, thread}; +#[cfg(feature = "postgres")] +use testcontainers::core::IntoContainerPort; +#[cfg(feature = "postgres")] +use testcontainers::runners::AsyncRunner; +#[cfg(feature = "postgres")] +use testcontainers::ImageExt; +#[cfg(feature = "postgres")] +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +#[cfg(feature = "postgres")] +use tokio::sync::{mpsc, Mutex}; +#[cfg(feature = "postgres")] +use ContainerCommands::{FetchHost, FetchPort}; const TEST_DIRECTORY: &str = "test_files/"; +const DATAFUSION_TESTING_TEST_DIRECTORY: &str = "../../datafusion-testing/data/"; const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_"; +const SQLITE_PREFIX: &str = "sqlite"; pub fn main() -> Result<()> { tokio::runtime::Builder::new_multi_thread() .enable_all() - .build() - .unwrap() + .build()? .block_on(run_tests()) } +// Trailing whitespace from lines in SLT will typically be removed, but do not fail if it is not +// If particular test wants to cover trailing whitespace on a value, +// it should project additional non-whitespace column on the right. #[allow(clippy::ptr_arg)] -fn normalizer(s: &String) -> String { - // Trailing whitespace from lines in SLT will typically be removed, but do not fail if it is not - // If particular test wants to cover trailing whitespace on a value, - // it should project additional non-whitespace column on the right. - s.trim_end().to_owned() +fn value_normalizer(s: &String) -> String { + s.trim_end().to_string() } -fn value_validator( +fn sqlite_value_validator( normalizer: Normalizer, actual: &[Vec], expected: &[String], ) -> bool { - let expected = expected.iter().map(normalizer).collect::>(); - let actual = actual + let normalized_expected = expected.iter().map(normalizer).collect::>(); + let normalized_actual = actual + .iter() + .map(|strs| strs.iter().map(normalizer).join(" ")) + .collect_vec(); + + if log_enabled!(Info) && normalized_actual != normalized_expected { + info!("sqlite validation failed. actual vs expected:"); + for i in 0..normalized_actual.len() { + info!("[{i}] {}", normalized_actual[i]); + info!( + "[{i}] {}", + if normalized_expected.len() >= i { + &normalized_expected[i] + } else { + "No more results" + } + ); + } + } + + normalized_actual == normalized_expected +} + +fn df_value_validator( + normalizer: Normalizer, + actual: &[Vec], + expected: &[String], +) -> bool { + let normalized_expected = expected.iter().map(normalizer).collect::>(); + let normalized_actual = actual .iter() .map(|strs| strs.iter().join(" ")) - // Editors do not preserve trailing whitespace, so expected may or may not lack it included - .map(|str| normalizer(&str)) - .collect::>(); - actual == expected + .map(|str| str.trim_end().to_string()) + .collect_vec(); + + if log_enabled!(Warn) && normalized_actual != normalized_expected { + warn!("df validation failed. actual vs expected:"); + for i in 0..normalized_actual.len() { + warn!("[{i}] {}", normalized_actual[i]); + warn!( + "[{i}] {}", + if normalized_expected.len() >= i { + &normalized_expected[i] + } else { + "No more results" + } + ); + } + } + + normalized_actual == normalized_expected } /// Sets up an empty directory at test_files/scratch/ /// creating it if needed and clearing any file contents if it exists /// This allows tests for inserting to external tables or copy to -/// to persist data to disk and have consistent state when running +/// persist data to disk and have consistent state when running /// a new test fn setup_scratch_dir(name: &Path) -> Result<()> { // go from copy.slt --> copy @@ -97,23 +169,89 @@ async fn run_tests() -> Result<()> { } options.warn_on_ignored(); + #[cfg(feature = "postgres")] + let start_pg_database = options.postgres_runner && !is_pg_uri_set(); + #[cfg(feature = "postgres")] + if start_pg_database { + info!("Starting postgres db ..."); + + thread::spawn(|| { + execute_blocking(start_postgres( + &POSTGRES_IN, + &POSTGRES_HOST, + &POSTGRES_PORT, + &POSTGRES_STOPPED, + )) + }); + + POSTGRES_IN.tx.send(FetchHost).unwrap(); + let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap(); + + POSTGRES_IN.tx.send(FetchPort).unwrap(); + let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap(); + + let pg_uri = format!("postgresql://postgres:postgres@{db_host}:{db_port}/test"); + info!("Postgres uri is {pg_uri}"); + + set_var("PG_URI", pg_uri); + } + // Run all tests in parallel, reporting failures at the end // // Doing so is safe because each slt file runs with its own // `SessionContext` and should not have side effects (like // modifying shared state like `/tmp/`) + let m = MultiProgress::with_draw_target(ProgressDrawTarget::stderr_with_hz(1)); + let m_style = ProgressStyle::with_template( + "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}", + ) + .unwrap() + .progress_chars("##-"); + + let start = Instant::now(); + let errors: Vec<_> = futures::stream::iter(read_test_files(&options)?) .map(|test_file| { + let validator = if options.include_sqlite + && test_file.relative_path.starts_with(SQLITE_PREFIX) + { + sqlite_value_validator + } else { + df_value_validator + }; + + let m_clone = m.clone(); + let m_style_clone = m_style.clone(); + SpawnedTask::spawn(async move { - let file_path = test_file.relative_path.clone(); - let start = datafusion::common::instant::Instant::now(); match (options.postgres_runner, options.complete) { - (false, false) => run_test_file(test_file).await?, - (false, true) => run_complete_file(test_file).await?, - (true, false) => run_test_file_with_postgres(test_file).await?, - (true, true) => run_complete_file_with_postgres(test_file).await?, + (false, false) => { + run_test_file(test_file, validator, m_clone, m_style_clone) + .await? + } + (false, true) => { + run_complete_file(test_file, validator, m_clone, m_style_clone) + .await? + } + (true, false) => { + run_test_file_with_postgres( + test_file, + validator, + m_clone, + m_style_clone, + ) + .await? + } + (true, true) => { + run_complete_file_with_postgres( + test_file, + validator, + m_clone, + m_style_clone, + ) + .await? + } } - println!("Executed {:?}. Took {:?}", file_path, start.elapsed()); Ok(()) as Result<()> }) .join() @@ -136,6 +274,15 @@ async fn run_tests() -> Result<()> { .collect() .await; + m.println(format!("Completed in {}", HumanDuration(start.elapsed())))?; + + #[cfg(feature = "postgres")] + if start_pg_database { + println!("Stopping postgres db ..."); + POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(()); + POSTGRES_STOPPED.rx.lock().await.recv().await; + } + // report on any errors if !errors.is_empty() { for e in &errors { @@ -147,60 +294,148 @@ async fn run_tests() -> Result<()> { } } -async fn run_test_file(test_file: TestFile) -> Result<()> { +#[cfg(feature = "postgres")] +fn is_pg_uri_set() -> bool { + match env::var("PG_URI") { + Ok(_) => true, + Err(_) => false, + } +} + +async fn run_test_file( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { let TestFile { path, relative_path, } = test_file; - info!("Running with DataFusion runner: {}", path.display()); let Some(test_ctx) = TestContext::try_new_for_test_file(&relative_path).await else { info!("Skipping: {}", path.display()); return Ok(()); }; setup_scratch_dir(&relative_path)?; + + let count: u64 = get_record_count(&path, "Datafusion".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + let mut runner = sqllogictest::Runner::new(|| async { Ok(DataFusion::new( test_ctx.session_ctx().clone(), relative_path.clone(), + pb.clone(), )) }); + runner.add_label("Datafusion"); runner.with_column_validator(strict_column_validator); - runner.with_normalizer(normalizer); - runner.with_validator(value_validator); - runner + runner.with_normalizer(value_normalizer); + runner.with_validator(validator); + + let res = runner .run_file_async(path) .await - .map_err(|e| DataFusionError::External(Box::new(e))) + .map_err(|e| DataFusionError::External(Box::new(e))); + + pb.finish_and_clear(); + + res +} + +fn get_record_count(path: &PathBuf, label: String) -> u64 { + let records: Vec::ColumnType>> = + parse_file(path).unwrap(); + let mut count: u64 = 0; + + records.iter().for_each(|rec| match rec { + Record::Query { conditions, .. } => { + if conditions.is_empty() + || !conditions.contains(&Condition::SkipIf { + label: label.clone(), + }) + || conditions.contains(&Condition::OnlyIf { + label: label.clone(), + }) + { + count += 1; + } + } + Record::Statement { conditions, .. } => { + if conditions.is_empty() + || !conditions.contains(&Condition::SkipIf { + label: label.clone(), + }) + || conditions.contains(&Condition::OnlyIf { + label: label.clone(), + }) + { + count += 1; + } + } + _ => {} + }); + + count } #[cfg(feature = "postgres")] -async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> { +async fn run_test_file_with_postgres( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { path, relative_path, } = test_file; - info!("Running with Postgres runner: {}", path.display()); setup_scratch_dir(&relative_path)?; - let mut runner = - sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone())); + + let count: u64 = get_record_count(&path, "postgresql".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + + let mut runner = sqllogictest::Runner::new(|| { + Postgres::connect(relative_path.clone(), pb.clone()) + }); + runner.add_label("postgres"); runner.with_column_validator(strict_column_validator); - runner.with_normalizer(normalizer); - runner.with_validator(value_validator); + runner.with_normalizer(value_normalizer); + runner.with_validator(validator); runner .run_file_async(path) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; + + pb.finish_and_clear(); + Ok(()) } #[cfg(not(feature = "postgres"))] -async fn run_test_file_with_postgres(_test_file: TestFile) -> Result<()> { +async fn run_test_file_with_postgres( + _test_file: TestFile, + _validator: Validator, + _mp: MultiProgress, + _mp_style: ProgressStyle, +) -> Result<()> { use datafusion_common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") } -async fn run_complete_file(test_file: TestFile) -> Result<()> { +async fn run_complete_file( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { let TestFile { path, relative_path, @@ -213,30 +448,48 @@ async fn run_complete_file(test_file: TestFile) -> Result<()> { return Ok(()); }; setup_scratch_dir(&relative_path)?; + + let count: u64 = get_record_count(&path, "Datafusion".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + let mut runner = sqllogictest::Runner::new(|| async { Ok(DataFusion::new( test_ctx.session_ctx().clone(), relative_path.clone(), + pb.clone(), )) }); + let col_separator = " "; - runner + let res = runner .update_test_file( path, col_separator, - value_validator, - normalizer, + validator, + value_normalizer, strict_column_validator, ) .await // Can't use e directly because it isn't marked Send, so turn it into a string. .map_err(|e| { DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}")) - }) + }); + + pb.finish_and_clear(); + + res } #[cfg(feature = "postgres")] -async fn run_complete_file_with_postgres(test_file: TestFile) -> Result<()> { +async fn run_complete_file_with_postgres( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { path, @@ -247,26 +500,48 @@ async fn run_complete_file_with_postgres(test_file: TestFile) -> Result<()> { path.display() ); setup_scratch_dir(&relative_path)?; - let mut runner = - sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone())); + + let count: u64 = get_record_count(&path, "postgresql".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + + let mut runner = sqllogictest::Runner::new(|| { + Postgres::connect(relative_path.clone(), pb.clone()) + }); + runner.add_label("postgres"); + runner.with_column_validator(strict_column_validator); + runner.with_normalizer(value_normalizer); + runner.with_validator(validator); + let col_separator = " "; - runner + let res = runner .update_test_file( path, col_separator, - value_validator, - normalizer, + validator, + value_normalizer, strict_column_validator, ) .await // Can't use e directly because it isn't marked Send, so turn it into a string. .map_err(|e| { DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}")) - }) + }); + + pb.finish_and_clear(); + + res } #[cfg(not(feature = "postgres"))] -async fn run_complete_file_with_postgres(_test_file: TestFile) -> Result<()> { +async fn run_complete_file_with_postgres( + _test_file: TestFile, + _validator: Validator, + _mp: MultiProgress, + _mp_style: ProgressStyle, +) -> Result<()> { use datafusion_common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") } @@ -282,11 +557,14 @@ struct TestFile { impl TestFile { fn new(path: PathBuf) -> Self { - let relative_path = PathBuf::from( - path.to_string_lossy() - .strip_prefix(TEST_DIRECTORY) - .unwrap_or(""), - ); + let p = path.to_string_lossy(); + let relative_path = PathBuf::from(if p.starts_with(TEST_DIRECTORY) { + p.strip_prefix(TEST_DIRECTORY).unwrap() + } else if p.starts_with(DATAFUSION_TESTING_TEST_DIRECTORY) { + p.strip_prefix(DATAFUSION_TESTING_TEST_DIRECTORY).unwrap() + } else { + "" + }); Self { path, @@ -298,6 +576,14 @@ impl TestFile { self.path.extension() == Some(OsStr::new("slt")) } + fn check_sqlite(&self, options: &Options) -> bool { + if !self.relative_path.starts_with(SQLITE_PREFIX) { + return true; + } + + options.include_sqlite + } + fn check_tpch(&self, options: &Options) -> bool { if !self.relative_path.starts_with("tpch") { return true; @@ -310,15 +596,29 @@ impl TestFile { fn read_test_files<'a>( options: &'a Options, ) -> Result + 'a>> { - Ok(Box::new( - read_dir_recursive(TEST_DIRECTORY)? + let mut paths = read_dir_recursive(TEST_DIRECTORY)? + .into_iter() + .map(TestFile::new) + .filter(|f| options.check_test_file(&f.relative_path)) + .filter(|f| f.is_slt_file()) + .filter(|f| f.check_tpch(options)) + .filter(|f| f.check_sqlite(options)) + .filter(|f| options.check_pg_compat_file(f.path.as_path())) + .collect::>(); + if options.include_sqlite { + let mut sqlite_paths = read_dir_recursive(DATAFUSION_TESTING_TEST_DIRECTORY)? .into_iter() .map(TestFile::new) .filter(|f| options.check_test_file(&f.relative_path)) .filter(|f| f.is_slt_file()) - .filter(|f| f.check_tpch(options)) - .filter(|f| options.check_pg_compat_file(f.path.as_path())), - )) + .filter(|f| f.check_sqlite(options)) + .filter(|f| options.check_pg_compat_file(f.path.as_path())) + .collect::>(); + + paths.append(&mut sqlite_paths) + } + + Ok(Box::new(paths.into_iter())) } fn read_dir_recursive>(path: P) -> Result> { @@ -350,7 +650,7 @@ fn read_dir_recursive_impl(dst: &mut Vec, path: &Path) -> Result<()> { /// Parsed command line options /// -/// This structure attempts to mimic the command line options of the built in rust test runner +/// This structure attempts to mimic the command line options of the built-in rust test runner /// accepted by IDEs such as CLion that pass arguments /// /// See for more details @@ -367,6 +667,9 @@ struct Options { )] postgres_runner: bool, + #[clap(long, env = "INCLUDE_SQLITE", help = "Include sqlite files")] + include_sqlite: bool, + #[clap(long, env = "INCLUDE_TPCH", help = "Include tpch files")] include_tpch: bool, @@ -431,10 +734,13 @@ impl Options { .any(|filter| relative_path.to_string_lossy().contains(filter)) } - /// Postgres runner executes only tests in files with specific names + /// Postgres runner executes only tests in files with specific names or in + /// specific folders fn check_pg_compat_file(&self, path: &Path) -> bool { let file_name = path.file_name().unwrap().to_str().unwrap().to_string(); - !self.postgres_runner || file_name.starts_with(PG_COMPAT_FILE_PREFIX) + !self.postgres_runner + || file_name.starts_with(PG_COMPAT_FILE_PREFIX) + || (self.include_sqlite && path.to_string_lossy().contains(SQLITE_PREFIX)) } /// Logs warning messages to stdout if any ignored options are passed @@ -452,3 +758,87 @@ impl Options { } } } + +#[cfg(feature = "postgres")] +pub async fn start_postgres( + in_channel: &Channel, + host_channel: &Channel, + port_channel: &Channel, + stopped_channel: &Channel<()>, +) { + info!("Starting postgres test container with user postgres/postgres and db test"); + + let container = testcontainers_modules::postgres::Postgres::default() + .with_user("postgres") + .with_password("postgres") + .with_db_name("test") + .with_mapped_port(16432, 5432.tcp()) + .with_tag("17-alpine") + .start() + .await + .unwrap(); + // uncomment this if you are running docker in docker + // let host = "host.docker.internal".to_string(); + let host = container.get_host().await.unwrap().to_string(); + let port = container.get_host_port_ipv4(5432).await.unwrap(); + + let mut rx = in_channel.rx.lock().await; + while let Some(command) = rx.recv().await { + match command { + FetchHost => host_channel.tx.send(host.clone()).unwrap(), + FetchPort => port_channel.tx.send(port).unwrap(), + ContainerCommands::Stop => { + container.stop().await.unwrap(); + stopped_channel.tx.send(()).unwrap(); + rx.close(); + } + } + } +} + +#[cfg(feature = "postgres")] +#[derive(Debug)] +pub enum ContainerCommands { + FetchHost, + FetchPort, + Stop, +} + +#[cfg(feature = "postgres")] +pub struct Channel { + pub tx: UnboundedSender, + pub rx: Mutex>, +} + +#[cfg(feature = "postgres")] +pub fn channel() -> Channel { + let (tx, rx) = mpsc::unbounded_channel(); + Channel { + tx, + rx: Mutex::new(rx), + } +} + +#[cfg(feature = "postgres")] +pub fn execute_blocking(f: F) { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(f); +} + +#[cfg(feature = "postgres")] +pub struct HostPort { + pub host: String, + pub port: u16, +} + +#[cfg(feature = "postgres")] +static POSTGRES_IN: Lazy> = Lazy::new(channel); +#[cfg(feature = "postgres")] +static POSTGRES_HOST: Lazy> = Lazy::new(channel); +#[cfg(feature = "postgres")] +static POSTGRES_PORT: Lazy> = Lazy::new(channel); +#[cfg(feature = "postgres")] +static POSTGRES_STOPPED: Lazy> = Lazy::new(channel); diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs index ced497de22a75..58400280072c1 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs @@ -239,6 +239,10 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result { let key = dict.normalized_keys()[row]; Ok(cell_to_string(dict.values(), key)?) } + // only added because of a bug in v 1.0.4 (is) of lexical-write-integer + DataType::Int64 => { + Ok(format!("{}", get_row_value!(array::Int64Array, col, row))) + } _ => { let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS); Ok(f.unwrap().value(row).to_string()) diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs index 5c24b49cfe868..e696058484a9c 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs @@ -18,26 +18,49 @@ use std::sync::Arc; use std::{path::PathBuf, time::Duration}; +use super::{error::Result, normalize, DFSqlLogicTestError}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::physical_plan::common::collect; use datafusion::physical_plan::execute_stream; use datafusion::prelude::SessionContext; -use log::info; +use indicatif::ProgressBar; +use log::Level::{Debug, Info}; +use log::{debug, log_enabled, warn}; use sqllogictest::DBOutput; - -use super::{error::Result, normalize, DFSqlLogicTestError}; +use tokio::time::Instant; use crate::engines::output::{DFColumnType, DFOutput}; pub struct DataFusion { ctx: SessionContext, relative_path: PathBuf, + pb: ProgressBar, } impl DataFusion { - pub fn new(ctx: SessionContext, relative_path: PathBuf) -> Self { - Self { ctx, relative_path } + pub fn new(ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar) -> Self { + Self { + ctx, + relative_path, + pb, + } + } + + fn update_slow_count(&self) { + let msg = self.pb.message(); + let split: Vec<&str> = msg.split(" ").collect(); + let mut current_count = 0; + + if split.len() > 2 { + // third match will be current slow count + current_count = split[2].parse::().unwrap(); + } + + current_count += 1; + + self.pb + .set_message(format!("{} - {} took > 500 ms", split[0], current_count)); } } @@ -47,12 +70,32 @@ impl sqllogictest::AsyncDB for DataFusion { type ColumnType = DFColumnType; async fn run(&mut self, sql: &str) -> Result { - info!( - "[{}] Running query: \"{}\"", - self.relative_path.display(), - sql - ); - run_query(&self.ctx, sql).await + if log_enabled!(Debug) { + debug!( + "[{}] Running query: \"{}\"", + self.relative_path.display(), + sql + ); + } + + let start = Instant::now(); + let result = run_query(&self.ctx, sql).await; + let duration = start.elapsed(); + + if duration.gt(&Duration::from_millis(500)) { + self.update_slow_count(); + } + + self.pb.inc(1); + + if log_enabled!(Info) && duration.gt(&Duration::from_secs(2)) { + warn!( + "[{}] Running query took more than 2 sec ({duration:?}): \"{sql}\"", + self.relative_path.display() + ); + } + + result } /// Engine name of current database. diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index a490488cd764f..1439695d62c6b 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -15,22 +15,24 @@ // specific language governing permissions and limitations // under the License. -/// Postgres engine implementation for sqllogictest. -use std::path::{Path, PathBuf}; -use std::str::FromStr; - use async_trait::async_trait; use bytes::Bytes; use futures::{SinkExt, StreamExt}; -use log::debug; +use log::{debug, info}; use sqllogictest::DBOutput; +/// Postgres engine implementation for sqllogictest. +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::time::Duration; use tokio::task::JoinHandle; use super::conversion::*; use crate::engines::output::{DFColumnType, DFOutput}; use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; +use indicatif::ProgressBar; use postgres_types::Type; use rust_decimal::Decimal; +use tokio::time::Instant; use tokio_postgres::{Column, Row}; use types::PgRegtype; @@ -55,6 +57,7 @@ pub struct Postgres { join_handle: JoinHandle<()>, /// Relative test file path relative_path: PathBuf, + pb: ProgressBar, } impl Postgres { @@ -71,11 +74,11 @@ impl Postgres { /// ``` /// /// See https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url for format - pub async fn connect(relative_path: PathBuf) -> Result { + pub async fn connect(relative_path: PathBuf, pb: ProgressBar) -> Result { let uri = std::env::var("PG_URI").map_or(PG_URI.to_string(), std::convert::identity); - debug!("Using postgres connection string: {uri}"); + info!("Using postgres connection string: {uri}"); let config = tokio_postgres::Config::from_str(&uri)?; @@ -113,6 +116,7 @@ impl Postgres { client, join_handle, relative_path, + pb, }) } @@ -181,6 +185,22 @@ impl Postgres { tx.commit().await?; Ok(DBOutput::StatementComplete(0)) } + + fn update_slow_count(&self) { + let msg = self.pb.message(); + let split: Vec<&str> = msg.split(" ").collect(); + let mut current_count = 0; + + if split.len() > 2 { + // second match will be current slow count + current_count += split[2].parse::().unwrap(); + } + + current_count += 1; + + self.pb + .set_message(format!("{} - {} took > 500 ms", split[0], current_count)); + } } /// remove single quotes from the start and end of the string @@ -194,16 +214,13 @@ fn no_quotes(t: &str) -> &str { /// return a schema name fn schema_name(relative_path: &Path) -> String { relative_path - .file_name() - .map(|name| { - name.to_string_lossy() - .chars() - .filter(|ch| ch.is_ascii_alphabetic()) - .collect::() - .trim_start_matches("pg_") - .to_string() - }) - .unwrap_or_else(|| "default_schema".to_string()) + .to_string_lossy() + .to_string() + .chars() + .filter(|ch| ch.is_ascii_alphanumeric()) + .collect::() + .trim_start_matches("pg_") + .to_string() } impl Drop for Postgres { @@ -221,7 +238,7 @@ impl sqllogictest::AsyncDB for Postgres { &mut self, sql: &str, ) -> Result, Self::Error> { - println!( + debug!( "[{}] Running query: \"{}\"", self.relative_path.display(), sql @@ -242,14 +259,24 @@ impl sqllogictest::AsyncDB for Postgres { }; if lower_sql.starts_with("copy") { + self.pb.inc(1); return self.run_copy_command(sql).await; } if !is_query_sql { self.client.execute(sql, &[]).await?; + self.pb.inc(1); return Ok(DBOutput::StatementComplete(0)); } + let start = Instant::now(); let rows = self.client.query(sql, &[]).await?; + let duration = start.elapsed(); + + if duration.gt(&Duration::from_millis(500)) { + self.update_slow_count(); + } + + self.pb.inc(1); let types: Vec = if rows.is_empty() { self.client From 95ab1a684347ccd67513ee30fc96c45336f4fe25 Mon Sep 17 00:00:00 2001 From: Ian Lai <108986288+Chen-Yuan-Lai@users.noreply.github.com> Date: Sat, 28 Dec 2024 22:16:03 +0800 Subject: [PATCH 3/5] doc-gen: migrate scalar functions (string) documentation 3/4 (#13926) Co-authored-by: Cheng-Yuan-Lai --- datafusion/functions/src/string/repeat.rs | 49 ++++++++----------- datafusion/functions/src/string/replace.rs | 49 +++++++++---------- datafusion/functions/src/string/split_part.rs | 45 +++++++---------- .../functions/src/string/starts_with.rs | 46 +++++++---------- datafusion/functions/src/string/to_hex.rs | 45 +++++++---------- datafusion/functions/src/string/uuid.rs | 39 ++++++--------- .../functions/src/unicode/find_in_set.rs | 46 ++++++++--------- datafusion/functions/src/unicode/reverse.rs | 44 +++++++---------- 8 files changed, 152 insertions(+), 211 deletions(-) diff --git a/datafusion/functions/src/string/repeat.rs b/datafusion/functions/src/string/repeat.rs index 044b3549243ba..d5ebf902c1105 100644 --- a/datafusion/functions/src/string/repeat.rs +++ b/datafusion/functions/src/string/repeat.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use crate::strings::StringArrayType; use crate::utils::{make_scalar_function, utf8_to_str_type}; @@ -29,11 +29,29 @@ use arrow::datatypes::DataType::{LargeUtf8, Utf8, Utf8View}; use datafusion_common::cast::as_int64_array; use datafusion_common::types::{logical_int64, logical_string}; use datafusion_common::{exec_err, Result}; -use datafusion_expr::scalar_doc_sections::DOC_SECTION_STRING; use datafusion_expr::{ColumnarValue, Documentation, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; use datafusion_expr_common::signature::TypeSignatureClass; +use datafusion_macros::user_doc; +#[user_doc( + doc_section(label = "String Functions"), + description = "Returns a string with an input string repeated a specified number.", + syntax_example = "repeat(str, n)", + sql_example = r#"```sql +> select repeat('data', 3); ++-------------------------------+ +| repeat(Utf8("data"),Int64(3)) | ++-------------------------------+ +| datadatadata | ++-------------------------------+ +```"#, + standard_argument(name = "str", prefix = "String"), + argument( + name = "n", + description = "Number of times to repeat the input string." + ) +)] #[derive(Debug)] pub struct RepeatFunc { signature: Signature, @@ -85,35 +103,10 @@ impl ScalarUDFImpl for RepeatFunc { } fn documentation(&self) -> Option<&Documentation> { - Some(get_repeat_doc()) + self.doc() } } -static DOCUMENTATION: OnceLock = OnceLock::new(); - -fn get_repeat_doc() -> &'static Documentation { - DOCUMENTATION.get_or_init(|| { - Documentation::builder( - DOC_SECTION_STRING, - "Returns a string with an input string repeated a specified number.", - "repeat(str, n)", - ) - .with_sql_example( - r#"```sql -> select repeat('data', 3); -+-------------------------------+ -| repeat(Utf8("data"),Int64(3)) | -+-------------------------------+ -| datadatadata | -+-------------------------------+ -```"#, - ) - .with_standard_argument("str", Some("String")) - .with_argument("n", "Number of times to repeat the input string.") - .build() - }) -} - /// Repeats string the specified number of times. /// repeat('Pg', 4) = 'PgPgPgPg' fn repeat(args: &[ArrayRef]) -> Result { diff --git a/datafusion/functions/src/string/replace.rs b/datafusion/functions/src/string/replace.rs index 9b71d3871ea84..9b6afc5469941 100644 --- a/datafusion/functions/src/string/replace.rs +++ b/datafusion/functions/src/string/replace.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait, StringArray}; use arrow::datatypes::DataType; @@ -24,10 +24,28 @@ use arrow::datatypes::DataType; use crate::utils::{make_scalar_function, utf8_to_str_type}; use datafusion_common::cast::{as_generic_string_array, as_string_view_array}; use datafusion_common::{exec_err, Result}; -use datafusion_expr::scalar_doc_sections::DOC_SECTION_STRING; use datafusion_expr::{ColumnarValue, Documentation, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; - +use datafusion_macros::user_doc; +#[user_doc( + doc_section(label = "String Functions"), + description = "Replaces all occurrences of a specified substring in a string with a new substring.", + syntax_example = "replace(str, substr, replacement)", + sql_example = r#"```sql +> select replace('ABabbaBA', 'ab', 'cd'); ++-------------------------------------------------+ +| replace(Utf8("ABabbaBA"),Utf8("ab"),Utf8("cd")) | ++-------------------------------------------------+ +| ABcdbaBA | ++-------------------------------------------------+ +```"#, + standard_argument(name = "str", prefix = "String"), + standard_argument( + name = "substr", + prefix = "Substring expression to replace in the input string. Substring" + ), + standard_argument(name = "replacement", prefix = "Replacement substring") +)] #[derive(Debug)] pub struct ReplaceFunc { signature: Signature, @@ -80,33 +98,10 @@ impl ScalarUDFImpl for ReplaceFunc { } fn documentation(&self) -> Option<&Documentation> { - Some(get_replace_doc()) + self.doc() } } -static DOCUMENTATION: OnceLock = OnceLock::new(); - -fn get_replace_doc() -> &'static Documentation { - DOCUMENTATION.get_or_init(|| { - Documentation::builder( - DOC_SECTION_STRING, - "Replaces all occurrences of a specified substring in a string with a new substring.", - "replace(str, substr, replacement)") - .with_sql_example(r#"```sql -> select replace('ABabbaBA', 'ab', 'cd'); -+-------------------------------------------------+ -| replace(Utf8("ABabbaBA"),Utf8("ab"),Utf8("cd")) | -+-------------------------------------------------+ -| ABcdbaBA | -+-------------------------------------------------+ -```"#) - .with_standard_argument("str", Some("String")) - .with_standard_argument("substr", Some("Substring expression to replace in the input string. Substring")) - .with_standard_argument("replacement", Some("Replacement substring")) - .build() - }) -} - fn replace_view(args: &[ArrayRef]) -> Result { let string_array = as_string_view_array(&args[0])?; let from_array = as_string_view_array(&args[1])?; diff --git a/datafusion/functions/src/string/split_part.rs b/datafusion/functions/src/string/split_part.rs index 40bdd3ad01b21..9a6ee726698b1 100644 --- a/datafusion/functions/src/string/split_part.rs +++ b/datafusion/functions/src/string/split_part.rs @@ -25,12 +25,28 @@ use arrow::datatypes::DataType; use datafusion_common::cast::as_int64_array; use datafusion_common::ScalarValue; use datafusion_common::{exec_err, DataFusionError, Result}; -use datafusion_expr::scalar_doc_sections::DOC_SECTION_STRING; use datafusion_expr::{ColumnarValue, Documentation, TypeSignature, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; +use datafusion_macros::user_doc; use std::any::Any; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; +#[user_doc( + doc_section(label = "String Functions"), + description = "Splits a string based on a specified delimiter and returns the substring in the specified position.", + syntax_example = "split_part(str, delimiter, pos)", + sql_example = r#"```sql +> select split_part('1.2.3.4.5', '.', 3); ++--------------------------------------------------+ +| split_part(Utf8("1.2.3.4.5"),Utf8("."),Int64(3)) | ++--------------------------------------------------+ +| 3 | ++--------------------------------------------------+ +```"#, + standard_argument(name = "str", prefix = "String"), + argument(name = "delimiter", description = "String or character to split on."), + argument(name = "pos", description = "Position of the part to return.") +)] #[derive(Debug)] pub struct SplitPartFunc { signature: Signature, @@ -182,33 +198,10 @@ impl ScalarUDFImpl for SplitPartFunc { } fn documentation(&self) -> Option<&Documentation> { - Some(get_split_part_doc()) + self.doc() } } -static DOCUMENTATION: OnceLock = OnceLock::new(); - -fn get_split_part_doc() -> &'static Documentation { - DOCUMENTATION.get_or_init(|| { - Documentation::builder( - DOC_SECTION_STRING, - "Splits a string based on a specified delimiter and returns the substring in the specified position.", - "split_part(str, delimiter, pos)") - .with_sql_example(r#"```sql -> select split_part('1.2.3.4.5', '.', 3); -+--------------------------------------------------+ -| split_part(Utf8("1.2.3.4.5"),Utf8("."),Int64(3)) | -+--------------------------------------------------+ -| 3 | -+--------------------------------------------------+ -```"#) - .with_standard_argument("str", Some("String")) - .with_argument("delimiter", "String or character to split on.") - .with_argument("pos", "Position of the part to return.") - .build() - }) -} - /// impl pub fn split_part_impl<'a, StringArrType, DelimiterArrType, StringArrayLen>( string_array: StringArrType, diff --git a/datafusion/functions/src/string/starts_with.rs b/datafusion/functions/src/string/starts_with.rs index 7354fda09584f..229982a9616a5 100644 --- a/datafusion/functions/src/string/starts_with.rs +++ b/datafusion/functions/src/string/starts_with.rs @@ -16,16 +16,16 @@ // under the License. use std::any::Any; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use arrow::array::ArrayRef; use arrow::datatypes::DataType; use crate::utils::make_scalar_function; use datafusion_common::{internal_err, Result}; -use datafusion_expr::scalar_doc_sections::DOC_SECTION_STRING; use datafusion_expr::{ColumnarValue, Documentation}; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; +use datafusion_macros::user_doc; /// Returns true if string starts with prefix. /// starts_with('alphabet', 'alph') = 't' @@ -34,6 +34,21 @@ pub fn starts_with(args: &[ArrayRef]) -> Result { Ok(Arc::new(result) as ArrayRef) } +#[user_doc( + doc_section(label = "String Functions"), + description = "Tests if a string starts with a substring.", + syntax_example = "starts_with(str, substr)", + sql_example = r#"```sql +> select starts_with('datafusion','data'); ++----------------------------------------------+ +| starts_with(Utf8("datafusion"),Utf8("data")) | ++----------------------------------------------+ +| true | ++----------------------------------------------+ +```"#, + standard_argument(name = "str", prefix = "String"), + argument(name = "substr", description = "Substring to test for.") +)] #[derive(Debug)] pub struct StartsWithFunc { signature: Signature, @@ -84,35 +99,10 @@ impl ScalarUDFImpl for StartsWithFunc { } fn documentation(&self) -> Option<&Documentation> { - Some(get_starts_with_doc()) + self.doc() } } -static DOCUMENTATION: OnceLock = OnceLock::new(); - -fn get_starts_with_doc() -> &'static Documentation { - DOCUMENTATION.get_or_init(|| { - Documentation::builder( - DOC_SECTION_STRING, - "Tests if a string starts with a substring.", - "starts_with(str, substr)", - ) - .with_sql_example( - r#"```sql -> select starts_with('datafusion','data'); -+----------------------------------------------+ -| starts_with(Utf8("datafusion"),Utf8("data")) | -+----------------------------------------------+ -| true | -+----------------------------------------------+ -```"#, - ) - .with_standard_argument("str", Some("String")) - .with_argument("substr", "Substring to test for.") - .build() - }) -} - #[cfg(test)] mod tests { use crate::utils::test::test_function; diff --git a/datafusion/functions/src/string/to_hex.rs b/datafusion/functions/src/string/to_hex.rs index 04907af14ade9..64654ef6ef106 100644 --- a/datafusion/functions/src/string/to_hex.rs +++ b/datafusion/functions/src/string/to_hex.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; use arrow::datatypes::{ @@ -27,9 +27,10 @@ use crate::utils::make_scalar_function; use datafusion_common::cast::as_primitive_array; use datafusion_common::Result; use datafusion_common::{exec_err, plan_err}; -use datafusion_expr::scalar_doc_sections::DOC_SECTION_STRING; + use datafusion_expr::{ColumnarValue, Documentation}; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; +use datafusion_macros::user_doc; /// Converts the number to its equivalent hexadecimal representation. /// to_hex(2147483647) = '7fffffff' @@ -59,6 +60,20 @@ where Ok(Arc::new(result) as ArrayRef) } +#[user_doc( + doc_section(label = "String Functions"), + description = "Converts an integer to a hexadecimal string.", + syntax_example = "to_hex(int)", + sql_example = r#"```sql +> select to_hex(12345689); ++-------------------------+ +| to_hex(Int64(12345689)) | ++-------------------------+ +| bc6159 | ++-------------------------+ +```"#, + standard_argument(name = "int", prefix = "Integer") +)] #[derive(Debug)] pub struct ToHexFunc { signature: Signature, @@ -116,34 +131,10 @@ impl ScalarUDFImpl for ToHexFunc { } fn documentation(&self) -> Option<&Documentation> { - Some(get_to_hex_doc()) + self.doc() } } -static DOCUMENTATION: OnceLock = OnceLock::new(); - -fn get_to_hex_doc() -> &'static Documentation { - DOCUMENTATION.get_or_init(|| { - Documentation::builder( - DOC_SECTION_STRING, - "Converts an integer to a hexadecimal string.", - "to_hex(int)", - ) - .with_sql_example( - r#"```sql -> select to_hex(12345689); -+-------------------------+ -| to_hex(Int64(12345689)) | -+-------------------------+ -| bc6159 | -+-------------------------+ -```"#, - ) - .with_standard_argument("int", Some("Integer")) - .build() - }) -} - #[cfg(test)] mod tests { use arrow::array::{Int32Array, StringArray}; diff --git a/datafusion/functions/src/string/uuid.rs b/datafusion/functions/src/string/uuid.rs index 6048a70bd8c57..f6d6a941068d6 100644 --- a/datafusion/functions/src/string/uuid.rs +++ b/datafusion/functions/src/string/uuid.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use arrow::array::GenericStringArray; use arrow::datatypes::DataType; @@ -24,10 +24,23 @@ use arrow::datatypes::DataType::Utf8; use uuid::Uuid; use datafusion_common::{internal_err, Result}; -use datafusion_expr::scalar_doc_sections::DOC_SECTION_STRING; use datafusion_expr::{ColumnarValue, Documentation, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; +use datafusion_macros::user_doc; +#[user_doc( + doc_section(label = "String Functions"), + description = "Returns [`UUID v4`](https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_(random)) string value which is unique per row.", + syntax_example = "uuid()", + sql_example = r#"```sql +> select uuid(); ++--------------------------------------+ +| uuid() | ++--------------------------------------+ +| 6ec17ef8-1934-41cc-8d59-d0c8f9eea1f0 | ++--------------------------------------+ +```"# +)] #[derive(Debug)] pub struct UuidFunc { signature: Signature, @@ -80,26 +93,6 @@ impl ScalarUDFImpl for UuidFunc { } fn documentation(&self) -> Option<&Documentation> { - Some(get_uuid_doc()) + self.doc() } } - -static DOCUMENTATION: OnceLock = OnceLock::new(); - -fn get_uuid_doc() -> &'static Documentation { - DOCUMENTATION.get_or_init(|| { - Documentation::builder( - DOC_SECTION_STRING, - "Returns [`UUID v4`](https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_(random)) string value which is unique per row.", - "uuid()") - .with_sql_example(r#"```sql -> select uuid(); -+--------------------------------------+ -| uuid() | -+--------------------------------------+ -| 6ec17ef8-1934-41cc-8d59-d0c8f9eea1f0 | -+--------------------------------------+ -```"#) - .build() - }) -} diff --git a/datafusion/functions/src/unicode/find_in_set.rs b/datafusion/functions/src/unicode/find_in_set.rs index 38efb408c1d3a..c4d9b51f60327 100644 --- a/datafusion/functions/src/unicode/find_in_set.rs +++ b/datafusion/functions/src/unicode/find_in_set.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use arrow::array::{ ArrayAccessor, ArrayIter, ArrayRef, ArrowPrimitiveType, AsArray, OffsetSizeTrait, @@ -26,12 +26,30 @@ use arrow::datatypes::{ArrowNativeType, DataType, Int32Type, Int64Type}; use crate::utils::{make_scalar_function, utf8_to_int_type}; use datafusion_common::{exec_err, Result}; -use datafusion_expr::scalar_doc_sections::DOC_SECTION_STRING; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; +use datafusion_macros::user_doc; +#[user_doc( + doc_section(label = "String Functions"), + description = "Returns a value in the range of 1 to N if the string str is in the string list strlist consisting of N substrings.", + syntax_example = "find_in_set(str, strlist)", + sql_example = r#"```sql +> select find_in_set('b', 'a,b,c,d'); ++----------------------------------------+ +| find_in_set(Utf8("b"),Utf8("a,b,c,d")) | ++----------------------------------------+ +| 2 | ++----------------------------------------+ +```"#, + argument(name = "str", description = "String expression to find in strlist."), + argument( + name = "strlist", + description = "A string list is a string composed of substrings separated by , characters." + ) +)] #[derive(Debug)] pub struct FindInSetFunc { signature: Signature, @@ -85,32 +103,10 @@ impl ScalarUDFImpl for FindInSetFunc { } fn documentation(&self) -> Option<&Documentation> { - Some(get_find_in_set_doc()) + self.doc() } } -static DOCUMENTATION: OnceLock = OnceLock::new(); - -fn get_find_in_set_doc() -> &'static Documentation { - DOCUMENTATION.get_or_init(|| { - Documentation::builder( - DOC_SECTION_STRING, - "Returns a value in the range of 1 to N if the string str is in the string list strlist consisting of N substrings.", - "find_in_set(str, strlist)") - .with_sql_example(r#"```sql -> select find_in_set('b', 'a,b,c,d'); -+----------------------------------------+ -| find_in_set(Utf8("b"),Utf8("a,b,c,d")) | -+----------------------------------------+ -| 2 | -+----------------------------------------+ -```"#) - .with_argument("str", "String expression to find in strlist.") - .with_argument("strlist", "A string list is a string composed of substrings separated by , characters.") - .build() - }) -} - ///Returns a value in the range of 1 to N if the string str is in the string list strlist consisting of N substrings ///A string list is a string composed of substrings separated by , characters. fn find_in_set(args: &[ArrayRef]) -> Result { diff --git a/datafusion/functions/src/unicode/reverse.rs b/datafusion/functions/src/unicode/reverse.rs index 8e3cf8845f981..5ad347ed96c08 100644 --- a/datafusion/functions/src/unicode/reverse.rs +++ b/datafusion/functions/src/unicode/reverse.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use crate::utils::{make_scalar_function, utf8_to_str_type}; use arrow::array::{ @@ -25,12 +25,26 @@ use arrow::array::{ }; use arrow::datatypes::DataType; use datafusion_common::{exec_err, Result}; -use datafusion_expr::scalar_doc_sections::DOC_SECTION_STRING; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; +use datafusion_macros::user_doc; use DataType::{LargeUtf8, Utf8, Utf8View}; +#[user_doc( + doc_section(label = "String Functions"), + description = "Reverses the character order of a string.", + syntax_example = "reverse(str)", + sql_example = r#"```sql +> select reverse('datafusion'); ++-----------------------------+ +| reverse(Utf8("datafusion")) | ++-----------------------------+ +| noisufatad | ++-----------------------------+ +```"#, + standard_argument(name = "str", prefix = "String") +)] #[derive(Debug)] pub struct ReverseFunc { signature: Signature, @@ -87,34 +101,10 @@ impl ScalarUDFImpl for ReverseFunc { } fn documentation(&self) -> Option<&Documentation> { - Some(get_reverse_doc()) + self.doc() } } -static DOCUMENTATION: OnceLock = OnceLock::new(); - -fn get_reverse_doc() -> &'static Documentation { - DOCUMENTATION.get_or_init(|| { - Documentation::builder( - DOC_SECTION_STRING, - "Reverses the character order of a string.", - "reverse(str)", - ) - .with_sql_example( - r#"```sql -> select reverse('datafusion'); -+-----------------------------+ -| reverse(Utf8("datafusion")) | -+-----------------------------+ -| noisufatad | -+-----------------------------+ -```"#, - ) - .with_standard_argument("str", Some("String")) - .build() - }) -} - /// Reverses the order of the characters in the string. /// reverse('abcde') = 'edcba' /// The implementation uses UTF-8 code points as characters From b21b5a63c732b8d7a95b2a5ba2e7c3dfb971c1b1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 28 Dec 2024 09:37:36 -0500 Subject: [PATCH 4/5] Update sqllogictest requirement from 0.24.0 to 0.25.0 (#13917) * Update sqllogictest requirement from 0.24.0 to 0.25.0 Updates the requirements on [sqllogictest](https://github.com/risinglightdb/sqllogictest-rs) to permit the latest version. - [Release notes](https://github.com/risinglightdb/sqllogictest-rs/releases) - [Changelog](https://github.com/risinglightdb/sqllogictest-rs/blob/main/CHANGELOG.md) - [Commits](https://github.com/risinglightdb/sqllogictest-rs/compare/v0.24.0...v0.25.0) --- updated-dependencies: - dependency-name: sqllogictest dependency-type: direct:production ... Signed-off-by: dependabot[bot] * Remove labels --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: jonahgao --- datafusion/sqllogictest/Cargo.toml | 2 +- datafusion/sqllogictest/test_files/group_by.slt | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 100b925dae41f..88afdfceb4126 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -53,7 +53,7 @@ once_cell = { version = "1.20", optional = true } postgres-protocol = { version = "0.6.7", optional = true } postgres-types = { version = "0.2.8", features = ["derive", "with-chrono-0_4"], optional = true } rust_decimal = { version = "1.36.0", features = ["tokio-pg"] } -sqllogictest = "0.24.0" +sqllogictest = "0.25.0" sqlparser = { workspace = true } tempfile = { workspace = true } testcontainers = { version = "0.23", features = ["default"], optional = true } diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index df7e21c2da447..bb375ad0cdff2 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -80,7 +80,7 @@ SELECT col1 * cor0.col1 * 56 AS col1 FROM tab2 AS cor0 GROUP BY cor0.col1 208376 94136 -query I rowsort label-4 +query I rowsort SELECT ALL + tab2.col1 / tab2.col1 FROM tab2 GROUP BY col1 ---- 1 @@ -442,7 +442,7 @@ SELECT DISTINCT * FROM tab2 AS cor0 GROUP BY cor0.col0, col1, cor0.col2 91 59 79 92 41 58 -query I rowsort label-58 +query I rowsort SELECT 9 / + cor0.col0 AS col1 FROM tab0 AS cor0 GROUP BY cor0.col0, cor0.col2 ---- 0 @@ -470,7 +470,7 @@ SELECT ( col0 ) FROM tab1 AS cor0 GROUP BY cor0.col0 28 82 -query I rowsort label-62 +query I rowsort SELECT ALL 59 / 26 FROM tab2 AS cor0 GROUP BY cor0.col0 ---- 2 @@ -558,7 +558,7 @@ SELECT DISTINCT ( - 31 ) col1 FROM tab1 GROUP BY tab1.col0 ---- -31 -query I rowsort label-75 +query I rowsort SELECT + + cor0.col0 / - cor0.col0 FROM tab1, tab0 AS cor0 GROUP BY cor0.col0 ---- -1 @@ -767,7 +767,7 @@ SELECT cor0.col2 AS col2 FROM tab0 AS cor0 GROUP BY cor0.col2 38 79 -query I rowsort label-106 +query I rowsort SELECT - 53 / cor0.col0 col0 FROM tab1 cor0 GROUP BY cor0.col0 ---- -1 @@ -1326,7 +1326,7 @@ SELECT + ( col0 ) * col0 AS col2 FROM tab2 AS cor0 GROUP BY cor0.col0 8281 8464 -query I rowsort label-188 +query I rowsort SELECT - 21 - + 57 / cor0.col0 FROM tab0 AS cor0 GROUP BY cor0.col0 ---- -21 @@ -1429,7 +1429,7 @@ SELECT cor0.col2 FROM tab2 AS cor0 GROUP BY cor0.col2 79 87 -query I rowsort label-203 +query I rowsort SELECT - cor0.col2 + CAST ( 80 AS INTEGER ) FROM tab1 AS cor0 GROUP BY col2 ---- 35 @@ -1482,7 +1482,7 @@ SELECT DISTINCT + 45 col0 FROM tab1 AS cor0 GROUP BY col0 ---- 45 -query I rowsort label-211 +query I rowsort SELECT ALL CAST ( NULL AS INTEGER ) FROM tab2 AS cor0 GROUP BY col1 ---- NULL From d089a53caa2a09dfe46cdb8f5ebc869bd6566a7b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 28 Dec 2024 09:44:07 -0500 Subject: [PATCH 5/5] Consolidate Examples: memtable.rs and parquet_multiple_files.rs (#13913) --- datafusion-examples/README.md | 3 +- datafusion-examples/examples/memtable.rs | 74 --------------- ...uet_sql_multiple_files.rs => sql_query.rs} | 94 ++++++++++++++++--- 3 files changed, 83 insertions(+), 88 deletions(-) delete mode 100644 datafusion-examples/examples/memtable.rs rename datafusion-examples/examples/{parquet_sql_multiple_files.rs => sql_query.rs} (54%) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index b06148ce267f1..3ec008a6026d8 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -64,10 +64,8 @@ cargo run --example dataframe - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros - [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function -- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es - [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates - [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries -- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files - [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution - [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`. - [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan` @@ -83,6 +81,7 @@ cargo run --example dataframe - [`sql_analysis.rs`](examples/sql_analysis.rs): Analyse SQL queries with DataFusion structures - [`sql_frontend.rs`](examples/sql_frontend.rs): Create LogicalPlans (only) from sql strings - [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser` +- [`sql_query.rs`](examples/memtable.rs): Query data using SQL (in memory `RecordBatch`es, local Parquet files)q - [`to_char.rs`](examples/to_char.rs): Examples of using the to_char function - [`to_timestamp.rs`](examples/to_timestamp.rs): Examples of using to_timestamp functions diff --git a/datafusion-examples/examples/memtable.rs b/datafusion-examples/examples/memtable.rs deleted file mode 100644 index bb0b720eff79a..0000000000000 --- a/datafusion-examples/examples/memtable.rs +++ /dev/null @@ -1,74 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion::arrow::array::{UInt64Array, UInt8Array}; -use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::datasource::MemTable; -use datafusion::error::Result; -use datafusion::prelude::SessionContext; -use std::sync::Arc; -use std::time::Duration; -use tokio::time::timeout; - -/// This example demonstrates executing a simple query against a [`MemTable`] -#[tokio::main] -async fn main() -> Result<()> { - let mem_table = create_memtable()?; - - // create local execution context - let ctx = SessionContext::new(); - - // Register the in-memory table containing the data - ctx.register_table("users", Arc::new(mem_table))?; - - let dataframe = ctx.sql("SELECT * FROM users;").await?; - - timeout(Duration::from_secs(10), async move { - let result = dataframe.collect().await.unwrap(); - let record_batch = result.first().unwrap(); - - assert_eq!(1, record_batch.column(0).len()); - dbg!(record_batch.columns()); - }) - .await - .unwrap(); - - Ok(()) -} - -fn create_memtable() -> Result { - MemTable::try_new(get_schema(), vec![vec![create_record_batch()?]]) -} - -fn create_record_batch() -> Result { - let id_array = UInt8Array::from(vec![1]); - let account_array = UInt64Array::from(vec![9000]); - - Ok(RecordBatch::try_new( - get_schema(), - vec![Arc::new(id_array), Arc::new(account_array)], - ) - .unwrap()) -} - -fn get_schema() -> SchemaRef { - SchemaRef::new(Schema::new(vec![ - Field::new("id", DataType::UInt8, false), - Field::new("bank_account", DataType::UInt64, true), - ])) -} diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/sql_query.rs similarity index 54% rename from datafusion-examples/examples/parquet_sql_multiple_files.rs rename to datafusion-examples/examples/sql_query.rs index b0d3922a32789..f6d3936568cc6 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/sql_query.rs @@ -15,21 +15,90 @@ // specific language governing permissions and limitations // under the License. +use datafusion::arrow::array::{UInt64Array, UInt8Array}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::MemTable; +use datafusion::error::{DataFusionError, Result}; +use datafusion::prelude::SessionContext; +use datafusion_common::exec_datafusion_err; +use object_store::local::LocalFileSystem; use std::path::Path; use std::sync::Arc; +use std::time::Duration; +use tokio::time::timeout; + +/// Examples of various ways to execute queries using SQL +/// +/// [`query_memtable`]: a simple query against a [`MemTable`] +/// [`query_parquet`]: a simple query against a directory with multiple Parquet files +/// +#[tokio::main] +async fn main() -> Result<()> { + query_memtable().await?; + query_parquet().await?; + Ok(()) +} -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::listing::ListingOptions; -use datafusion::prelude::*; +/// Run a simple query against a [`MemTable`] +pub async fn query_memtable() -> Result<()> { + let mem_table = create_memtable()?; -use object_store::local::LocalFileSystem; + // create local execution context + let ctx = SessionContext::new(); -/// This example demonstrates executing a simple query against an Arrow data source (a directory -/// with multiple Parquet files) and fetching results. The query is run twice, once showing -/// how to used `register_listing_table` with an absolute path, and once registering an -/// ObjectStore to use a relative path. -#[tokio::main] -async fn main() -> Result<(), Box> { + // Register the in-memory table containing the data + ctx.register_table("users", Arc::new(mem_table))?; + + let dataframe = ctx.sql("SELECT * FROM users;").await?; + + timeout(Duration::from_secs(10), async move { + let result = dataframe.collect().await.unwrap(); + let record_batch = result.first().unwrap(); + + assert_eq!(1, record_batch.column(0).len()); + dbg!(record_batch.columns()); + }) + .await + .unwrap(); + + Ok(()) +} + +fn create_memtable() -> Result { + MemTable::try_new(get_schema(), vec![vec![create_record_batch()?]]) +} + +fn create_record_batch() -> Result { + let id_array = UInt8Array::from(vec![1]); + let account_array = UInt64Array::from(vec![9000]); + + Ok(RecordBatch::try_new( + get_schema(), + vec![Arc::new(id_array), Arc::new(account_array)], + ) + .unwrap()) +} + +fn get_schema() -> SchemaRef { + SchemaRef::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, false), + Field::new("bank_account", DataType::UInt64, true), + ])) +} + +/// The simplest way to query parquet files is to use the +/// [`SessionContext::read_parquet`] API +/// +/// For more control, you can use the lower level [`ListingOptions`] and +/// [`ListingTable`] APIS +/// +/// This example shows how to use relative and absolute paths. +/// +/// [`ListingTable`]: datafusion::datasource::listing::ListingTable +async fn query_parquet() -> Result<()> { // create local execution context let ctx = SessionContext::new(); @@ -73,13 +142,14 @@ async fn main() -> Result<(), Box> { let test_data_path = Path::new(&test_data); let test_data_path_parent = test_data_path .parent() - .ok_or("test_data path needs a parent")?; + .ok_or(exec_datafusion_err!("test_data path needs a parent"))?; std::env::set_current_dir(test_data_path_parent)?; let local_fs = Arc::new(LocalFileSystem::default()); - let u = url::Url::parse("file://./")?; + let u = url::Url::parse("file://./") + .map_err(|e| DataFusionError::External(Box::new(e)))?; ctx.register_object_store(&u, local_fs); // Register a listing table - this will use all files in the directory as data sources