6060 */
6161@ SuppressWarnings ({"nullness" })
6262public class UpdateSchemaDestination <DestinationT >
63- extends DoFn <
64- Iterable <KV <DestinationT , WriteTables .Result >>,
65- Iterable <KV <TableDestination , WriteTables .Result >>> {
63+ extends DoFn <
64+ Iterable <KV <DestinationT , WriteTables .Result >>,
65+ Iterable <KV <TableDestination , WriteTables .Result >>> {
6666
6767 private static final Logger LOG = LoggerFactory .getLogger (UpdateSchemaDestination .class );
6868 private final BigQueryServices bqServices ;
@@ -83,9 +83,9 @@ private static class PendingJobData {
8383 final BoundedWindow window ;
8484
8585 public PendingJobData (
86- BigQueryHelpers .PendingJob retryJob ,
87- TableDestination tableDestination ,
88- BoundedWindow window ) {
86+ BigQueryHelpers .PendingJob retryJob ,
87+ TableDestination tableDestination ,
88+ BoundedWindow window ) {
8989 this .retryJob = retryJob ;
9090 this .tableDestination = tableDestination ;
9191 this .window = window ;
@@ -95,15 +95,15 @@ public PendingJobData(
9595 private final Map <DestinationT , PendingJobData > pendingJobs = Maps .newHashMap ();
9696
9797 public UpdateSchemaDestination (
98- BigQueryServices bqServices ,
99- PCollectionView <String > zeroLoadJobIdPrefixView ,
100- @ Nullable ValueProvider <String > loadJobProjectId ,
101- BigQueryIO .Write .WriteDisposition writeDisposition ,
102- BigQueryIO .Write .CreateDisposition createDisposition ,
103- int maxRetryJobs ,
104- @ Nullable String kmsKey ,
105- Set <BigQueryIO .Write .SchemaUpdateOption > schemaUpdateOptions ,
106- DynamicDestinations <?, DestinationT > dynamicDestinations ) {
98+ BigQueryServices bqServices ,
99+ PCollectionView <String > zeroLoadJobIdPrefixView ,
100+ @ Nullable ValueProvider <String > loadJobProjectId ,
101+ BigQueryIO .Write .WriteDisposition writeDisposition ,
102+ BigQueryIO .Write .CreateDisposition createDisposition ,
103+ int maxRetryJobs ,
104+ @ Nullable String kmsKey ,
105+ Set <BigQueryIO .Write .SchemaUpdateOption > schemaUpdateOptions ,
106+ DynamicDestinations <?, DestinationT > dynamicDestinations ) {
107107 this .loadJobProjectId = loadJobProjectId ;
108108 this .zeroLoadJobIdPrefixView = zeroLoadJobIdPrefixView ;
109109 this .bqServices = bqServices ;
@@ -123,18 +123,18 @@ public void startBundle(StartBundleContext c) {
123123 TableDestination getTableWithDefaultProject (DestinationT destination ) {
124124 if (dynamicDestinations .getPipelineOptions () == null ) {
125125 throw new IllegalStateException (
126- "Unexpected null pipeline option for DynamicDestination object. "
127- + "Need to call setSideInputAccessorFromProcessContext(context) before use it." );
126+ "Unexpected null pipeline option for DynamicDestination object. "
127+ + "Need to call setSideInputAccessorFromProcessContext(context) before use it." );
128128 }
129129 BigQueryOptions options = dynamicDestinations .getPipelineOptions ().as (BigQueryOptions .class );
130130 TableDestination tableDestination = dynamicDestinations .getTable (destination );
131131 TableReference tableReference = tableDestination .getTableReference ();
132132
133133 if (Strings .isNullOrEmpty (tableReference .getProjectId ())) {
134134 tableReference .setProjectId (
135- options .getBigQueryProject () == null
136- ? options .getProject ()
137- : options .getBigQueryProject ());
135+ options .getBigQueryProject () == null
136+ ? options .getProject ()
137+ : options .getBigQueryProject ());
138138 tableDestination = tableDestination .withTableReference (tableReference );
139139 }
140140
@@ -143,10 +143,10 @@ TableDestination getTableWithDefaultProject(DestinationT destination) {
143143
144144 @ ProcessElement
145145 public void processElement (
146- @ Element Iterable <KV <DestinationT , WriteTables .Result >> element ,
147- ProcessContext context ,
148- BoundedWindow window )
149- throws IOException {
146+ @ Element Iterable <KV <DestinationT , WriteTables .Result >> element ,
147+ ProcessContext context ,
148+ BoundedWindow window )
149+ throws IOException {
150150 dynamicDestinations .setSideInputAccessorFromProcessContext (context );
151151 List <KV <TableDestination , WriteTables .Result >> outputs = Lists .newArrayList ();
152152 for (KV <DestinationT , WriteTables .Result > entry : element ) {
@@ -160,33 +160,33 @@ public void processElement(
160160 TableSchema schema = dynamicDestinations .getSchema (destination );
161161 TableReference tableReference = tableDestination .getTableReference ();
162162 String jobIdPrefix =
163- BigQueryResourceNaming .createJobIdWithDestination (
164- context .sideInput (zeroLoadJobIdPrefixView ),
165- tableDestination ,
166- 1 ,
167- context .pane ().getIndex ());
163+ BigQueryResourceNaming .createJobIdWithDestination (
164+ context .sideInput (zeroLoadJobIdPrefixView ),
165+ tableDestination ,
166+ 1 ,
167+ context .pane ().getIndex ());
168168 BigQueryHelpers .PendingJob updateSchemaDestinationJob =
169- startZeroLoadJob (
170- getJobService (context .getPipelineOptions ().as (BigQueryOptions .class )),
171- getDatasetService (context .getPipelineOptions ().as (BigQueryOptions .class )),
172- jobIdPrefix ,
173- tableReference ,
174- tableDestination .getTimePartitioning (),
175- tableDestination .getClustering (),
176- schema ,
177- writeDisposition ,
178- createDisposition ,
179- schemaUpdateOptions );
169+ startZeroLoadJob (
170+ getJobService (context .getPipelineOptions ().as (BigQueryOptions .class )),
171+ getDatasetService (context .getPipelineOptions ().as (BigQueryOptions .class )),
172+ jobIdPrefix ,
173+ tableReference ,
174+ tableDestination .getTimePartitioning (),
175+ tableDestination .getClustering (),
176+ schema ,
177+ writeDisposition ,
178+ createDisposition ,
179+ schemaUpdateOptions );
180180 if (updateSchemaDestinationJob != null ) {
181181 pendingJobs .put (
182- destination , new PendingJobData (updateSchemaDestinationJob , tableDestination , window ));
182+ destination , new PendingJobData (updateSchemaDestinationJob , tableDestination , window ));
183183 }
184184 }
185185 if (!pendingJobs .isEmpty ()) {
186186 LOG .info (
187- "Added {} pending jobs to update the schema for each destination before copying {} temp tables." ,
188- pendingJobs .size (),
189- outputs .size ());
187+ "Added {} pending jobs to update the schema for each destination before copying {} temp tables." ,
188+ pendingJobs .size (),
189+ outputs .size ());
190190 }
191191 context .output (outputs );
192192 }
@@ -210,61 +210,61 @@ public void onTeardown() {
210210 @ FinishBundle
211211 public void finishBundle (FinishBundleContext context ) throws Exception {
212212 DatasetService datasetService =
213- getDatasetService (context .getPipelineOptions ().as (BigQueryOptions .class ));
213+ getDatasetService (context .getPipelineOptions ().as (BigQueryOptions .class ));
214214 BigQueryHelpers .PendingJobManager jobManager = new BigQueryHelpers .PendingJobManager ();
215215 for (final PendingJobData pendingJobData : pendingJobs .values ()) {
216216 jobManager =
217- jobManager .addPendingJob (
218- pendingJobData .retryJob ,
219- j -> {
220- try {
221- if (pendingJobData .tableDestination .getTableDescription () != null ) {
222- TableReference ref = pendingJobData .tableDestination .getTableReference ();
223- datasetService .patchTableDescription (
224- ref .clone ()
225- .setTableId (BigQueryHelpers .stripPartitionDecorator (ref .getTableId ())),
226- pendingJobData .tableDestination .getTableDescription ());
227- }
228- } catch (IOException | InterruptedException e ) {
229- return e ;
230- }
231- return null ;
232- });
217+ jobManager .addPendingJob (
218+ pendingJobData .retryJob ,
219+ j -> {
220+ try {
221+ if (pendingJobData .tableDestination .getTableDescription () != null ) {
222+ TableReference ref = pendingJobData .tableDestination .getTableReference ();
223+ datasetService .patchTableDescription (
224+ ref .clone ()
225+ .setTableId (BigQueryHelpers .stripPartitionDecorator (ref .getTableId ())),
226+ pendingJobData .tableDestination .getTableDescription ());
227+ }
228+ } catch (IOException | InterruptedException e ) {
229+ return e ;
230+ }
231+ return null ;
232+ });
233233 }
234234 jobManager .waitForDone ();
235235 }
236236
237237 private BigQueryHelpers .PendingJob startZeroLoadJob (
238- BigQueryServices .JobService jobService ,
239- DatasetService datasetService ,
240- String jobIdPrefix ,
241- TableReference tableReference ,
242- TimePartitioning timePartitioning ,
243- Clustering clustering ,
244- @ Nullable TableSchema schema ,
245- BigQueryIO .Write .WriteDisposition writeDisposition ,
246- BigQueryIO .Write .CreateDisposition createDisposition ,
247- Set <BigQueryIO .Write .SchemaUpdateOption > schemaUpdateOptions ) {
238+ BigQueryServices .JobService jobService ,
239+ DatasetService datasetService ,
240+ String jobIdPrefix ,
241+ TableReference tableReference ,
242+ TimePartitioning timePartitioning ,
243+ Clustering clustering ,
244+ @ Nullable TableSchema schema ,
245+ BigQueryIO .Write .WriteDisposition writeDisposition ,
246+ BigQueryIO .Write .CreateDisposition createDisposition ,
247+ Set <BigQueryIO .Write .SchemaUpdateOption > schemaUpdateOptions ) {
248248 JobConfigurationLoad loadConfig =
249- new JobConfigurationLoad ()
250- .setDestinationTable (tableReference )
251- .setSchema (schema )
252- .setWriteDisposition (writeDisposition .name ())
253- .setCreateDisposition (createDisposition .name ())
254- .setSourceFormat ("NEWLINE_DELIMITED_JSON" );
249+ new JobConfigurationLoad ()
250+ .setDestinationTable (tableReference )
251+ .setSchema (schema )
252+ .setWriteDisposition (writeDisposition .name ())
253+ .setCreateDisposition (createDisposition .name ())
254+ .setSourceFormat ("NEWLINE_DELIMITED_JSON" );
255255 if (schemaUpdateOptions != null ) {
256256 List <String > options =
257- schemaUpdateOptions .stream ()
258- .map (BigQueryIO .Write .SchemaUpdateOption ::name )
259- .collect (Collectors .toList ());
257+ schemaUpdateOptions .stream ()
258+ .map (BigQueryIO .Write .SchemaUpdateOption ::name )
259+ .collect (Collectors .toList ());
260260 loadConfig .setSchemaUpdateOptions (options );
261261 }
262262 if (!loadConfig
263- .getWriteDisposition ()
264- .equals (BigQueryIO .Write .WriteDisposition .WRITE_TRUNCATE .toString ())
265- && !loadConfig
266- .getWriteDisposition ()
267- .equals (BigQueryIO .Write .WriteDisposition .WRITE_APPEND .toString ())) {
263+ .getWriteDisposition ()
264+ .equals (BigQueryIO .Write .WriteDisposition .WRITE_TRUNCATE .toString ())
265+ && !loadConfig
266+ .getWriteDisposition ()
267+ .equals (BigQueryIO .Write .WriteDisposition .WRITE_APPEND .toString ())) {
268268 return null ;
269269 }
270270 final Table destinationTable ;
@@ -281,9 +281,9 @@ private BigQueryHelpers.PendingJob startZeroLoadJob(
281281 // or when destination schema is null (the write will set the schema)
282282 // or when provided schema is null (e.g. when using CREATE_NEVER disposition)
283283 if (destinationTable .getSchema () == null
284- || destinationTable .getSchema ().isEmpty ()
285- || destinationTable .getSchema ().equals (schema )
286- || schema == null ) {
284+ || destinationTable .getSchema ().isEmpty ()
285+ || destinationTable .getSchema ().equals (schema )
286+ || schema == null ) {
287287 return null ;
288288 }
289289 if (timePartitioning != null ) {
@@ -296,67 +296,67 @@ private BigQueryHelpers.PendingJob startZeroLoadJob(
296296
297297 if (kmsKey != null ) {
298298 loadConfig .setDestinationEncryptionConfiguration (
299- new EncryptionConfiguration ().setKmsKeyName (kmsKey ));
299+ new EncryptionConfiguration ().setKmsKeyName (kmsKey ));
300300 }
301301 String projectId =
302- loadJobProjectId == null || loadJobProjectId .get () == null
303- ? tableReference .getProjectId ()
304- : loadJobProjectId .get ();
302+ loadJobProjectId == null || loadJobProjectId .get () == null
303+ ? tableReference .getProjectId ()
304+ : loadJobProjectId .get ();
305305 String bqLocation =
306- BigQueryHelpers .getDatasetLocation (
307- datasetService , tableReference .getProjectId (), tableReference .getDatasetId ());
306+ BigQueryHelpers .getDatasetLocation (
307+ datasetService , tableReference .getProjectId (), tableReference .getDatasetId ());
308308
309309 BigQueryHelpers .PendingJob retryJob =
310- new BigQueryHelpers .PendingJob (
311- // Function to load the data.
312- jobId -> {
313- JobReference jobRef =
314- new JobReference ()
315- .setProjectId (projectId )
316- .setJobId (jobId .getJobId ())
317- .setLocation (bqLocation );
318- LOG .info (
319- "Loading zero rows using job {}, job id {} iteration {}" ,
320- tableReference ,
321- jobRef ,
322- jobId .getRetryIndex ());
323- try {
324- jobService .startLoadJob (
325- jobRef , loadConfig , new ByteArrayContent ("text/plain" , new byte [0 ]));
326- } catch (IOException | InterruptedException e ) {
327- LOG .warn ("Schema update load job {} failed with {}" , jobRef , e .toString ());
328- throw new RuntimeException (e );
329- }
330- return null ;
331- },
332- // Function to poll the result of a load job.
333- jobId -> {
334- JobReference jobRef =
335- new JobReference ()
336- .setProjectId (projectId )
337- .setJobId (jobId .getJobId ())
338- .setLocation (bqLocation );
339- try {
340- return jobService .pollJob (jobRef , BatchLoads .LOAD_JOB_POLL_MAX_RETRIES );
341- } catch (InterruptedException e ) {
342- throw new RuntimeException (e );
343- }
344- },
345- // Function to lookup a job.
346- jobId -> {
347- JobReference jobRef =
348- new JobReference ()
349- .setProjectId (projectId )
350- .setJobId (jobId .getJobId ())
351- .setLocation (bqLocation );
352- try {
353- return jobService .getJob (jobRef );
354- } catch (InterruptedException | IOException e ) {
355- throw new RuntimeException (e );
356- }
357- },
358- maxRetryJobs ,
359- jobIdPrefix );
310+ new BigQueryHelpers .PendingJob (
311+ // Function to load the data.
312+ jobId -> {
313+ JobReference jobRef =
314+ new JobReference ()
315+ .setProjectId (projectId )
316+ .setJobId (jobId .getJobId ())
317+ .setLocation (bqLocation );
318+ LOG .info (
319+ "Loading zero rows using job {}, job id {} iteration {}" ,
320+ tableReference ,
321+ jobRef ,
322+ jobId .getRetryIndex ());
323+ try {
324+ jobService .startLoadJob (
325+ jobRef , loadConfig , new ByteArrayContent ("text/plain" , new byte [0 ]));
326+ } catch (IOException | InterruptedException e ) {
327+ LOG .warn ("Schema update load job {} failed with {}" , jobRef , e .toString ());
328+ throw new RuntimeException (e );
329+ }
330+ return null ;
331+ },
332+ // Function to poll the result of a load job.
333+ jobId -> {
334+ JobReference jobRef =
335+ new JobReference ()
336+ .setProjectId (projectId )
337+ .setJobId (jobId .getJobId ())
338+ .setLocation (bqLocation );
339+ try {
340+ return jobService .pollJob (jobRef , BatchLoads .LOAD_JOB_POLL_MAX_RETRIES );
341+ } catch (InterruptedException e ) {
342+ throw new RuntimeException (e );
343+ }
344+ },
345+ // Function to lookup a job.
346+ jobId -> {
347+ JobReference jobRef =
348+ new JobReference ()
349+ .setProjectId (projectId )
350+ .setJobId (jobId .getJobId ())
351+ .setLocation (bqLocation );
352+ try {
353+ return jobService .getJob (jobRef );
354+ } catch (InterruptedException | IOException e ) {
355+ throw new RuntimeException (e );
356+ }
357+ },
358+ maxRetryJobs ,
359+ jobIdPrefix );
360360 return retryJob ;
361361 }
362362
0 commit comments