Skip to content

Commit 0947aca

Browse files
Patching 0.8.0 release (#630)
* making sure that bigquery reader is returning meaningfull defaults for null values (#626) * making sure that bigquery reader is returning meaningfull defaults for null values * linter fixes * Use balanced sharding strategy in bigquery read session. (#601) We use sharding to provide a way of shuffling the read order. With liquid sharding, a single stream can read all data which wouldn't be shuffled. * changing after-success.sh to push binaries to dropbox for branch R0.81 * bumping up version 0.8.0->0.8.1
1 parent 8d5bfba commit 0947aca

File tree

5 files changed

+130
-31
lines changed

5 files changed

+130
-31
lines changed

.travis/after-success.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# limitations under the License.
1515
# ==============================================================================
1616

17-
if [[ ( ${TRAVIS_BRANCH} == "master" ) && ( ${TRAVIS_EVENT_TYPE} != "pull_request" ) ]]; then
17+
if [[ (( ${TRAVIS_BRANCH} == "master" ) || ( ${TRAVIS_BRANCH} == "R0.81" )) && ( ${TRAVIS_EVENT_TYPE} != "pull_request" ) ]]; then
1818

1919
twine upload wheelhouse/tensorflow_io_nightly-*.whl
2020

tensorflow_io/bigquery/kernels/bigquery_kernels.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ class BigQueryReadSessionOp : public OpKernel {
149149
createReadSessionRequest.mutable_read_options()->set_row_restriction(
150150
row_restriction_);
151151
createReadSessionRequest.set_requested_streams(requested_streams_);
152+
createReadSessionRequest.set_sharding_strategy(apiv1beta1::ShardingStrategy::BALANCED);
152153
createReadSessionRequest.set_format(apiv1beta1::DataFormat::AVRO);
153154
VLOG(3) << "createReadSessionRequest: "
154155
<< createReadSessionRequest.DebugString();

tensorflow_io/bigquery/kernels/bigquery_lib.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,30 @@ class BigQueryReaderDatasetIterator : public DatasetIterator<Dataset> {
228228
field.value<avro::GenericEnum>().symbol();
229229
break;
230230
case avro::AVRO_NULL:
231+
switch(output_types[i]) {
232+
case DT_BOOL:
233+
((*out_tensors)[i]).scalar<bool>()() = false;
234+
break;
235+
case DT_INT32:
236+
((*out_tensors)[i]).scalar<int32>()() = 0;
237+
break;
238+
case DT_INT64:
239+
((*out_tensors)[i]).scalar<int64>()() = 0l;
240+
break;
241+
case DT_FLOAT:
242+
((*out_tensors)[i]).scalar<float>()() = 0.0f;
243+
break;
244+
case DT_DOUBLE:
245+
((*out_tensors)[i]).scalar<double>()() = 0.0;
246+
break;
247+
case DT_STRING:
248+
((*out_tensors)[i]).scalar<string>()() = "";
249+
break;
250+
default:
251+
return errors::InvalidArgument(
252+
"unsupported data type against AVRO_NULL: ",
253+
output_types[i]);
254+
}
231255
break;
232256
default:
233257
return errors::InvalidArgument("unsupported data type: ",

tensorflow_io/core/python/ops/io_info.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
from __future__ import print_function
1919

2020
package = 'tensorflow>=1.15.0,<1.16.0'
21-
version = '0.8.0'
21+
version = '0.8.1'

tests/test_bigquery_eager.py

Lines changed: 103 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def __init__(self, avro_schema, avro_serialized_rows_per_stream,
5858
self, self._grpc_server)
5959
port = self._grpc_server.add_insecure_port("localhost:0")
6060
self._endpoint = "localhost:" + str(port)
61-
print ("started server on :" + self._endpoint)
61+
print("started a fake server on :" + self._endpoint)
6262

6363
def start(self):
6464
self._grpc_server.start()
@@ -74,7 +74,7 @@ def _build_stream_name(self, stream_index):
7474
self._table_id + "/" + str(stream_index)
7575

7676
def CreateReadSession(self, request, context):
77-
print ("called CreateReadSession on server")
77+
print("called CreateReadSession on a fake server")
7878
self._project_id = request.table_reference.project_id
7979
self._table_id = request.table_reference.table_id
8080
self._dataset_id = request.table_reference.dataset_id
@@ -89,10 +89,10 @@ def CreateReadSession(self, request, context):
8989
return response
9090

9191
def ReadRows(self, request, context):
92-
print ("called ReadRows on server")
92+
print("called ReadRows on a fake server")
9393
response = storage_pb2.ReadRowsResponse()
9494
stream_index = self._streams.index(request.read_position.stream.name)
95-
if stream_index >= 0 and stream_index < len(
95+
if 0 <= stream_index < len(
9696
self._avro_serialized_rows_per_stream):
9797
response.avro_rows.serialized_binary_rows = \
9898
self._avro_serialized_rows_per_stream[stream_index]
@@ -104,53 +104,107 @@ def ReadRows(self, request, context):
104104
class BigqueryOpsTest(test.TestCase):
105105
"""Tests for BigQuery adapter."""
106106

107-
GCP_PROJECT_ID = "bigquery-public-data"
108-
DATASET_ID = "usa_names"
109-
TABLE_ID = "usa_1910_current"
110-
PARENT = "projects/some_parent"
107+
GCP_PROJECT_ID = "test_project_id"
108+
DATASET_ID = "test_dataset"
109+
TABLE_ID = "test_table"
110+
PARENT = "projects/test_parent"
111111

112112
AVRO_SCHEMA = """
113113
{
114114
"type": "record",
115115
"name": "__root__",
116116
"fields": [
117117
{
118-
"name": "state",
118+
"name": "string",
119119
"type": [
120120
"null",
121121
"string"
122122
],
123-
"doc": "2-digit state code"
123+
"doc": "nullable string"
124124
},
125125
{
126-
"name": "name",
126+
"name": "boolean",
127127
"type": [
128128
"null",
129-
"string"
129+
"boolean"
130+
],
131+
"doc": "nullable boolean"
132+
},
133+
{
134+
"name": "int",
135+
"type": [
136+
"null",
137+
"int"
130138
],
131-
"doc": "Given name of a person at birth"
139+
"doc": "nullable int"
132140
},
133141
{
134-
"name": "number",
142+
"name": "long",
135143
"type": [
136144
"null",
137145
"long"
138146
],
139-
"doc": "Number of occurrences of the name"
147+
"doc": "nullable long"
148+
},
149+
{
150+
"name": "float",
151+
"type": [
152+
"null",
153+
"float"
154+
],
155+
"doc": "nullable float"
156+
},
157+
{
158+
"name": "double",
159+
"type": [
160+
"null",
161+
"double"
162+
],
163+
"doc": "nullable double"
140164
}
141165
]
142166
}"""
143167

144-
STREAM_1_ROWS = [{
145-
"state": "wa",
146-
"name": "Andrew",
147-
"number": 1
148-
}, {
149-
"state": "wa",
150-
"name": "Eva",
151-
"number": 2
152-
}]
153-
STREAM_2_ROWS = [{"state": "ny", "name": "Emma", "number": 10}]
168+
STREAM_1_ROWS = [
169+
{
170+
"string": "string1",
171+
"boolean": True,
172+
"int": 10,
173+
"long": 100,
174+
"float": 1000.0,
175+
"double": 10000.0
176+
},
177+
{
178+
"string": "string2",
179+
"boolean": False,
180+
"int": 12,
181+
"long": 102,
182+
"float": 1002.0,
183+
"double": 10002.0
184+
}
185+
]
186+
STREAM_2_ROWS = [
187+
{
188+
"string": "string2",
189+
"boolean": True,
190+
"int": 20,
191+
"long": 200,
192+
"float": 2000.0,
193+
"double": 20000.0
194+
},
195+
{
196+
# Empty record, all values are null
197+
}
198+
]
199+
200+
DEFAULT_VALUES = {
201+
'boolean': False,
202+
'double': 0.0,
203+
'float': 0.0,
204+
'int': 0,
205+
'long': 0,
206+
'string': ''
207+
}
154208

155209
@staticmethod
156210
def _serialize_to_avro(rows, schema):
@@ -233,8 +287,16 @@ def test_read_rows(self):
233287
self.PARENT,
234288
self.GCP_PROJECT_ID,
235289
self.TABLE_ID,
236-
self.DATASET_ID, ["state", "name", "number"],
237-
[dtypes.string, dtypes.string, dtypes.int64],
290+
self.DATASET_ID,
291+
["string", "boolean", "int", "long", "float", "double"],
292+
[
293+
dtypes.string,
294+
dtypes.bool,
295+
dtypes.int32,
296+
dtypes.int64,
297+
dtypes.float32,
298+
dtypes.float64
299+
],
238300
requested_streams=2)
239301

240302
streams_list = read_session.get_streams()
@@ -252,6 +314,8 @@ def test_read_rows(self):
252314
itr2 = iter(dataset2)
253315
self.assertEqual(self.STREAM_2_ROWS[0],
254316
self._normalize_dictionary(itr2.get_next()))
317+
self.assertEqual(self.DEFAULT_VALUES,
318+
self._normalize_dictionary(itr2.get_next()))
255319
with self.assertRaises(errors.OutOfRangeError):
256320
itr2.get_next()
257321

@@ -262,8 +326,16 @@ def test_parallel_read_rows(self):
262326
self.PARENT,
263327
self.GCP_PROJECT_ID,
264328
self.TABLE_ID,
265-
self.DATASET_ID, ["state", "name", "number"],
266-
[dtypes.string, dtypes.string, dtypes.int64],
329+
self.DATASET_ID,
330+
["string", "boolean", "int", "long", "float", "double"],
331+
[
332+
dtypes.string,
333+
dtypes.bool,
334+
dtypes.int32,
335+
dtypes.int64,
336+
dtypes.float32,
337+
dtypes.float64
338+
],
267339
requested_streams=2)
268340

269341
dataset = read_session.parallel_read_rows()
@@ -274,6 +346,8 @@ def test_parallel_read_rows(self):
274346
self._normalize_dictionary(itr.get_next()))
275347
self.assertEqual(self.STREAM_1_ROWS[1],
276348
self._normalize_dictionary(itr.get_next()))
349+
self.assertEqual(self.DEFAULT_VALUES,
350+
self._normalize_dictionary(itr.get_next()))
277351

278352
if __name__ == "__main__":
279353
test.main()

0 commit comments

Comments
 (0)