Skip to content

Commit

Permalink
[hive] Fix hive writer in tez-mr (apache#2954)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alibaba-HZY authored and zhu3pang committed Mar 29, 2024
1 parent 55304dc commit edbc555
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,27 @@ public void testInsert() throws Exception {
assertThat(select).isEqualTo(Arrays.asList("1\t2\t3\tHello", "4\t5\t6\tFine"));
}

@Test
public void testHiveCreateAndHiveWrite() throws Exception {
List<InternalRow> emptyData = Collections.emptyList();

hiveShell.execute(
"CREATE TABLE paimon_table (\n"
+ " `a` STRING comment '',\n"
+ " `b` STRING comment '',\n"
+ " `c` STRING comment ''\n"
+ ") \n"
+ "STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'\n"
+ "TBLPROPERTIES (\n"
+ " 'primary-key' = 'a',\n"
+ " 'bucket' = '1',\n"
+ " 'bucket_key' = 'a'\n"
+ ");");
hiveShell.execute("insert into paimon_table values (2,3,'Hello'),(5,6,'Fine')");
List<String> select = hiveShell.executeQuery("select * from paimon_table");
assertThat(select).containsExactly("2\t3\tHello", "5\t6\tFine");
}

@Test
public void testInsertTimestampAndDate() throws Exception {
List<InternalRow> emptyData = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOE
}

@Override
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {

public void commitTask(TaskAttemptContext originalContext) throws IOException {
TaskAttemptContext taskAttemptContext =
TezUtil.enrichContextWithAttemptWrapper(originalContext);
TaskAttemptID attemptID = taskAttemptContext.getTaskAttemptID();
JobConf jobConf = taskAttemptContext.getJobConf();
FileStoreTable table = createFileStoreTable(jobConf);
Expand Down Expand Up @@ -117,7 +118,9 @@ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException
}

@Override
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
public void abortTask(TaskAttemptContext originalContext) throws IOException {
TaskAttemptContext taskAttemptContext =
TezUtil.enrichContextWithAttemptWrapper(originalContext);
Map<String, PaimonRecordWriter> writers =
PaimonRecordWriter.removeWriters(taskAttemptContext.getTaskAttemptID());

Expand All @@ -130,7 +133,8 @@ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException
}

@Override
public void commitJob(JobContext jobContext) throws IOException {
public void commitJob(JobContext originalContext) throws IOException {
JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
JobConf jobConf = jobContext.getJobConf();

long startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -161,7 +165,8 @@ public void commitJob(JobContext jobContext) throws IOException {
}

@Override
public void abortJob(JobContext jobContext, int status) throws IOException {
public void abortJob(JobContext originalContext, int status) throws IOException {
JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
FileStoreTable table = createFileStoreTable(jobContext.getJobConf());
if (table != null) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public FileSinkOperator.RecordWriter getHiveRecordWriter(
}

private static PaimonRecordWriter writer(JobConf jobConf) {
TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get(TASK_ATTEMPT_ID_KEY));
TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jobConf);

FileStoreTable table = createFileStoreTable(jobConf);
// force write-only = true
Map<String, String> newOptions =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.hive.mapred;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobID;

import java.util.Objects;

/**
* Utility class to enrich the JobContext and TaskAttemptContext with the vertex id. copied form
* iceberg.
*/
public class TezUtil {

private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
// TezProcessor (Hive) propagates the vertex id under this key - available during Task commit
// phase
private static final String TEZ_VERTEX_ID_HIVE = "hive.tez.vertex.index";
// MROutputCommitter (Tez) propagates the vertex id under this key - available during DAG/Vertex
// commit phase
private static final String TEZ_VERTEX_ID_DAG = "mapreduce.task.vertex.id";

/**
* If the Tez vertex id is present in config, creates a new jobContext by appending the Tez
* vertex id to the jobID. For the rationale behind this enrichment, please refer to point #1 in
* the docs of {@link TaskAttemptWrapper}.
*
* @param jobContext original jobContext to be enriched
* @return enriched jobContext
*/
public static JobContext enrichContextWithVertexId(JobContext jobContext) {
String vertexId = jobContext.getJobConf().get(TEZ_VERTEX_ID_DAG);
if (vertexId != null) {
JobID jobID = getJobIDWithVertexAppended(jobContext.getJobID(), vertexId);
return new JobContextImpl(jobContext.getJobConf(), jobID, jobContext.getProgressible());
} else {
return jobContext;
}
}

/**
* Creates a new taskAttemptContext by replacing the taskAttemptID with a wrapped object. For
* the rationale behind this enrichment, please refer to point #2 in the docs of {@link
* TaskAttemptWrapper}.
*
* @param taskAttemptContext original taskAttemptContext to be enriched
* @return enriched taskAttemptContext
*/
public static TaskAttemptContext enrichContextWithAttemptWrapper(
TaskAttemptContext taskAttemptContext) {
TaskAttemptID wrapped = TezUtil.taskAttemptWrapper(taskAttemptContext.getTaskAttemptID());
return new TaskAttemptContextImpl(taskAttemptContext.getJobConf(), wrapped);
}

public static TaskAttemptID taskAttemptWrapper(TaskAttemptID attemptID) {
return new TaskAttemptWrapper(attemptID, "");
}

public static TaskAttemptID taskAttemptWrapper(JobConf jc) {
return new TaskAttemptWrapper(
TaskAttemptID.forName(jc.get(TASK_ATTEMPT_ID_KEY)), jc.get(TEZ_VERTEX_ID_HIVE));
}

private static JobID getJobIDWithVertexAppended(JobID jobID, String vertexId) {
if (vertexId != null && !vertexId.isEmpty()) {
return new JobID(jobID.getJtIdentifier() + vertexId, jobID.getId());
} else {
return jobID;
}
}

private TezUtil() {}

/**
* Subclasses {@link TaskAttemptID}. It has two main purposes: 1. Provide a way to append an
* optional vertex id to the Job ID. This is needed because there is a discrepancy between how
* the attempt ID is constructed in the {@link org.apache.tez.mapreduce.output.MROutput} (with
* vertex ID appended to the end of the Job ID) and how it's available in the mapper (without
* vertex ID) which creates and caches the HiveIcebergRecordWriter object. 2. Redefine the
* equals/hashcode provided by TaskAttemptID so that task type (map or reduce) does not count,
* and therefore the mapper and reducer threads can use the same attempt ID-based key to
* retrieve the cached HiveIcebergRecordWriter object.
*/
private static class TaskAttemptWrapper extends TaskAttemptID {

TaskAttemptWrapper(TaskAttemptID attemptID, String vertexId) {
super(
getJobIDWithVertexAppended(attemptID.getJobID(), vertexId).getJtIdentifier(),
attemptID.getJobID().getId(),
attemptID.getTaskType(),
attemptID.getTaskID().getId(),
attemptID.getId());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskAttemptWrapper that = (TaskAttemptWrapper) o;
return getId() == that.getId()
&& getTaskID().getId() == that.getTaskID().getId()
&& Objects.equals(getJobID(), that.getJobID());
}

@Override
public int hashCode() {
return Objects.hash(getId(), getTaskID().getId(), getJobID());
}
}
}

0 comments on commit edbc555

Please sign in to comment.