Skip to content

Commit

Permalink
[Core] Add order type in paimon (#5020)
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs authored Feb 11, 2025
1 parent 47cc1a1 commit 6facb71
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 84 deletions.
33 changes: 33 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3226,4 +3226,37 @@ public String toString() {
return value;
}
}

/** The order type of table sort. */
public enum OrderType {
ORDER("order"),
ZORDER("zorder"),
HILBERT("hilbert"),
NONE("none");

private final String orderType;

OrderType(String orderType) {
this.orderType = orderType;
}

@Override
public String toString() {
return "order type: " + orderType;
}

public static OrderType of(String orderType) {
if (ORDER.orderType.equalsIgnoreCase(orderType)) {
return ORDER;
} else if (ZORDER.orderType.equalsIgnoreCase(orderType)) {
return ZORDER;
} else if (HILBERT.orderType.equalsIgnoreCase(orderType)) {
return HILBERT;
} else if (NONE.orderType.equalsIgnoreCase(orderType)) {
return NONE;
}

throw new IllegalArgumentException("cannot match type: " + orderType + " for ordering");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;
import org.apache.paimon.utils.ZOrderByteUtils;

import java.io.Serializable;
import java.nio.ByteBuffer;
Expand All @@ -57,8 +56,8 @@
import java.util.Set;
import java.util.function.BiFunction;

import static org.apache.paimon.utils.ZOrderByteUtils.NULL_BYTES;
import static org.apache.paimon.utils.ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE;
import static org.apache.paimon.sort.zorder.ZOrderByteUtils.NULL_BYTES;
import static org.apache.paimon.sort.zorder.ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE;

/** Z-indexer for responsibility to generate z-index. */
public class ZIndexer implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.utils;
package org.apache.paimon.sort.zorder;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.utils;
package org.apache.paimon.sort.zorder;

import org.junit.Assert;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.ZOrderByteUtils;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand All @@ -51,14 +51,14 @@
import java.util.List;
import java.util.Map;

import static org.apache.paimon.CoreOptions.OrderType.HILBERT;
import static org.apache.paimon.CoreOptions.OrderType.ORDER;
import static org.apache.paimon.CoreOptions.OrderType.ZORDER;
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
import static org.apache.paimon.flink.sink.FlinkSink.isStreaming;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
import static org.apache.paimon.flink.sorter.TableSorter.OrderType.HILBERT;
import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ORDER;
import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ZORDER;
import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.paimon.flink.sorter;

import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sorter;

import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.action.SortCompactAction;
import org.apache.paimon.table.FileStoreTable;

Expand Down Expand Up @@ -83,34 +84,4 @@ public static TableSorter getSorter(
throw new IllegalArgumentException("cannot match order type: " + sortStrategy);
}
}

/** The order type of table sort. */
public enum OrderType {
ORDER("order"),
ZORDER("zorder"),
HILBERT("hilbert");

private final String orderType;

OrderType(String orderType) {
this.orderType = orderType;
}

@Override
public String toString() {
return "order type: " + orderType;
}

public static OrderType of(String orderType) {
if (ORDER.orderType.equalsIgnoreCase(orderType)) {
return ORDER;
} else if (ZORDER.orderType.equalsIgnoreCase(orderType)) {
return ZORDER;
} else if (HILBERT.orderType.equalsIgnoreCase(orderType)) {
return HILBERT;
}

throw new IllegalArgumentException("cannot match type: " + orderType + " for ordering");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.paimon.flink.sorter;

import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sorter.TableSorter.OrderType;

import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.spark.procedure;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
Expand Down Expand Up @@ -144,7 +145,7 @@ public InternalRow[] call(InternalRow args) {
String partitions = blank(args, 1) ? null : args.getString(1);
// make full compact strategy as default.
String compactStrategy = blank(args, 2) ? FULL : args.getString(2);
String sortType = blank(args, 3) ? TableSorter.OrderType.NONE.name() : args.getString(3);
String sortType = blank(args, 3) ? OrderType.NONE.name() : args.getString(3);
List<String> sortColumns =
blank(args, 4)
? Collections.emptyList()
Expand All @@ -153,11 +154,11 @@ public InternalRow[] call(InternalRow args) {
String options = args.isNullAt(6) ? null : args.getString(6);
Duration partitionIdleTime =
blank(args, 7) ? null : TimeUtils.parseDuration(args.getString(7));
if (TableSorter.OrderType.NONE.name().equals(sortType) && !sortColumns.isEmpty()) {
if (OrderType.NONE.name().equals(sortType) && !sortColumns.isEmpty()) {
throw new IllegalArgumentException(
"order_strategy \"none\" cannot work with order_by columns.");
}
if (partitionIdleTime != null && (!TableSorter.OrderType.NONE.name().equals(sortType))) {
if (partitionIdleTime != null && (!OrderType.NONE.name().equals(sortType))) {
throw new IllegalArgumentException(
"sort compact do not support 'partition_idle_time'.");
}
Expand Down Expand Up @@ -234,7 +235,7 @@ private boolean execute(
@Nullable Expression condition,
@Nullable Duration partitionIdleTime) {
BucketMode bucketMode = table.bucketMode();
TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
OrderType orderType = OrderType.of(sortType);
boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL);
Predicate filter =
condition == null
Expand All @@ -245,7 +246,7 @@ private boolean execute(
table.rowType(),
false)
.getOrElse(null);
if (orderType.equals(TableSorter.OrderType.NONE)) {
if (orderType.equals(OrderType.NONE)) {
JavaSparkContext javaSparkContext = new JavaSparkContext(spark().sparkContext());
switch (bucketMode) {
case HASH_FIXED:
Expand Down Expand Up @@ -474,7 +475,7 @@ private Set<BinaryRow> getHistoryPartition(

private void sortCompactUnAwareBucketTable(
FileStoreTable table,
TableSorter.OrderType orderType,
OrderType orderType,
List<String> sortColumns,
DataSourceV2Relation relation,
@Nullable Predicate filter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.spark.sort;

import org.apache.paimon.utils.ZOrderByteUtils;
import org.apache.paimon.sort.zorder.ZOrderByteUtils;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.expressions.UserDefinedFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark.sort;

import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.table.FileStoreTable;

import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -62,7 +63,7 @@ private void checkColNames() {
public abstract Dataset<Row> sort(Dataset<Row> input);

public static TableSorter getSorter(
FileStoreTable table, TableSorter.OrderType orderType, List<String> orderColumns) {
FileStoreTable table, OrderType orderType, List<String> orderColumns) {
switch (orderType) {
case ORDER:
return new OrderSorter(table, orderColumns);
Expand All @@ -81,37 +82,4 @@ public Dataset<Row> sort(Dataset<Row> input) {
throw new IllegalArgumentException("cannot match order type: " + orderType);
}
}

/** order type for sorting. */
public enum OrderType {
ORDER("order"),
ZORDER("zorder"),
HILBERT("hilbert"),
NONE("none");

private final String orderType;

OrderType(String orderType) {
this.orderType = orderType;
}

@Override
public String toString() {
return "order type: " + orderType;
}

public static OrderType of(String orderType) {
if (ORDER.orderType.equalsIgnoreCase(orderType)) {
return ORDER;
} else if (ZORDER.orderType.equalsIgnoreCase(orderType)) {
return ZORDER;
} else if (HILBERT.orderType.equalsIgnoreCase(orderType)) {
return HILBERT;
} else if (NONE.orderType.equalsIgnoreCase(orderType)) {
return NONE;
}

throw new IllegalArgumentException("cannot match type: " + orderType + " for ordering");
}
}
}

0 comments on commit 6facb71

Please sign in to comment.