Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add order type in paimon #5020

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3226,4 +3226,35 @@ public String toString() {
return value;
}
}

/** The order type of table sort. */
public enum OrderType {
ORDER("order"),
ZORDER("zorder"),
HILBERT("hilbert"),
NONE("none");

private final String orderType;

OrderType(String orderType) {
this.orderType = orderType;
}

@Override
public String toString() {
return "order type: " + orderType;
}

public static OrderType of(String orderType) {
if (ORDER.orderType.equalsIgnoreCase(orderType)) {
return ORDER;
} else if (ZORDER.orderType.equalsIgnoreCase(orderType)) {
return ZORDER;
} else if (HILBERT.orderType.equalsIgnoreCase(orderType)) {
return HILBERT;
}

throw new IllegalArgumentException("cannot match type: " + orderType + " for ordering");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand All @@ -51,14 +51,14 @@
import java.util.List;
import java.util.Map;

import static org.apache.paimon.CoreOptions.OrderType.HILBERT;
import static org.apache.paimon.CoreOptions.OrderType.ORDER;
import static org.apache.paimon.CoreOptions.OrderType.ZORDER;
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
import static org.apache.paimon.flink.sink.FlinkSink.isStreaming;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
import static org.apache.paimon.flink.sorter.TableSorter.OrderType.HILBERT;
import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ORDER;
import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ZORDER;
import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.paimon.flink.sorter;

import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sorter;

import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.action.SortCompactAction;
import org.apache.paimon.table.FileStoreTable;

Expand Down Expand Up @@ -83,34 +84,4 @@ public static TableSorter getSorter(
throw new IllegalArgumentException("cannot match order type: " + sortStrategy);
}
}

/** The order type of table sort. */
public enum OrderType {
ORDER("order"),
ZORDER("zorder"),
HILBERT("hilbert");

private final String orderType;

OrderType(String orderType) {
this.orderType = orderType;
}

@Override
public String toString() {
return "order type: " + orderType;
}

public static OrderType of(String orderType) {
if (ORDER.orderType.equalsIgnoreCase(orderType)) {
return ORDER;
} else if (ZORDER.orderType.equalsIgnoreCase(orderType)) {
return ZORDER;
} else if (HILBERT.orderType.equalsIgnoreCase(orderType)) {
return HILBERT;
}

throw new IllegalArgumentException("cannot match type: " + orderType + " for ordering");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.paimon.flink.sorter;

import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;

import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.spark.procedure;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
Expand Down Expand Up @@ -144,7 +145,7 @@ public InternalRow[] call(InternalRow args) {
String partitions = blank(args, 1) ? null : args.getString(1);
// make full compact strategy as default.
String compactStrategy = blank(args, 2) ? FULL : args.getString(2);
String sortType = blank(args, 3) ? TableSorter.OrderType.NONE.name() : args.getString(3);
String sortType = blank(args, 3) ? OrderType.NONE.name() : args.getString(3);
List<String> sortColumns =
blank(args, 4)
? Collections.emptyList()
Expand All @@ -153,11 +154,11 @@ public InternalRow[] call(InternalRow args) {
String options = args.isNullAt(6) ? null : args.getString(6);
Duration partitionIdleTime =
blank(args, 7) ? null : TimeUtils.parseDuration(args.getString(7));
if (TableSorter.OrderType.NONE.name().equals(sortType) && !sortColumns.isEmpty()) {
if (OrderType.NONE.name().equals(sortType) && !sortColumns.isEmpty()) {
throw new IllegalArgumentException(
"order_strategy \"none\" cannot work with order_by columns.");
}
if (partitionIdleTime != null && (!TableSorter.OrderType.NONE.name().equals(sortType))) {
if (partitionIdleTime != null && (!OrderType.NONE.name().equals(sortType))) {
throw new IllegalArgumentException(
"sort compact do not support 'partition_idle_time'.");
}
Expand Down Expand Up @@ -234,7 +235,7 @@ private boolean execute(
@Nullable Expression condition,
@Nullable Duration partitionIdleTime) {
BucketMode bucketMode = table.bucketMode();
TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
OrderType orderType = OrderType.of(sortType);
boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL);
Predicate filter =
condition == null
Expand All @@ -245,7 +246,7 @@ private boolean execute(
table.rowType(),
false)
.getOrElse(null);
if (orderType.equals(TableSorter.OrderType.NONE)) {
if (orderType.equals(OrderType.NONE)) {
JavaSparkContext javaSparkContext = new JavaSparkContext(spark().sparkContext());
switch (bucketMode) {
case HASH_FIXED:
Expand Down Expand Up @@ -474,7 +475,7 @@ private Set<BinaryRow> getHistoryPartition(

private void sortCompactUnAwareBucketTable(
FileStoreTable table,
TableSorter.OrderType orderType,
OrderType orderType,
List<String> sortColumns,
DataSourceV2Relation relation,
@Nullable Predicate filter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark.sort;

import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.table.FileStoreTable;

import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -62,7 +63,7 @@ private void checkColNames() {
public abstract Dataset<Row> sort(Dataset<Row> input);

public static TableSorter getSorter(
FileStoreTable table, TableSorter.OrderType orderType, List<String> orderColumns) {
FileStoreTable table, OrderType orderType, List<String> orderColumns) {
switch (orderType) {
case ORDER:
return new OrderSorter(table, orderColumns);
Expand All @@ -81,37 +82,4 @@ public Dataset<Row> sort(Dataset<Row> input) {
throw new IllegalArgumentException("cannot match order type: " + orderType);
}
}

/** order type for sorting. */
public enum OrderType {
ORDER("order"),
ZORDER("zorder"),
HILBERT("hilbert"),
NONE("none");

private final String orderType;

OrderType(String orderType) {
this.orderType = orderType;
}

@Override
public String toString() {
return "order type: " + orderType;
}

public static OrderType of(String orderType) {
if (ORDER.orderType.equalsIgnoreCase(orderType)) {
return ORDER;
} else if (ZORDER.orderType.equalsIgnoreCase(orderType)) {
return ZORDER;
} else if (HILBERT.orderType.equalsIgnoreCase(orderType)) {
return HILBERT;
} else if (NONE.orderType.equalsIgnoreCase(orderType)) {
return NONE;
}

throw new IllegalArgumentException("cannot match type: " + orderType + " for ordering");
}
}
}
Loading