From edbc555bd2c74f9a2f43850d43b5e3fc33c1524d Mon Sep 17 00:00:00 2001 From: HZY <48040205+Alibaba-HZY@users.noreply.github.com> Date: Mon, 18 Mar 2024 10:08:48 +0800 Subject: [PATCH] [hive] Fix hive writer in tez-mr (#2954) --- .../apache/paimon/hive/HiveWriteITCase.java | 21 +++ .../hive/mapred/PaimonOutputCommitter.java | 15 +- .../hive/mapred/PaimonOutputFormat.java | 3 +- .../apache/paimon/hive/mapred/TezUtil.java | 136 ++++++++++++++++++ 4 files changed, 169 insertions(+), 6 deletions(-) create mode 100644 paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TezUtil.java diff --git a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java index 4751b075ac30..7513621a855b 100644 --- a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java +++ b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java @@ -170,6 +170,27 @@ public void testInsert() throws Exception { assertThat(select).isEqualTo(Arrays.asList("1\t2\t3\tHello", "4\t5\t6\tFine")); } + @Test + public void testHiveCreateAndHiveWrite() throws Exception { + List emptyData = Collections.emptyList(); + + hiveShell.execute( + "CREATE TABLE paimon_table (\n" + + " `a` STRING comment '',\n" + + " `b` STRING comment '',\n" + + " `c` STRING comment ''\n" + + ") \n" + + "STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'\n" + + "TBLPROPERTIES (\n" + + " 'primary-key' = 'a',\n" + + " 'bucket' = '1',\n" + + " 'bucket_key' = 'a'\n" + + ");"); + hiveShell.execute("insert into paimon_table values (2,3,'Hello'),(5,6,'Fine')"); + List select = hiveShell.executeQuery("select * from paimon_table"); + assertThat(select).containsExactly("2\t3\tHello", "5\t6\tFine"); + } + @Test public void testInsertTimestampAndDate() throws Exception { List emptyData = Collections.emptyList(); diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java index 122c3f766d05..94bc4a675ae8 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java @@ -71,8 +71,9 @@ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOE } @Override - public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { - + public void commitTask(TaskAttemptContext originalContext) throws IOException { + TaskAttemptContext taskAttemptContext = + TezUtil.enrichContextWithAttemptWrapper(originalContext); TaskAttemptID attemptID = taskAttemptContext.getTaskAttemptID(); JobConf jobConf = taskAttemptContext.getJobConf(); FileStoreTable table = createFileStoreTable(jobConf); @@ -117,7 +118,9 @@ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException } @Override - public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { + public void abortTask(TaskAttemptContext originalContext) throws IOException { + TaskAttemptContext taskAttemptContext = + TezUtil.enrichContextWithAttemptWrapper(originalContext); Map writers = PaimonRecordWriter.removeWriters(taskAttemptContext.getTaskAttemptID()); @@ -130,7 +133,8 @@ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException } @Override - public void commitJob(JobContext jobContext) throws IOException { + public void commitJob(JobContext originalContext) throws IOException { + JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); JobConf jobConf = jobContext.getJobConf(); long startTime = System.currentTimeMillis(); @@ -161,7 +165,8 @@ public void commitJob(JobContext jobContext) throws IOException { } @Override - public void abortJob(JobContext jobContext, int status) throws IOException { + public void abortJob(JobContext originalContext, int status) throws IOException { + JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); FileStoreTable table = createFileStoreTable(jobContext.getJobConf()); if (table != null) { diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java index ccfd7bcdfcd6..ef1ee687ecb0 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java @@ -73,7 +73,8 @@ public FileSinkOperator.RecordWriter getHiveRecordWriter( } private static PaimonRecordWriter writer(JobConf jobConf) { - TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get(TASK_ATTEMPT_ID_KEY)); + TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jobConf); + FileStoreTable table = createFileStoreTable(jobConf); // force write-only = true Map newOptions = diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TezUtil.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TezUtil.java new file mode 100644 index 000000000000..d4d7c7cd9c71 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TezUtil.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.mapred; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.JobID; + +import java.util.Objects; + +/** + * Utility class to enrich the JobContext and TaskAttemptContext with the vertex id. copied form + * iceberg. + */ +public class TezUtil { + + private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id"; + // TezProcessor (Hive) propagates the vertex id under this key - available during Task commit + // phase + private static final String TEZ_VERTEX_ID_HIVE = "hive.tez.vertex.index"; + // MROutputCommitter (Tez) propagates the vertex id under this key - available during DAG/Vertex + // commit phase + private static final String TEZ_VERTEX_ID_DAG = "mapreduce.task.vertex.id"; + + /** + * If the Tez vertex id is present in config, creates a new jobContext by appending the Tez + * vertex id to the jobID. For the rationale behind this enrichment, please refer to point #1 in + * the docs of {@link TaskAttemptWrapper}. + * + * @param jobContext original jobContext to be enriched + * @return enriched jobContext + */ + public static JobContext enrichContextWithVertexId(JobContext jobContext) { + String vertexId = jobContext.getJobConf().get(TEZ_VERTEX_ID_DAG); + if (vertexId != null) { + JobID jobID = getJobIDWithVertexAppended(jobContext.getJobID(), vertexId); + return new JobContextImpl(jobContext.getJobConf(), jobID, jobContext.getProgressible()); + } else { + return jobContext; + } + } + + /** + * Creates a new taskAttemptContext by replacing the taskAttemptID with a wrapped object. For + * the rationale behind this enrichment, please refer to point #2 in the docs of {@link + * TaskAttemptWrapper}. + * + * @param taskAttemptContext original taskAttemptContext to be enriched + * @return enriched taskAttemptContext + */ + public static TaskAttemptContext enrichContextWithAttemptWrapper( + TaskAttemptContext taskAttemptContext) { + TaskAttemptID wrapped = TezUtil.taskAttemptWrapper(taskAttemptContext.getTaskAttemptID()); + return new TaskAttemptContextImpl(taskAttemptContext.getJobConf(), wrapped); + } + + public static TaskAttemptID taskAttemptWrapper(TaskAttemptID attemptID) { + return new TaskAttemptWrapper(attemptID, ""); + } + + public static TaskAttemptID taskAttemptWrapper(JobConf jc) { + return new TaskAttemptWrapper( + TaskAttemptID.forName(jc.get(TASK_ATTEMPT_ID_KEY)), jc.get(TEZ_VERTEX_ID_HIVE)); + } + + private static JobID getJobIDWithVertexAppended(JobID jobID, String vertexId) { + if (vertexId != null && !vertexId.isEmpty()) { + return new JobID(jobID.getJtIdentifier() + vertexId, jobID.getId()); + } else { + return jobID; + } + } + + private TezUtil() {} + + /** + * Subclasses {@link TaskAttemptID}. It has two main purposes: 1. Provide a way to append an + * optional vertex id to the Job ID. This is needed because there is a discrepancy between how + * the attempt ID is constructed in the {@link org.apache.tez.mapreduce.output.MROutput} (with + * vertex ID appended to the end of the Job ID) and how it's available in the mapper (without + * vertex ID) which creates and caches the HiveIcebergRecordWriter object. 2. Redefine the + * equals/hashcode provided by TaskAttemptID so that task type (map or reduce) does not count, + * and therefore the mapper and reducer threads can use the same attempt ID-based key to + * retrieve the cached HiveIcebergRecordWriter object. + */ + private static class TaskAttemptWrapper extends TaskAttemptID { + + TaskAttemptWrapper(TaskAttemptID attemptID, String vertexId) { + super( + getJobIDWithVertexAppended(attemptID.getJobID(), vertexId).getJtIdentifier(), + attemptID.getJobID().getId(), + attemptID.getTaskType(), + attemptID.getTaskID().getId(), + attemptID.getId()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskAttemptWrapper that = (TaskAttemptWrapper) o; + return getId() == that.getId() + && getTaskID().getId() == that.getTaskID().getId() + && Objects.equals(getJobID(), that.getJobID()); + } + + @Override + public int hashCode() { + return Objects.hash(getId(), getTaskID().getId(), getJobID()); + } + } +}