diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 29192d014..a46052f3b 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -21,10 +21,6 @@ ignore = [ # Marvin Attack: potential key recovery through timing sidechannels # Issues: https://github.com/apache/iceberg-rust/issues/221 "RUSTSEC-2023-0071", - # `instant` is unmaintained - # - # Introduced by datafusion, will be fixed in datafusion 44. - "RUSTSEC-2024-0384", # `derivative` is unmaintained; consider using an alternative # # Introduced by hive_metastore, tracked at https://github.com/cloudwego/pilota/issues/293 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c56a25cf8..873924c29 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -46,11 +46,12 @@ jobs: - name: Check License Header uses: apache/skywalking-eyes/header@v0.6.0 - - name: Install cargo-sort - run: make install-cargo-sort - - name: Install taplo-cli - run: make install-taplo-cli + uses: taiki-e/install-action@v2 + with: + tool: taplo-cli@0.9.3 + - name: Check toml format + run: make check-toml - name: Cargo format run: make check-fmt @@ -61,11 +62,19 @@ jobs: - name: Cargo clippy run: make check-clippy + - name: Install cargo-sort + uses: taiki-e/install-action@v2 + with: + tool: cargo-sort@1.0.9 - name: Cargo sort - run: make cargo-sort + run: cargo sort -c -w + - name: Install cargo-machete + uses: taiki-e/install-action@v2 + with: + tool: cargo-machete - name: Cargo Machete - run: make cargo-machete + run: cargo machete build: runs-on: ${{ matrix.os }} diff --git a/.github/workflows/ci_typos.yml b/.github/workflows/ci_typos.yml index b0f0349eb..593f015fa 100644 --- a/.github/workflows/ci_typos.yml +++ b/.github/workflows/ci_typos.yml @@ -42,4 +42,4 @@ jobs: steps: - uses: actions/checkout@v4 - name: Check typos - uses: crate-ci/typos@v1.28.4 + uses: crate-ci/typos@v1.29.4 diff --git a/Cargo.lock b/Cargo.lock index 42388f85c..ca6d792af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,7 +416,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -480,10 +480,9 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522" dependencies = [ - "bzip2", + "bzip2 0.4.4", "flate2", "futures-core", - "futures-io", "memchr", "pin-project-lite", "tokio", @@ -596,9 +595,9 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" [[package]] name = "async-trait" -version = "0.1.83" +version = "0.1.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" +checksum = "1b1244b10dcd56c92219da4e14caa97e312079e185f04ba3eea25061561dc0a0" dependencies = [ "proc-macro2", "quote", @@ -628,9 +627,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.5.11" +version = "1.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5d1c2c88936a73c699225d0bc00684a534166b0cebc2659c3cdf08de8edc64c" +checksum = "c03a50b30228d3af8865ce83376b4e99e1ffa34728220fe2860e4df0bb5278d6" dependencies = [ "aws-credential-types", "aws-runtime", @@ -670,9 +669,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.5.2" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44f6f1124d6e19ab6daf7f2e615644305dc6cb2d706892a8a8c0b98db35de020" +checksum = "b16d1aa50accc11a4b4d5c50f7fb81cc0cf60328259c587d0e6b0f11385bde46" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -695,9 +694,9 @@ dependencies = [ [[package]] name = "aws-sdk-glue" -version = "1.74.0" +version = "1.76.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcf70e5fdbed7934eff5a4990b0d85d3c02a88c6ae79b4c20b900bcf8c6890aa" +checksum = "9c25c89d6efe63a398cb727b79c285e06184c432985a0d221df0f23d7d10f1f9" dependencies = [ "aws-credential-types", "aws-runtime", @@ -717,9 +716,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3tables" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2111e5117b6e6bbe8c89ddca58e5c1339accc74a47757ab1e39db4f26999a426" +checksum = "b031430981550707ed53b591da47ebe8385fddc5cd99cc2851dc5d0eb3cbdb03" dependencies = [ "aws-credential-types", "aws-runtime", @@ -739,9 +738,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.51.0" +version = "1.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74995133da38f109a0eb8e8c886f9e80c713b6e9f2e6e5a6a1ba4450ce2ffc46" +checksum = "1605dc0bf9f0a4b05b451441a17fcb0bda229db384f23bf5cead3adbab0664ac" dependencies = [ "aws-credential-types", "aws-runtime", @@ -761,9 +760,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.52.0" +version = "1.54.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7062a779685cbf3b2401eb36151e2c6589fd5f3569b8a6bc2d199e5aaa1d059" +checksum = "59f3f73466ff24f6ad109095e0f3f2c830bfb4cd6c8b12f744c8e61ebf4d3ba1" dependencies = [ "aws-credential-types", "aws-runtime", @@ -783,9 +782,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.52.0" +version = "1.54.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "299dae7b1dc0ee50434453fa5a229dc4b22bd3ee50409ff16becf1f7346e0193" +checksum = "861d324ef69247c6f3c6823755f408a68877ffb1a9afaff6dd8b0057c760de60" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1241,6 +1240,16 @@ dependencies = [ "libc", ] +[[package]] +name = "bzip2" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bafdbf26611df8c14810e268ddceda071c297570a5fb360ceddf617fe417ef58" +dependencies = [ + "bzip2-sys", + "libc", +] + [[package]] name = "bzip2-sys" version = "0.1.11+1.0.8" @@ -1365,7 +1374,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -1634,11 +1643,10 @@ dependencies = [ [[package]] name = "datafusion" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbba0799cf6913b456ed07a94f0f3b6e12c62a5d88b10809e2284a0f2b915c05" +checksum = "014fc8c384ecacedaabb3bc8359c2a6c6e9d8f7bea65be3434eccacfc37f52d9" dependencies = [ - "ahash 0.8.11", "arrow", "arrow-array", "arrow-ipc", @@ -1646,7 +1654,7 @@ dependencies = [ "async-compression", "async-trait", "bytes", - "bzip2", + "bzip2 0.5.0", "chrono", "dashmap", "datafusion-catalog", @@ -1657,6 +1665,7 @@ dependencies = [ "datafusion-functions", "datafusion-functions-aggregate", "datafusion-functions-nested", + "datafusion-functions-table", "datafusion-functions-window", "datafusion-optimizer", "datafusion-physical-expr", @@ -1667,18 +1676,13 @@ dependencies = [ "flate2", "futures", "glob", - "half", - "hashbrown 0.14.5", - "indexmap 2.7.0", "itertools", "log", - "num_cpus", "object_store", "parking_lot", "parquet", - "paste", - "pin-project-lite", "rand", + "regex", "sqlparser", "tempfile", "tokio", @@ -1691,9 +1695,9 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7493c5c2d40eec435b13d92e5703554f4efc7059451fcb8d3a79580ff0e45560" +checksum = "ee60d33e210ef96070377ae667ece7caa0e959c8387496773d4a1a72f1a5012e" dependencies = [ "arrow-schema", "async-trait", @@ -1706,52 +1710,56 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24953049ebbd6f8964f91f60aa3514e121b5e81e068e33b60e77815ab369b25c" +checksum = "0b42b7d720fe21ed9cca2ebb635f3f13a12cfab786b41e0fba184fb2e620525b" dependencies = [ "ahash 0.8.11", "arrow", "arrow-array", "arrow-buffer", "arrow-schema", - "chrono", "half", "hashbrown 0.14.5", "indexmap 2.7.0", - "instant", "libc", - "num_cpus", + "log", "object_store", "parquet", "paste", + "recursive", "sqlparser", "tokio", + "web-time", ] [[package]] name = "datafusion-common-runtime" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f06df4ef76872e11c924d3c814fd2a8dd09905ed2e2195f71c857d78abd19685" +checksum = "72fbf14d4079f7ce5306393084fe5057dddfdc2113577e0049310afa12e94281" dependencies = [ "log", "tokio", ] +[[package]] +name = "datafusion-doc" +version = "44.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c278dbd64860ed0bb5240fc1f4cb6aeea437153910aea69bcf7d5a8d6d0454f3" + [[package]] name = "datafusion-execution" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bbdcb628d690f3ce5fea7de81642b514486d58ff9779a51f180a69a4eadb361" +checksum = "e22cb02af47e756468b3cbfee7a83e3d4f2278d452deb4b033ba933c75169486" dependencies = [ "arrow", - "chrono", "dashmap", "datafusion-common", "datafusion-expr", "futures", - "hashbrown 0.14.5", "log", "object_store", "parking_lot", @@ -1762,45 +1770,41 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8036495980e3131f706b7d33ab00b4492d73dc714e3cb74d11b50f9602a73246" +checksum = "62298eadb1d15b525df1315e61a71519ffc563d41d5c3b2a30fda2d70f77b93c" dependencies = [ - "ahash 0.8.11", "arrow", - "arrow-array", - "arrow-buffer", "chrono", "datafusion-common", + "datafusion-doc", "datafusion-expr-common", "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr-common", "indexmap 2.7.0", "paste", + "recursive", "serde_json", "sqlparser", - "strum", - "strum_macros", ] [[package]] name = "datafusion-expr-common" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4da0f3cb4669f9523b403d6b5a0ec85023e0ab3bf0183afd1517475b3e64fdd2" +checksum = "dda7f73c5fc349251cd3dcb05773c5bf55d2505a698ef9d38dfc712161ea2f55" dependencies = [ "arrow", "datafusion-common", "itertools", - "paste", ] [[package]] name = "datafusion-functions" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f52c4012648b34853e40a2c6bcaa8772f837831019b68aca384fb38436dba162" +checksum = "fd197f3b2975424d3a4898ea46651be855a46721a56727515dbd5c9e2fb597da" dependencies = [ "arrow", "arrow-buffer", @@ -1809,8 +1813,11 @@ dependencies = [ "blake3", "chrono", "datafusion-common", + "datafusion-doc", "datafusion-execution", "datafusion-expr", + "datafusion-expr-common", + "datafusion-macros", "hashbrown 0.14.5", "hex", "itertools", @@ -1825,44 +1832,44 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5b8bb624597ba28ed7446df4a9bd7c7a7bde7c578b6b527da3f47371d5f6741" +checksum = "aabbe48fba18f9981b134124381bee9e46f93518b8ad2f9721ee296cef5affb9" dependencies = [ "ahash 0.8.11", "arrow", "arrow-schema", "datafusion-common", + "datafusion-doc", "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate-common", + "datafusion-macros", "datafusion-physical-expr", "datafusion-physical-expr-common", "half", - "indexmap 2.7.0", "log", "paste", ] [[package]] name = "datafusion-functions-aggregate-common" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fb06208fc470bc8cf1ce2d9a1159d42db591f2c7264a8c1776b53ad8f675143" +checksum = "d7a3fefed9c8c11268d446d924baca8cabf52fe32f73fdaa20854bac6473590c" dependencies = [ "ahash 0.8.11", "arrow", "datafusion-common", "datafusion-expr-common", "datafusion-physical-expr-common", - "rand", ] [[package]] name = "datafusion-functions-nested" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fca25bbb87323716d05e54114666e942172ccca23c5a507e9c7851db6e965317" +checksum = "6360f27464fab857bec698af39b2ae331dc07c8bf008fb4de387a19cdc6815a5" dependencies = [ "arrow", "arrow-array", @@ -1878,18 +1885,35 @@ dependencies = [ "itertools", "log", "paste", - "rand", +] + +[[package]] +name = "datafusion-functions-table" +version = "44.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c35c070eb705c12795dab399c3809f4dfbc290678c624d3989490ca9b8449c1" +dependencies = [ + "arrow", + "async-trait", + "datafusion-catalog", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-plan", + "parking_lot", + "paste", ] [[package]] name = "datafusion-functions-window" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ae23356c634e54c59f7c51acb7a5b9f6240ffb2cf997049a1a24a8a88598dbe" +checksum = "52229bca26b590b140900752226c829f15fc1a99840e1ca3ce1a9534690b82a8" dependencies = [ "datafusion-common", + "datafusion-doc", "datafusion-expr", "datafusion-functions-window-common", + "datafusion-macros", "datafusion-physical-expr", "datafusion-physical-expr-common", "log", @@ -1898,48 +1922,54 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4b3d6ff7794acea026de36007077a06b18b89e4f9c3fea7f2215f9f7dd9059b" +checksum = "367befc303b64a668a10ae6988a064a9289e1999e71a7f8e526b6e14d6bdd9d6" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", ] +[[package]] +name = "datafusion-macros" +version = "44.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5de3c8f386ea991696553afe241a326ecbc3c98a12c562867e4be754d3a060c" +dependencies = [ + "quote", + "syn 2.0.92", +] + [[package]] name = "datafusion-optimizer" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bec6241eb80c595fa0e1a8a6b69686b5cf3bd5fdacb8319582a0943b0bd788aa" +checksum = "53b520413906f755910422b016fb73884ae6e9e1b376de4f9584b6c0e031da75" dependencies = [ "arrow", - "async-trait", "chrono", "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown 0.14.5", "indexmap 2.7.0", "itertools", "log", - "paste", - "regex-syntax", + "recursive", + "regex", + "regex-syntax 0.8.5", ] [[package]] name = "datafusion-physical-expr" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3370357b8fc75ec38577700644e5d1b0bc78f38babab99c0b8bd26bafb3e4335" +checksum = "acd6ddc378f6ad19af95ccd6790dec8f8e1264bc4c70e99ddc1830c1a1c78ccd" dependencies = [ "ahash 0.8.11", "arrow", "arrow-array", "arrow-buffer", - "arrow-ord", "arrow-schema", - "arrow-string", - "chrono", "datafusion-common", "datafusion-expr", "datafusion-expr-common", @@ -1956,39 +1986,40 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8b7734d94bf2fa6f6e570935b0ddddd8421179ce200065be97874e13d46a47b" +checksum = "06e6c05458eccd74b4c77ed6a1fe63d52434240711de7f6960034794dad1caf5" dependencies = [ "ahash 0.8.11", "arrow", "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", - "rand", + "itertools", ] [[package]] name = "datafusion-physical-optimizer" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eee8c479522df21d7b395640dff88c5ed05361852dce6544d7c98e9dbcebffe" +checksum = "9dc3a82190f49c37d377f31317e07ab5d7588b837adadba8ac367baad5dc2351" dependencies = [ "arrow", - "arrow-schema", "datafusion-common", "datafusion-execution", "datafusion-expr-common", "datafusion-physical-expr", "datafusion-physical-plan", "itertools", + "log", + "recursive", ] [[package]] name = "datafusion-physical-plan" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17e1fc2e2c239d14e8556f2622b19a726bf6bc6962cc00c71fc52626274bee24" +checksum = "6a6608bc9844b4ddb5ed4e687d173e6c88700b1d0482f43894617d18a1fe75da" dependencies = [ "ahash 0.8.11", "arrow", @@ -2002,7 +2033,6 @@ dependencies = [ "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", - "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr", "datafusion-physical-expr-common", @@ -2012,29 +2042,28 @@ dependencies = [ "indexmap 2.7.0", "itertools", "log", - "once_cell", "parking_lot", "pin-project-lite", - "rand", "tokio", ] [[package]] name = "datafusion-sql" -version = "43.0.0" +version = "44.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63e3a4ed41dbee20a5d947a59ca035c225d67dc9cbe869c10f66dcdf25e7ce51" +checksum = "6a884061c79b33d0c8e84a6f4f4be8bdc12c0f53f5af28ddf5d6d95ac0b15fdc" dependencies = [ "arrow", "arrow-array", "arrow-schema", + "bigdecimal", "datafusion-common", "datafusion-expr", "indexmap 2.7.0", "log", + "recursive", "regex", "sqlparser", - "strum", ] [[package]] @@ -2441,6 +2470,19 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6bd114ceda131d3b1d665eba35788690ad37f5916457286b32ab6fd3c438dd" +dependencies = [ + "cfg-if", + "libc", + "log", + "rustversion", + "windows", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2485,8 +2527,8 @@ dependencies = [ "aho-corasick", "bstr", "log", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", ] [[package]] @@ -2601,12 +2643,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hermit-abi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" - [[package]] name = "hermit-abi" version = "0.4.0" @@ -2850,7 +2886,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -3228,7 +3264,7 @@ dependencies = [ "globset", "log", "memchr", - "regex-automata", + "regex-automata 0.4.9", "same-file", "walkdir", "winapi-util", @@ -3266,18 +3302,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "instant" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" -dependencies = [ - "cfg-if", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "integer-encoding" version = "3.0.4" @@ -3532,6 +3556,19 @@ dependencies = [ "value-bag", ] +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lz4_flex" version = "0.11.3" @@ -3552,6 +3589,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "md-5" version = "0.10.6" @@ -3648,25 +3694,23 @@ dependencies = [ [[package]] name = "moka" -version = "0.12.8" +version = "0.12.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" dependencies = [ "async-lock", - "async-trait", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", "event-listener 5.3.1", "futures-util", - "once_cell", + "loom", "parking_lot", - "quanta", + "portable-atomic", "rustc_version", "smallvec", "tagptr", "thiserror 1.0.69", - "triomphe", "uuid", ] @@ -3748,6 +3792,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.3" @@ -3846,16 +3900,6 @@ dependencies = [ "libm", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi 0.3.9", - "libc", -] - [[package]] name = "num_enum" version = "0.7.3" @@ -3984,6 +4028,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking" version = "2.2.1" @@ -4310,7 +4360,7 @@ checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f" dependencies = [ "cfg-if", "concurrent-queue", - "hermit-abi 0.4.0", + "hermit-abi", "pin-project-lite", "rustix", "tracing", @@ -4323,6 +4373,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "325a6d2ac5dee293c3b2612d4993b98aec1dff096b0a2dae70ed7d95784a05da" +[[package]] +name = "portable-atomic" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6" + [[package]] name = "powerfmt" version = "0.2.0" @@ -4366,6 +4422,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "psm" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "200b9ff220857e53e184257720a14553b2f4aa02577d2ed9842d45d4b9654810" +dependencies = [ + "cc", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -4412,21 +4477,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" -[[package]] -name = "quanta" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773ce68d0bb9bc7ef20be3536ffe94e223e1f365bd374108b2659fac0c65cfe6" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid", - "wasi", - "web-sys", - "winapi", -] - [[package]] name = "quick-xml" version = "0.35.0" @@ -4556,12 +4606,23 @@ dependencies = [ ] [[package]] -name = "raw-cpuid" -version = "11.2.0" +name = "recursive" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +checksum = "0786a43debb760f491b1bc0269fe5e84155353c67482b9e60d0cfb596054b43e" dependencies = [ - "bitflags 2.6.0", + "recursive-proc-macro-impl", + "stacker", +] + +[[package]] +name = "recursive-proc-macro-impl" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" +dependencies = [ + "quote", + "syn 2.0.92", ] [[package]] @@ -4601,8 +4662,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -4613,7 +4683,7 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -4622,6 +4692,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -4675,9 +4751,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.11" +version = "0.12.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe060fe50f524be480214aba758c71f99f90ee8c83c5a36b5e9e1d568eb4eb3" +checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" dependencies = [ "base64 0.22.1", "bytes", @@ -5007,6 +5083,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -5106,9 +5188,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.134" +version = "1.0.135" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d" +checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9" dependencies = [ "itoa", "memchr", @@ -5191,6 +5273,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -5387,9 +5478,9 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.51.0" +version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7" +checksum = "05a528114c392209b3264855ad491fcce534b94a38771b0a0b97a79379275ce8" dependencies = [ "log", "sqlparser_derive", @@ -5397,9 +5488,9 @@ dependencies = [ [[package]] name = "sqlparser_derive" -version = "0.2.2" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" +checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c" dependencies = [ "proc-macro2", "quote", @@ -5606,6 +5697,19 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "stacker" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "799c883d55abdb5e98af1a7b3f23b9b6de8ecada0ecac058672d7635eb48ca7b" +dependencies = [ + "cc", + "cfg-if", + "libc", + "psm", + "windows-sys 0.59.0", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -5634,9 +5738,6 @@ name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" -dependencies = [ - "strum_macros", -] [[package]] name = "strum_macros" @@ -5713,12 +5814,13 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tempfile" -version = "3.14.0" +version = "3.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" +checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704" dependencies = [ "cfg-if", "fastrand", + "getrandom", "once_cell", "rustix", "windows-sys 0.59.0", @@ -5786,6 +5888,16 @@ dependencies = [ "syn 2.0.92", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "thrift" version = "0.17.0" @@ -5864,9 +5976,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.42.0" +version = "1.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", "bytes", @@ -5882,9 +5994,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", @@ -6009,19 +6121,43 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", + "valuable", ] [[package]] -name = "trim-in-place" -version = "0.1.7" +name = "tracing-log" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] [[package]] -name = "triomphe" -version = "0.1.11" +name = "tracing-subscriber" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "trim-in-place" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" [[package]] name = "try-lock" @@ -6237,6 +6373,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "value-bag" version = "1.10.0" @@ -6489,7 +6631,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -6498,6 +6640,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" +dependencies = [ + "windows-core 0.58.0", + "windows-targets 0.52.6", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -6507,6 +6659,41 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-implement" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.92", +] + +[[package]] +name = "windows-interface" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.92", +] + [[package]] name = "windows-registry" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 5b1dca422..d627af0bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,8 +51,8 @@ arrow-string = { version = "53" } async-stream = "0.3.5" async-trait = "0.1" async-std = "1.12" -aws-config = "1.1.8" -aws-sdk-glue = "1.21" +aws-config = "1.5.13" +aws-sdk-glue = "1.76" bimap = "0.6" bitvec = "1.0.1" bytes = "1.5" @@ -91,7 +91,7 @@ serde_derive = "1" serde_json = "1" serde_repr = "0.1.16" serde_with = "3.4" -tempfile = "3.8" +tempfile = "3.15" tokio = { version = "1", default-features = false } typed-builder = "0.20" url = "2" diff --git a/Makefile b/Makefile index 4ecc9bd88..fc8a52e5f 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ cargo-machete: install-cargo-machete cargo machete install-taplo-cli: - cargo install taplo-cli@0.9.0 + cargo install taplo-cli@0.9.3 fix-toml: install-taplo-cli taplo fmt diff --git a/crates/catalog/s3tables/Cargo.toml b/crates/catalog/s3tables/Cargo.toml index 772b328f3..9ea66a0f9 100644 --- a/crates/catalog/s3tables/Cargo.toml +++ b/crates/catalog/s3tables/Cargo.toml @@ -32,7 +32,7 @@ keywords = ["iceberg", "sql", "catalog"] anyhow = { workspace = true } async-trait = { workspace = true } aws-config = { workspace = true } -aws-sdk-s3tables = "1.2.0" +aws-sdk-s3tables = "1.3.0" iceberg = { workspace = true } serde_json = { workspace = true } uuid = { workspace = true, features = ["v4"] } diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 7f323722f..626ca15ef 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -61,7 +61,7 @@ derive_builder = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } -moka = { version = "0.12.8", features = ["future"] } +moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } num-bigint = { workspace = true } once_cell = { workspace = true } diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 91dfe85e9..142426f75 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -29,6 +29,7 @@ use arrow_array::{ }; use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; use bitvec::macros::internal::funty::Fundamental; +use num_bigint::BigInt; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use parquet::file::statistics::Statistics; use rust_decimal::prelude::ToPrimitive; @@ -119,8 +120,10 @@ fn visit_type(r#type: &DataType, visitor: &mut V) -> Resu DataType::Boolean | DataType::Utf8 | DataType::LargeUtf8 + | DataType::Utf8View | DataType::Binary | DataType::LargeBinary + | DataType::BinaryView | DataType::FixedSizeBinary(_) ) => { @@ -402,7 +405,9 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { { Ok(Type::Primitive(PrimitiveType::TimestamptzNs)) } - DataType::Binary | DataType::LargeBinary => Ok(Type::Primitive(PrimitiveType::Binary)), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => { + Ok(Type::Primitive(PrimitiveType::Binary)) + } DataType::FixedSizeBinary(width) => { Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64))) } @@ -739,9 +744,15 @@ macro_rules! get_parquet_stat_as_datum { let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else { return Ok(None); }; + let unscaled_value = BigInt::from_signed_bytes_be(bytes); Some(Datum::new( primitive_type.clone(), - PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)), + PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Can't convert bytes to i128: {:?}", bytes), + ) + })?), )) } ( diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/inspect/manifests.rs similarity index 54% rename from crates/iceberg/src/metadata_scan.rs rename to crates/iceberg/src/inspect/manifests.rs index 16604d781..ab63d2f6e 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -15,126 +15,32 @@ // specific language governing permissions and limitations // under the License. -//! Metadata table api. - use std::sync::Arc; use arrow_array::builder::{ - BooleanBuilder, ListBuilder, MapBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, + BooleanBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, }; -use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType}; +use arrow_array::types::{Int32Type, Int64Type, Int8Type}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; +use arrow_schema::{DataType, Field, Fields, Schema}; +use futures::{stream, StreamExt}; +use crate::scan::ArrowRecordBatchStream; use crate::table::Table; use crate::Result; -/// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table. -/// -/// References: -/// - -/// - -/// - -#[derive(Debug)] -pub struct MetadataTable(Table); - -impl MetadataTable { - /// Creates a new metadata scan. - pub(super) fn new(table: Table) -> Self { - Self(table) - } - - /// Get the snapshots table. - pub fn snapshots(&self) -> SnapshotsTable { - SnapshotsTable { table: &self.0 } - } - - /// Get the manifests table. - pub fn manifests(&self) -> ManifestsTable { - ManifestsTable { table: &self.0 } - } -} - -/// Snapshots table. -pub struct SnapshotsTable<'a> { - table: &'a Table, -} - -impl<'a> SnapshotsTable<'a> { - /// Returns the schema of the snapshots table. - pub fn schema(&self) -> Schema { - Schema::new(vec![ - Field::new( - "committed_at", - DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), - false, - ), - Field::new("snapshot_id", DataType::Int64, false), - Field::new("parent_id", DataType::Int64, true), - Field::new("operation", DataType::Utf8, false), - Field::new("manifest_list", DataType::Utf8, false), - Field::new( - "summary", - DataType::Map( - Arc::new(Field::new( - "entries", - DataType::Struct( - vec![ - Field::new("keys", DataType::Utf8, false), - Field::new("values", DataType::Utf8, true), - ] - .into(), - ), - false, - )), - false, - ), - false, - ), - ]) - } - - /// Scans the snapshots table. - pub fn scan(&self) -> Result { - let mut committed_at = - PrimitiveBuilder::::new().with_timezone("+00:00"); - let mut snapshot_id = PrimitiveBuilder::::new(); - let mut parent_id = PrimitiveBuilder::::new(); - let mut operation = StringBuilder::new(); - let mut manifest_list = StringBuilder::new(); - let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - - for snapshot in self.table.metadata().snapshots() { - committed_at.append_value(snapshot.timestamp_ms()); - snapshot_id.append_value(snapshot.snapshot_id()); - parent_id.append_option(snapshot.parent_snapshot_id()); - manifest_list.append_value(snapshot.manifest_list()); - operation.append_value(snapshot.summary().operation.as_str()); - for (key, value) in &snapshot.summary().additional_properties { - summary.keys().append_value(key); - summary.values().append_value(value); - } - summary.append(true)?; - } - - Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ - Arc::new(committed_at.finish()), - Arc::new(snapshot_id.finish()), - Arc::new(parent_id.finish()), - Arc::new(operation.finish()), - Arc::new(manifest_list.finish()), - Arc::new(summary.finish()), - ])?) - } -} - /// Manifests table. pub struct ManifestsTable<'a> { table: &'a Table, } impl<'a> ManifestsTable<'a> { - fn partition_summary_fields(&self) -> Vec { + /// Create a new Manifests table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + fn partition_summary_fields() -> Vec { vec![ Field::new("contains_null", DataType::Boolean, false), Field::new("contains_nan", DataType::Boolean, true), @@ -161,7 +67,7 @@ impl<'a> ManifestsTable<'a> { "partition_summaries", DataType::List(Arc::new(Field::new_struct( "item", - self.partition_summary_fields(), + Self::partition_summary_fields(), false, ))), false, @@ -170,7 +76,7 @@ impl<'a> ManifestsTable<'a> { } /// Scans the manifests table. - pub async fn scan(&self) -> Result { + pub async fn scan(&self) -> Result { let mut content = PrimitiveBuilder::::new(); let mut path = StringBuilder::new(); let mut length = PrimitiveBuilder::::new(); @@ -183,12 +89,12 @@ impl<'a> ManifestsTable<'a> { let mut existing_delete_files_count = PrimitiveBuilder::::new(); let mut deleted_delete_files_count = PrimitiveBuilder::::new(); let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( - Fields::from(self.partition_summary_fields()), + Fields::from(Self::partition_summary_fields()), 0, )) .with_field(Arc::new(Field::new_struct( "item", - self.partition_summary_fields(), + Self::partition_summary_fields(), false, ))); @@ -238,7 +144,7 @@ impl<'a> ManifestsTable<'a> { } } - Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![ Arc::new(content.finish()), Arc::new(path.finish()), Arc::new(length.finish()), @@ -251,151 +157,28 @@ impl<'a> ManifestsTable<'a> { Arc::new(existing_delete_files_count.finish()), Arc::new(deleted_delete_files_count.finish()), Arc::new(partition_summaries.finish()), - ])?) + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) } } #[cfg(test)] mod tests { - use expect_test::{expect, Expect}; - use itertools::Itertools; + use expect_test::expect; - use super::*; + use crate::inspect::metadata_table::tests::check_record_batches; use crate::scan::tests::TableTestFixture; - /// Snapshot testing to check the resulting record batch. - /// - /// - `expected_schema/data`: put `expect![[""]]` as a placeholder, - /// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result, - /// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). - /// Check the doc of [`expect_test`] for more details. - /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. - /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. - fn check_record_batch( - record_batch: RecordBatch, - expected_schema: Expect, - expected_data: Expect, - ignore_check_columns: &[&str], - sort_column: Option<&str>, - ) { - let mut columns = record_batch.columns().to_vec(); - if let Some(sort_column) = sort_column { - let column = record_batch.column_by_name(sort_column).unwrap(); - let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap(); - columns = columns - .iter() - .map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap()) - .collect_vec(); - } - - expected_schema.assert_eq(&format!( - "{}", - record_batch.schema().fields().iter().format(",\n") - )); - expected_data.assert_eq(&format!( - "{}", - record_batch - .schema() - .fields() - .iter() - .zip_eq(columns) - .map(|(field, column)| { - if ignore_check_columns.contains(&field.name().as_str()) { - format!("{}: (skipped)", field.name()) - } else { - format!("{}: {:?}", field.name(), column) - } - }) - .format(",\n") - )); - } - - #[test] - fn test_snapshots_table() { - let table = TableTestFixture::new().table; - let record_batch = table.metadata_table().snapshots().scan().unwrap(); - check_record_batch( - record_batch, - expect![[r#" - Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], - expect![[r#" - committed_at: PrimitiveArray - [ - 2018-01-04T21:22:35.770+00:00, - 2019-04-12T20:29:15.770+00:00, - ], - snapshot_id: PrimitiveArray - [ - 3051729675574597004, - 3055729675574597004, - ], - parent_id: PrimitiveArray - [ - null, - 3051729675574597004, - ], - operation: StringArray - [ - "append", - "append", - ], - manifest_list: (skipped), - summary: MapArray - [ - StructArray - -- validity: - [ - ] - [ - -- child 0: "keys" (Utf8) - StringArray - [ - ] - -- child 1: "values" (Utf8) - StringArray - [ - ] - ], - StructArray - -- validity: - [ - ] - [ - -- child 0: "keys" (Utf8) - StringArray - [ - ] - -- child 1: "values" (Utf8) - StringArray - [ - ] - ], - ]"#]], - &["manifest_list"], - Some("committed_at"), - ); - } - #[tokio::test] async fn test_manifests_table() { let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; - let record_batch = fixture - .table - .metadata_table() - .manifests() - .scan() - .await - .unwrap(); + let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap(); - check_record_batch( - record_batch, + check_record_batches( + batch_stream, expect![[r#" Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, @@ -480,6 +263,6 @@ mod tests { ]"#]], &["path", "length"], Some("path"), - ); + ).await; } } diff --git a/crates/iceberg/src/inspect/metadata_table.rs b/crates/iceberg/src/inspect/metadata_table.rs new file mode 100644 index 000000000..75dbc7472 --- /dev/null +++ b/crates/iceberg/src/inspect/metadata_table.rs @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{ManifestsTable, SnapshotsTable}; +use crate::table::Table; + +/// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table. +/// +/// References: +/// - +/// - +/// - +#[derive(Debug)] +pub struct MetadataTable<'a>(&'a Table); + +impl<'a> MetadataTable<'a> { + /// Creates a new metadata scan. + pub fn new(table: &'a Table) -> Self { + Self(table) + } + + /// Get the snapshots table. + pub fn snapshots(&self) -> SnapshotsTable { + SnapshotsTable::new(self.0) + } + + /// Get the manifests table. + pub fn manifests(&self) -> ManifestsTable { + ManifestsTable::new(self.0) + } +} + +#[cfg(test)] +pub mod tests { + use expect_test::Expect; + use futures::TryStreamExt; + use itertools::Itertools; + + use crate::scan::ArrowRecordBatchStream; + + /// Snapshot testing to check the resulting record batch. + /// + /// - `expected_schema/data`: put `expect![[""]]` as a placeholder, + /// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result, + /// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). + /// Check the doc of [`expect_test`] for more details. + /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. + /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. + pub async fn check_record_batches( + batch_stream: ArrowRecordBatchStream, + expected_schema: Expect, + expected_data: Expect, + ignore_check_columns: &[&str], + sort_column: Option<&str>, + ) { + let record_batches = batch_stream.try_collect::>().await.unwrap(); + assert!(!record_batches.is_empty(), "Empty record batches"); + + // Combine record batches using the first batch's schema + let first_batch = record_batches.first().unwrap(); + let record_batch = + arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap(); + + let mut columns = record_batch.columns().to_vec(); + if let Some(sort_column) = sort_column { + let column = record_batch.column_by_name(sort_column).unwrap(); + let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap(); + columns = columns + .iter() + .map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap()) + .collect_vec(); + } + + expected_schema.assert_eq(&format!( + "{}", + record_batch.schema().fields().iter().format(",\n") + )); + expected_data.assert_eq(&format!( + "{}", + record_batch + .schema() + .fields() + .iter() + .zip_eq(columns) + .map(|(field, column)| { + if ignore_check_columns.contains(&field.name().as_str()) { + format!("{}: (skipped)", field.name()) + } else { + format!("{}: {:?}", field.name(), column) + } + }) + .format(",\n") + )); + } +} diff --git a/crates/iceberg/src/inspect/mod.rs b/crates/iceberg/src/inspect/mod.rs new file mode 100644 index 000000000..b64420ea1 --- /dev/null +++ b/crates/iceberg/src/inspect/mod.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metadata table APIs. + +mod manifests; +mod metadata_table; +mod snapshots; + +pub use manifests::ManifestsTable; +pub use metadata_table::*; +pub use snapshots::SnapshotsTable; diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs new file mode 100644 index 000000000..1ee89963d --- /dev/null +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; +use arrow_array::types::{Int64Type, TimestampMillisecondType}; +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use futures::{stream, StreamExt}; + +use crate::scan::ArrowRecordBatchStream; +use crate::table::Table; +use crate::Result; + +/// Snapshots table. +pub struct SnapshotsTable<'a> { + table: &'a Table, +} + +impl<'a> SnapshotsTable<'a> { + /// Create a new Snapshots table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the schema of the snapshots table. + pub fn schema(&self) -> Schema { + Schema::new(vec![ + Field::new( + "committed_at", + DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), + false, + ), + Field::new("snapshot_id", DataType::Int64, false), + Field::new("parent_id", DataType::Int64, true), + Field::new("operation", DataType::Utf8, false), + Field::new("manifest_list", DataType::Utf8, false), + Field::new( + "summary", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true), + ] + .into(), + ), + false, + )), + false, + ), + false, + ), + ]) + } + + /// Scans the snapshots table. + pub async fn scan(&self) -> Result { + let mut committed_at = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut parent_id = PrimitiveBuilder::::new(); + let mut operation = StringBuilder::new(); + let mut manifest_list = StringBuilder::new(); + let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); + + for snapshot in self.table.metadata().snapshots() { + committed_at.append_value(snapshot.timestamp_ms()); + snapshot_id.append_value(snapshot.snapshot_id()); + parent_id.append_option(snapshot.parent_snapshot_id()); + manifest_list.append_value(snapshot.manifest_list()); + operation.append_value(snapshot.summary().operation.as_str()); + for (key, value) in &snapshot.summary().additional_properties { + summary.keys().append_value(key); + summary.values().append_value(value); + } + summary.append(true)?; + } + + let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) + } +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + + use crate::inspect::metadata_table::tests::check_record_batches; + use crate::scan::tests::TableTestFixture; + + #[tokio::test] + async fn test_snapshots_table() { + let table = TableTestFixture::new().table; + + let batch_stream = table.inspect().snapshots().scan().await.unwrap(); + + check_record_batches( + batch_stream, + expect![[r#" + Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + committed_at: PrimitiveArray + [ + 2018-01-04T21:22:35.770+00:00, + 2019-04-12T20:29:15.770+00:00, + ], + snapshot_id: PrimitiveArray + [ + 3051729675574597004, + 3055729675574597004, + ], + parent_id: PrimitiveArray + [ + null, + 3051729675574597004, + ], + operation: StringArray + [ + "append", + "append", + ], + manifest_list: (skipped), + summary: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "keys" (Utf8) + StringArray + [ + ] + -- child 1: "values" (Utf8) + StringArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "keys" (Utf8) + StringArray + [ + ] + -- child 1: "values" (Utf8) + StringArray + [ + ] + ], + ]"#]], + &["manifest_list"], + Some("committed_at"), + ).await; + } +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 1946f35f3..fe5a52999 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -73,7 +73,7 @@ mod avro; pub mod io; pub mod spec; -pub mod metadata_scan; +pub mod inspect; pub mod scan; pub mod expr; diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 649b6b2c4..709c4cdae 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -228,6 +228,11 @@ impl SchemaBuilder { results } + /// According to [the spec](https://iceberg.apache.org/spec/#identifier-fields), the identifier fields + /// must meet the following requirements: + /// - Float, double, and optional fields cannot be used as identifier fields. + /// - Identifier fields may be nested in structs but cannot be nested within maps or lists. + /// - A nested field cannot be used as an identifier field if it is nested in an optional struct, to avoid null values in identifiers. fn validate_identifier_ids( r#struct: &StructType, id_to_field: &HashMap, @@ -563,6 +568,9 @@ pub fn index_parents(r#struct: &StructType) -> Result> { type T = (); fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> { + if let Some(parent) = self.parents.last().copied() { + self.result.insert(field.id, parent); + } self.parents.push(field.id); Ok(()) } @@ -573,6 +581,9 @@ pub fn index_parents(r#struct: &StructType) -> Result> { } fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> { + if let Some(parent) = self.parents.last().copied() { + self.result.insert(field.id, parent); + } self.parents.push(field.id); Ok(()) } @@ -583,6 +594,9 @@ pub fn index_parents(r#struct: &StructType) -> Result> { } fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> { + if let Some(parent) = self.parents.last().copied() { + self.result.insert(field.id, parent); + } self.parents.push(field.id); Ok(()) } @@ -593,6 +607,9 @@ pub fn index_parents(r#struct: &StructType) -> Result> { } fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> { + if let Some(parent) = self.parents.last().copied() { + self.result.insert(field.id, parent); + } self.parents.push(field.id); Ok(()) } @@ -606,10 +623,7 @@ pub fn index_parents(r#struct: &StructType) -> Result> { Ok(()) } - fn field(&mut self, field: &NestedFieldRef, _value: Self::T) -> Result { - if let Some(parent) = self.parents.last().copied() { - self.result.insert(field.id, parent); - } + fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result { Ok(()) } @@ -1261,7 +1275,7 @@ mod tests { use crate::spec::schema::Schema; use crate::spec::schema::_serde::{SchemaEnum, SchemaV1, SchemaV2}; use crate::spec::values::Map as MapValue; - use crate::spec::{prune_columns, Datum, Literal}; + use crate::spec::{index_parents, prune_columns, Datum, Literal}; fn check_schema_serde(json: &str, expected_type: Schema, _expected_enum: SchemaEnum) { let desered_type: Schema = serde_json::from_str(json).unwrap(); @@ -2627,4 +2641,143 @@ table { assert_eq!(schema, reassigned_schema); assert_eq!(schema.highest_field_id(), 0); } + + #[test] + fn test_index_parent() { + let schema = table_schema_nested(); + let result = index_parents(&schema.r#struct).unwrap(); + assert_eq!(result.get(&5).unwrap(), &4); + assert_eq!(result.get(&7).unwrap(), &6); + assert_eq!(result.get(&8).unwrap(), &6); + assert_eq!(result.get(&9).unwrap(), &8); + assert_eq!(result.get(&10).unwrap(), &8); + assert_eq!(result.get(&12).unwrap(), &11); + assert_eq!(result.get(&13).unwrap(), &12); + assert_eq!(result.get(&14).unwrap(), &12); + assert_eq!(result.get(&16).unwrap(), &15); + assert_eq!(result.get(&17).unwrap(), &15); + } + + #[test] + fn test_identifier_field_ids() { + // field in map + assert!(Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![NestedField::required( + 1, + "Map", + Type::Map(MapType::new( + NestedField::map_key_element(2, Type::Primitive(PrimitiveType::String)).into(), + NestedField::map_value_element( + 3, + Type::Primitive(PrimitiveType::Boolean), + true, + ) + .into(), + )), + ) + .into()]) + .build() + .is_err()); + assert!(Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![3]) + .with_fields(vec![NestedField::required( + 1, + "Map", + Type::Map(MapType::new( + NestedField::map_key_element(2, Type::Primitive(PrimitiveType::String)).into(), + NestedField::map_value_element( + 3, + Type::Primitive(PrimitiveType::Boolean), + true, + ) + .into(), + )), + ) + .into()]) + .build() + .is_err()); + + // field in list + assert!(Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![NestedField::required( + 1, + "List", + Type::List(ListType::new( + NestedField::list_element(2, Type::Primitive(PrimitiveType::String), true) + .into(), + )), + ) + .into()]) + .build() + .is_err()); + + // field in optional struct + assert!(Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![NestedField::optional( + 1, + "Struct", + Type::Struct(StructType::new(vec![ + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "age", Type::Primitive(PrimitiveType::Int)).into(), + ])), + ) + .into()]) + .build() + .is_err()); + + // float and double + assert!(Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![NestedField::required( + 1, + "Float", + Type::Primitive(PrimitiveType::Float), + ) + .into()]) + .build() + .is_err()); + assert!(Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![NestedField::required( + 1, + "Double", + Type::Primitive(PrimitiveType::Double), + ) + .into()]) + .build() + .is_err()); + + // optional field + assert!(Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![NestedField::required( + 1, + "Required", + Type::Primitive(PrimitiveType::String), + ) + .into()]) + .build() + .is_ok()); + assert!(Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![NestedField::optional( + 1, + "Optional", + Type::Primitive(PrimitiveType::String), + ) + .into()]) + .build() + .is_err()); + } } diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index fa5304855..ebee670f4 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -20,9 +20,9 @@ use std::sync::Arc; use crate::arrow::ArrowReaderBuilder; +use crate::inspect::MetadataTable; use crate::io::object_cache::ObjectCache; use crate::io::FileIO; -use crate::metadata_scan::MetadataTable; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -203,7 +203,7 @@ impl Table { /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. - pub fn metadata_table(self) -> MetadataTable { + pub fn inspect(&self) -> MetadataTable<'_> { MetadataTable::new(self) } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 596228f7c..5561b1913 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -478,15 +478,18 @@ mod tests { use anyhow::Result; use arrow_array::types::Int64Type; use arrow_array::{ - Array, ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, StructArray, + Array, ArrayRef, BooleanArray, Decimal128Array, Int32Array, Int64Array, ListArray, + RecordBatch, StructArray, }; use arrow_schema::{DataType, SchemaRef as ArrowSchemaRef}; use arrow_select::concat::concat_batches; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use rust_decimal::Decimal; use tempfile::TempDir; use uuid::Uuid; use super::*; + use crate::arrow::schema_to_arrow_schema; use crate::io::FileIOBuilder; use crate::spec::{PrimitiveLiteral, Struct, *}; use crate::writer::file_writer::location_generator::test::MockLocationGenerator; @@ -1169,4 +1172,245 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_decimal_bound() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let loccation_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // test 1.1 and 2.2 + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + loccation_gen.clone(), + file_name_gen.clone(), + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![Some(22000000000), Some(11000000000)]) + .with_data_type(DataType::Decimal128(28, 10)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + + // test -1.1 and -2.2 + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + loccation_gen.clone(), + file_name_gen.clone(), + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)]) + .with_data_type(DataType::Decimal128(28, 10)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + + // test max and min of rust_decimal + let decimal_max = Decimal::MAX; + let decimal_min = Decimal::MIN; + assert_eq!(decimal_max.scale(), decimal_min.scale()); + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: decimal_max.scale(), + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema, + file_io.clone(), + loccation_gen, + file_name_gen, + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![ + Some(decimal_max.mantissa()), + Some(decimal_min.mantissa()), + ]) + .with_data_type(DataType::Decimal128(38, 0)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::decimal(decimal_max).unwrap()).as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::decimal(decimal_min).unwrap()).as_ref() + ); + + // test max and min for scale 38 + // # TODO + // Readd this case after resolve https://github.com/apache/iceberg-rust/issues/669 + // let schema = Arc::new( + // Schema::builder() + // .with_fields(vec![NestedField::optional( + // 0, + // "decimal", + // Type::Primitive(PrimitiveType::Decimal { + // precision: 38, + // scale: 0, + // }), + // ) + // .into()]) + // .build() + // .unwrap(), + // ); + // let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + // let mut pw = ParquetWriterBuilder::new( + // WriterProperties::builder().build(), + // schema, + // file_io.clone(), + // loccation_gen, + // file_name_gen, + // ) + // .build() + // .await?; + // let col0 = Arc::new( + // Decimal128Array::from(vec![ + // Some(99999999999999999999999999999999999999_i128), + // Some(-99999999999999999999999999999999999999_i128), + // ]) + // .with_data_type(DataType::Decimal128(38, 0)), + // ) as ArrayRef; + // let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + // pw.write(&to_write).await?; + // let res = pw.close().await?; + // assert_eq!(res.len(), 1); + // let data_file = res + // .into_iter() + // .next() + // .unwrap() + // .content(crate::spec::DataContentType::Data) + // .partition(Struct::empty()) + // .build() + // .unwrap(); + // assert_eq!( + // data_file.upper_bounds().get(&0), + // Some(Datum::new( + // PrimitiveType::Decimal { + // precision: 38, + // scale: 0 + // }, + // PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128) + // )) + // .as_ref() + // ); + // assert_eq!( + // data_file.lower_bounds().get(&0), + // Some(Datum::new( + // PrimitiveType::Decimal { + // precision: 38, + // scale: 0 + // }, + // PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128) + // )) + // .as_ref() + // ); + + Ok(()) + } } diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index d3930cfda..81a94d839 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -31,7 +31,7 @@ keywords = ["iceberg", "integrations", "datafusion"] [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } -datafusion = { version = "43" } +datafusion = { version = "44" } futures = { workspace = true } iceberg = { workspace = true } tokio = { workspace = true } diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index eaf2f94e5..f33437eec 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -25,10 +25,9 @@ use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::error::Result as DFResult; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use datafusion::physical_plan::{ - DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, -}; +use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; use datafusion::prelude::Expr; use futures::{Stream, TryStreamExt}; use iceberg::expr::Predicate; @@ -88,7 +87,8 @@ impl IcebergTableScan { PlanProperties::new( EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/docs/contributing/orbstack.md b/docs/contributing/orbstack.md index 29eb09dc5..cb00849d6 100644 --- a/docs/contributing/orbstack.md +++ b/docs/contributing/orbstack.md @@ -34,6 +34,6 @@ ``` { - "registry-mirrors": ["https://registry.docker.ir", "https://docker.iranserver.com"] + "registry-mirrors": [""] } ``` \ No newline at end of file