Skip to content

Commit

Permalink
[flink] Introduce bypass append compact coordinator and worker (#3936)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Aug 13, 2024
1 parent aa3bfb1 commit fa7623e
Show file tree
Hide file tree
Showing 13 changed files with 546 additions and 100 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.paimon.Snapshot;

import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;

/** Test case for append-only managed unaware-bucket table. */
public class UnawareBucketAppendOnlyTableITCase extends CatalogITCaseBase {

@Override
protected List<String> ddl() {
return Collections.singletonList(
"CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('bucket' = '-1')");
}

@Test
public void testCompactionInStreamingMode() throws Exception {
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");
batchSql("ALTER TABLE append_table SET ('continuous.discovery-interval' = '1 s')");

sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(500));
sEnv.executeSql(
"CREATE TEMPORARY TABLE Orders_in (\n"
+ " f0 INT,\n"
+ " f1 STRING\n"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '1',\n"
+ " 'number-of-rows' = '10'\n"
+ ")");

assertStreamingHasCompact("INSERT INTO append_table SELECT * FROM Orders_in", 60000);
// ensure data gen finished
Thread.sleep(5000);

List<Row> rows = batchSql("SELECT * FROM append_table");
assertThat(rows.size()).isEqualTo(10);
}

private void assertStreamingHasCompact(String sql, long timeout) throws Exception {
long start = System.currentTimeMillis();
long currentId = 1;
sEnv.executeSql(sql);
Snapshot snapshot;
while (true) {
snapshot = findSnapshot("append_table", currentId);
if (snapshot != null) {
if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
break;
}
currentId++;
}
long now = System.currentTimeMillis();
if (now - start > timeout) {
throw new RuntimeException(
"Time up for streaming execute, don't get expected result.");
}
Thread.sleep(1000);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
51 changes: 51 additions & 0 deletions paimon-flink/paimon-flink-1.19/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ under the License.
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -55,6 +61,51 @@ under the License.
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.paimon.Snapshot;

import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;

/** Test case for append-only managed unaware-bucket table. */
public class UnawareBucketAppendOnlyTableITCase extends CatalogITCaseBase {

@Override
protected List<String> ddl() {
return Collections.singletonList(
"CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('bucket' = '-1')");
}

@Test
public void testCompactionInStreamingMode() throws Exception {
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");
batchSql("ALTER TABLE append_table SET ('continuous.discovery-interval' = '1 s')");

sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(500));
sEnv.executeSql(
"CREATE TEMPORARY TABLE Orders_in (\n"
+ " f0 INT,\n"
+ " f1 STRING\n"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '1',\n"
+ " 'number-of-rows' = '10'\n"
+ ")");

assertStreamingHasCompact("INSERT INTO append_table SELECT * FROM Orders_in", 60000);
// ensure data gen finished
Thread.sleep(5000);

List<Row> rows = batchSql("SELECT * FROM append_table");
assertThat(rows.size()).isEqualTo(10);
}

private void assertStreamingHasCompact(String sql, long timeout) throws Exception {
long start = System.currentTimeMillis();
long currentId = 1;
sEnv.executeSql(sql);
Snapshot snapshot;
while (true) {
snapshot = findSnapshot("append_table", currentId);
if (snapshot != null) {
if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
break;
}
currentId++;
}
long now = System.currentTimeMillis();
if (now - start > timeout) {
throw new RuntimeException(
"Time up for streaming execute, don't get expected result.");
}
Thread.sleep(1000);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -71,27 +70,17 @@ public void withPartitionPredicate(Predicate predicate) {

public void build() {
// build source from UnawareSourceFunction
DataStreamSource<AppendOnlyCompactionTask> source = buildSource(false);
DataStreamSource<AppendOnlyCompactionTask> source = buildSource();

// from source, construct the full flink job
sinkFromSource(source);
}

public DataStream<Committable> fetchUncommitted(String commitUser) {
DataStreamSource<AppendOnlyCompactionTask> source = buildSource(true);

// rebalance input to default or assigned parallelism
DataStream<AppendOnlyCompactionTask> rebalanced = rebalanceInput(source);

return new UnawareBucketCompactionSink(table)
.doWrite(rebalanced, commitUser, rebalanced.getParallelism());
}

private DataStreamSource<AppendOnlyCompactionTask> buildSource(boolean emitMaxWatermark) {
private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis();
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
table, isContinuous, scanInterval, partitionPredicate, emitMaxWatermark);
table, isContinuous, scanInterval, partitionPredicate);

return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;

/** A {@link AppendCompactWorkerOperator} to bypass Committable inputs. */
public class AppendBypassCompactWorkerOperator
extends AppendCompactWorkerOperator<Either<Committable, AppendOnlyCompactionTask>> {

public AppendBypassCompactWorkerOperator(FileStoreTable table, String commitUser) {
super(table, commitUser);
}

@Override
public void open() throws Exception {
super.open();
}

@Override
public void processElement(StreamRecord<Either<Committable, AppendOnlyCompactionTask>> element)
throws Exception {
if (element.getValue().isLeft()) {
output.collect(new StreamRecord<>(element.getValue().left()));
} else {
unawareBucketCompactor.processElement(element.getValue().right());
}
}
}
Loading

0 comments on commit fa7623e

Please sign in to comment.