From a379efe545b5e4d678aa794c4629d51c497ff817 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 19 Mar 2024 14:00:20 +0100 Subject: [PATCH] TEZ-4548: InputDataInformationEvent to be read from serialized payload from filesystem --- .../api/events/InputDataInformationEvent.java | 21 +++-- tez-api/src/main/proto/Events.proto | 1 + .../TestCompositeDataMovementEvent.java | 4 +- .../events/TestInputDataInformationEvent.java | 53 +++++++++++ .../app/dag/RootInputInitializerManager.java | 5 +- .../tez/mapreduce/hadoop/MRInputHelpers.java | 31 +++++++ .../apache/tez/mapreduce/input/MRInput.java | 5 +- .../mapreduce/hadoop/TestMRInputHelpers.java | 88 ++++++++++++++----- .../apache/tez/common/ProtoConverters.java | 20 +++-- 9 files changed, 189 insertions(+), 39 deletions(-) rename tez-api/src/test/java/org/apache/tez/runtime/api/{event => events}/TestCompositeDataMovementEvent.java (90%) create mode 100644 tez-api/src/test/java/org/apache/tez/runtime/api/events/TestInputDataInformationEvent.java diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java index a62a34154f..b142bf9800 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java @@ -49,8 +49,8 @@ public final class InputDataInformationEvent extends Event { private final int sourceIndex; private int targetIndex; // TODO Likely to be multiple at a later point. private final ByteBuffer userPayload; + private String serializedPath; private final Object userPayloadObject; - private InputDataInformationEvent(int srcIndex, ByteBuffer userPayload) { this.sourceIndex = srcIndex; @@ -79,6 +79,12 @@ public static InputDataInformationEvent createWithObjectPayload(int srcIndex, return new InputDataInformationEvent(srcIndex, userPayloadDeserialized, null); } + public static InputDataInformationEvent createWithSerializedPath(int srcIndex, String serializedPath) { + InputDataInformationEvent event = new InputDataInformationEvent(srcIndex, null); + event.serializedPath = serializedPath; + return event; + } + public int getSourceIndex() { return this.sourceIndex; } @@ -90,11 +96,15 @@ public int getTargetIndex() { public void setTargetIndex(int target) { this.targetIndex = target; } - + + public String getSerializedPath() { + return serializedPath; + } + public ByteBuffer getUserPayload() { return userPayload == null ? null : userPayload.asReadOnlyBuffer(); } - + public Object getDeserializedUserPayload() { return this.userPayloadObject; } @@ -103,6 +113,7 @@ public Object getDeserializedUserPayload() { public String toString() { return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex=" + targetIndex + ", serializedUserPayloadExists=" + (userPayload != null) - + ", deserializedUserPayloadExists=" + (userPayloadObject != null) + "]"; - } + + ", deserializedUserPayloadExists=" + (userPayloadObject != null) + + ", serializedPath=" + serializedPath + "]"; + } } diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto index 9949b0bc8c..05896ac62e 100644 --- a/tez-api/src/main/proto/Events.proto +++ b/tez-api/src/main/proto/Events.proto @@ -58,6 +58,7 @@ message RootInputDataInformationEventProto { optional int32 source_index = 1; optional int32 target_index = 2; optional bytes user_payload = 3; + optional bytes serialized_path = 4; } message CompositeEventProto { diff --git a/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestCompositeDataMovementEvent.java similarity index 90% rename from tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java rename to tez-api/src/test/java/org/apache/tez/runtime/api/events/TestCompositeDataMovementEvent.java index 7dce6991a4..98c238a1dd 100644 --- a/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java +++ b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestCompositeDataMovementEvent.java @@ -16,12 +16,10 @@ * limitations under the License. */ -package org.apache.tez.runtime.api.event; +package org.apache.tez.runtime.api.events; import java.nio.ByteBuffer; -import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; -import org.apache.tez.runtime.api.events.DataMovementEvent; import org.junit.Assert; import org.junit.Test; diff --git a/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestInputDataInformationEvent.java b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestInputDataInformationEvent.java new file mode 100644 index 0000000000..6e002e26c4 --- /dev/null +++ b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestInputDataInformationEvent.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.api.events; + +import java.nio.ByteBuffer; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.base.Charsets; + +public class TestInputDataInformationEvent { + + @Test + public void testApiPayloadOrPath() { + InputDataInformationEvent eventWithSerializedPayload = + InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.wrap("payload1".getBytes())); + // event created by createWithSerializedPayload should contain serialized payload + // but not a path or a deserialized payload + Assert.assertEquals("payload1", Charsets.UTF_8.decode(eventWithSerializedPayload.getUserPayload()).toString()); + Assert.assertNull(eventWithSerializedPayload.getSerializedPath()); + Assert.assertNull(eventWithSerializedPayload.getDeserializedUserPayload()); + + InputDataInformationEvent eventWithObjectPayload = InputDataInformationEvent.createWithObjectPayload(0, "payload2"); + // event created by eventWithObjectPayload should contain a deserialized payload + // but not a path or serialized payload + Assert.assertEquals("payload2", eventWithObjectPayload.getDeserializedUserPayload()); + Assert.assertNull(eventWithObjectPayload.getSerializedPath()); + Assert.assertNull(eventWithObjectPayload.getUserPayload()); + + InputDataInformationEvent eventWithPath = InputDataInformationEvent.createWithSerializedPath(0, "file://hello"); + // event created by createWithSerializedPath should contain a path + // but neither serialized nor deserialized payload + Assert.assertEquals("file://hello", eventWithPath.getSerializedPath()); + Assert.assertNull(eventWithPath.getUserPayload()); + Assert.assertNull(eventWithPath.getDeserializedUserPayload()); + } +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index cfbdb19e30..f3e94993ca 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -272,9 +272,8 @@ public List call() throws Exception { List events = ugi.doAs(new PrivilegedExceptionAction>() { @Override public List run() throws Exception { - LOG.info( - "Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() + - " on vertex " + initializerWrapper.getVertexLogIdentifier()); + LOG.info("Starting InputInitializer for Input: {} on vertex {}", initializerWrapper.getInput().getName(), + initializerWrapper.getVertexLogIdentifier()); try { TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), initializerWrapper.vertexId); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index 26bba4d002..c6da1362a5 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -19,6 +19,7 @@ package org.apache.tez.mapreduce.hadoop; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -30,12 +31,15 @@ import java.util.Objects; import com.google.common.base.Function; +import com.google.common.base.Strings; + import org.apache.tez.common.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -43,6 +47,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -72,6 +77,7 @@ import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.protos.MRRuntimeProtos; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; @Public @Unstable @@ -889,4 +895,29 @@ public static int getDagAttemptNumber(Configuration conf) { return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER); } + public static MRSplitProto getProto(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException { + return !Strings.isNullOrEmpty(initEvent.getSerializedPath()) ? readProtoFromFs(initEvent, jobConf) + : readProtoFromPayload(initEvent); + } + + private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException { + String serializedPath = initEvent.getSerializedPath(); + Path filePath = new Path(serializedPath); + LOG.info("Reading InputDataInformationEvent from path: {}", filePath); + + MRSplitProto splitProto = null; + FileSystem fs = FileSystem.get(filePath.toUri(), jobConf); + + try (FSDataInputStream in = fs.open(filePath)) { + splitProto = MRSplitProto.parseFrom(in); + fs.delete(filePath, false); + } + return splitProto; + } + + private static MRSplitProto readProtoFromPayload(InputDataInformationEvent initEvent) throws IOException { + ByteBuffer payload = initEvent.getUserPayload(); + LOG.info("Reading InputDataInformationEvent from payload, size: {} bytes}", payload.limit()); + return MRSplitProto.parseFrom(ByteString.copyFrom(payload)); + } } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index ee907f5d74..8c3d5d5dbb 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -27,8 +27,6 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import com.google.protobuf.ByteString; - import org.apache.tez.runtime.api.ProgressFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +70,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.common.Preconditions; + import com.google.common.collect.Lists; /** @@ -672,7 +671,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I LOG.debug(getContext().getInputOutputVertexNames() + " initializing RecordReader from event"); } Objects.requireNonNull(initEvent, "InitEvent must be specified"); - MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload())); + MRSplitProto splitProto = MRInputHelpers.getProto(initEvent, jobConf); Object splitObj = null; long splitLength = -1; if (useNewApi) { diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java index 11b1271a8a..0d3b1c5335 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java @@ -42,10 +42,16 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.dag.api.DataSourceDescriptor; import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import com.google.protobuf.ByteString; + public class TestMRInputHelpers { protected static MiniDFSCluster dfsCluster; @@ -56,9 +62,12 @@ public class TestMRInputHelpers { private static Path oldSplitsDir; private static Path newSplitsDir; - private static String TEST_ROOT_DIR = "target" + private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestMRHelpers.class.getName() + "-tmpDir"; + private static final Path LOCAL_TEST_ROOT_DIR = new Path("target" + + Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir"); + @BeforeClass public static void setup() throws IOException { try { @@ -188,6 +197,43 @@ public void testInputSplitLocalResourceCreation() throws Exception { MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); } + @Test + public void testInputEventSerializedPayload() throws IOException { + MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build(); + + InputDataInformationEvent initEvent = + InputDataInformationEvent.createWithSerializedPayload(0, proto.toByteString().asReadOnlyByteBuffer()); + MRSplitProto protoFromEvent = MRInputHelpers.getProto(initEvent, new JobConf(conf)); + + Assert.assertEquals(proto, protoFromEvent); + } + + @Test + public void testInputEventSerializedPath() throws IOException { + MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build(); + + FileSystem localFs = FileSystem.getLocal(conf); + Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR); + + Path serializedPath = new Path(splitsDir + Path.SEPARATOR + "splitpayload"); + + try (FSDataOutputStream out = localFs.create(serializedPath)) { + proto.writeTo(out); + } + + // event file is present on fs + Assert.assertTrue(localFs.exists(serializedPath)); + + InputDataInformationEvent initEvent = + InputDataInformationEvent.createWithSerializedPath(0, serializedPath.toUri().toString()); + MRSplitProto protoFromEvent = MRInputHelpers.getProto(initEvent, new JobConf(conf)); + + Assert.assertEquals(proto, protoFromEvent); + + // event file is delete after read + Assert.assertFalse(localFs.exists(serializedPath)); + } + private void verifyLocationHints(Path inputSplitsDir, List actual) throws Exception { JobID jobId = new JobID("dummy", 1); @@ -232,30 +278,32 @@ private DataSourceDescriptor generateDataSourceDescriptorMapRed(Path inputSplits @Test(timeout = 5000) public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exception { FileSystem localFs = FileSystem.getLocal(conf); - Path LOCAL_TEST_ROOT_DIR = new Path("target" - + Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir"); + Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR); - try { - localFs.mkdirs(LOCAL_TEST_ROOT_DIR); - - Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR); + DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir); - DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir); - - Map localResources = dataSource.getAdditionalLocalFiles(); + Map localResources = dataSource.getAdditionalLocalFiles(); - Assert.assertEquals(2, localResources.size()); - Assert.assertTrue(localResources.containsKey( - MRInputHelpers.JOB_SPLIT_RESOURCE_NAME)); - Assert.assertTrue(localResources.containsKey( - MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); + Assert.assertEquals(2, localResources.size()); + Assert.assertTrue(localResources.containsKey( + MRInputHelpers.JOB_SPLIT_RESOURCE_NAME)); + Assert.assertTrue(localResources.containsKey( + MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); - for (LocalResource lr : localResources.values()) { - Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme())); - } - } finally { - localFs.delete(LOCAL_TEST_ROOT_DIR, true); + for (LocalResource lr : localResources.values()) { + Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme())); } } + @Before + public void before() throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + localFs.mkdirs(LOCAL_TEST_ROOT_DIR); + } + + @After + public void after() throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + localFs.delete(LOCAL_TEST_ROOT_DIR, true); + } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java index 2c9ad86018..eb8e37aeb9 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java @@ -18,8 +18,11 @@ package org.apache.tez.common; +import com.google.common.base.Charsets; import com.google.protobuf.ByteString; +import java.nio.ByteBuffer; + import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.CustomProcessorEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; @@ -135,15 +138,22 @@ public static VertexManagerEvent convertVertexManagerEventFromProto( if (event.getUserPayload() != null) { builder.setUserPayload(ByteString.copyFrom(event.getUserPayload())); } + if (event.getSerializedPath() != null) { + builder.setSerializedPath(ByteString.copyFrom(event.getSerializedPath().getBytes(Charsets.UTF_8))); + } return builder.build(); } - public static InputDataInformationEvent - convertRootInputDataInformationEventFromProto( + public static InputDataInformationEvent convertRootInputDataInformationEventFromProto( EventProtos.RootInputDataInformationEventProto proto) { - InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload( - proto.getSourceIndex(), - proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null); + ByteBuffer payload = proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null; + InputDataInformationEvent diEvent = null; + if (!proto.getSerializedPath().isEmpty()) { + diEvent = InputDataInformationEvent.createWithSerializedPath(proto.getSourceIndex(), + proto.getSerializedPath().toStringUtf8()); + } else { + diEvent = InputDataInformationEvent.createWithSerializedPayload(proto.getSourceIndex(), payload); + } diEvent.setTargetIndex(proto.getTargetIndex()); return diEvent; }