Skip to content

Commit

Permalink
Patching BigQuery connector in 0.7.0 release and upgrading release to…
Browse files Browse the repository at this point in the history
… 0.7.1 (#556)

* adding README document describing how to use BigQuery connector (#467)

* adding README document describing how to use BigQuery connector

* Fixing BigQuery connector package definition, and updating README.md accordingly

* More changes for BigQuery connector (#490)

* Fixing Dockerfile

* Returning dataset in a form of Dictionary from BigQuery connector

* Adding NULL fields support to BigQuery connector

* python style tweak

* more style tweaks

* Style tweaks, comming from google account

* Properly setting row_restriction in createReadSessionRequest and updating sample accordingly (#529)

* updating version 0.7.0 -> 0.7.1

* locking TF version to 1.14

* linter fix
  • Loading branch information
vlasenkoalexey authored and yongtang committed Oct 18, 2019
1 parent b322365 commit 18b113d
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 2 deletions.
2 changes: 1 addition & 1 deletion configure.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ rm -f .bazelrc
if python -c "import tensorflow as tf" &> /dev/null; then
echo 'using installed tensorflow'
else
pip install tensorflow
pip install --upgrade tensorflow==1.14.0
fi
python -m pip install grpcio-tools
python config_helper.py
1 change: 1 addition & 0 deletions dev/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ RUN /bin/bash -c "source activate tfio-dev && python -m pip install \
pyarrow==${ARROW_VERSION} \
pandas \
fastavro \
gast==0.2.2 \
${PIP_ADD_PACKAGES} \
"

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def has_ext_modules(self):
"""

package = 'tensorflow>=1.14.0,<1.15.0'
version = '0.7.0'
version = '0.7.1'
project = 'tensorflow-io'
if '--package-version' in sys.argv:
print(package)
Expand Down
14 changes: 14 additions & 0 deletions tensorflow_io/bigquery/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ load(
"tf_io_copts",
)

py_library(
name = "bigquery",
srcs = [
"__init__.py",
"python/__init__.py",
"python/ops/__init__.py",
"python/ops/bigquery_api.py",
],
data = [
":python/ops/_bigquery.so",
],
srcs_version = "PY2AND3",
)

KERNEL_FILES = [
"kernels/bigquery_kernels.cc",
"kernels/bigquery_dataset_op.cc",
Expand Down
89 changes: 89 additions & 0 deletions tensorflow_io/bigquery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Google BigQuery

[BigQuery](https://cloud.google.com/bigquery/) is a serverless, highly-scalable,
and cost-effective cloud data warehouse with an in-memory BI Engine and machine
learning built in.

BigQuery connector relies on [BigQuery Storage API](https://cloud.google.com/bigquery/docs/reference/storage/).
that provides fast access to BigQuery managed storage by using an rpc-based
protocol.

## Prerequisites

In order to use BigQuery connector, you need to make sure that Google Cloud SDK
is propertly configured and that you have BigQuery Storage API enabled.
Depending on environment you are using some prerequisites might be already met.

1. [Select or create a GCP project.](https://pantheon.corp.google.com/projectselector2/home/dashboard)
2. [Install and initialize the Cloud SDK.](https://cloud.google.com/sdk/docs/)
3. [Setup Authentication.](https://cloud.google.com/docs/authentication/#service_accounts)
If you choose to use [service account](https://cloud.google.com/docs/authentication/production)
authentication, please make sure that GOOGLE_APPLICATION_CREDENTIALS
environment variable is initialized with a path pointing to JSON file that
contains your service account key.
4. [Enable BigQuery Storage API.](https://cloud.google.com/bigquery/docs/reference/storage/#enabling_the_api)

## Sample Use

BigQuery connector mostly follows [BigQuery Storage API flow](https://cloud.google.com/bigquery/docs/reference/storage/#basic_api_flow),
but hides complexity associated with decoding serialized data rows into Tensors.

1. Create a `BigQueryClient` client.
2. Use the `BigQueryClient` to create `BigQueryReadSession` object corresponding
to a read session. A read session divides the contents of a BigQuery table
into one or more streams, which can then be used to read data from the
table.
3. Call parallel_read_rows on `BigQueryReadSession` object to read from multiple
BigQuery streams in parallel.

The following example illustrates how to read particular columns from public
BigQuery dataset.

```python
from tensorflow.python.framework import ops
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession

GCP_PROJECT_ID = '<FILL_ME_IN>'
DATASET_GCP_PROJECT_ID = "bigquery-public-data"
DATASET_ID = "samples"
TABLE_ID = "wikipedia"

def main():
ops.enable_eager_execution()
client = BigQueryClient()
read_session = client.read_session(
"projects/" + GCP_PROJECT_ID,
DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
["title",
"id",
"num_characters",
"language",
"timestamp",
"wp_namespace",
"contributor_username"],
[dtypes.string,
dtypes.int64,
dtypes.int64,
dtypes.string,
dtypes.int64,
dtypes.int64,
dtypes.string],
requested_streams=2,
row_restriction="num_characters > 1000")
dataset = read_session.parallel_read_rows()

row_index = 0
for row in dataset.prefetch(10):
print("row %d: %s" % (row_index, row))
row_index += 1

if __name__ == '__main__':
app.run(main)

```

Please refer to BigQuery connector Python docstrings and to
[Enable BigQuery Storage API](https://cloud.google.com/bigquery/docs/reference/storage/rpc/)
documentation for more details about each parameter.
2 changes: 2 additions & 0 deletions tensorflow_io/bigquery/kernels/bigquery_kernels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class BigQueryReadSessionOp : public OpKernel {
*createReadSessionRequest.mutable_read_options()
->mutable_selected_fields() = {selected_fields_.begin(),
selected_fields_.end()};
createReadSessionRequest.mutable_read_options()->set_row_restriction(
row_restriction_);
createReadSessionRequest.set_requested_streams(requested_streams_);
createReadSessionRequest.set_format(apiv1beta1::DataFormat::AVRO);
VLOG(3) << "createReadSessionRequest: "
Expand Down
5 changes: 5 additions & 0 deletions tensorflow_io/bigquery/kernels/bigquery_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ class BigQueryReaderDatasetIterator : public DatasetIterator<Dataset> {
case avro::AVRO_ENUM:
dtype = DT_STRING;
break;
case avro::AVRO_NULL:
dtype = output_types[i];
break;
default:
return errors::InvalidArgument("unsupported data type: ",
field.type());
Expand Down Expand Up @@ -250,6 +253,8 @@ class BigQueryReaderDatasetIterator : public DatasetIterator<Dataset> {
((*out_tensors)[i]).scalar<string>()() =
field.value<avro::GenericEnum>().symbol();
break;
case avro::AVRO_NULL: // Fallthrough;
break;
default:
return errors::InvalidArgument("unsupported data type: ",
field.type());
Expand Down
20 changes: 20 additions & 0 deletions tensorflow_io/bigquery/python/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""This module contains the Python API for the Cloud BigQuery integration."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

0 comments on commit 18b113d

Please sign in to comment.