101101import java .util .Collection ;
102102import java .util .Collections ;
103103import java .util .HashMap ;
104- import java .util .Iterator ;
105104import java .util .List ;
106105import java .util .Map ;
106+ import java .util .concurrent .TimeUnit ;
107107import java .util .logging .Logger ;
108108import org .apache .avro .Conversions ;
109109import org .apache .avro .LogicalTypes ;
118118import org .junit .Test ;
119119
120120/** Integration tests for BigQuery Storage API. */
121- public class ITBigQueryStorageTest {
122- private static final Logger LOG = Logger .getLogger (ITBigQueryStorageTest .class .getName ());
121+ public class ITBigQueryStorageReadClientTest {
122+ private static final Logger LOG =
123+ Logger .getLogger (ITBigQueryStorageReadClientTest .class .getName ());
123124 private static final String DATASET = RemoteBigQueryHelper .generateDatasetName ();
124125 private static final String DESCRIPTION = "BigQuery Storage Java client test dataset" ;
125126
@@ -503,7 +504,7 @@ public static void beforeClass() throws IOException {
503504 LOG .info (
504505 String .format (
505506 "%s tests running with parent project: %s" ,
506- ITBigQueryStorageTest .class .getSimpleName (), parentProjectId ));
507+ ITBigQueryStorageReadClientTest .class .getSimpleName (), parentProjectId ));
507508
508509 RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper .create ();
509510 bigquery = bigqueryHelper .getOptions ().getService ();
@@ -516,9 +517,10 @@ public static void beforeClass() throws IOException {
516517 }
517518
518519 @ AfterClass
519- public static void afterClass () {
520+ public static void afterClass () throws InterruptedException {
520521 if (client != null ) {
521522 client .close ();
523+ client .awaitTermination (10 , TimeUnit .SECONDS );
522524 }
523525
524526 if (bigquery != null ) {
@@ -530,7 +532,7 @@ public static void afterClass() {
530532 @ Test
531533 public void testSimpleReadAvro () {
532534 String table =
533- BigQueryResource .FormatTableResource (
535+ BigQueryResource .formatTableResource (
534536 /* projectId= */ "bigquery-public-data" ,
535537 /* datasetId= */ "samples" ,
536538 /* tableId= */ "shakespeare" );
@@ -566,7 +568,7 @@ public void testSimpleReadAvro() {
566568 @ Test
567569 public void testSimpleReadArrow () {
568570 String table =
569- BigQueryResource .FormatTableResource (
571+ BigQueryResource .formatTableResource (
570572 /* projectId= */ "bigquery-public-data" ,
571573 /* datasetId= */ "samples" ,
572574 /* tableId= */ "shakespeare" );
@@ -630,7 +632,7 @@ public void testRangeTypeSimple() throws InterruptedException {
630632 bigquery .query (createTable );
631633
632634 String table =
633- BigQueryResource .FormatTableResource (
635+ BigQueryResource .formatTableResource (
634636 /* projectId= */ ServiceOptions .getDefaultProjectId (),
635637 /* datasetId= */ DATASET ,
636638 /* tableId= */ tableId .getTable ());
@@ -739,7 +741,7 @@ public void testRangeTypeWrite()
739741 }
740742
741743 String table =
742- BigQueryResource .FormatTableResource (
744+ BigQueryResource .formatTableResource (
743745 /* projectId= */ projectName ,
744746 /* datasetId= */ DATASET ,
745747 /* tableId= */ tableId .getTable ());
@@ -800,7 +802,7 @@ public void testRangeTypeWrite()
800802 @ Test
801803 public void testSimpleReadAndResume () {
802804 String table =
803- BigQueryResource .FormatTableResource (
805+ BigQueryResource .formatTableResource (
804806 /* projectId= */ "bigquery-public-data" ,
805807 /* datasetId= */ "samples" ,
806808 /* tableId= */ "shakespeare" );
@@ -845,7 +847,7 @@ public void testSimpleReadAndResume() {
845847 @ Test
846848 public void testFilter () throws IOException {
847849 String table =
848- BigQueryResource .FormatTableResource (
850+ BigQueryResource .formatTableResource (
849851 /* projectId= */ "bigquery-public-data" ,
850852 /* datasetId= */ "samples" ,
851853 /* tableId= */ "shakespeare" );
@@ -887,15 +889,13 @@ public void testFilter() throws IOException {
887889 rowCount += response .getRowCount ();
888890 reader .processRows (
889891 response .getAvroRows (),
890- new AvroRowConsumer () {
891- @ Override
892- public void accept (GenericData .Record record ) {
893- Long wordCount = (Long ) record .get ("word_count" );
894- assertWithMessage ("Row not matching expectations: %s" , record .toString ())
895- .that (wordCount )
896- .isGreaterThan (100L );
897- }
898- });
892+ (AvroRowConsumer )
893+ record -> {
894+ Long wordCount = (Long ) record .get ("word_count" );
895+ assertWithMessage ("Row not matching expectations: %s" , record .toString ())
896+ .that (wordCount )
897+ .isGreaterThan (100L );
898+ });
899899 }
900900
901901 assertEquals (1_333 , rowCount );
@@ -904,7 +904,7 @@ public void accept(GenericData.Record record) {
904904 @ Test
905905 public void testColumnSelection () throws IOException {
906906 String table =
907- BigQueryResource .FormatTableResource (
907+ BigQueryResource .formatTableResource (
908908 /* projectId= */ "bigquery-public-data" ,
909909 /* datasetId= */ "samples" ,
910910 /* tableId= */ "shakespeare" );
@@ -964,19 +964,17 @@ public void testColumnSelection() throws IOException {
964964 rowCount += response .getRowCount ();
965965 reader .processRows (
966966 response .getAvroRows (),
967- new AvroRowConsumer () {
968- @ Override
969- public void accept (GenericData .Record record ) {
970- String rowAssertMessage =
971- String .format ("Row not matching expectations: %s" , record .toString ());
972-
973- Long wordCount = (Long ) record .get ("word_count" );
974- assertWithMessage (rowAssertMessage ).that (wordCount ).isGreaterThan (100L );
975-
976- Utf8 word = (Utf8 ) record .get ("word" );
977- assertWithMessage (rowAssertMessage ).that (word .length ()).isGreaterThan (0 );
978- }
979- });
967+ (AvroRowConsumer )
968+ record -> {
969+ String rowAssertMessage =
970+ String .format ("Row not matching expectations: %s" , record .toString ());
971+
972+ Long wordCount = (Long ) record .get ("word_count" );
973+ assertWithMessage (rowAssertMessage ).that (wordCount ).isGreaterThan (100L );
974+
975+ Utf8 word = (Utf8 ) record .get ("word" );
976+ assertWithMessage (rowAssertMessage ).that (word .length ()).isGreaterThan (0 );
977+ });
980978 }
981979
982980 assertEquals (1_333 , rowCount );
@@ -995,8 +993,6 @@ public void testReadAtSnapshot() throws InterruptedException, IOException {
995993 TableId testTableId = TableId .of (/* dataset= */ DATASET , /* table= */ "test_read_snapshot" );
996994 bigquery .create (TableInfo .of (testTableId , StandardTableDefinition .of (tableSchema )));
997995
998- testTableId .toString ();
999-
1000996 Job firstJob =
1001997 RunQueryAppendJobAndExpectSuccess (
1002998 /* destinationTableId= */ testTableId , /* query= */ "SELECT 1 AS col" );
@@ -1006,7 +1002,7 @@ public void testReadAtSnapshot() throws InterruptedException, IOException {
10061002 /* destinationTableId= */ testTableId , /* query= */ "SELECT 2 AS col" );
10071003
10081004 String table =
1009- BigQueryResource .FormatTableResource (
1005+ BigQueryResource .formatTableResource (
10101006 /* projectId= */ projectName ,
10111007 /* datasetId= */ DATASET ,
10121008 /* tableId= */ testTableId .getTable ());
@@ -1022,7 +1018,7 @@ public void accept(GenericData.Record record) {
10221018 rowsAfterFirstSnapshot .add ((Long ) record .get ("col" ));
10231019 }
10241020 });
1025- assertEquals (Arrays . asList (1L ), rowsAfterFirstSnapshot );
1021+ assertEquals (Collections . singletonList (1L ), rowsAfterFirstSnapshot );
10261022
10271023 final List <Long > rowsAfterSecondSnapshot = new ArrayList <>();
10281024 ProcessRowsAtSnapshot (
@@ -1060,7 +1056,7 @@ public void testColumnPartitionedTableByDateField() throws InterruptedException,
10601056 RunQueryJobAndExpectSuccess (QueryJobConfiguration .newBuilder (createTableStatement ).build ());
10611057
10621058 String table =
1063- BigQueryResource .FormatTableResource (
1059+ BigQueryResource .formatTableResource (
10641060 /* projectId= */ projectName ,
10651061 /* datasetId= */ DATASET ,
10661062 /* tableId= */ partitionedTableName );
@@ -1108,7 +1104,7 @@ public void testIngestionTimePartitionedTable() throws InterruptedException, IOE
11081104 /* query= */ "SELECT 2 AS num_field" );
11091105
11101106 String table =
1111- BigQueryResource .FormatTableResource (
1107+ BigQueryResource .formatTableResource (
11121108 /* projectId= */ projectName ,
11131109 /* datasetId= */ testTableId .getDataset (),
11141110 /* tableId= */ testTableId .getTable ());
@@ -1151,7 +1147,7 @@ public void testBasicSqlTypes() throws InterruptedException, IOException {
11511147 RunQueryJobAndExpectSuccess (QueryJobConfiguration .newBuilder (createTableStatement ).build ());
11521148
11531149 String table =
1154- BigQueryResource .FormatTableResource (
1150+ BigQueryResource .formatTableResource (
11551151 /* projectId= */ projectName , /* datasetId= */ DATASET , /* tableId= */ tableName );
11561152
11571153 List <GenericData .Record > rows = ReadAllRows (/* table= */ table , /* filter= */ null );
@@ -1248,7 +1244,7 @@ public void testDateAndTimeSqlTypes() throws InterruptedException, IOException {
12481244 RunQueryJobAndExpectSuccess (QueryJobConfiguration .newBuilder (createTableStatement ).build ());
12491245
12501246 String table =
1251- BigQueryResource .FormatTableResource (
1247+ BigQueryResource .formatTableResource (
12521248 /* projectId= */ projectName , /* datasetId= */ DATASET , /* tableId= */ tableName );
12531249
12541250 List <GenericData .Record > rows = ReadAllRows (/* table= */ table , /* filter= */ null );
@@ -1343,7 +1339,7 @@ public void testGeographySqlType() throws InterruptedException, IOException {
13431339 RunQueryJobAndExpectSuccess (QueryJobConfiguration .newBuilder (createTableStatement ).build ());
13441340
13451341 String table =
1346- BigQueryResource .FormatTableResource (
1342+ BigQueryResource .formatTableResource (
13471343 /* projectId= */ projectName , /* datasetId= */ DATASET , /* tableId= */ tableName );
13481344
13491345 List <GenericData .Record > rows = ReadAllRows (/* table= */ table , /* filter= */ null );
@@ -1386,7 +1382,7 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio
13861382 RunQueryJobAndExpectSuccess (QueryJobConfiguration .newBuilder (createTableStatement ).build ());
13871383
13881384 String table =
1389- BigQueryResource .FormatTableResource (
1385+ BigQueryResource .formatTableResource (
13901386 /* projectId= */ projectName , /* datasetId= */ DATASET , /* tableId= */ tableName );
13911387
13921388 List <GenericData .Record > rows = ReadAllRows (/* table= */ table , /* filter= */ null );
@@ -1453,7 +1449,7 @@ public void testSimpleReadWithBackgroundExecutorProvider() throws IOException {
14531449 client .getStub ().getStubSettings ().getBackgroundExecutorProvider ())
14541450 .getExecutorThreadCount ());
14551451 String table =
1456- BigQueryResource .FormatTableResource (
1452+ BigQueryResource .formatTableResource (
14571453 /* projectId= */ "bigquery-public-data" ,
14581454 /* datasetId= */ "samples" ,
14591455 /* tableId= */ "shakespeare" );
@@ -1497,7 +1493,7 @@ public void testUniverseDomainWithInvalidUniverseDomain() throws IOException {
14971493 BigQueryReadClient localClient = BigQueryReadClient .create (bigQueryReadSettings );
14981494
14991495 String table =
1500- BigQueryResource .FormatTableResource (
1496+ BigQueryResource .formatTableResource (
15011497 /* projectId= */ "bigquery-public-data" ,
15021498 /* datasetId= */ "samples" ,
15031499 /* tableId= */ "shakespeare" );
@@ -1532,7 +1528,7 @@ public void testInvalidUniverseDomainWithMismatchCredentials() throws IOExceptio
15321528 BigQueryReadClient localClient = BigQueryReadClient .create (bigQueryReadSettings );
15331529
15341530 String table =
1535- BigQueryResource .FormatTableResource (
1531+ BigQueryResource .formatTableResource (
15361532 /* projectId= */ "bigquery-public-data" ,
15371533 /* datasetId= */ "samples" ,
15381534 /* tableId= */ "shakespeare" );
@@ -1564,7 +1560,7 @@ public void testUniverseDomainWithMatchingDomain() throws IOException {
15641560 BigQueryReadClient localClient = BigQueryReadClient .create (bigQueryReadSettings );
15651561
15661562 String table =
1567- BigQueryResource .FormatTableResource (
1563+ BigQueryResource .formatTableResource (
15681564 /* projectId= */ "bigquery-public-data" ,
15691565 /* datasetId= */ "samples" ,
15701566 /* tableId= */ "shakespeare" );
@@ -1608,7 +1604,7 @@ public void testSimpleReadWithOtelTracing() throws IOException {
16081604 BigQueryReadClient otelClient = BigQueryReadClient .create (otelSettings );
16091605
16101606 String table =
1611- BigQueryResource .FormatTableResource (
1607+ BigQueryResource .formatTableResource (
16121608 /* projectId= */ "bigquery-public-data" ,
16131609 /* datasetId= */ "samples" ,
16141610 /* tableId= */ "shakespeare" );
@@ -1667,7 +1663,7 @@ public void testUniverseDomain() throws IOException {
16671663 BigQueryReadClient localClient = BigQueryReadClient .create (bigQueryReadSettings );
16681664
16691665 String table =
1670- BigQueryResource .FormatTableResource (
1666+ BigQueryResource .formatTableResource (
16711667 /* projectId= */ "google-tpc-testing-environment:cloudsdk-test-project" ,
16721668 /* datasetId= */ "tpc_demo_dataset" ,
16731669 /* tableId= */ "new_table" );
@@ -1709,10 +1705,8 @@ private long ReadStreamToOffset(ReadStream readStream, long rowOffset) {
17091705
17101706 long rowCount = 0 ;
17111707 ServerStream <ReadRowsResponse > serverStream = client .readRowsCallable ().call (readRowsRequest );
1712- Iterator <ReadRowsResponse > responseIterator = serverStream .iterator ();
17131708
1714- while (responseIterator .hasNext ()) {
1715- ReadRowsResponse response = responseIterator .next ();
1709+ for (ReadRowsResponse response : serverStream ) {
17161710 rowCount += response .getRowCount ();
17171711 if (rowCount >= rowOffset ) {
17181712 return rowOffset ;
@@ -1853,8 +1847,7 @@ private Job RunQueryJobAndExpectSuccess(QueryJobConfiguration configuration)
18531847 }
18541848
18551849 static ServiceAccountCredentials loadCredentials (String credentialFile ) {
1856- try {
1857- InputStream keyStream = new ByteArrayInputStream (credentialFile .getBytes ());
1850+ try (InputStream keyStream = new ByteArrayInputStream (credentialFile .getBytes ())) {
18581851 return ServiceAccountCredentials .fromStream (keyStream );
18591852 } catch (IOException e ) {
18601853 fail ("Couldn't create fake JSON credentials." );
0 commit comments