diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml index 2aba1085b8329..ec7f54ec24dbc 100644 --- a/.github/workflows/stale.yml +++ b/.github/workflows/stale.yml @@ -27,7 +27,7 @@ jobs: issues: write pull-requests: write steps: - - uses: actions/stale@997185467fa4f803885201cee163a9f38240193d # v10.1.1 + - uses: actions/stale@b5d41d4e1d5dceea10e7104786b73624c18a190f # v10.2.0 with: stale-pr-message: "Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days." days-before-pr-stale: 60 diff --git a/Cargo.lock b/Cargo.lock index c552835a2cb6f..d9661c23fffe1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,7 +160,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -171,7 +171,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -601,9 +601,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-config" -version = "1.8.13" +version = "1.8.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c456581cb3c77fafcc8c67204a70680d40b61112d6da78c77bd31d945b65f1b5" +checksum = "8a8fc176d53d6fe85017f230405e3255cedb4a02221cb55ed6d76dccbbb099b2" dependencies = [ "aws-credential-types", "aws-runtime", @@ -631,9 +631,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.11" +version = "1.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cd362783681b15d136480ad555a099e82ecd8e2d10a841e14dfd0078d67fee3" +checksum = "e26bbf46abc608f2dc61fd6cb3b7b0665497cc259a21520151ed98f8b37d2c79" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -665,9 +665,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c635c2dc792cb4a11ce1a4f392a925340d1bdf499289b5ec1ec6810954eb43f5" +checksum = "b0f92058d22a46adf53ec57a6a96f34447daf02bff52e8fb956c66bcd5c6ac12" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -678,6 +678,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", + "bytes-utils", "fastrand", "http 1.4.0", "http-body 1.0.1", @@ -689,9 +690,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.93.0" +version = "1.94.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dcb38bb33fc0a11f1ffc3e3e85669e0a11a37690b86f77e75306d8f369146a0" +checksum = "699da1961a289b23842d88fe2984c6ff68735fdf9bdcbc69ceaeb2491c9bf434" dependencies = [ "aws-credential-types", "aws-runtime", @@ -713,9 +714,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.95.0" +version = "1.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ada8ffbea7bd1be1f53df1dadb0f8fdb04badb13185b3321b929d1ee3caad09" +checksum = "e3e3a4cb3b124833eafea9afd1a6cc5f8ddf3efefffc6651ef76a03cbc6b4981" dependencies = [ "aws-credential-types", "aws-runtime", @@ -737,9 +738,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.97.0" +version = "1.98.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6443ccadc777095d5ed13e21f5c364878c9f5bad4e35187a6cdbd863b0afcad" +checksum = "89c4f19655ab0856375e169865c91264de965bd74c407c7f1e403184b1049409" dependencies = [ "aws-credential-types", "aws-runtime", @@ -762,9 +763,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.3.8" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efa49f3c607b92daae0c078d48a4571f599f966dce3caee5f1ea55c4d9073f99" +checksum = "68f6ae9b71597dc5fd115d52849d7a5556ad9265885ad3492ea8d73b93bbc46e" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -784,9 +785,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.11" +version = "1.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52eec3db979d18cb807fc1070961cc51d87d069abe9ab57917769687368a8c6c" +checksum = "5cc50d0f63e714784b84223abd7abbc8577de8c35d699e0edd19f0a88a08ae13" dependencies = [ "futures-util", "pin-project-lite", @@ -795,9 +796,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.63.3" +version = "0.63.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630e67f2a31094ffa51b210ae030855cb8f3b7ee1329bdd8d085aaf61e8b97fc" +checksum = "d619373d490ad70966994801bc126846afaa0d1ee920697a031f0cf63f2568e7" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -816,9 +817,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-client" -version = "1.1.9" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12fb0abf49ff0cab20fd31ac1215ed7ce0ea92286ba09e2854b42ba5cabe7525" +checksum = "00ccbb08c10f6bcf912f398188e42ee2eab5f1767ce215a02a73bc5df1bbdd95" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -840,27 +841,27 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.62.3" +version = "0.62.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cb96aa208d62ee94104645f7b2ecaf77bf27edf161590b6224bfbac2832f979" +checksum = "27b3a779093e18cad88bbae08dc4261e1d95018c4c5b9356a52bcae7c0b6e9bb" dependencies = [ "aws-smithy-types", ] [[package]] name = "aws-smithy-observability" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0a46543fbc94621080b3cf553eb4cbbdc41dd9780a30c4756400f0139440a1d" +checksum = "4d3f39d5bb871aaf461d59144557f16d5927a5248a983a40654d9cf3b9ba183b" dependencies = [ "aws-smithy-runtime-api", ] [[package]] name = "aws-smithy-query" -version = "0.60.13" +version = "0.60.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cebbddb6f3a5bd81553643e9c7daf3cc3dc5b0b5f398ac668630e8a84e6fff0" +checksum = "05f76a580e3d8f8961e5d48763214025a2af65c2fa4cd1fb7f270a0e107a71b0" dependencies = [ "aws-smithy-types", "urlencoding", @@ -868,9 +869,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.10.0" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3df87c14f0127a0d77eb261c3bc45d5b4833e2a1f63583ebfb728e4852134ee" +checksum = "22ccf7f6eba8b2dcf8ce9b74806c6c185659c311665c4bf8d6e71ebd454db6bf" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -893,9 +894,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.11.3" +version = "1.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49952c52f7eebb72ce2a754d3866cc0f87b97d2a46146b79f80f3a93fb2b3716" +checksum = "b4af6e5def28be846479bbeac55aa4603d6f7986fc5da4601ba324dd5d377516" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -910,9 +911,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.3" +version = "1.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b3a26048eeab0ddeba4b4f9d51654c79af8c3b32357dc5f336cee85ab331c33" +checksum = "8ca2734c16913a45343b37313605d84e7d8b34a4611598ce1d25b35860a2bed3" dependencies = [ "base64-simd", "bytes", @@ -933,18 +934,18 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.13" +version = "0.60.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11b2f670422ff42bf7065031e72b45bc52a3508bd089f743ea90731ca2b6ea57" +checksum = "b53543b4b86ed43f051644f704a98c7291b3618b67adf057ee77a366fa52fcaa" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "1.3.11" +version = "1.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d980627d2dd7bfc32a3c025685a033eeab8d365cc840c631ef59d1b8f428164" +checksum = "6c50f3cdf47caa8d01f2be4a6663ea02418e892f9bbfd82c7b9a3a37eaccdd3a" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -1324,9 +1325,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.57" +version = "4.5.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6899ea499e3fb9305a65d5ebf6e3d2248c5fab291f300ad0a704fbe142eae31a" +checksum = "c5caf74d17c3aec5495110c34cc3f78644bfa89af6c8993ed4de2790e49b6499" dependencies = [ "clap_builder", "clap_derive", @@ -1334,9 +1335,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.57" +version = "4.5.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b12c8b680195a62a8364d16b8447b01b6c2c8f9aaf68bee653be34d4245e238" +checksum = "370daa45065b80218950227371916a1633217ae42b2715b2287b606dcd618e24" dependencies = [ "anstream", "anstyle", @@ -1358,9 +1359,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.6" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" +checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" [[package]] name = "clipboard-win" @@ -2203,6 +2204,7 @@ dependencies = [ "datafusion-functions-window", "datafusion-physical-expr", "datafusion-physical-expr-common", + "datafusion-physical-optimizer", "datafusion-physical-plan", "datafusion-proto", "datafusion-proto-common", @@ -2746,7 +2748,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2856,9 +2858,9 @@ dependencies = [ [[package]] name = "env_filter" -version = "0.1.4" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bf3c259d255ca70051b30e2e95b5446cdb8949ac4cd22c0d7fd634d89f568e2" +checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" dependencies = [ "log", "regex", @@ -2866,9 +2868,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" +checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" dependencies = [ "anstream", "anstyle", @@ -2890,7 +2892,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -3223,6 +3225,19 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", + "wasip3", +] + [[package]] name = "glob" version = "0.3.3" @@ -3620,6 +3635,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + [[package]] name = "ident_case" version = "1.0.1" @@ -3672,9 +3693,9 @@ dependencies = [ [[package]] name = "indicatif" -version = "0.18.3" +version = "0.18.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9375e112e4b463ec1b1c6c011953545c65a30164fbab5b581df32b3abf0dcb88" +checksum = "25470f23803092da7d239834776d653104d551bc4d7eacaf31e6837854b8e9eb" dependencies = [ "console 0.16.2", "portable-atomic", @@ -3812,6 +3833,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "lexical-core" version = "1.0.6" @@ -3893,9 +3920,9 @@ dependencies = [ [[package]] name = "liblzma" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73c36d08cad03a3fbe2c4e7bb3a9e84c57e4ee4135ed0b065cade3d98480c648" +checksum = "b6033b77c21d1f56deeae8014eb9fbe7bdf1765185a6c508b5ca82eeaed7f899" dependencies = [ "liblzma-sys", ] @@ -4124,7 +4151,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -4238,9 +4265,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.12.4" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c1be0c6c22ec0817cdc77d3842f721a17fd30ab6965001415b5402a74e6b740" +checksum = "fbfbfff40aeccab00ec8a910b57ca8ecf4319b335c542f2edcd19dd25a1e2a00" dependencies = [ "async-trait", "base64 0.22.1", @@ -4743,7 +4770,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ "heck", - "itertools 0.14.0", + "itertools 0.13.0", "log", "multimap", "petgraph", @@ -4762,7 +4789,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.114", @@ -5231,7 +5258,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -5700,9 +5727,9 @@ dependencies = [ [[package]] name = "sqllogictest" -version = "0.29.0" +version = "0.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dffbf03091090a9330529c3926313be0a0570f036edfd490b11db39eea4b7118" +checksum = "d03b2262a244037b0b510edbd25a8e6c9fb8d73ee0237fc6cc95a54c16f94a82" dependencies = [ "async-trait", "educe", @@ -5908,9 +5935,9 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.38.1" +version = "0.38.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5792d209c2eac902426c0c4a166c9f72147db453af548cf9bf3242644c4d4fe3" +checksum = "1efc19935b4b66baa6f654ac7924c192f55b175c00a7ab72410fc24284dacda8" dependencies = [ "libc", "memchr", @@ -5930,7 +5957,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -6219,9 +6246,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a286e33f82f8a1ee2df63f4fa35c0becf4a85a0cb03091a15fd7bf0b402dc94a" +checksum = "7f32a6f80051a4111560201420c7885d0082ba9efe2ab61875c587bb6b18b9a0" dependencies = [ "async-trait", "axum", @@ -6500,6 +6527,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "unit-prefix" version = "0.5.2" @@ -6585,11 +6618,11 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.20.0" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f" +checksum = "b672338555252d43fd2240c714dc444b8c6fb0a5c5335e65a07bba7742735ddb" dependencies = [ - "getrandom 0.3.4", + "getrandom 0.4.1", "js-sys", "serde_core", "wasm-bindgen", @@ -6653,7 +6686,16 @@ version = "1.0.1+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.46.0", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", ] [[package]] @@ -6763,6 +6805,28 @@ version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8145dd1593bf0fb137dbfa85b8be79ec560a447298955877804640e40c2d6ea" +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap 2.13.0", + "wasm-encoder", + "wasmparser", +] + [[package]] name = "wasm-streams" version = "0.4.2" @@ -6776,6 +6840,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap 2.13.0", + "semver", +] + [[package]] name = "web-sys" version = "0.3.85" @@ -6838,7 +6914,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -7137,6 +7213,94 @@ version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap 2.13.0", + "prettyplease", + "syn 2.0.114", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.114", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap 2.13.0", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap 2.13.0", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "writeable" version = "0.6.2" diff --git a/Cargo.toml b/Cargo.toml index f19d0fbfa0eb4..2186632113511 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,11 +161,11 @@ hex = { version = "0.4.3" } indexmap = "2.13.0" insta = { version = "1.46.3", features = ["glob", "filters"] } itertools = "0.14" -liblzma = { version = "0.4.4", features = ["static"] } +liblzma = { version = "0.4.6", features = ["static"] } log = "^0.4" memchr = "2.8.0" num-traits = { version = "0.2" } -object_store = { version = "0.12.4", default-features = false } +object_store = { version = "0.12.5", default-features = false } parking_lot = "0.12" parquet = { version = "57.3.0", default-features = false, features = [ "arrow", @@ -192,7 +192,7 @@ tokio = { version = "1.48", features = ["macros", "rt", "sync"] } tokio-stream = "0.1" tokio-util = "0.7" url = "2.5.7" -uuid = "1.20" +uuid = "1.21" zstd = { version = "0.13", default-features = false } [workspace.lints.clippy] diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 877ea2eb8d2f1..a07be54948e86 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -40,7 +40,7 @@ mimalloc_extended = ["libmimalloc-sys/extended"] [dependencies] arrow = { workspace = true } -clap = { version = "4.5.57", features = ["derive"] } +clap = { version = "4.5.59", features = ["derive"] } datafusion = { workspace = true, default-features = true } datafusion-common = { workspace = true, default-features = true } env_logger = { workspace = true } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index df6dc13fd9b1b..c58b9d9061863 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -37,10 +37,10 @@ backtrace = ["datafusion/backtrace"] [dependencies] arrow = { workspace = true } async-trait = { workspace = true } -aws-config = "1.8.13" +aws-config = "1.8.14" aws-credential-types = "1.2.7" chrono = { workspace = true } -clap = { version = "4.5.57", features = ["cargo", "derive"] } +clap = { version = "4.5.59", features = ["cargo", "derive"] } datafusion = { workspace = true, features = [ "avro", "compression", diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index f31d4d52ce88b..c9b4e974c8994 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -486,7 +486,7 @@ pub trait TableProviderFactory: Debug + Sync + Send { } /// A trait for table function implementations -pub trait TableFunctionImpl: Debug + Sync + Send { +pub trait TableFunctionImpl: Debug + Sync + Send + Any { /// Create a table provider fn call(&self, args: &[Expr]) -> Result>; } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index dad12c1c6bc91..3b9507b1acf1a 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1256,7 +1256,7 @@ impl<'a> TryInto> for &'a FormatOptions } /// A key value pair, with a corresponding description -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct ConfigEntry { /// A unique string to identify this config value pub key: String, @@ -1352,6 +1352,8 @@ impl ConfigField for ConfigOptions { } } +pub const DATAFUSION_FFI_CONFIG_NAMESPACE: &str = "datafusion_ffi"; + impl ConfigOptions { /// Creates a new [`ConfigOptions`] with default values pub fn new() -> Self { @@ -1366,12 +1368,12 @@ impl ConfigOptions { /// Set a configuration option pub fn set(&mut self, key: &str, value: &str) -> Result<()> { - let Some((prefix, key)) = key.split_once('.') else { + let Some((mut prefix, mut inner_key)) = key.split_once('.') else { return _config_err!("could not find config namespace for key \"{key}\""); }; if prefix == "datafusion" { - if key == "optimizer.enable_dynamic_filter_pushdown" { + if inner_key == "optimizer.enable_dynamic_filter_pushdown" { let bool_value = value.parse::().map_err(|e| { DataFusionError::Configuration(format!( "Failed to parse '{value}' as bool: {e}", @@ -1386,13 +1388,23 @@ impl ConfigOptions { } return Ok(()); } - return ConfigField::set(self, key, value); + return ConfigField::set(self, inner_key, value); + } + + if !self.extensions.0.contains_key(prefix) + && self + .extensions + .0 + .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE) + { + inner_key = key; + prefix = DATAFUSION_FFI_CONFIG_NAMESPACE; } let Some(e) = self.extensions.0.get_mut(prefix) else { return _config_err!("Could not find config namespace \"{prefix}\""); }; - e.0.set(key, value) + e.0.set(inner_key, value) } /// Create new [`ConfigOptions`], taking values from environment variables @@ -2157,7 +2169,7 @@ impl TableOptions { /// /// A result indicating success or failure in setting the configuration option. pub fn set(&mut self, key: &str, value: &str) -> Result<()> { - let Some((prefix, _)) = key.split_once('.') else { + let Some((mut prefix, _)) = key.split_once('.') else { return _config_err!("could not find config namespace for key \"{key}\""); }; @@ -2169,6 +2181,15 @@ impl TableOptions { return Ok(()); } + if !self.extensions.0.contains_key(prefix) + && self + .extensions + .0 + .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE) + { + prefix = DATAFUSION_FFI_CONFIG_NAMESPACE; + } + let Some(e) = self.extensions.0.get_mut(prefix) else { return _config_err!("Could not find config namespace \"{prefix}\""); }; diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 3a97562c40e97..3d0a76a182697 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -182,7 +182,7 @@ recursive = { workspace = true } regex = { workspace = true } rstest = { workspace = true } serde_json = { workspace = true } -sysinfo = "0.38.1" +sysinfo = "0.38.2" test-utils = { path = "../../test-utils" } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot", "fs"] } diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index b23eede2a054e..30ad658d0d390 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -368,9 +368,9 @@ impl MemoryReservation { /// Frees all bytes from this reservation back to the underlying /// pool, returning the number of bytes freed. pub fn free(&self) -> usize { - let size = self.size.load(atomic::Ordering::Relaxed); + let size = self.size.swap(0, atomic::Ordering::Relaxed); if size != 0 { - self.shrink(size) + self.registration.pool.shrink(self, size); } size } @@ -381,26 +381,37 @@ impl MemoryReservation { /// /// Panics if `capacity` exceeds [`Self::size`] pub fn shrink(&self, capacity: usize) { - self.size.fetch_sub(capacity, atomic::Ordering::Relaxed); + self.size + .fetch_update( + atomic::Ordering::Relaxed, + atomic::Ordering::Relaxed, + |prev| prev.checked_sub(capacity), + ) + .expect("capacity exceeds reservation size"); self.registration.pool.shrink(self, capacity); } /// Tries to free `capacity` bytes from this reservation - /// if `capacity` does not exceed [`Self::size`] - /// Returns new reservation size - /// or error if shrinking capacity is more than allocated size + /// if `capacity` does not exceed [`Self::size`]. + /// Returns new reservation size, + /// or error if shrinking capacity is more than allocated size. pub fn try_shrink(&self, capacity: usize) -> Result { - let updated = self.size.fetch_update( - atomic::Ordering::Relaxed, - atomic::Ordering::Relaxed, - |prev| prev.checked_sub(capacity), - ); - updated.map_err(|_| { - let prev = self.size.load(atomic::Ordering::Relaxed); - internal_datafusion_err!( - "Cannot free the capacity {capacity} out of allocated size {prev}" + let prev = self + .size + .fetch_update( + atomic::Ordering::Relaxed, + atomic::Ordering::Relaxed, + |prev| prev.checked_sub(capacity), ) - }) + .map_err(|_| { + let prev = self.size.load(atomic::Ordering::Relaxed); + internal_datafusion_err!( + "Cannot free the capacity {capacity} out of allocated size {prev}" + ) + })?; + + self.registration.pool.shrink(self, capacity); + Ok(prev - capacity) } /// Sets the size of this reservation to `capacity` @@ -580,4 +591,37 @@ mod tests { assert_eq!(r2.size(), 25); assert_eq!(pool.reserved(), 28); } + + #[test] + fn test_try_shrink() { + let pool = Arc::new(GreedyMemoryPool::new(100)) as _; + let r1 = MemoryConsumer::new("r1").register(&pool); + + r1.try_grow(50).unwrap(); + assert_eq!(r1.size(), 50); + assert_eq!(pool.reserved(), 50); + + // Successful shrink returns new size and frees pool memory + let new_size = r1.try_shrink(30).unwrap(); + assert_eq!(new_size, 20); + assert_eq!(r1.size(), 20); + assert_eq!(pool.reserved(), 20); + + // Freed pool memory is now available to other consumers + let r2 = MemoryConsumer::new("r2").register(&pool); + r2.try_grow(80).unwrap(); + assert_eq!(pool.reserved(), 100); + + // Shrinking more than allocated fails without changing state + let err = r1.try_shrink(25); + assert!(err.is_err()); + assert_eq!(r1.size(), 20); + assert_eq!(pool.reserved(), 100); + + // Shrink to exactly zero + let new_size = r1.try_shrink(20).unwrap(); + assert_eq!(new_size, 0); + assert_eq!(r1.size(), 0); + assert_eq!(pool.reserved(), 80); + } } diff --git a/datafusion/expr-common/src/accumulator.rs b/datafusion/expr-common/src/accumulator.rs index 3acf110a0bfc7..59fb6a595206a 100644 --- a/datafusion/expr-common/src/accumulator.rs +++ b/datafusion/expr-common/src/accumulator.rs @@ -48,7 +48,7 @@ use std::fmt::Debug; /// [`evaluate`]: Self::evaluate /// [`merge_batch`]: Self::merge_batch /// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL) -pub trait Accumulator: Send + Sync + Debug { +pub trait Accumulator: Send + Sync + Debug + std::any::Any { /// Updates the accumulator's state from its input. /// /// `values` contains the arguments to this aggregate function. diff --git a/datafusion/expr-common/src/groups_accumulator.rs b/datafusion/expr-common/src/groups_accumulator.rs index 08c9f01f13c40..9053f7a8eab9f 100644 --- a/datafusion/expr-common/src/groups_accumulator.rs +++ b/datafusion/expr-common/src/groups_accumulator.rs @@ -108,7 +108,7 @@ impl EmitTo { /// /// [`Accumulator`]: crate::accumulator::Accumulator /// [Aggregating Millions of Groups Fast blog]: https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/ -pub trait GroupsAccumulator: Send { +pub trait GroupsAccumulator: Send + std::any::Any { /// Updates the accumulator's state from its arguments, encoded as /// a vector of [`ArrayRef`]s. /// diff --git a/datafusion/expr/src/partition_evaluator.rs b/datafusion/expr/src/partition_evaluator.rs index 0671f31f6d154..5a4e20e5ac9ac 100644 --- a/datafusion/expr/src/partition_evaluator.rs +++ b/datafusion/expr/src/partition_evaluator.rs @@ -90,7 +90,7 @@ use crate::window_state::WindowAggState; /// For more background, please also see the [User defined Window Functions in DataFusion blog] /// /// [User defined Window Functions in DataFusion blog]: https://datafusion.apache.org/blog/2025/04/19/user-defined-window-functions -pub trait PartitionEvaluator: Debug + Send { +pub trait PartitionEvaluator: Debug + Send + std::any::Any { /// When the window frame has a fixed beginning (e.g UNBOUNDED /// PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and /// NTH_VALUE do not need the (unbounded) input once they have diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index 28e1b2ee5681f..21264b59e05a3 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -61,6 +61,7 @@ datafusion-functions-table = { workspace = true, optional = true } datafusion-functions-window = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } +datafusion-physical-optimizer = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-proto = { workspace = true } datafusion-proto-common = { workspace = true } diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs index 61e26f1663532..ff588a89a71b3 100644 --- a/datafusion/ffi/src/catalog_provider.rs +++ b/datafusion/ffi/src/catalog_provider.rs @@ -250,6 +250,11 @@ impl FFI_CatalogProvider { runtime: Option, logical_codec: FFI_LogicalExtensionCodec, ) -> Self { + if let Some(provider) = provider.as_any().downcast_ref::() + { + return provider.0.clone(); + } + let private_data = Box::new(ProviderPrivateData { provider, runtime }); Self { diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs index 40f8be3871bb9..65574a7ac33de 100644 --- a/datafusion/ffi/src/catalog_provider_list.rs +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -212,6 +212,13 @@ impl FFI_CatalogProviderList { runtime: Option, logical_codec: FFI_LogicalExtensionCodec, ) -> Self { + if let Some(provider) = provider + .as_any() + .downcast_ref::() + { + return provider.0.clone(); + } + let private_data = Box::new(ProviderPrivateData { provider, runtime }); Self { diff --git a/datafusion/ffi/src/config/extension_options.rs b/datafusion/ffi/src/config/extension_options.rs new file mode 100644 index 0000000000000..48fd4e710921a --- /dev/null +++ b/datafusion/ffi/src/config/extension_options.rs @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::ffi::c_void; + +use abi_stable::StableAbi; +use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2}; +use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions}; +use datafusion_common::{Result, exec_err}; + +use crate::df_result; + +/// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries. +/// +/// Unlike other FFI structs in this crate, we do not construct a foreign +/// variant of this object. This is due to the typical method for interacting +/// with extension options is by creating a local struct of your concrete type. +/// To support this methodology use the `to_extension` method instead. +/// +/// When using [`FFI_ExtensionOptions`] with multiple extensions, all extension +/// values are stored on a single [`FFI_ExtensionOptions`] object. The keys +/// are stored with the full path prefix to avoid overwriting values when using +/// multiple extensions. +#[repr(C)] +#[derive(Debug, StableAbi)] +pub struct FFI_ExtensionOptions { + /// Return a deep clone of this [`ExtensionOptions`] + pub cloned: unsafe extern "C" fn(&Self) -> FFI_ExtensionOptions, + + /// Set the given `key`, `value` pair + pub set: + unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>, + + /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`] + pub entries: unsafe extern "C" fn(&Self) -> RVec>, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(&mut Self), + + /// Internal data. This is only to be accessed by the provider of the options. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_ExtensionOptions {} +unsafe impl Sync for FFI_ExtensionOptions {} + +pub struct ExtensionOptionsPrivateData { + pub options: HashMap, +} + +impl FFI_ExtensionOptions { + #[inline] + fn inner_mut(&mut self) -> &mut HashMap { + let private_data = self.private_data as *mut ExtensionOptionsPrivateData; + unsafe { &mut (*private_data).options } + } + + #[inline] + fn inner(&self) -> &HashMap { + let private_data = self.private_data as *const ExtensionOptionsPrivateData; + unsafe { &(*private_data).options } + } +} + +unsafe extern "C" fn cloned_fn_wrapper( + options: &FFI_ExtensionOptions, +) -> FFI_ExtensionOptions { + options + .inner() + .iter() + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect::>() + .into() +} + +unsafe extern "C" fn set_fn_wrapper( + options: &mut FFI_ExtensionOptions, + key: RStr, + value: RStr, +) -> RResult<(), RString> { + let _ = options.inner_mut().insert(key.into(), value.into()); + RResult::ROk(()) +} + +unsafe extern "C" fn entries_fn_wrapper( + options: &FFI_ExtensionOptions, +) -> RVec> { + options + .inner() + .iter() + .map(|(key, value)| (key.to_owned().into(), value.to_owned().into()).into()) + .collect() +} + +unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) { + unsafe { + debug_assert!(!options.private_data.is_null()); + let private_data = + Box::from_raw(options.private_data as *mut ExtensionOptionsPrivateData); + drop(private_data); + options.private_data = std::ptr::null_mut(); + } +} + +impl Default for FFI_ExtensionOptions { + fn default() -> Self { + HashMap::new().into() + } +} + +impl From> for FFI_ExtensionOptions { + fn from(options: HashMap) -> Self { + let private_data = ExtensionOptionsPrivateData { options }; + + Self { + cloned: cloned_fn_wrapper, + set: set_fn_wrapper, + entries: entries_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, + } + } +} + +impl Drop for FFI_ExtensionOptions { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl Clone for FFI_ExtensionOptions { + fn clone(&self) -> Self { + unsafe { (self.cloned)(self) } + } +} + +impl ConfigExtension for FFI_ExtensionOptions { + const PREFIX: &'static str = + datafusion_common::config::DATAFUSION_FFI_CONFIG_NAMESPACE; +} + +impl ExtensionOptions for FFI_ExtensionOptions { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn cloned(&self) -> Box { + let ffi_options = unsafe { (self.cloned)(self) }; + Box::new(ffi_options) + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + if key.split_once('.').is_none() { + return exec_err!("Unable to set FFI config value without namespace set"); + }; + + df_result!(unsafe { (self.set)(self, key.into(), value.into()) }) + } + + fn entries(&self) -> Vec { + unsafe { + (self.entries)(self) + .into_iter() + .map(|entry_tuple| ConfigEntry { + key: entry_tuple.0.into(), + value: Some(entry_tuple.1.into()), + description: "ffi_config_options", + }) + .collect() + } + } +} + +impl FFI_ExtensionOptions { + /// Add all of the values in a concrete configuration extension to the + /// FFI variant. This is safe to call on either side of the FFI + /// boundary. + pub fn add_config(&mut self, config: &C) -> Result<()> { + for entry in config.entries() { + if let Some(value) = entry.value { + let key = format!("{}.{}", C::PREFIX, entry.key); + self.set(key.as_str(), value.as_str())?; + } + } + + Ok(()) + } + + /// Merge another `FFI_ExtensionOptions` configurations into this one. + /// This is safe to call on either side of the FFI boundary. + pub fn merge(&mut self, other: &FFI_ExtensionOptions) -> Result<()> { + for entry in other.entries() { + if let Some(value) = entry.value { + self.set(entry.key.as_str(), value.as_str())?; + } + } + Ok(()) + } + + /// Create a concrete extension type from the FFI variant. + /// This is safe to call on either side of the FFI boundary. + pub fn to_extension(&self) -> Result { + let mut result = C::default(); + + unsafe { + for entry in (self.entries)(self) { + let key = entry.0.as_str(); + let value = entry.1.as_str(); + + if let Some((prefix, inner_key)) = key.split_once('.') + && prefix == C::PREFIX + { + result.set(inner_key, value)?; + } + } + } + + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use datafusion_common::config::{ConfigExtension, ConfigOptions}; + use datafusion_common::extensions_options; + + use crate::config::extension_options::FFI_ExtensionOptions; + + // Define a new configuration struct using the `extensions_options` macro + extensions_options! { + /// My own config options. + pub struct MyConfig { + /// Should "foo" be replaced by "bar"? + pub foo_to_bar: bool, default = true + + /// How many "baz" should be created? + pub baz_count: usize, default = 1337 + } + } + + impl ConfigExtension for MyConfig { + const PREFIX: &'static str = "my_config"; + } + + #[test] + fn round_trip_ffi_extension_options() { + // set up config struct and register extension + let mut config = ConfigOptions::default(); + let mut ffi_options = FFI_ExtensionOptions::default(); + ffi_options.add_config(&MyConfig::default()).unwrap(); + + config.extensions.insert(ffi_options); + + // overwrite config default + config.set("my_config.baz_count", "42").unwrap(); + + // check config state + let returned_ffi_config = + config.extensions.get::().unwrap(); + let my_config: MyConfig = returned_ffi_config.to_extension().unwrap(); + + // check default value + assert!(my_config.foo_to_bar); + + // check overwritten value + assert_eq!(my_config.baz_count, 42); + } +} diff --git a/datafusion/ffi/src/config/mod.rs b/datafusion/ffi/src/config/mod.rs new file mode 100644 index 0000000000000..850a4dc337336 --- /dev/null +++ b/datafusion/ffi/src/config/mod.rs @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod extension_options; + +use abi_stable::StableAbi; +use abi_stable::std_types::{RHashMap, RString}; +use datafusion_common::config::{ + ConfigExtension, ConfigOptions, ExtensionOptions, TableOptions, +}; +use datafusion_common::{DataFusionError, Result}; + +use crate::config::extension_options::FFI_ExtensionOptions; + +/// A stable struct for sharing [`ConfigOptions`] across FFI boundaries. +/// +/// Accessing FFI extension options require a slightly different pattern +/// than local extensions. The trait [`ExtensionOptionsFFIProvider`] can +/// be used to simplify accessing FFI extensions. +#[repr(C)] +#[derive(Debug, Clone, StableAbi)] +pub struct FFI_ConfigOptions { + base_options: RHashMap, + + extensions: FFI_ExtensionOptions, +} + +impl From<&ConfigOptions> for FFI_ConfigOptions { + fn from(options: &ConfigOptions) -> Self { + let base_options: RHashMap = options + .entries() + .into_iter() + .filter_map(|entry| entry.value.map(|value| (entry.key, value))) + .map(|(key, value)| (key.into(), value.into())) + .collect(); + + let mut extensions = FFI_ExtensionOptions::default(); + for (extension_name, extension) in options.extensions.iter() { + for entry in extension.entries().iter() { + if let Some(value) = entry.value.as_ref() { + extensions + .set(format!("{extension_name}.{}", entry.key).as_str(), value) + .expect("FFI_ExtensionOptions set should always return Ok"); + } + } + } + + Self { + base_options, + extensions, + } + } +} + +impl TryFrom for ConfigOptions { + type Error = DataFusionError; + fn try_from(ffi_options: FFI_ConfigOptions) -> Result { + let mut options = ConfigOptions::default(); + options.extensions.insert(ffi_options.extensions); + + for kv_tuple in ffi_options.base_options.iter() { + options.set(kv_tuple.0.as_str(), kv_tuple.1.as_str())?; + } + + Ok(options) + } +} + +pub trait ExtensionOptionsFFIProvider { + /// Extract a [`ConfigExtension`]. This method should attempt to first extract + /// the extension from the local options when possible. Should that fail, it + /// should attempt to extract the FFI options and then convert them to the + /// desired [`ConfigExtension`]. + fn local_or_ffi_extension(&self) -> Option; +} + +impl ExtensionOptionsFFIProvider for ConfigOptions { + fn local_or_ffi_extension(&self) -> Option { + self.extensions + .get::() + .map(|v| v.to_owned()) + .or_else(|| { + self.extensions + .get::() + .and_then(|ffi_ext| ffi_ext.to_extension().ok()) + }) + } +} + +impl ExtensionOptionsFFIProvider for TableOptions { + fn local_or_ffi_extension(&self) -> Option { + self.extensions + .get::() + .map(|v| v.to_owned()) + .or_else(|| { + self.extensions + .get::() + .and_then(|ffi_ext| ffi_ext.to_extension().ok()) + }) + } +} + +/// A stable struct for sharing [`TableOptions`] across FFI boundaries. +/// +/// Accessing FFI extension options require a slightly different pattern +/// than local extensions. The trait [`ExtensionOptionsFFIProvider`] can +/// be used to simplify accessing FFI extensions. +#[repr(C)] +#[derive(Debug, Clone, StableAbi)] +pub struct FFI_TableOptions { + base_options: RHashMap, + + extensions: FFI_ExtensionOptions, +} + +impl From<&TableOptions> for FFI_TableOptions { + fn from(options: &TableOptions) -> Self { + let base_options: RHashMap = options + .entries() + .into_iter() + .filter_map(|entry| entry.value.map(|value| (entry.key, value))) + .map(|(key, value)| (key.into(), value.into())) + .collect(); + + let mut extensions = FFI_ExtensionOptions::default(); + for (extension_name, extension) in options.extensions.iter() { + for entry in extension.entries().iter() { + if let Some(value) = entry.value.as_ref() { + extensions + .set(format!("{extension_name}.{}", entry.key).as_str(), value) + .expect("FFI_ExtensionOptions set should always return Ok"); + } + } + } + + Self { + base_options, + extensions, + } + } +} + +impl TryFrom for TableOptions { + type Error = DataFusionError; + fn try_from(ffi_options: FFI_TableOptions) -> Result { + let mut options = TableOptions::default(); + options.extensions.insert(ffi_options.extensions); + + for kv_tuple in ffi_options.base_options.iter() { + options.set(kv_tuple.0.as_str(), kv_tuple.1.as_str())?; + } + + Ok(options) + } +} diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 94e1d03d0832c..920c0ae1449c4 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -20,7 +20,8 @@ use std::pin::Pin; use std::sync::Arc; use abi_stable::StableAbi; -use abi_stable::std_types::{RString, RVec}; +use abi_stable::std_types::{ROption, RResult, RString, RVec}; +use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_plan::{ @@ -28,11 +29,12 @@ use datafusion_physical_plan::{ }; use tokio::runtime::Handle; +use crate::config::FFI_ConfigOptions; use crate::execution::FFI_TaskContext; use crate::plan_properties::FFI_PlanProperties; use crate::record_batch_stream::FFI_RecordBatchStream; use crate::util::FFIResult; -use crate::{df_result, rresult}; +use crate::{df_result, rresult, rresult_return}; /// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries. #[repr(C)] @@ -44,6 +46,9 @@ pub struct FFI_ExecutionPlan { /// Return a vector of children plans pub children: unsafe extern "C" fn(plan: &Self) -> RVec, + pub with_new_children: + unsafe extern "C" fn(plan: &Self, children: RVec) -> FFIResult, + /// Return the plan name. pub name: unsafe extern "C" fn(plan: &Self) -> RString, @@ -55,6 +60,12 @@ pub struct FFI_ExecutionPlan { context: FFI_TaskContext, ) -> FFIResult, + pub repartitioned: unsafe extern "C" fn( + plan: &Self, + target_partitions: usize, + config: FFI_ConfigOptions, + ) -> FFIResult>, + /// Used to create a clone on the provider of the execution plan. This should /// only need to be called by the receiver of the plan. pub clone: unsafe extern "C" fn(plan: &Self) -> Self, @@ -85,6 +96,11 @@ impl FFI_ExecutionPlan { let private_data = self.private_data as *const ExecutionPlanPrivateData; unsafe { &(*private_data).plan } } + + fn runtime(&self) -> Option { + let private_data = self.private_data as *const ExecutionPlanPrivateData; + unsafe { (*private_data).runtime.clone() } + } } unsafe extern "C" fn properties_fn_wrapper( @@ -96,19 +112,34 @@ unsafe extern "C" fn properties_fn_wrapper( unsafe extern "C" fn children_fn_wrapper( plan: &FFI_ExecutionPlan, ) -> RVec { - unsafe { - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan = &(*private_data).plan; - let runtime = &(*private_data).runtime; + let runtime = plan.runtime(); + let plan = plan.inner(); - let children: Vec<_> = plan - .children() - .into_iter() - .map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone())) - .collect(); + let children: Vec<_> = plan + .children() + .into_iter() + .map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone())) + .collect(); - children.into() - } + children.into() +} + +unsafe extern "C" fn with_new_children_fn_wrapper( + plan: &FFI_ExecutionPlan, + children: RVec, +) -> FFIResult { + let runtime = plan.runtime(); + let plan = Arc::clone(plan.inner()); + let children = rresult_return!( + children + .iter() + .map(>::try_from) + .collect::>>() + ); + + let new_plan = rresult_return!(plan.with_new_children(children)); + + RResult::ROk(FFI_ExecutionPlan::new(new_plan, runtime)) } unsafe extern "C" fn execute_fn_wrapper( @@ -116,17 +147,34 @@ unsafe extern "C" fn execute_fn_wrapper( partition: usize, context: FFI_TaskContext, ) -> FFIResult { - unsafe { - let ctx = context.into(); - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan = &(*private_data).plan; - let runtime = (*private_data).runtime.clone(); - - rresult!( - plan.execute(partition, ctx) - .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime)) - ) - } + let ctx = context.into(); + let runtime = plan.runtime(); + let plan = plan.inner(); + + let _guard = runtime.as_ref().map(|rt| rt.enter()); + + rresult!( + plan.execute(partition, ctx) + .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime)) + ) +} + +unsafe extern "C" fn repartitioned_fn_wrapper( + plan: &FFI_ExecutionPlan, + target_partitions: usize, + config: FFI_ConfigOptions, +) -> FFIResult> { + let maybe_config: Result = config.try_into(); + let config = rresult_return!(maybe_config); + let runtime = plan.runtime(); + let plan = plan.inner(); + + rresult!( + plan.repartitioned(target_partitions, &config) + .map(|maybe_plan| maybe_plan + .map(|plan| FFI_ExecutionPlan::new(plan, runtime)) + .into()) + ) } unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString { @@ -144,12 +192,10 @@ unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) { } unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan { - unsafe { - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan_data = &(*private_data); + let runtime = plan.runtime(); + let plan = plan.inner(); - FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), plan_data.runtime.clone()) - } + FFI_ExecutionPlan::new(Arc::clone(plan), runtime) } impl Clone for FFI_ExecutionPlan { @@ -158,16 +204,75 @@ impl Clone for FFI_ExecutionPlan { } } +/// Helper function to recursively identify any children that do not +/// have a runtime set but should because they are local to this same +/// library. This does imply a restriction that all execution plans +/// in this chain that are within the same library use the same runtime. +fn pass_runtime_to_children( + plan: &Arc, + runtime: &Handle, +) -> Result>> { + println!("checking plan {:?}", plan.name()); + let mut updated_children = false; + let plan_is_foreign = plan.as_any().is::(); + + let children = plan + .children() + .into_iter() + .map(|child| { + let child = match pass_runtime_to_children(child, runtime)? { + Some(child) => { + updated_children = true; + child + } + None => Arc::clone(child), + }; + + // If the parent is foreign and the child is local to this library, then when + // we called `children()` above we will get something other than a + // `ForeignExecutionPlan`. In this case wrap the plan in a `ForeignExecutionPlan` + // because when we call `with_new_children` below it will extract the + // FFI plan that does contain the runtime. + if plan_is_foreign && !child.as_any().is::() { + updated_children = true; + let ffi_child = FFI_ExecutionPlan::new(child, Some(runtime.clone())); + let foreign_child = ForeignExecutionPlan::try_from(ffi_child); + foreign_child.map(|c| Arc::new(c) as Arc) + } else { + Ok(child) + } + }) + .collect::>>()?; + if updated_children { + Arc::clone(plan).with_new_children(children).map(Some) + } else { + Ok(None) + } +} + impl FFI_ExecutionPlan { /// This function is called on the provider's side. - pub fn new(plan: Arc, runtime: Option) -> Self { - let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime }); + pub fn new(mut plan: Arc, runtime: Option) -> Self { + // Note to developers: `pass_runtime_to_children` relies on the logic here to + // get the underlying FFI plan during calls to `new_with_children`. + if let Some(plan) = plan.as_any().downcast_ref::() { + return plan.plan.clone(); + } + if let Some(rt) = &runtime + && let Ok(Some(p)) = pass_runtime_to_children(&plan, rt) + { + plan = p; + } + + let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime }); Self { properties: properties_fn_wrapper, children: children_fn_wrapper, + with_new_children: with_new_children_fn_wrapper, name: name_fn_wrapper, execute: execute_fn_wrapper, + repartitioned: repartitioned_fn_wrapper, clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, @@ -227,28 +332,34 @@ impl TryFrom<&FFI_ExecutionPlan> for Arc { fn try_from(plan: &FFI_ExecutionPlan) -> Result { if (plan.library_marker_id)() == crate::get_library_marker_id() { - return Ok(Arc::clone(plan.inner())); + Ok(Arc::clone(plan.inner())) + } else { + let plan = ForeignExecutionPlan::try_from(plan.clone())?; + Ok(Arc::new(plan)) } + } +} +impl TryFrom for ForeignExecutionPlan { + type Error = DataFusionError; + fn try_from(plan: FFI_ExecutionPlan) -> Result { unsafe { - let name = (plan.name)(plan).into(); + let name = (plan.name)(&plan).into(); - let properties: PlanProperties = (plan.properties)(plan).try_into()?; + let properties: PlanProperties = (plan.properties)(&plan).try_into()?; - let children_rvec = (plan.children)(plan); + let children_rvec = (plan.children)(&plan); let children = children_rvec .iter() .map(>::try_from) .collect::>>()?; - let plan = ForeignExecutionPlan { + Ok(ForeignExecutionPlan { name, - plan: plan.clone(), + plan, properties, children, - }; - - Ok(Arc::new(plan)) + }) } } } @@ -274,12 +385,16 @@ impl ExecutionPlan for ForeignExecutionPlan { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(ForeignExecutionPlan { - plan: self.plan.clone(), - name: self.name.clone(), - children, - properties: self.properties.clone(), - })) + unsafe { + let children = children + .into_iter() + .map(|child| FFI_ExecutionPlan::new(child, None)) + .collect::>(); + let new_plan = + df_result!((self.plan.with_new_children)(&self.plan, children))?; + + (&new_plan).try_into() + } } fn execute( @@ -293,13 +408,28 @@ impl ExecutionPlan for ForeignExecutionPlan { .map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream) } } + + fn repartitioned( + &self, + target_partitions: usize, + config: &ConfigOptions, + ) -> Result>> { + let config = config.into(); + let maybe_plan: Option = df_result!(unsafe { + (self.plan.repartitioned)(&self.plan, target_partitions, config) + })? + .into(); + + maybe_plan + .map(|plan| >::try_from(&plan)) + .transpose() + } } -#[cfg(test)] -pub(crate) mod tests { - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion::physical_plan::Partitioning; - use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +#[cfg(any(test, feature = "integration-tests"))] +pub mod tests { + use datafusion_physical_plan::Partitioning; + use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use super::*; @@ -313,7 +443,7 @@ pub(crate) mod tests { pub fn new(schema: arrow::datatypes::SchemaRef) -> Self { Self { props: PlanProperties::new( - datafusion::physical_expr::EquivalenceProperties::new(schema), + datafusion_physical_expr::EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(3), EmissionType::Incremental, Boundedness::Bounded, @@ -371,8 +501,9 @@ pub(crate) mod tests { #[test] fn test_round_trip_ffi_execution_plan() -> Result<()> { - let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false), + ])); let original_plan = Arc::new(EmptyExec::new(schema)); let original_name = original_plan.name().to_string(); @@ -384,7 +515,7 @@ pub(crate) mod tests { assert_eq!(original_name, foreign_plan.name()); - let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new( + let display = datafusion_physical_plan::display::DisplayableExecutionPlan::new( foreign_plan.as_ref(), ); @@ -399,8 +530,9 @@ pub(crate) mod tests { #[test] fn test_ffi_execution_plan_children() -> Result<()> { - let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false), + ])); // Version 1: Adding child to the foreign plan let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); @@ -438,8 +570,9 @@ pub(crate) mod tests { #[test] fn test_ffi_execution_plan_local_bypass() { - let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false), + ])); let plan = Arc::new(EmptyExec::new(schema)); diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 2ca9b8f6f495a..bcd2c2c4ae827 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -28,11 +28,13 @@ pub mod arrow_wrappers; pub mod catalog_provider; pub mod catalog_provider_list; +pub mod config; pub mod execution; pub mod execution_plan; pub mod expr; pub mod insert_op; pub mod physical_expr; +pub mod physical_optimizer; pub mod plan_properties; pub mod proto; pub mod record_batch_stream; diff --git a/datafusion/ffi/src/physical_expr/mod.rs b/datafusion/ffi/src/physical_expr/mod.rs index d268dd613f987..189a1e478217e 100644 --- a/datafusion/ffi/src/physical_expr/mod.rs +++ b/datafusion/ffi/src/physical_expr/mod.rs @@ -448,6 +448,10 @@ impl Drop for FFI_PhysicalExpr { impl From> for FFI_PhysicalExpr { /// Creates a new [`FFI_PhysicalExpr`]. fn from(expr: Arc) -> Self { + if let Some(expr) = expr.as_any().downcast_ref::() { + return expr.expr.clone(); + } + let private_data = Box::new(PhysicalExprPrivateData { expr }); Self { diff --git a/datafusion/ffi/src/physical_optimizer.rs b/datafusion/ffi/src/physical_optimizer.rs new file mode 100644 index 0000000000000..bb59957b1f900 --- /dev/null +++ b/datafusion/ffi/src/physical_optimizer.rs @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ffi::c_void; +use std::sync::Arc; + +use abi_stable::StableAbi; +use abi_stable::std_types::{RResult, RStr}; +use async_trait::async_trait; +use datafusion_common::config::ConfigOptions; +use datafusion_common::error::Result; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::ExecutionPlan; +use tokio::runtime::Handle; + +use crate::config::FFI_ConfigOptions; +use crate::execution_plan::FFI_ExecutionPlan; +use crate::util::FFIResult; +use crate::{df_result, rresult_return}; + +/// A stable struct for sharing [`PhysicalOptimizerRule`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +pub struct FFI_PhysicalOptimizerRule { + pub optimize: unsafe extern "C" fn( + &Self, + plan: &FFI_ExecutionPlan, + config: FFI_ConfigOptions, + ) -> FFIResult, + + pub name: unsafe extern "C" fn(&Self) -> RStr, + + pub schema_check: unsafe extern "C" fn(&Self) -> bool, + + /// Used to create a clone on the provider of the execution plan. This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Return the major DataFusion version number of this provider. + pub version: unsafe extern "C" fn() -> u64, + + /// Internal data. This is only to be accessed by the provider of the plan. + /// A [`ForeignPhysicalOptimizerRule`] should never attempt to access this data. + pub private_data: *mut c_void, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. + pub library_marker_id: extern "C" fn() -> usize, +} + +unsafe impl Send for FFI_PhysicalOptimizerRule {} +unsafe impl Sync for FFI_PhysicalOptimizerRule {} + +struct RulePrivateData { + rule: Arc, + runtime: Option, +} + +impl FFI_PhysicalOptimizerRule { + fn inner(&self) -> &Arc { + let private_data = self.private_data as *const RulePrivateData; + unsafe { &(*private_data).rule } + } + + fn runtime(&self) -> Option { + let private_data = self.private_data as *const RulePrivateData; + unsafe { (*private_data).runtime.clone() } + } +} + +unsafe extern "C" fn optimize_fn_wrapper( + rule: &FFI_PhysicalOptimizerRule, + plan: &FFI_ExecutionPlan, + config: FFI_ConfigOptions, +) -> FFIResult { + let runtime = rule.runtime(); + let rule = rule.inner(); + let plan: Arc = rresult_return!(plan.try_into()); + let config = rresult_return!(ConfigOptions::try_from(config)); + let optimized_plan = rresult_return!(rule.optimize(plan, &config)); + + RResult::ROk(FFI_ExecutionPlan::new(optimized_plan, runtime)) +} + +unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> RStr<'_> { + let rule = rule.inner(); + rule.name().into() +} + +unsafe extern "C" fn schema_check_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> bool { + rule.inner().schema_check() +} + +unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_PhysicalOptimizerRule) { + let private_data = + unsafe { Box::from_raw(provider.private_data as *mut RulePrivateData) }; + drop(private_data); +} + +unsafe extern "C" fn clone_fn_wrapper( + rule: &FFI_PhysicalOptimizerRule, +) -> FFI_PhysicalOptimizerRule { + let runtime = rule.runtime(); + let rule = Arc::clone(rule.inner()); + + let private_data = + Box::into_raw(Box::new(RulePrivateData { rule, runtime })) as *mut c_void; + + FFI_PhysicalOptimizerRule { + optimize: optimize_fn_wrapper, + name: name_fn_wrapper, + schema_check: schema_check_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + library_marker_id: crate::get_library_marker_id, + } +} + +impl Drop for FFI_PhysicalOptimizerRule { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl FFI_PhysicalOptimizerRule { + /// Creates a new [`FFI_PhysicalOptimizerRule`]. + pub fn new( + rule: Arc, + runtime: Option, + ) -> Self { + if let Some(rule) = (Arc::clone(&rule) as Arc) + .downcast_ref::() + { + return rule.0.clone(); + } + + let private_data = Box::new(RulePrivateData { rule, runtime }); + let private_data = Box::into_raw(private_data) as *mut c_void; + + Self { + optimize: optimize_fn_wrapper, + name: name_fn_wrapper, + schema_check: schema_check_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + library_marker_id: crate::get_library_marker_id, + } + } +} + +/// This wrapper struct exists on the receiver side of the FFI interface, so it has +/// no guarantees about being able to access the data in `private_data`. Any functions +/// defined on this struct must only use the stable functions provided in +/// FFI_PhysicalOptimizerRule to interact with the foreign table provider. +#[derive(Debug)] +pub struct ForeignPhysicalOptimizerRule(pub FFI_PhysicalOptimizerRule); + +unsafe impl Send for ForeignPhysicalOptimizerRule {} +unsafe impl Sync for ForeignPhysicalOptimizerRule {} + +impl From<&FFI_PhysicalOptimizerRule> for Arc { + fn from(provider: &FFI_PhysicalOptimizerRule) -> Self { + if (provider.library_marker_id)() == crate::get_library_marker_id() { + return Arc::clone(provider.inner()); + } + + Arc::new(ForeignPhysicalOptimizerRule(provider.clone())) + as Arc + } +} + +impl Clone for FFI_PhysicalOptimizerRule { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +#[async_trait] +impl PhysicalOptimizerRule for ForeignPhysicalOptimizerRule { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + let config_options: FFI_ConfigOptions = config.into(); + let plan = FFI_ExecutionPlan::new(plan, None); + + let optimized_plan = + unsafe { df_result!((self.0.optimize)(&self.0, &plan, config_options))? }; + (&optimized_plan).try_into() + } + + fn name(&self) -> &str { + unsafe { (self.0.name)(&self.0).as_str() } + } + + fn schema_check(&self) -> bool { + unsafe { (self.0.schema_check)(&self.0) } + } +} diff --git a/datafusion/ffi/src/proto/logical_extension_codec.rs b/datafusion/ffi/src/proto/logical_extension_codec.rs index 3781a40539ed1..2beeead7039c0 100644 --- a/datafusion/ffi/src/proto/logical_extension_codec.rs +++ b/datafusion/ffi/src/proto/logical_extension_codec.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::ffi::c_void; use std::sync::Arc; @@ -296,6 +297,12 @@ impl FFI_LogicalExtensionCodec { runtime: Option, task_ctx_provider: impl Into, ) -> Self { + if let Some(codec) = (Arc::clone(&codec) as Arc) + .downcast_ref::() + { + return codec.0.clone(); + } + let task_ctx_provider = task_ctx_provider.into(); let private_data = Box::new(LogicalExtensionCodecPrivateData { codec, runtime }); diff --git a/datafusion/ffi/src/proto/physical_extension_codec.rs b/datafusion/ffi/src/proto/physical_extension_codec.rs index 0577e72366478..8f0d45affc914 100644 --- a/datafusion/ffi/src/proto/physical_extension_codec.rs +++ b/datafusion/ffi/src/proto/physical_extension_codec.rs @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::ffi::c_void; -use std::sync::Arc; - use abi_stable::StableAbi; use abi_stable::std_types::{RResult, RSlice, RStr, RVec}; use datafusion_common::error::Result; @@ -27,6 +24,9 @@ use datafusion_expr::{ }; use datafusion_physical_plan::ExecutionPlan; use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use std::any::Any; +use std::ffi::c_void; +use std::sync::Arc; use tokio::runtime::Handle; use crate::execution::FFI_TaskContextProvider; @@ -111,14 +111,14 @@ unsafe impl Send for FFI_PhysicalExtensionCodec {} unsafe impl Sync for FFI_PhysicalExtensionCodec {} struct PhysicalExtensionCodecPrivateData { - provider: Arc, + codec: Arc, runtime: Option, } impl FFI_PhysicalExtensionCodec { fn inner(&self) -> &Arc { let private_data = self.private_data as *const PhysicalExtensionCodecPrivateData; - unsafe { &(*private_data).provider } + unsafe { &(*private_data).codec } } fn runtime(&self) -> &Option { @@ -132,6 +132,7 @@ unsafe extern "C" fn try_decode_fn_wrapper( buf: RSlice, inputs: RVec, ) -> FFIResult { + let runtime = codec.runtime().clone(); let task_ctx: Arc = rresult_return!((&codec.task_ctx_provider).try_into()); let codec = codec.inner(); @@ -144,7 +145,7 @@ unsafe extern "C" fn try_decode_fn_wrapper( let plan = rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref())); - RResult::ROk(FFI_ExecutionPlan::new(plan, None)) + RResult::ROk(FFI_ExecutionPlan::new(plan, runtime)) } unsafe extern "C" fn try_encode_fn_wrapper( @@ -240,11 +241,10 @@ unsafe extern "C" fn try_encode_udwf_fn_wrapper( RResult::ROk(bytes.into()) } -unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_PhysicalExtensionCodec) { +unsafe extern "C" fn release_fn_wrapper(codec: &mut FFI_PhysicalExtensionCodec) { unsafe { - let private_data = Box::from_raw( - provider.private_data as *mut PhysicalExtensionCodecPrivateData, - ); + let private_data = + Box::from_raw(codec.private_data as *mut PhysicalExtensionCodecPrivateData); drop(private_data); } } @@ -267,13 +267,18 @@ impl Drop for FFI_PhysicalExtensionCodec { impl FFI_PhysicalExtensionCodec { /// Creates a new [`FFI_PhysicalExtensionCodec`]. pub fn new( - provider: Arc, + codec: Arc, runtime: Option, task_ctx_provider: impl Into, ) -> Self { + if let Some(codec) = (Arc::clone(&codec) as Arc) + .downcast_ref::() + { + return codec.0.clone(); + } + let task_ctx_provider = task_ctx_provider.into(); - let private_data = - Box::new(PhysicalExtensionCodecPrivateData { provider, runtime }); + let private_data = Box::new(PhysicalExtensionCodecPrivateData { codec, runtime }); Self { try_decode: try_decode_fn_wrapper, @@ -306,11 +311,11 @@ unsafe impl Send for ForeignPhysicalExtensionCodec {} unsafe impl Sync for ForeignPhysicalExtensionCodec {} impl From<&FFI_PhysicalExtensionCodec> for Arc { - fn from(provider: &FFI_PhysicalExtensionCodec) -> Self { - if (provider.library_marker_id)() == crate::get_library_marker_id() { - Arc::clone(provider.inner()) + fn from(codec: &FFI_PhysicalExtensionCodec) -> Self { + if (codec.library_marker_id)() == crate::get_library_marker_id() { + Arc::clone(codec.inner()) } else { - Arc::new(ForeignPhysicalExtensionCodec(provider.clone())) + Arc::new(ForeignPhysicalExtensionCodec(codec.clone())) } } } diff --git a/datafusion/ffi/src/schema_provider.rs b/datafusion/ffi/src/schema_provider.rs index b8e44b134f87b..5d1348e2328f7 100644 --- a/datafusion/ffi/src/schema_provider.rs +++ b/datafusion/ffi/src/schema_provider.rs @@ -259,6 +259,11 @@ impl FFI_SchemaProvider { runtime: Option, logical_codec: FFI_LogicalExtensionCodec, ) -> Self { + if let Some(provider) = provider.as_any().downcast_ref::() + { + return provider.0.clone(); + } + let owner_name = provider.owner_name().map(|s| s.into()).into(); let private_data = Box::new(ProviderPrivateData { provider, runtime }); diff --git a/datafusion/ffi/src/session/config.rs b/datafusion/ffi/src/session/config.rs index eb9c4e2c6986a..63f0f20ecc7d5 100644 --- a/datafusion/ffi/src/session/config.rs +++ b/datafusion/ffi/src/session/config.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::ffi::c_void; +use crate::config::FFI_ConfigOptions; use abi_stable::StableAbi; -use abi_stable::std_types::{RHashMap, RString}; +use datafusion_common::config::ConfigOptions; use datafusion_common::error::{DataFusionError, Result}; use datafusion_execution::config::SessionConfig; @@ -37,9 +37,8 @@ use datafusion_execution::config::SessionConfig; #[repr(C)] #[derive(Debug, StableAbi)] pub struct FFI_SessionConfig { - /// Return a hash map from key to value of the config options represented - /// by string values. - pub config_options: unsafe extern "C" fn(config: &Self) -> RHashMap, + /// FFI stable configuration options. + pub config_options: FFI_ConfigOptions, /// Used to create a clone on the provider of the execution plan. This should /// only need to be called by the receiver of the plan. @@ -67,21 +66,6 @@ impl FFI_SessionConfig { } } -unsafe extern "C" fn config_options_fn_wrapper( - config: &FFI_SessionConfig, -) -> RHashMap { - let config_options = config.inner().options(); - - let mut options = RHashMap::default(); - for config_entry in config_options.entries() { - if let Some(value) = config_entry.value { - options.insert(config_entry.key.into(), value.into()); - } - } - - options -} - unsafe extern "C" fn release_fn_wrapper(config: &mut FFI_SessionConfig) { unsafe { debug_assert!(!config.private_data.is_null()); @@ -100,7 +84,7 @@ unsafe extern "C" fn clone_fn_wrapper(config: &FFI_SessionConfig) -> FFI_Session let private_data = Box::new(SessionConfigPrivateData { config: old_config }); FFI_SessionConfig { - config_options: config_options_fn_wrapper, + config_options: config.config_options.clone(), private_data: Box::into_raw(private_data) as *mut c_void, clone: clone_fn_wrapper, release: release_fn_wrapper, @@ -119,8 +103,10 @@ impl From<&SessionConfig> for FFI_SessionConfig { config: session.clone(), }); + let config_options = FFI_ConfigOptions::from(session.options().as_ref()); + Self { - config_options: config_options_fn_wrapper, + config_options, private_data: Box::into_raw(private_data) as *mut c_void, clone: clone_fn_wrapper, release: release_fn_wrapper, @@ -149,14 +135,9 @@ impl TryFrom<&FFI_SessionConfig> for SessionConfig { return Ok(config.inner().clone()); } - let config_options = unsafe { (config.config_options)(config) }; - - let mut options_map = HashMap::new(); - config_options.iter().for_each(|kv_pair| { - options_map.insert(kv_pair.0.to_string(), kv_pair.1.to_string()); - }); + let config_options = ConfigOptions::try_from(config.config_options.clone())?; - SessionConfig::from_string_hash_map(&options_map) + Ok(SessionConfig::from(config_options)) } } diff --git a/datafusion/ffi/src/session/mod.rs b/datafusion/ffi/src/session/mod.rs index aa910abb9149a..7ca90a048bb71 100644 --- a/datafusion/ffi/src/session/mod.rs +++ b/datafusion/ffi/src/session/mod.rs @@ -321,6 +321,10 @@ impl FFI_SessionRef { runtime: Option, logical_codec: FFI_LogicalExtensionCodec, ) -> Self { + if let Some(session) = session.as_any().downcast_ref::() { + return session.session.clone(); + } + let private_data = Box::new(SessionPrivateData { session, runtime }); Self { diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index df8b648026d3e..c6940a67de62a 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -390,6 +390,9 @@ impl FFI_TableProvider { runtime: Option, logical_codec: FFI_LogicalExtensionCodec, ) -> Self { + if let Some(provider) = provider.as_any().downcast_ref::() { + return provider.0.clone(); + } let private_data = Box::new(ProviderPrivateData { provider, runtime }); Self { diff --git a/datafusion/ffi/src/tests/config.rs b/datafusion/ffi/src/tests/config.rs new file mode 100644 index 0000000000000..46fc9756203e3 --- /dev/null +++ b/datafusion/ffi/src/tests/config.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::config::ConfigExtension; +use datafusion_common::extensions_options; + +use crate::config::extension_options::FFI_ExtensionOptions; + +extensions_options! { + pub struct ExternalConfig { + /// Should "foo" be replaced by "bar"? + pub is_enabled: bool, default = true + + /// Some value to be extracted + pub base_number: usize, default = 1000 + } +} + +impl PartialEq for ExternalConfig { + fn eq(&self, other: &Self) -> bool { + self.base_number == other.base_number && self.is_enabled == other.is_enabled + } +} +impl Eq for ExternalConfig {} + +impl ConfigExtension for ExternalConfig { + const PREFIX: &'static str = "external_config"; +} + +pub(crate) extern "C" fn create_extension_options() -> FFI_ExtensionOptions { + let mut extensions = FFI_ExtensionOptions::default(); + extensions + .add_config(&ExternalConfig::default()) + .expect("add_config should be infallible for ExternalConfig"); + + extensions +} diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs index 9bcd7e0031083..cb827acbfb430 100644 --- a/datafusion/ffi/src/tests/mod.rs +++ b/datafusion/ffi/src/tests/mod.rs @@ -38,6 +38,9 @@ use super::table_provider::FFI_TableProvider; use super::udf::FFI_ScalarUDF; use crate::catalog_provider::FFI_CatalogProvider; use crate::catalog_provider_list::FFI_CatalogProviderList; +use crate::config::extension_options::FFI_ExtensionOptions; +use crate::execution_plan::FFI_ExecutionPlan; +use crate::execution_plan::tests::EmptyExec; use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; use crate::tests::catalog::create_catalog_provider_list; use crate::udaf::FFI_AggregateUDF; @@ -46,6 +49,7 @@ use crate::udwf::FFI_WindowUDF; mod async_provider; pub mod catalog; +pub mod config; mod sync_provider; mod udf_udaf_udwf; pub mod utils; @@ -87,6 +91,11 @@ pub struct ForeignLibraryModule { pub create_rank_udwf: extern "C" fn() -> FFI_WindowUDF, + /// Create extension options, for either ConfigOptions or TableOptions + pub create_extension_options: extern "C" fn() -> FFI_ExtensionOptions, + + pub create_empty_exec: extern "C" fn() -> FFI_ExecutionPlan, + pub version: extern "C" fn() -> u64, } @@ -128,6 +137,13 @@ extern "C" fn construct_table_provider( } } +pub(crate) extern "C" fn create_empty_exec() -> FFI_ExecutionPlan { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + + let plan = Arc::new(EmptyExec::new(schema)); + FFI_ExecutionPlan::new(plan, None) +} + #[export_root_module] /// This defines the entry point for using the module. pub fn get_foreign_library_module() -> ForeignLibraryModuleRef { @@ -141,6 +157,8 @@ pub fn get_foreign_library_module() -> ForeignLibraryModuleRef { create_sum_udaf: create_ffi_sum_func, create_stddev_udaf: create_ffi_stddev_func, create_rank_udwf: create_ffi_rank_func, + create_extension_options: config::create_extension_options, + create_empty_exec, version: super::version, } .leak_into_prefix() diff --git a/datafusion/ffi/src/udaf/accumulator.rs b/datafusion/ffi/src/udaf/accumulator.rs index 6d2b86a3f2e62..125b28598b433 100644 --- a/datafusion/ffi/src/udaf/accumulator.rs +++ b/datafusion/ffi/src/udaf/accumulator.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::ffi::c_void; use std::ops::Deref; use std::ptr::null_mut; @@ -204,6 +205,13 @@ unsafe extern "C" fn release_fn_wrapper(accumulator: &mut FFI_Accumulator) { impl From> for FFI_Accumulator { fn from(accumulator: Box) -> Self { + if (accumulator.as_ref() as &dyn Any).is::() { + let accumulator = (accumulator as Box) + .downcast::() + .expect("already checked type"); + return accumulator.accumulator; + } + let supports_retract_batch = accumulator.supports_retract_batch(); let private_data = AccumulatorPrivateData { accumulator }; diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs b/datafusion/ffi/src/udaf/groups_accumulator.rs index fc4ce4b8ba9d0..0dc8edbfe5a85 100644 --- a/datafusion/ffi/src/udaf/groups_accumulator.rs +++ b/datafusion/ffi/src/udaf/groups_accumulator.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::ffi::c_void; use std::ops::Deref; use std::ptr::null_mut; @@ -245,6 +246,13 @@ unsafe extern "C" fn release_fn_wrapper(accumulator: &mut FFI_GroupsAccumulator) impl From> for FFI_GroupsAccumulator { fn from(accumulator: Box) -> Self { + if (accumulator.as_ref() as &dyn Any).is::() { + let accumulator = (accumulator as Box) + .downcast::() + .expect("already checked type"); + return accumulator.accumulator; + } + let supports_convert_to_state = accumulator.supports_convert_to_state(); let private_data = GroupsAccumulatorPrivateData { accumulator }; diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index 22cbe8cff0fe6..8e791b28b1ad6 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -371,6 +371,10 @@ impl Clone for FFI_AggregateUDF { impl From> for FFI_AggregateUDF { fn from(udaf: Arc) -> Self { + if let Some(udaf) = udaf.inner().as_any().downcast_ref::() { + return udaf.udaf.clone(); + } + let name = udaf.name().into(); let aliases = udaf.aliases().iter().map(|a| a.to_owned().into()).collect(); let is_nullable = udaf.is_nullable(); diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs index 94be5f38eab0b..b1566b0817dd7 100644 --- a/datafusion/ffi/src/udf/mod.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -232,6 +232,10 @@ impl Clone for FFI_ScalarUDF { impl From> for FFI_ScalarUDF { fn from(udf: Arc) -> Self { + if let Some(udf) = udf.inner().as_any().downcast_ref::() { + return udf.udf.clone(); + } + let name = udf.name().into(); let aliases = udf.aliases().iter().map(|a| a.to_owned().into()).collect(); let volatility = udf.signature().volatility.into(); diff --git a/datafusion/ffi/src/udtf.rs b/datafusion/ffi/src/udtf.rs index 6024ec755de58..35c13c1169c72 100644 --- a/datafusion/ffi/src/udtf.rs +++ b/datafusion/ffi/src/udtf.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::ffi::c_void; use std::sync::Arc; @@ -166,6 +167,12 @@ impl FFI_TableFunction { runtime: Option, logical_codec: FFI_LogicalExtensionCodec, ) -> Self { + if let Some(udtf) = + (Arc::clone(&udtf) as Arc).downcast_ref::() + { + return udtf.0.clone(); + } + let private_data = Box::new(TableFunctionPrivateData { udtf, runtime }); Self { diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index dbac00fd43020..2e4bd0a294fd0 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -222,6 +222,10 @@ impl Clone for FFI_WindowUDF { impl From> for FFI_WindowUDF { fn from(udf: Arc) -> Self { + if let Some(udwf) = udf.inner().as_any().downcast_ref::() { + return udwf.udf.clone(); + } + let name = udf.name().into(); let aliases = udf.aliases().iter().map(|a| a.to_owned().into()).collect(); let volatility = udf.signature().volatility.into(); diff --git a/datafusion/ffi/src/udwf/partition_evaluator.rs b/datafusion/ffi/src/udwf/partition_evaluator.rs index 8df02511aa4b3..6820c6e335dd6 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::ffi::c_void; use std::ops::Range; @@ -205,6 +206,13 @@ unsafe extern "C" fn release_fn_wrapper(evaluator: &mut FFI_PartitionEvaluator) impl From> for FFI_PartitionEvaluator { fn from(evaluator: Box) -> Self { + if (evaluator.as_ref() as &dyn Any).is::() { + let evaluator = (evaluator as Box) + .downcast::() + .expect("already checked type"); + return evaluator.evaluator; + } + let is_causal = evaluator.is_causal(); let supports_bounded_execution = evaluator.supports_bounded_execution(); let include_rank = evaluator.include_rank(); diff --git a/datafusion/ffi/tests/ffi_config.rs b/datafusion/ffi/tests/ffi_config.rs new file mode 100644 index 0000000000000..ca0a3e31e8de6 --- /dev/null +++ b/datafusion/ffi/tests/ffi_config.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Add an additional module here for convenience to scope this to only +/// when the feature integration-tests is built +#[cfg(feature = "integration-tests")] +mod tests { + use datafusion::error::{DataFusionError, Result}; + use datafusion_common::ScalarValue; + use datafusion_common::config::{ConfigOptions, TableOptions}; + use datafusion_execution::config::SessionConfig; + use datafusion_ffi::config::ExtensionOptionsFFIProvider; + use datafusion_ffi::tests::config::ExternalConfig; + use datafusion_ffi::tests::utils::get_module; + + #[test] + fn test_ffi_config_options_extension() -> Result<()> { + let module = get_module()?; + + let extension_options = + module + .create_extension_options() + .ok_or(DataFusionError::NotImplemented( + "External test library failed to implement create_extension_options" + .to_string(), + ))?(); + + let mut config = ConfigOptions::new(); + config.extensions.insert(extension_options); + + // Verify default values are as expected + let returned_config: ExternalConfig = config + .local_or_ffi_extension() + .expect("should have external config extension"); + assert_eq!(returned_config, ExternalConfig::default()); + + config.set("external_config.is_enabled", "false")?; + let returned_config: ExternalConfig = config + .local_or_ffi_extension() + .expect("should have external config extension"); + assert!(!returned_config.is_enabled); + + Ok(()) + } + + #[test] + fn test_ffi_table_options_extension() -> Result<()> { + let module = get_module()?; + + let extension_options = + module + .create_extension_options() + .ok_or(DataFusionError::NotImplemented( + "External test library failed to implement create_extension_options" + .to_string(), + ))?(); + + let mut table_options = TableOptions::new(); + table_options.extensions.insert(extension_options); + + // Verify default values are as expected + let returned_options: ExternalConfig = table_options + .local_or_ffi_extension() + .expect("should have external config extension"); + + assert_eq!(returned_options, ExternalConfig::default()); + + table_options.set("external_config.is_enabled", "false")?; + let returned_options: ExternalConfig = table_options + .local_or_ffi_extension() + .expect("should have external config extension"); + assert!(!returned_options.is_enabled); + + Ok(()) + } + + #[test] + fn test_ffi_session_config_options_extension() -> Result<()> { + let module = get_module()?; + + let extension_options = + module + .create_extension_options() + .ok_or(DataFusionError::NotImplemented( + "External test library failed to implement create_extension_options" + .to_string(), + ))?(); + + let mut config = SessionConfig::new().with_option_extension(extension_options); + + // Verify default values are as expected + let returned_config: ExternalConfig = config + .options() + .local_or_ffi_extension() + .expect("should have external config extension"); + assert_eq!(returned_config, ExternalConfig::default()); + + config = config.set( + "external_config.is_enabled", + &ScalarValue::Boolean(Some(false)), + ); + let returned_config: ExternalConfig = config + .options() + .local_or_ffi_extension() + .expect("should have external config extension"); + assert!(!returned_config.is_enabled); + + Ok(()) + } +} diff --git a/datafusion/ffi/tests/ffi_execution_plan.rs b/datafusion/ffi/tests/ffi_execution_plan.rs new file mode 100644 index 0000000000000..053dc1766c4ca --- /dev/null +++ b/datafusion/ffi/tests/ffi_execution_plan.rs @@ -0,0 +1,91 @@ +#[cfg(feature = "integration-tests")] +mod tests { + use arrow::datatypes::Field; + use arrow::datatypes::Schema; + use arrow_schema::DataType; + use datafusion_common::DataFusionError; + use datafusion_ffi::execution_plan::FFI_ExecutionPlan; + use datafusion_ffi::execution_plan::ForeignExecutionPlan; + use datafusion_ffi::execution_plan::{ExecutionPlanPrivateData, tests::EmptyExec}; + use datafusion_ffi::tests::utils::get_module; + use datafusion_physical_plan::ExecutionPlan; + use std::sync::Arc; + + #[test] + fn test_ffi_execution_plan_new_sets_runtimes_on_children() + -> Result<(), DataFusionError> { + // We want to test the case where we have two libraries. + // Library A will have a foreign plan from Library B, called child_plan. + // Library A will add a plan called grandchild_plan under child_plan + // Library A will create a plan called parent_plan, that has child_plan + // under it. So we should have: + // parent_plan (local) -> child_plan (foreign) -> grandchild_plan (local) + // Then we want to turn parent_plan into a FFI plan. + // Verify that grandchild_plan also gets the same runtime as parent_plan. + + let module = get_module()?; + + fn generate_local_plan() -> Arc { + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); + + Arc::new(EmptyExec::new(schema)) + } + + let child_plan = + module + .create_empty_exec() + .ok_or(DataFusionError::NotImplemented( + "External module failed to implement create_empty_exec".to_string(), + ))?(); + let child_plan: Arc = (&child_plan) + .try_into() + .expect("should be able create plan"); + assert!(child_plan.as_any().is::()); + + let grandchild_plan = generate_local_plan(); + + let child_plan = child_plan.with_new_children(vec![grandchild_plan])?; + + unsafe { + // Originally the runtime is not set. We go through the unsafe casting + // of data here because the `inner()` function is private and this is + // only an integration test so we do not want to expose it. + let ffi_child = FFI_ExecutionPlan::new(Arc::clone(&child_plan), None); + let ffi_grandchild = + (ffi_child.children)(&ffi_child).into_iter().next().unwrap(); + + let grandchild_private_data = + ffi_grandchild.private_data as *const ExecutionPlanPrivateData; + assert!((*grandchild_private_data).runtime.is_none()); + } + + let parent_plan = generate_local_plan().with_new_children(vec![child_plan])?; + + // Adding the grandchild beneath this FFI plan should get the runtime passed down. + let runtime = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + let ffi_parent = + FFI_ExecutionPlan::new(parent_plan, Some(runtime.handle().clone())); + + unsafe { + let ffi_child = (ffi_parent.children)(&ffi_parent) + .into_iter() + .next() + .unwrap(); + let ffi_grandchild = + (ffi_child.children)(&ffi_child).into_iter().next().unwrap(); + assert_eq!( + (ffi_grandchild.library_marker_id)(), + (ffi_parent.library_marker_id)() + ); + + let grandchild_private_data = + ffi_grandchild.private_data as *const ExecutionPlanPrivateData; + assert!((*grandchild_private_data).runtime.is_some()); + } + + Ok(()) + } +} diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs index f716b48f0cccc..d1d8924a2c3e8 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs @@ -37,7 +37,7 @@ use super::accumulate::NullState; #[derive(Debug)] pub struct BooleanGroupsAccumulator where - F: Fn(bool, bool) -> bool + Send + Sync, + F: Fn(bool, bool) -> bool + Send + Sync + 'static, { /// values per group values: BooleanBufferBuilder, @@ -55,7 +55,7 @@ where impl BooleanGroupsAccumulator where - F: Fn(bool, bool) -> bool + Send + Sync, + F: Fn(bool, bool) -> bool + Send + Sync + 'static, { pub fn new(bool_fn: F, identity: bool) -> Self { Self { @@ -69,7 +69,7 @@ where impl GroupsAccumulator for BooleanGroupsAccumulator where - F: Fn(bool, bool) -> bool + Send + Sync, + F: Fn(bool, bool) -> bool + Send + Sync + 'static, { fn update_batch( &mut self, diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs index acf875b686139..a81b89e1e46f1 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs @@ -41,7 +41,7 @@ use super::accumulate::NullState; pub struct PrimitiveGroupsAccumulator where T: ArrowPrimitiveType + Send, - F: Fn(&mut T::Native, T::Native) + Send + Sync, + F: Fn(&mut T::Native, T::Native) + Send + Sync + 'static, { /// values per group, stored as the native type values: Vec, @@ -62,7 +62,7 @@ where impl PrimitiveGroupsAccumulator where T: ArrowPrimitiveType + Send, - F: Fn(&mut T::Native, T::Native) + Send + Sync, + F: Fn(&mut T::Native, T::Native) + Send + Sync + 'static, { pub fn new(data_type: &DataType, prim_fn: F) -> Self { Self { @@ -84,7 +84,7 @@ where impl GroupsAccumulator for PrimitiveGroupsAccumulator where T: ArrowPrimitiveType + Send, - F: Fn(&mut T::Native, T::Native) + Send + Sync, + F: Fn(&mut T::Native, T::Native) + Send + Sync + 'static, { fn update_batch( &mut self, diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index 543116db1ddb6..1ddb549ae87d5 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -754,7 +754,7 @@ impl Accumulator for DurationAvgAccumulator { struct AvgGroupsAccumulator where T: ArrowNumericType + Send, - F: Fn(T::Native, u64) -> Result + Send, + F: Fn(T::Native, u64) -> Result + Send + 'static, { /// The type of the internal sum sum_data_type: DataType, @@ -778,7 +778,7 @@ where impl AvgGroupsAccumulator where T: ArrowNumericType + Send, - F: Fn(T::Native, u64) -> Result + Send, + F: Fn(T::Native, u64) -> Result + Send + 'static, { pub fn new(sum_data_type: &DataType, return_data_type: &DataType, avg_fn: F) -> Self { debug!( @@ -800,7 +800,7 @@ where impl GroupsAccumulator for AvgGroupsAccumulator where T: ArrowNumericType + Send, - F: Fn(T::Native, u64) -> Result + Send, + F: Fn(T::Native, u64) -> Result + Send + 'static, { fn update_batch( &mut self, diff --git a/datafusion/functions-nested/benches/array_set_ops.rs b/datafusion/functions-nested/benches/array_set_ops.rs index 237c2d1474c00..e3146921d7fe1 100644 --- a/datafusion/functions-nested/benches/array_set_ops.rs +++ b/datafusion/functions-nested/benches/array_set_ops.rs @@ -23,7 +23,7 @@ use criterion::{ }; use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; -use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion}; +use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion}; use rand::SeedableRng; use rand::prelude::SliceRandom; use rand::rngs::StdRng; @@ -38,6 +38,7 @@ const SEED: u64 = 42; fn criterion_benchmark(c: &mut Criterion) { bench_array_union(c); bench_array_intersect(c); + bench_array_distinct(c); } fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { @@ -97,6 +98,48 @@ fn bench_array_intersect(c: &mut Criterion) { group.finish(); } +fn bench_array_distinct(c: &mut Criterion) { + let mut group = c.benchmark_group("array_distinct"); + let udf = ArrayDistinct::new(); + + for (duplicate_label, duplicate_ratio) in + &[("high_duplicate", 0.8), ("low_duplicate", 0.2)] + { + for &array_size in ARRAY_SIZES { + let array = + create_array_with_duplicates(NUM_ROWS, array_size, *duplicate_ratio); + group.bench_with_input( + BenchmarkId::new(*duplicate_label, array_size), + &array_size, + |b, _| { + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Array(array.clone())], + arg_fields: vec![ + Field::new("arr", array.data_type().clone(), false) + .into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + fn create_arrays_with_overlap( num_rows: usize, array_size: usize, @@ -164,5 +207,53 @@ fn create_arrays_with_overlap( (array1, array2) } +fn create_array_with_duplicates( + num_rows: usize, + array_size: usize, + duplicate_ratio: f64, +) -> ArrayRef { + assert!((0.0..=1.0).contains(&duplicate_ratio)); + let unique_count = ((array_size as f64) * (1.0 - duplicate_ratio)).round() as usize; + let duplicate_count = array_size - unique_count; + + let mut rng = StdRng::seed_from_u64(SEED); + let mut values = Vec::with_capacity(num_rows * array_size); + + for row in 0..num_rows { + let base = (row as i64) * (array_size as i64) * 2; + + // Add unique values first + for i in 0..unique_count { + values.push(base + i as i64); + } + + // Fill the rest with duplicates randomly picked from the unique values + let mut unique_indices: Vec = + (0..unique_count).map(|i| base + i as i64).collect(); + unique_indices.shuffle(&mut rng); + + for i in 0..duplicate_count { + values.push(unique_indices[i % unique_count]); + } + } + + let values = Int64Array::from(values); + let field = Arc::new(Field::new("item", DataType::Int64, true)); + + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + field, + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + criterion_group!(benches, criterion_benchmark); criterion_main!(benches); diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 370599611feef..2348b3c530c53 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -22,7 +22,6 @@ use arrow::array::{ Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, new_null_array, }; use arrow::buffer::{NullBuffer, OffsetBuffer}; -use arrow::compute; use arrow::datatypes::DataType::{LargeList, List, Null}; use arrow::datatypes::{DataType, Field, FieldRef}; use arrow::row::{RowConverter, SortField}; @@ -35,7 +34,6 @@ use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; use datafusion_macros::user_doc; -use itertools::Itertools; use std::any::Any; use std::collections::HashSet; use std::fmt::{Display, Formatter}; @@ -264,7 +262,7 @@ impl ScalarUDFImpl for ArrayIntersect { ) )] #[derive(Debug, PartialEq, Eq, Hash)] -pub(super) struct ArrayDistinct { +pub struct ArrayDistinct { signature: Signature, aliases: Vec, } @@ -278,6 +276,12 @@ impl ArrayDistinct { } } +impl Default for ArrayDistinct { + fn default() -> Self { + Self::new() + } +} + impl ScalarUDFImpl for ArrayDistinct { fn as_any(&self) -> &dyn Any { self @@ -527,42 +531,52 @@ fn general_array_distinct( if array.is_empty() { return Ok(Arc::new(array.clone()) as ArrayRef); } + let value_offsets = array.value_offsets(); let dt = array.value_type(); - let mut offsets = Vec::with_capacity(array.len()); + let mut offsets = Vec::with_capacity(array.len() + 1); offsets.push(OffsetSize::usize_as(0)); - let mut new_arrays = Vec::with_capacity(array.len()); - let converter = RowConverter::new(vec![SortField::new(dt)])?; - // distinct for each list in ListArray - for arr in array.iter() { - let last_offset: OffsetSize = offsets.last().copied().unwrap(); - let Some(arr) = arr else { - // Add same offset for null + + // Convert all values to row format in a single batch for performance + let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; + let rows = converter.convert_columns(&[Arc::clone(array.values())])?; + let mut final_rows = Vec::with_capacity(rows.num_rows()); + let mut seen = HashSet::new(); + for i in 0..array.len() { + let last_offset = *offsets.last().unwrap(); + + // Null list entries produce no output; just carry forward the offset. + if array.is_null(i) { offsets.push(last_offset); continue; - }; - let values = converter.convert_columns(&[arr])?; - // sort elements in list and remove duplicates - let rows = values.iter().sorted().dedup().collect::>(); - offsets.push(last_offset + OffsetSize::usize_as(rows.len())); - let arrays = converter.convert_rows(rows)?; - let array = match arrays.first() { - Some(array) => Arc::clone(array), - None => { - return internal_err!("array_distinct: failed to get array from rows"); + } + + let start = value_offsets[i].as_usize(); + let end = value_offsets[i + 1].as_usize(); + seen.clear(); + seen.reserve(end - start); + + // Walk the sub-array and keep only the first occurrence of each value. + for idx in start..end { + let row = rows.row(idx); + if seen.insert(row) { + final_rows.push(row); } - }; - new_arrays.push(array); - } - if new_arrays.is_empty() { - return Ok(Arc::new(array.clone()) as ArrayRef); + } + offsets.push(last_offset + OffsetSize::usize_as(seen.len())); } - let offsets = OffsetBuffer::new(offsets.into()); - let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); - let values = compute::concat(&new_arrays_ref)?; + + // Convert all collected distinct rows back + let final_values = if final_rows.is_empty() { + new_empty_array(&dt) + } else { + let arrays = converter.convert_rows(final_rows)?; + Arc::clone(&arrays[0]) + }; + Ok(Arc::new(GenericListArray::::try_new( Arc::clone(field), - offsets, - values, + OffsetBuffer::new(offsets.into()), + final_values, // Keep the list nulls array.nulls().cloned(), )?)) diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index ff71c9ec64385..4b415ed180e4a 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -48,7 +48,7 @@ use datafusion_physical_plan::ExecutionPlan; /// `PhysicalOptimizerRule`s. /// /// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule -pub trait PhysicalOptimizerRule: Debug { +pub trait PhysicalOptimizerRule: Debug + std::any::Any { /// Rewrite `plan` to an optimized form fn optimize( &self, diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 218c2e4e47d04..6ff3fc88d4d42 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -104,7 +104,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone { Self: Sized; } -pub trait LogicalExtensionCodec: Debug + Send + Sync { +pub trait LogicalExtensionCodec: Debug + Send + Sync + std::any::Any { fn try_decode( &self, buf: &[u8], diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index bfba715b91249..c3ac3e6f7f97b 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Debug; @@ -3636,7 +3637,7 @@ pub trait AsExecutionPlan: Debug + Send + Sync + Clone { Self: Sized; } -pub trait PhysicalExtensionCodec: Debug + Send + Sync { +pub trait PhysicalExtensionCodec: Debug + Send + Sync + Any { fn try_decode( &self, buf: &[u8], diff --git a/datafusion/spark/src/function/aggregate/avg.rs b/datafusion/spark/src/function/aggregate/avg.rs index bbcda9b0f8c7f..9ad712713d26b 100644 --- a/datafusion/spark/src/function/aggregate/avg.rs +++ b/datafusion/spark/src/function/aggregate/avg.rs @@ -213,7 +213,7 @@ impl Accumulator for AvgAccumulator { struct AvgGroupsAccumulator where T: ArrowNumericType + Send, - F: Fn(T::Native, i64) -> Result + Send, + F: Fn(T::Native, i64) -> Result + Send + 'static, { /// The type of the returned average return_data_type: DataType, @@ -231,7 +231,7 @@ where impl AvgGroupsAccumulator where T: ArrowNumericType + Send, - F: Fn(T::Native, i64) -> Result + Send, + F: Fn(T::Native, i64) -> Result + Send + 'static, { pub fn new(return_data_type: &DataType, avg_fn: F) -> Self { Self { @@ -246,7 +246,7 @@ where impl GroupsAccumulator for AvgGroupsAccumulator where T: ArrowNumericType + Send, - F: Fn(T::Native, i64) -> Result + Send, + F: Fn(T::Native, i64) -> Result + Send + 'static, { fn update_batch( &mut self, diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 7614295975e5e..e610739a0312e 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -45,7 +45,7 @@ async-trait = { workspace = true } bigdecimal = { workspace = true } bytes = { workspace = true, optional = true } chrono = { workspace = true, optional = true } -clap = { version = "4.5.57", features = ["derive", "env"] } +clap = { version = "4.5.59", features = ["derive", "env"] } datafusion = { workspace = true, default-features = true, features = ["avro"] } datafusion-spark = { workspace = true, features = ["core"] } datafusion-substrait = { workspace = true, default-features = true } @@ -58,7 +58,7 @@ object_store = { workspace = true } postgres-types = { version = "0.2.12", features = ["derive", "with-chrono-0_4"], optional = true } # When updating the following dependency verify that sqlite test file regeneration works correctly # by running the regenerate_sqlite_files.sh script. -sqllogictest = "0.29.0" +sqllogictest = "0.29.1" sqlparser = { workspace = true } tempfile = { workspace = true } testcontainers-modules = { workspace = true, features = ["postgres"], optional = true } diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index a41cbecf77055..f675763120718 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -6832,7 +6832,7 @@ from array_distinct_table_2D; ---- [[1, 2], [3, 4], [5, 6]] [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] -[NULL, [5, 6]] +[[5, 6], NULL] query ? select array_distinct(column1) @@ -6864,7 +6864,7 @@ from array_distinct_table_2D_fixed; ---- [[1, 2], [3, 4], [5, 6]] [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] -[NULL, [5, 6]] +[[5, 6], NULL] query ??? select array_intersect(column1, column2), diff --git a/docs/requirements.txt b/docs/requirements.txt index 432557a0f68e2..a37f08e1253f8 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -19,6 +19,6 @@ sphinx==9.1.0 sphinx-reredirects==1.1.0 pydata-sphinx-theme==0.16.1 myst-parser==5.0.0 -maturin==1.11.5 +maturin==1.12.2 jinja2==3.1.6 setuptools==82.0.0