Skip to content

Commit

Permalink
Add support for HCatalog and Pig (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
jphalip authored Mar 8, 2024
1 parent 57dffdc commit 64af1b0
Show file tree
Hide file tree
Showing 49 changed files with 1,282 additions and 674 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ buildNumber.properties
.mvn/timing.properties
.mvn/wrapper/maven-wrapper.jar
.vscode/
derby.log
metastore_db

# Integration tests artifacts:
hive-exec*.jar
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next

* Added support for Hive 2.X.
* Fixed case sensitivity bug with column names. This particularly affected pseudo columns like
`_PARTITIONTIME` and `_PARTITIONDATE` in time-ingestion partitioned BigQuery tables.
* **Backward-incompatible change:** The type of the `_PARTITION_TIME` pseudo-column in
Expand Down
2 changes: 1 addition & 1 deletion README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ You can set the following Hive/Hadoop configuration properties in your environme
| `bq.temp.gcs.path` | | GCS location for storing temporary Avro files when using the `indirect` write method |
| `bq.write.method` | `direct` | Indicates how to write data to BigQuery. Possible values: `direct` (to directly write to the BigQuery storage API), `indirect` (to stage temporary Avro files to GCS before loading into BigQuery). |
| `bq.work.dir.parent.path` | `${hadoop.tmp.dir}` | Parent path on HDFS where each job creates its temporary work directory |
| `bq.work.dir.name.prefix` | `bq-hive-` | Prefix used for naming the jobs' temporary directories. |
| `bq.work.dir.name.prefix` | `hive-bq-` | Prefix used for naming the jobs' temporary directories. |
| `materializationProject` | | Project used to temporarily materialize data when reading views. Defaults to the same project as the read view. |
| `materializationDataset` | | Dataset used to temporarily materialize data when reading views. Defaults to the same dataset as the read view. |
| `maxParallelism` | | Maximum initial number of read streams |
Expand Down
56 changes: 55 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ software versions:
* Hive 2.3.6, 2.3.9, 3.1.2, and 3.1.3.
* Hadoop 2.10.2, 3.2.3, and 3.3.3.
* Tez 0.9.2 on Hadoop 2, and Tez 0.10.1 on Hadoop 3.
* Pig 0.17.0.

## Installation

Expand Down Expand Up @@ -276,7 +277,7 @@ You can set the following Hive/Hadoop configuration properties in your environme
| `bq.temp.gcs.path` | | GCS location for storing temporary Avro files when using the `indirect` write method |
| `bq.write.method` | `direct` | Indicates how to write data to BigQuery. Possible values: `direct` (to directly write to the BigQuery storage API), `indirect` (to stage temporary Avro files to GCS before loading into BigQuery). |
| `bq.work.dir.parent.path` | `${hadoop.tmp.dir}` | Parent path on HDFS where each job creates its temporary work directory |
| `bq.work.dir.name.prefix` | `bq-hive-` | Prefix used for naming the jobs' temporary directories. |
| `bq.work.dir.name.prefix` | `hive-bq-` | Prefix used for naming the jobs' temporary directories. |
| `materializationProject` | | Project used to temporarily materialize data when reading views. Defaults to the same project as the read view. |
| `materializationDataset` | | Dataset used to temporarily materialize data when reading views. Defaults to the same dataset as the read view. |
| `maxParallelism` | | Maximum initial number of read streams |
Expand Down Expand Up @@ -474,6 +475,59 @@ session creation time (i.e. when the `SELECT` query is initiated).
Note that this consistency model currently only applies to the table data, not its metadata.
## Spark SQL integration
Dataproc uses a patched version of Spark that automatically detects a table that has the `bq.table`
table property, in which case Spark will use the [`Spark-BigQuery Connector`](https://github.com/GoogleCloudDataproc/spark-bigquery-connector)
to access the table's data. This means that on Dataproc you actually do not need to use the
Hive-BigQuery Connector for Spark SQL.
### Code samples
Java example:
```java
SparkConf sparkConf = new SparkConf().setMaster("local");
SparkSession spark =
SparkSession.builder()
.appName("example")
.config(sparkConf)
.enableHiveSupport()
.getOrCreate();
Dataset<Row> ds = spark.sql("SELECT * FROM mytable");
Row[] rows = ds.collect();
```
Python example:
```python
spark = SparkSession.builder \
.appName("example") \
.config("spark.master", "local") \
.enableHiveSupport() \
.getOrCreate()
df = spark.sql("SELECT * FROM mytable")
rows = df.collect()
```
## Apache Pig integration
The connector supports Apache Pig via HCatalog.
Here's an example reading from a BigQuery table and writing to another, assuming that
`my-database.my-table` and `my-database.my-other-table` have been registered as BigQuery tables:
```pig
some_data = LOAD 'my-database.my-table' USING org.apache.hive.hcatalog.pig.HCatLoader();
STORE some_data INTO 'my-database.my-other-table' USING org.apache.hive.hcatalog.pig.HCatStorer();
```
Notes:
* Pig only supports `datetime` types with milliseconds precision, so you may encounter precision
loss if you have values with nanoseconds in Hive or BigQuery. Learn more in the HCatalog
documentation on [data type mappings](https://cwiki.apache.org/confluence/display/hive/hcatalog+loadstore#HCatalogLoadStore-DataTypeMappings).
## BigLake integration
[BigLake](https://cloud.google.com/biglake) allows you to store your data in open formats
Expand Down
18 changes: 9 additions & 9 deletions cloudbuild/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
steps:
# 1. Create a Docker image containing hadoop-connectors repo
# 0. Create a Docker image containing hadoop-connectors repo
- name: 'gcr.io/cloud-builders/docker'
id: 'docker-build'
args: ['build', '--tag=gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit', '-f', 'cloudbuild/Dockerfile', '.']

# 2. Build the connector and download dependencies without running tests.
# 1. Build the connector and download dependencies without running tests.
- name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit'
id: 'check'
waitFor: ['docker-build']
Expand All @@ -13,7 +13,7 @@ steps:
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'

# 3. Build the connector and download dependencies without running tests.
# 2. Build the connector and download dependencies without running tests.
- name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit'
id: 'build'
waitFor: ['check']
Expand All @@ -22,7 +22,7 @@ steps:
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'

# 4. Run unit tests for Hive 2
# 3. Run unit tests for Hive 2
- name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit'
id: 'unit-tests-hive2'
waitFor: ['build']
Expand All @@ -31,7 +31,7 @@ steps:
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'

# 5. Run unit tests for Hive 3
# 4. Run unit tests for Hive 3
- name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit'
id: 'unit-tests-hive3'
waitFor: ['build']
Expand All @@ -40,7 +40,7 @@ steps:
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'

# 6. Run integration tests for Hive 2
# 5. Run integration tests for Hive 2
- name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit'
id: 'integration-tests-hive2'
waitFor: ['unit-tests-hive2']
Expand All @@ -49,7 +49,7 @@ steps:
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'

# 7. Run integration tests for Hive 3
# 6. Run integration tests for Hive 3
- name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit'
id: 'integration-tests-hive3'
waitFor: ['unit-tests-hive3']
Expand All @@ -58,8 +58,8 @@ steps:
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'

# Tests should take under 90 mins
timeout: 5400s
# Tests should take under 120 mins
timeout: 7200s

options:
machineType: 'N1_HIGHCPU_32'
11 changes: 6 additions & 5 deletions cloudbuild/presubmit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ readonly ACTION=$1

readonly HIVE2_PROFILE="hive2-generic"
readonly HIVE3_PROFILE="hive3-generic"
readonly HIVE3_SHADED_DEPS="shaded-deps-hive3.1.2-hadoop2.10.2"
readonly MVN="./mvnw -B -e -Dmaven.repo.local=/workspace/.repository"

export TEST_BUCKET=dataproc-integ-tests
Expand All @@ -37,16 +38,16 @@ cd /workspace
case "$ACTION" in
# Java code style check
check)
./mvnw spotless:check -P"${HIVE2_PROFILE}" && ./mvnw spotless:check -P"${HIVE3_PROFILE}"
$MVN spotless:check -P"${HIVE2_PROFILE}" && $MVN spotless:check -P"${HIVE3_PROFILE}"
exit
;;

# Download maven and all the dependencies
# Build the Maven packages and dependencies
build)
# Install all modules for Hive 2, including parent modules
# Install all modules for Hive 2
$MVN install -DskipTests -P"${HIVE2_PROFILE}"
# Install the shaded deps for Hive 3 (all the other shaded & parent modules have already been installed with the previous command)
$MVN install -DskipTests -P"${HIVE3_PROFILE}" -pl shaded-deps-${HIVE3_PROFILE}
# Install the shaded dependencies for Hive 3 (all the other shaded & parent modules have already been installed with the previous command)
$MVN install -DskipTests -P"${HIVE3_PROFILE}" -pl ${HIVE3_SHADED_DEPS}
exit
;;

Expand Down
23 changes: 15 additions & 8 deletions hive-2-bigquery-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<classifier>shaded</classifier>
<scope>provided</scope>
</dependency>

<!-- **************** Test dependencies **************** -->
Expand All @@ -36,14 +37,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>shaded-acceptance-tests-dependencies</artifactId>
<version>${project.version}</version>
<classifier>shaded</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.github.hiverunner</groupId>
<artifactId>hiverunner</artifactId>
Expand All @@ -53,6 +46,20 @@
</dependencies>

<profiles>
<profile>
<!-- Currently the same as "hive2.3.9-hadoop2.10.2" but could be changed later -->
<!-- Use this profile if you don't care about specific minor versions of Hive 2.X -->
<id>hive2-generic</id>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>shaded-deps-hive2.3.9-hadoop2.10.2</artifactId>
<version>${project.version}</version>
<classifier>shaded</classifier>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive2.3.6-hadoop2.7.0</id>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.cloud.hive.bigquery.connector.config.HiveBigQueryConfig;
import com.google.cloud.hive.bigquery.connector.utils.DatetimeUtils;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.*;
import org.apache.arrow.vector.DateDayVector;
Expand All @@ -36,27 +37,27 @@ public class Hive2Compat extends HiveCompat {
public Object convertHiveTimeUnitToBq(
ObjectInspector objectInspector, Object hiveValue, String writeMethod) {
if (objectInspector instanceof TimestampObjectInspector) {
TimestampWritable writable;
if (hiveValue instanceof LazyTimestamp) {
writable = ((LazyTimestamp) hiveValue).getWritableObject();
Timestamp timestamp;
if (hiveValue instanceof Timestamp) {
timestamp = (Timestamp) hiveValue;
} else if (hiveValue instanceof LazyTimestamp) {
timestamp = ((LazyTimestamp) hiveValue).getWritableObject().getTimestamp();
} else {
writable = (TimestampWritable) hiveValue;
timestamp = ((TimestampWritable) hiveValue).getTimestamp();
}
Timestamp timestamp = writable.getTimestamp();
if (writeMethod.equals(HiveBigQueryConfig.WRITE_METHOD_INDIRECT)) {
return DatetimeUtils.getEpochMicrosFromHiveTimestamp(timestamp);
} else {
return DatetimeUtils.getEncodedProtoLongFromHiveTimestamp(timestamp);
}
return DatetimeUtils.getEncodedProtoLongFromHiveTimestamp(timestamp);
}
if (objectInspector instanceof DateObjectInspector) {
DateWritable writable;
if (hiveValue instanceof Date) {
return (int) ((Date) hiveValue).toLocalDate().toEpochDay();
}
if (hiveValue instanceof LazyDate) {
writable = ((LazyDate) hiveValue).getWritableObject();
} else {
writable = (DateWritable) hiveValue;
return (int) ((LazyDate) hiveValue).getWritableObject().get().toLocalDate().toEpochDay();
}
return new Integer(writable.getDays());
return ((DateWritable) hiveValue).getDays();
}

return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2023 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.hive.bigquery.connector.integration;

public class PigIntegrationTests extends PigIntegrationTestsBase {

// Tests are from the super-class

}
39 changes: 29 additions & 10 deletions hive-3-bigquery-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<classifier>shaded</classifier>
<scope>provided</scope>
</dependency>

<!-- **************** Test dependencies **************** -->
Expand All @@ -36,14 +37,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>shaded-acceptance-tests-dependencies</artifactId>
<version>${project.version}</version>
<classifier>shaded</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.github.hiverunner</groupId>
<artifactId>hiverunner</artifactId>
Expand All @@ -52,8 +45,21 @@

</dependencies>


<profiles>
<profile>
<!-- Currently the same as "hive3.1.2-hadoop2.10.2" but could be changed later -->
<!-- Use this profile if you don't care about specific minor versions of Hive 3.X -->
<id>hive3-generic</id>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>shaded-deps-hive3.1.2-hadoop2.10.2</artifactId>
<version>${project.version}</version>
<classifier>shaded</classifier>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive3.1.2-hadoop2.10.2</id>
<dependencies>
Expand All @@ -76,6 +82,13 @@
<classifier>shaded</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>shaded-acceptance-tests-dependencies</artifactId>
<version>${project.version}</version>
<classifier>shaded</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
<profile>
Expand All @@ -88,11 +101,17 @@
<classifier>shaded</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>shaded-acceptance-tests-dependencies</artifactId>
<version>${project.version}</version>
<classifier>shaded</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
</profiles>


<build>
<plugins>
<plugin>
Expand Down
Loading

0 comments on commit 64af1b0

Please sign in to comment.