diff --git a/.github/trigger_files/IO_Iceberg_Performance_Tests.json b/.github/trigger_files/IO_Iceberg_Performance_Tests.json new file mode 100644 index 000000000000..e3d6056a5de9 --- /dev/null +++ b/.github/trigger_files/IO_Iceberg_Performance_Tests.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 +} diff --git a/.github/workflows/IO_Iceberg_Performance_Tests.yml b/.github/workflows/IO_Iceberg_Performance_Tests.yml index 7f441ef80325..fcf1889e812b 100644 --- a/.github/workflows/IO_Iceberg_Performance_Tests.yml +++ b/.github/workflows/IO_Iceberg_Performance_Tests.yml @@ -75,4 +75,4 @@ jobs: - name: Run IcebergIO Performance Test uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:java:io:iceberg:loadTest \ No newline at end of file + gradle-command: :it:iceberg:IcebergBQMSLoadTestLarge --info -DinfluxHost="http://10.128.0.96:8086" -DinfluxDatabase="beam_test_metrics" -DinfluxMeasurement="java_stress_test_iceberg_bqms" \ No newline at end of file diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java index 5c2fb74cd2fb..585270df0dfd 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.joda.time.Instant; /** Base class for IO Stress tests. */ @@ -83,6 +84,15 @@ public long getPeriodStartMillis() { public long getPeriodEndMillis() { return periodEndMillis; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(LoadPeriod.class) + .add("start_millis", periodStartMillis) + .add("end_millis", periodEndMillis) + .add("multiplier", loadIncreaseMultiplier) + .toString(); + } } /** @@ -92,7 +102,7 @@ public long getPeriodEndMillis() { */ protected static class MultiplierDoFn extends DoFn { private final int startMultiplier; - private final long startTimesMillis; + private long startTimesMillis; private final List loadPeriods; public MultiplierDoFn(int startMultiplier, List loadPeriods) { @@ -101,12 +111,16 @@ public MultiplierDoFn(int startMultiplier, List loadPeriods) { this.loadPeriods = loadPeriods; } + @Setup + public void start() { + this.startTimesMillis = Instant.now().getMillis(); + } + @DoFn.ProcessElement - public void processElement( - @Element T element, OutputReceiver outputReceiver, @DoFn.Timestamp Instant timestamp) { + public void processElement(@Element T element, OutputReceiver outputReceiver) { int multiplier = this.startMultiplier; - long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis; + long elapsedTimeMillis = Instant.now().getMillis() - startTimesMillis; for (LoadPeriod loadPeriod : loadPeriods) { if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis() diff --git a/it/iceberg/build.gradle b/it/iceberg/build.gradle new file mode 100644 index 000000000000..aa2635cb2bba --- /dev/null +++ b/it/iceberg/build.gradle @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.beam.gradle.IoPerformanceTestUtilities + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.it.iceberg', + exportJavadoc: false, +) + +description = "Apache Beam :: IT :: Iceberg" +ext.summary = "Integration test utilities for Iceberg." + +dependencies { + testImplementation project(path: ":it:common") + testImplementation project(path: ":it:google-cloud-platform") + testImplementation project(path: ":sdks:java:io:synthetic") + testImplementation project(path: ":sdks:java:managed") + testImplementation library.java.google_api_services_bigquery + + testImplementation project(path: ":sdks:java:io:iceberg") + testImplementation "org.apache.iceberg:iceberg-core:1.6.1" + testImplementation library.java.hadoop_common + testRuntimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") + testRuntimeOnly library.java.hadoop_auth + testRuntimeOnly library.java.hadoop_client + testRuntimeOnly library.java.bigdataoss_gcs_connector + + testImplementation project(path: ":runners:direct-java") + testImplementation project(path: ":runners:google-cloud-dataflow-java") + testImplementation project(path: ":sdks:java:testing:test-utils") + testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration") + testImplementation project(":sdks:java:io:google-cloud-platform") + testImplementation library.java.google_api_services_dataflow +} + +tasks.register( + "IcebergBQMSLoadTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'iceberg', 'IcebergIOBigQueryMetastoreST', + ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'] + + System.properties +) + +tasks.register( + "IcebergBQMSLoadTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'iceberg', 'IcebergIOBigQueryMetastoreST', + ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'] + + System.properties +) + diff --git a/it/iceberg/src/test/java/org/apache/beam/it/iceberg/IcebergIOBigQueryMetastoreST.java b/it/iceberg/src/test/java/org/apache/beam/it/iceberg/IcebergIOBigQueryMetastoreST.java new file mode 100644 index 000000000000..fffd202344ca --- /dev/null +++ b/it/iceberg/src/test/java/org/apache/beam/it/iceberg/IcebergIOBigQueryMetastoreST.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.iceberg; + +import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromRealDistribution; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.TestProperties; +import org.apache.beam.it.gcp.IOStressTestBase; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.io.iceberg.IcebergUtils; +import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.commons.math3.distribution.ConstantRealDistribution; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * IcebergIO stress tests using BigQueryMetastore catalog. The test is designed to assess the + * performance of IcebergIO under various conditions. + */ +public final class IcebergIOBigQueryMetastoreST extends IOStressTestBase { + private static final Logger LOG = LoggerFactory.getLogger(IcebergIOBigQueryMetastoreST.class); + private static final String BQMS_CATALOG = + "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"; + private static final int NUM_PARTITIONS = 10; + private static final String WRITE_ELEMENT_METRIC_NAME = "write_count"; + private static final String READ_ELEMENT_METRIC_NAME = "read_count"; + private static InfluxDBSettings influxDBSettings; + private static final Schema SCHEMA = + Schema.builder().addInt32Field("num").addByteArrayField("bytes").build(); + private static final long SALT = System.nanoTime(); + private static final BigqueryClient BQ_CLIENT = + new BigqueryClient("IcebergIOBigQueryMetastoreST"); + private static final String DATASET = "managed_iceberg_bqms_load_tests_" + System.nanoTime();; + private TestConfiguration configuration; + private String testConfigName; + private String warehouse; + private String tableId; + private Map managedConfig; + + @Rule public TestPipeline writePipeline = TestPipeline.create(); + @Rule public TestPipeline readPipeline = TestPipeline.create(); + @Rule public TestName test = new TestName(); + + @Before + public void setup() throws Exception { + // parse configuration + testConfigName = + TestProperties.getProperty("configuration", "medium", TestProperties.Type.PROPERTY); + configuration = TEST_CONFIGS_PRESET.get(testConfigName); + if (configuration == null) { + try { + configuration = TestConfiguration.fromJsonString(testConfigName, TestConfiguration.class); + } catch (IOException e) { + throw new IllegalArgumentException( + String.format( + "Unknown test configuration: [%s]. Pass to a valid configuration json, or use" + + " config presets: %s", + testConfigName, TEST_CONFIGS_PRESET.keySet())); + } + } + String useDataflowRunnerV2FromProps = + TestProperties.getProperty("useDataflowRunnerV2", "false", TestProperties.Type.PROPERTY); + if (!useDataflowRunnerV2FromProps.isEmpty()) { + configuration.useDataflowRunnerV2 = Boolean.parseBoolean(useDataflowRunnerV2FromProps); + } + GcpOptions options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); + // tempLocation needs to be set for DataflowRunner + if (!Strings.isNullOrEmpty(tempBucketName)) { + String tempLocation = String.format("gs://%s/temp/", tempBucketName); + options.setTempLocation(tempLocation); + writePipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation); + writePipeline.getOptions().setTempLocation(tempLocation); + readPipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation); + readPipeline.getOptions().setTempLocation(tempLocation); + } + // Use streaming pipeline to write and read records + writePipeline.getOptions().as(StreamingOptions.class).setStreaming(true); + readPipeline.getOptions().as(StreamingOptions.class).setStreaming(true); + + writePipeline + .getOptions() + .as(DataflowPipelineOptions.class) + .setNumWorkers(configuration.numWorkers); + readPipeline + .getOptions() + .as(DataflowPipelineOptions.class) + .setNumWorkers(configuration.numWorkers / 5); + + if (configuration.exportMetricsToInfluxDB) { + configuration.influxHost = + TestProperties.getProperty("influxHost", "", TestProperties.Type.PROPERTY); + configuration.influxDatabase = + TestProperties.getProperty("influxDatabase", "", TestProperties.Type.PROPERTY); + configuration.influxMeasurement = + TestProperties.getProperty("influxMeasurement", "", TestProperties.Type.PROPERTY); + } + tableId = String.format("%s.%s_%s", DATASET, test.getMethodName(), testConfigName); + warehouse = + String.format("%s%s/%s", options.getTempLocation(), getClass().getSimpleName(), SALT); + + managedConfig = + ImmutableMap.builder() + .put("table", tableId) + .put( + "catalog_properties", + ImmutableMap.builder() + .put("gcp_project", project) + .put("gcp_location", "us-central1") + .put("warehouse", warehouse) + .put("catalog-impl", BQMS_CATALOG) + .put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO") + .build()) + .build(); + BQ_CLIENT.createNewDataset(project, DATASET); + } + + @After + public void cleanup() { + BQ_CLIENT.deleteDataset(project, DATASET); + } + + private static final Map TEST_CONFIGS_PRESET; + + static { + try { + TEST_CONFIGS_PRESET = + ImmutableMap.of( + "medium", + TestConfiguration.fromJsonString( + "{\"numRecords\":5000000,\"valueSizeBytes\":1000,\"minutes\":10,\"forceNumInitialBundles\":20," + + "\"pipelineTimeout\":60,\"runner\":\"DataflowRunner\",\"numWorkers\":10}", + TestConfiguration.class), + "large", + TestConfiguration.fromJsonString( + // + // "{\"numRecords\":100000000,\"valueSizeBytes\":1000,\"minutes\":7,\"forceNumInitialBundles\":20," + // + + // "\"pipelineTimeout\":240,\"runner\":\"DataflowRunner\",\"numWorkers\":30}", + // + "{\"numRecords\":1000000000,\"valueSizeBytes\":1000,\"minutes\":30,\"forceNumInitialBundles\":200," + + "\"pipelineTimeout\":240,\"runner\":\"DataflowRunner\",\"numWorkers\":30}", + TestConfiguration.class)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** Run stress test with configurations specified by TestProperties. */ + @Test + public void testWriteAndRead() throws IOException, InterruptedException { + if (configuration.exportMetricsToInfluxDB) { + influxDBSettings = + InfluxDBSettings.builder() + .withHost(configuration.influxHost) + .withDatabase(configuration.influxDatabase) + .withMeasurement(configuration.influxMeasurement + "_" + testConfigName) + .get(); + } + + Catalog catalog = + CatalogUtil.loadCatalog( + BQMS_CATALOG, + "IcebergIOBigQueryMetastoreST", + ImmutableMap.builder() + .put("gcp_project", project) + .put("gcp_location", region) + .put("warehouse", warehouse) + .build(), + new Configuration()); + TableIdentifier identifier = TableIdentifier.parse(tableId); + catalog.createTable(identifier, IcebergUtils.beamSchemaToIcebergSchema(SCHEMA)); + + PipelineLauncher.LaunchInfo readInfo = readData(); + LOG.info("Sleeping for 2 min to allow the read job to start up first..."); + Thread.sleep(2 * 60 * 1000); + PipelineLauncher.LaunchInfo writeInfo = generateDataAndWrite(); + + try { + PipelineOperator.Result writeResult = + pipelineOperator.waitUntilDone( + createConfig(writeInfo, Duration.ofMinutes(configuration.pipelineTimeout))); + assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, writeResult); + + @Nullable + Double writeNumRecords = + pipelineLauncher.getMetric( + project, + region, + writeInfo.jobId(), + getBeamMetricsName(PipelineMetricsType.COUNTER, WRITE_ELEMENT_METRIC_NAME)); + @Nullable + Double readNumRecords = + pipelineLauncher.getMetric( + project, + region, + readInfo.jobId(), + getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)); + + String query = String.format("SELECT COUNT(*) as count FROM `%s.%s`", project, tableId); + List result = BQ_CLIENT.queryUnflattened(query, project, true, true); + int writtenCount = Integer.parseInt((String) result.get(0).get("count")); + + // load periods with multipliers should result in a greater number of written records + assertTrue(configuration.numRecords <= writtenCount); + + if (writeNumRecords != null && readNumRecords != null) { + double marginOfError = Math.abs((writtenCount - writeNumRecords) / writtenCount) * 100; + assertTrue( + String.format( + "Table query shows that %s records were written, but metrics show %s. Margin of error: %%%s", + writtenCount, writeNumRecords, marginOfError), + marginOfError > 0.01); + + marginOfError = Math.abs((writeNumRecords - readNumRecords) / writeNumRecords) * 100; + assertTrue( + String.format( + "Metrics show that %s records we written and %s were read, with a margin of error: %%%s", + writeNumRecords, readNumRecords, marginOfError), + marginOfError > 0.01); + } + } finally { + // clean up pipelines + if (pipelineLauncher.getJobStatus(project, region, writeInfo.jobId()) + == PipelineLauncher.JobState.RUNNING) { + pipelineLauncher.cancelJob(project, region, writeInfo.jobId()); + } + if (pipelineLauncher.getJobStatus(project, region, readInfo.jobId()) + == PipelineLauncher.JobState.RUNNING) { + pipelineLauncher.cancelJob(project, region, readInfo.jobId()); + } + } + + // export metrics + // MetricsConfiguration writeMetricsConfig = + // MetricsConfiguration.builder() + // .setInputPCollection("Reshuffle fanout/Values/Values/Map.out0") + // .setInputPCollectionV2("Reshuffle + // fanout/Values/Values/Map/ParMultiDo(Anonymous).out0") + // .setOutputPCollection("Counting element.out0") + // .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0") + // .build(); + // + // MetricsConfiguration readMetricsConfig = + // MetricsConfiguration.builder() + // .setOutputPCollection("Counting element.out0") + // .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0") + // .build(); + + if (influxDBSettings == null) { + return; + } + + // exportMetrics( + // writeInfo, writeMetricsConfig, configuration.exportMetricsToInfluxDB, + // influxDBSettings); + // exportMetrics( + // readInfo, readMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings); + } + + /** + * The method creates a pipeline to simulate data generation and write operations to an Iceberg + * table, based on the specified configuration parameters. The stress test involves varying the + * load dynamically over time, with options to use configurable parameters. + */ + private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException { + int totalRowsPerSecond = (int) configuration.numRecords / (configuration.minutes * 60); + int rowsPerSecondPerSplit = totalRowsPerSecond / configuration.forceNumInitialBundles; + double delayMillis = 1000 * (1d / rowsPerSecondPerSplit); + configuration.delayDistribution = + fromRealDistribution(new ConstantRealDistribution(delayMillis)); + + LOG.info( + "Writing with configuration:\n" + + "\tnumRows: {}\n" + + "\tminutes: {}\n" + + "\tinitialRowsPerSecond: {}\n" + + "\tbyteSizePerRow: {}\n" + + "\tnumInitialBundles: {}\n", + configuration.numRecords, + configuration.minutes, + totalRowsPerSecond, + configuration.valueSizeBytes, + configuration.forceNumInitialBundles); + + List loadPeriods = + getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY); + System.out.println("loadPeriods: " + loadPeriods); + + PCollection source = + writePipeline + .apply(Read.from(new SyntheticUnboundedSource(configuration))) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + kv -> + Row.withSchema(SCHEMA) + .addValues( + ThreadLocalRandom.current().nextInt(NUM_PARTITIONS), + kv.getValue()) + .build())) + .setRowSchema(SCHEMA) + .apply("Apply Variable Load Periods", ParDo.of(new MultiplierDoFn<>(1, loadPeriods))) + .apply("Counting element", ParDo.of(new CountingFn<>(WRITE_ELEMENT_METRIC_NAME))); + + Map writeConfig = new HashMap<>(managedConfig); + writeConfig.put("triggering_frequency_seconds", 10); + source.apply("Iceberg[BQMS] Write", Managed.write(Managed.ICEBERG).withConfig(writeConfig)); + + PipelineLauncher.LaunchConfig options = + PipelineLauncher.LaunchConfig.builder("write-iceberg-bqms") + .setSdk(PipelineLauncher.Sdk.JAVA) + .setPipeline(writePipeline) + .addParameter("runner", configuration.runner) + .addParameter( + "autoscalingAlgorithm", + DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED + .toString()) + .addParameter("numWorkers", String.valueOf(configuration.numWorkers)) + .addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers)) + .addParameter("experiments", configuration.useDataflowRunnerV2 ? "use_runner_v2" : "") + .addParameter("experiments", "enable_streaming_engine") + .build(); + + return pipelineLauncher.launch(project, region, options); + } + + /** The method reads data from an Iceberg table. */ + private PipelineLauncher.LaunchInfo readData() throws IOException { + Map readConfig = new HashMap<>(managedConfig); + readConfig.put("streaming", true); + readConfig.put("poll_interval_seconds", 30); + // read from the beginning just in case the write job starts up first. + readConfig.put("starting_strategy", "earliest"); + readPipeline + .apply("Iceberg[BQMS] Read", Managed.read(Managed.ICEBERG_CDC).withConfig(readConfig)) + .getSinglePCollection() + .apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME))); + + PipelineLauncher.LaunchConfig options = + PipelineLauncher.LaunchConfig.builder("read-iceberg-bqms") + .setSdk(PipelineLauncher.Sdk.JAVA) + .setPipeline(readPipeline) + .addParameter("numWorkers", String.valueOf(configuration.numWorkers / 5)) + .addParameter("runner", configuration.runner) + .addParameter("experiments", configuration.useDataflowRunnerV2 ? "use_runner_v2" : "") + .build(); + + return pipelineLauncher.launch(project, region, options); + } + + /** Options for Iceberg IO stress test. */ + static class TestConfiguration extends SyntheticSourceOptions { + /** Pipeline timeout in minutes. Must be a positive value. */ + @JsonProperty public int pipelineTimeout = 20; + + /** Runner specified to run the pipeline. */ + @JsonProperty public String runner = "DirectRunner"; + + /** + * Determines whether to use Dataflow runner v2. If set to true, it uses SDF mode for reading + * from Kafka. Otherwise, Unbounded mode will be used. + */ + @JsonProperty public boolean useDataflowRunnerV2 = false; + + /** Number of workers for the pipeline. */ + @JsonProperty public int numWorkers = 20; + + /** Maximum number of workers for the pipeline. */ + @JsonProperty public int maxNumWorkers = 100; + + /** Rows will be generated for this many minutes. */ + @JsonProperty public int minutes = 15; + + /** + * Determines the destination for exporting metrics. If set to true, metrics will be exported to + * InfluxDB and displayed using Grafana. If set to false, metrics will be exported to BigQuery + * and displayed with Looker Studio. + */ + @JsonProperty public boolean exportMetricsToInfluxDB = false; + + /** InfluxDB measurement to publish results to. * */ + @JsonProperty public String influxMeasurement = IcebergIOBigQueryMetastoreST.class.getName(); + + /** InfluxDB host to publish metrics. * */ + @JsonProperty public String influxHost; + + /** InfluxDB database to publish metrics. * */ + @JsonProperty public String influxDatabase; + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java index 4df3eecb18e5..fb5c87a54b44 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java @@ -83,7 +83,7 @@ private PCollection>> unboundedSnapshots(PBegin in /** Creates a fixed snapshot range. */ private PCollection>> boundedSnapshots(PBegin input, Table table) { checkStateNotNull( - table.currentSnapshot().snapshotId(), + table.currentSnapshot(), "Table %s does not have any snapshots to read from.", scanConfig.getTableIdentifier()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java index 27aa92b40069..9eb6251ad642 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java @@ -126,9 +126,8 @@ public void testWriteToPartitionedAndValidateWithBQQuery() pipeline.run().waitUntilFinish(); // Fetch records using a BigQuery query and validate - BigqueryClient bqClient = new BigqueryClient(getClass().getSimpleName()); String query = String.format("SELECT * FROM `%s.%s`", OPTIONS.getProject(), tableId()); - List rows = bqClient.queryUnflattened(query, OPTIONS.getProject(), true, true); + List rows = BQ_CLIENT.queryUnflattened(query, OPTIONS.getProject(), true, true); List beamRows = rows.stream() .map(tr -> BigQueryUtils.toBeamRow(BEAM_SCHEMA, tr)) @@ -138,7 +137,7 @@ public void testWriteToPartitionedAndValidateWithBQQuery() String queryByPartition = String.format("SELECT bool_field, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId()); - rows = bqClient.queryUnflattened(queryByPartition, OPTIONS.getProject(), true, true); + rows = BQ_CLIENT.queryUnflattened(queryByPartition, OPTIONS.getProject(), true, true); RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool_field", "datetime")); beamRows = rows.stream() diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java index ae2d49de5a58..bebf649ea1b2 100644 --- a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java +++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java @@ -193,7 +193,7 @@ public Object getDistribution() { * unbounded source uses RateLimiter to control QPS. */ @JsonDeserialize(using = SamplerDeserializer.class) - Sampler delayDistribution = fromRealDistribution(new ConstantRealDistribution(0)); + public Sampler delayDistribution = fromRealDistribution(new ConstantRealDistribution(0)); /** * When 'delayDistribution' is configured, this indicates how the delay enforced ("SLEEP", "CPU", diff --git a/settings.gradle.kts b/settings.gradle.kts index 4e71a3308560..a449192bb9a2 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -109,6 +109,7 @@ include(":it:conditions") include(":it:elasticsearch") include(":it:google-cloud-platform") include(":it:jdbc") +include(":it:iceberg") include(":it:kafka") include(":it:testcontainers") include(":it:truthmatchers")