|
36 | 36 | import org.apache.beam.sdk.schemas.Schema; |
37 | 37 | import org.apache.beam.sdk.testing.PAssert; |
38 | 38 | import org.apache.beam.sdk.testing.TestPipeline; |
| 39 | +import org.apache.beam.sdk.testing.TestPipelineOptions; |
39 | 40 | import org.apache.beam.sdk.transforms.Create; |
40 | 41 | import org.apache.beam.sdk.transforms.MapElements; |
41 | 42 | import org.apache.beam.sdk.transforms.PeriodicImpulse; |
@@ -98,6 +99,10 @@ public void testBatchFileLoadsWriteRead() { |
98 | 99 | String.format("%s:%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName()); |
99 | 100 | Map<String, Object> config = ImmutableMap.of("table", table); |
100 | 101 |
|
| 102 | + // file loads requires a GCS temp location |
| 103 | + String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot(); |
| 104 | + writePipeline.getOptions().setTempLocation(tempLocation); |
| 105 | + |
101 | 106 | // batch write |
102 | 107 | PCollectionRowTuple.of("input", getInput(writePipeline, false)) |
103 | 108 | .apply(Managed.write(Managed.BIGQUERY).withConfig(config)); |
@@ -146,6 +151,12 @@ public void testDynamicDestinations(boolean streaming) throws IOException, Inter |
146 | 151 | Map<String, Object> config = |
147 | 152 | ImmutableMap.of("table", destinationTemplate, "drop", Collections.singletonList("dest")); |
148 | 153 |
|
| 154 | + if (!streaming) { |
| 155 | + // file loads requires a GCS temp location |
| 156 | + String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot(); |
| 157 | + writePipeline.getOptions().setTempLocation(tempLocation); |
| 158 | + } |
| 159 | + |
149 | 160 | // write |
150 | 161 | PCollectionRowTuple.of("input", getInput(writePipeline, streaming)) |
151 | 162 | .apply(Managed.write(Managed.BIGQUERY).withConfig(config)); |
|
0 commit comments