Skip to content

Commit

Permalink
feat(hudi): update hudi to latest version (0.10.0) (#463)
Browse files Browse the repository at this point in the history
* feat(hudi): update hudi to latest version (0.10.0)

* feat(hoodie): update to 0.10.0
  • Loading branch information
amirhalatzi authored Dec 26, 2021
1 parent 6b00ee7 commit 05fc952
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ env:
- SPARK2_VERSION=2.4.6
- SPARK_VERSION=3.2.0
- HIVE_VERSION=2.3.7
- HUDI_VERSION=0.5.3
- HUDI_VERSION=0.10.0
- TARGET_CACHE=$HOME/target-cache/${TRAVIS_COMMIT}
- LC_ALL=en_US.UTF-8
- LANG=en_US.UTF-8
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ Metorikku supports reading/writing with [Apache Hudi](https://github.com/apache/
Hudi is a very exciting project that basically allows upserts and deletes directly on top of partitioned parquet data.

In order to use Hudi with Metorikku you need to add to your classpath (via ```--jars``` or if running locally with ```-cp```)
an external JAR from here: https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.12/0.5.3/hudi-spark-bundle_2.12-0.5.3.jar
an external JAR from here: https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.12/0.10.0/hudi-spark-bundle_2.12-0.10.0.jar

To run Hudi jobs you also have to make sure you have the following spark configuration (pass with ```--conf``` or ```-D```):
```properties
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ libraryDependencies ++= Seq(
"com.redislabs" %% "spark-redis" % "2.5.0" % "provided",
"org.apache.kafka" %% "kafka" % "2.2.0" % "provided",
"za.co.absa" %% "abris" % "3.2.1" % "provided" excludeAll(excludeAvro, excludeSpark),
"org.apache.hudi" %% "hudi-spark-bundle" % "0.5.3" % "provided",
"org.apache.hudi" %% "hudi-spark-bundle" % "0.10.0" % "provided",
"org.apache.parquet" % "parquet-avro" % parquetVersion.value % "provided",
"com.amazon.deequ" % "deequ" % deequVersion.value excludeAll(excludeSpark, excludeScalanlp),
"org.apache.avro" % "avro" % "1.8.2" % "provided",
Expand Down
6 changes: 3 additions & 3 deletions docker/hive/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ ENV MYSQL_CONNECTOR_VERSION=5.1.47
ADD https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_CONNECTOR_VERSION/mysql-connector-java-$MYSQL_CONNECTOR_VERSION.jar \
$HIVE_HOME/lib/mysql-connector-java-$MYSQL_CONNECTOR_VERSION.jar

ARG HUDI_VERSION=0.5.3
ADD https://repo1.maven.org/maven2/org/apache/hudi/hudi-hive-bundle/$HUDI_VERSION/hudi-hive-bundle-$HUDI_VERSION.jar \
$HIVE_HOME/lib/hudi-hive-bundle-$HUDI_VERSION.jar
ARG HUDI_VERSION=0.10.0
ADD https://repo1.maven.org/maven2/org/apache/hudi/hudi-hive-sync-bundle/$HUDI_VERSION/hudi-hive-sync-bundle-$HUDI_VERSION.jar \
$HIVE_HOME/lib/hudi-hive-sync-bundle-$HUDI_VERSION.jar
ADD https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/$HUDI_VERSION/hudi-hadoop-mr-bundle-$HUDI_VERSION.jar \
$HIVE_HOME/lib/hudi-hadoop-mr-bundle-$HUDI_VERSION.jar

Expand Down
6 changes: 3 additions & 3 deletions docker/spark/custom-hadoop/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ RUN wget -q https://archive.apache.org/dist/hive/hive-$HIVE_VERSION/apache-hive-
&& rm apache-hive-$HIVE_VERSION-bin.tar.gz

#Hudi for hive
ENV HUDI_VERSION=0.5.3
RUN wget -q https://repo1.maven.org/maven2/org/apache/hudi/hudi-hive-bundle/$HUDI_VERSION/hudi-hive-bundle-$HUDI_VERSION.jar \
&& mv hudi-hive-bundle-$HUDI_VERSION.jar $HIVE_HOME/lib
ENV HUDI_VERSION=0.10.0
RUN wget -q https://repo1.maven.org/maven2/org/apache/hudi/hudi-hive-sync-bundle/$HUDI_VERSION/hudi-hive-sync-bundle-$HUDI_VERSION.jar \
&& mv hudi-hive-sync-bundle-$HUDI_VERSION.jar $HIVE_HOME/lib
RUN wget -q https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/$HUDI_VERSION/hudi-hadoop-mr-bundle-$HUDI_VERSION.jar \
&& mv hudi-hadoop-mr-bundle-$HUDI_VERSION.jar $HIVE_HOME/lib
RUN wget -q https://repo1.maven.org/maven2/com/tdunning/json/1.8/json-1.8.jar -P ${SPARK_HOME}/jars/
Expand Down
29 changes: 15 additions & 14 deletions src/main/scala/com/yotpo/metorikku/utils/HudiUtils.scala
Original file line number Diff line number Diff line change
@@ -1,34 +1,35 @@
package com.yotpo.metorikku.utils

import org.apache.hudi.avro.model.HoodieCompactionPlan
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTimeline}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.CompactionUtils
import org.apache.hudi.common.util.collection.ImmutablePair
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.table.HoodieTable
import org.apache.log4j.LogManager
import org.apache.log4j.{LogManager, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext

object HudiUtils {
val log = LogManager.getLogger(this.getClass)
val log: Logger = LogManager.getLogger(this.getClass)

def deletePendingCompactions(sparkContext: SparkContext, basePath: String): Unit = {
try {
val jsc = JavaSparkContext.fromSparkContext(sparkContext)
val hudiMetaclient = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, basePath)
val writerConfig = HoodieWriteConfig.newBuilder().withPath(basePath).build()
val hudiTable = HoodieTable.getHoodieTable(hudiMetaclient, writerConfig, jsc)
val pendingCompactionPlans = CompactionUtils.getAllPendingCompactionPlans(hudiMetaclient)
val activeTimeline = hudiTable.getActiveTimeline()

val hudiMetaClient = HoodieTableMetaClient
.builder
.setConf(sparkContext.hadoopConfiguration)
.setBasePath(basePath)
.build()

val pendingCompactionPlans = CompactionUtils.getAllPendingCompactionPlans(hudiMetaClient)
val activeTimeline = hudiMetaClient.getActiveTimeline

pendingCompactionPlans.toArray().foreach({ pendingCompactionPlan => {
val inflightInstant = pendingCompactionPlan.asInstanceOf[ImmutablePair[HoodieInstant, HoodieCompactionPlan]].getLeft
log.info(s"Deleting pending inflight compaction: ${inflightInstant.getFileName}")
activeTimeline.deleteInflight(inflightInstant)
val compactionRequestedInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, inflightInstant.getTimestamp);
val compactionRequestedInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, inflightInstant.getTimestamp)
log.info(s"Deleting pending compaction requested: ${compactionRequestedInstant.getFileName}")
activeTimeline.deleteCompactionRequested(compactionRequestedInstant)
}
Expand Down

0 comments on commit 05fc952

Please sign in to comment.