Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,12 @@ impl DataFrame {
pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame> {
let fields = columns
.iter()
.map(|name| {
.flat_map(|name| {
self.plan
.schema()
.qualified_field_with_unqualified_name(name)
.qualified_fields_with_unqualified_name(name)
})
.collect::<Result<Vec<_>>>()?;
.collect::<Vec<_>>();
let expr: Vec<Expr> = fields
.into_iter()
.map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
Expand Down Expand Up @@ -428,13 +428,12 @@ impl DataFrame {
pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame> {
let fields_to_drop = columns
.iter()
.map(|name| {
.flat_map(|name| {
self.plan
.schema()
.qualified_field_with_unqualified_name(name)
.qualified_fields_with_unqualified_name(name)
})
.filter(|r| r.is_ok())
.collect::<Result<Vec<_>>>()?;
.collect::<Vec<_>>();
let expr: Vec<Expr> = self
.plan
.schema()
Expand Down
97 changes: 97 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,55 @@ async fn select_with_periods() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn select_columns_duplicated_names_from_different_qualifiers() -> Result<()> {
let t1 = test_table_with_name("t1")
.await?
.select_columns(&["c1"])?
.limit(0, Some(3))?;
let t2 = test_table_with_name("t2")
.await?
.select_columns(&["c1"])?
.limit(3, Some(3))?;
let t3 = test_table_with_name("t3")
.await?
.select_columns(&["c1"])?
.limit(6, Some(3))?;

let join_res = t1
.join(t2, JoinType::Left, &["t1.c1"], &["t2.c1"], None)?
.join(t3, JoinType::Left, &["t1.c1"], &["t3.c1"], None)?;
assert_snapshot!(
batches_to_sort_string(&join_res.clone().collect().await.unwrap()),
@r"
+----+----+----+
| c1 | c1 | c1 |
+----+----+----+
| b | b | |
| b | b | |
| c | | |
| d | | d |
+----+----+----+
"
);

let select_res = join_res.select_columns(&["c1"])?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line returned SchemaError without the code changes.

    Error: SchemaError(AmbiguousReference { field: Column { relation: None, name: "c1" } }, Some(""))

assert_snapshot!(
batches_to_sort_string(&select_res.clone().collect().await.unwrap()),
@r"
+----+----+----+
| c1 | c1 | c1 |
+----+----+----+
| b | b | |
| b | b | |
| c | | |
| d | | d |
+----+----+----+
"
);
Ok(())
}

#[tokio::test]
async fn drop_columns() -> Result<()> {
// build plan using Table API
Expand Down Expand Up @@ -543,6 +592,54 @@ async fn drop_with_periods() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn drop_columns_duplicated_names_from_different_qualifiers() -> Result<()> {
let t1 = test_table_with_name("t1")
.await?
.select_columns(&["c1"])?
.limit(0, Some(3))?;
let t2 = test_table_with_name("t2")
.await?
.select_columns(&["c1"])?
.limit(3, Some(3))?;
let t3 = test_table_with_name("t3")
.await?
.select_columns(&["c1"])?
.limit(6, Some(3))?;

let join_res = t1
.join(t2, JoinType::LeftMark, &["c1"], &["c1"], None)?
.join(t3, JoinType::LeftMark, &["c1"], &["c1"], None)?;
assert_snapshot!(
batches_to_sort_string(&join_res.clone().collect().await.unwrap()),
@r"
+----+-------+-------+
| c1 | mark | mark |
+----+-------+-------+
| b | true | false |
| c | false | false |
| d | false | true |
+----+-------+-------+
"
);

let drop_res = join_res.drop_columns(&["mark"])?;
assert_snapshot!(
batches_to_sort_string(&drop_res.clone().collect().await.unwrap()),
@r"
+----+
| c1 |
+----+
| b |
| c |
| d |
+----+
"
);

Ok(())
}
Comment on lines +626 to +640
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This failed without the code changes

    running 1 test
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Snapshot Summary ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
    Snapshot: drop_columns_duplicated_names_from_different_qualifiers-2
    Source: datafusion/core/tests/dataframe/mod.rs:627
    ────────────────────────────────────────────────────────────────────────────────
    Expression: batches_to_sort_string(&drop_res.clone().collect().await.unwrap())
    ────────────────────────────────────────────────────────────────────────────────
    -old snapshot
    +new results
    ────────────┬───────────────────────────────────────────────────────────────────
        1       │-+----+
        2       │-| c1 |
        3       │-+----+
        4       │-| b  |
        5       │-| c  |
        6       │-| d  |
        7       │-+----+
              1 │++----+-------+-------+
              2 │+| c1 | mark  | mark  |
              3 │++----+-------+-------+
              4 │+| b  | true  | false |
              5 │+| c  | false | false |
              6 │+| d  | false | true  |
              7 │++----+-------+-------+
    ────────────┴───────────────────────────────────────────────────────────────────
    To update snapshots run `cargo insta review`
    Stopped on the first failure. Run `cargo insta test` to run all snapshots.
    test dataframe::drop_columns_duplicated_names_from_different_qualifiers ... FAILED

    failures:

    failures:
        dataframe::drop_columns_duplicated_names_from_different_qualifiers

    test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 769 filtered out; finished in 0.17s


#[tokio::test]
async fn aggregate() -> Result<()> {
// build plan using DataFrame API
Expand Down
Loading