Skip to content

Commit

Permalink
TEZ-4548: InputDataInformationEvent to be read from serialized payloa…
Browse files Browse the repository at this point in the history
…d from filesystem
  • Loading branch information
abstractdog committed Mar 30, 2024
1 parent 34bb628 commit a379efe
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public final class InputDataInformationEvent extends Event {
private final int sourceIndex;
private int targetIndex; // TODO Likely to be multiple at a later point.
private final ByteBuffer userPayload;
private String serializedPath;
private final Object userPayloadObject;


private InputDataInformationEvent(int srcIndex, ByteBuffer userPayload) {
this.sourceIndex = srcIndex;
Expand Down Expand Up @@ -79,6 +79,12 @@ public static InputDataInformationEvent createWithObjectPayload(int srcIndex,
return new InputDataInformationEvent(srcIndex, userPayloadDeserialized, null);
}

public static InputDataInformationEvent createWithSerializedPath(int srcIndex, String serializedPath) {
InputDataInformationEvent event = new InputDataInformationEvent(srcIndex, null);
event.serializedPath = serializedPath;
return event;
}

public int getSourceIndex() {
return this.sourceIndex;
}
Expand All @@ -90,11 +96,15 @@ public int getTargetIndex() {
public void setTargetIndex(int target) {
this.targetIndex = target;
}


public String getSerializedPath() {
return serializedPath;
}

public ByteBuffer getUserPayload() {
return userPayload == null ? null : userPayload.asReadOnlyBuffer();
}

public Object getDeserializedUserPayload() {
return this.userPayloadObject;
}
Expand All @@ -103,6 +113,7 @@ public Object getDeserializedUserPayload() {
public String toString() {
return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex="
+ targetIndex + ", serializedUserPayloadExists=" + (userPayload != null)
+ ", deserializedUserPayloadExists=" + (userPayloadObject != null) + "]";
}
+ ", deserializedUserPayloadExists=" + (userPayloadObject != null)
+ ", serializedPath=" + serializedPath + "]";
}
}
1 change: 1 addition & 0 deletions tez-api/src/main/proto/Events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ message RootInputDataInformationEventProto {
optional int32 source_index = 1;
optional int32 target_index = 2;
optional bytes user_payload = 3;
optional bytes serialized_path = 4;
}

message CompositeEventProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
* limitations under the License.
*/

package org.apache.tez.runtime.api.event;
package org.apache.tez.runtime.api.events;

import java.nio.ByteBuffer;

import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.junit.Assert;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.api.events;

import java.nio.ByteBuffer;

import org.junit.Assert;
import org.junit.Test;

import com.google.common.base.Charsets;

public class TestInputDataInformationEvent {

@Test
public void testApiPayloadOrPath() {
InputDataInformationEvent eventWithSerializedPayload =
InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.wrap("payload1".getBytes()));
// event created by createWithSerializedPayload should contain serialized payload
// but not a path or a deserialized payload
Assert.assertEquals("payload1", Charsets.UTF_8.decode(eventWithSerializedPayload.getUserPayload()).toString());
Assert.assertNull(eventWithSerializedPayload.getSerializedPath());
Assert.assertNull(eventWithSerializedPayload.getDeserializedUserPayload());

InputDataInformationEvent eventWithObjectPayload = InputDataInformationEvent.createWithObjectPayload(0, "payload2");
// event created by eventWithObjectPayload should contain a deserialized payload
// but not a path or serialized payload
Assert.assertEquals("payload2", eventWithObjectPayload.getDeserializedUserPayload());
Assert.assertNull(eventWithObjectPayload.getSerializedPath());
Assert.assertNull(eventWithObjectPayload.getUserPayload());

InputDataInformationEvent eventWithPath = InputDataInformationEvent.createWithSerializedPath(0, "file://hello");
// event created by createWithSerializedPath should contain a path
// but neither serialized nor deserialized payload
Assert.assertEquals("file://hello", eventWithPath.getSerializedPath());
Assert.assertNull(eventWithPath.getUserPayload());
Assert.assertNull(eventWithPath.getDeserializedUserPayload());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,8 @@ public List<Event> call() throws Exception {
List<Event> events = ugi.doAs(new PrivilegedExceptionAction<List<Event>>() {
@Override
public List<Event> run() throws Exception {
LOG.info(
"Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() +
" on vertex " + initializerWrapper.getVertexLogIdentifier());
LOG.info("Starting InputInitializer for Input: {} on vertex {}", initializerWrapper.getInput().getName(),
initializerWrapper.getVertexLogIdentifier());
try {
TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
initializerWrapper.vertexId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.mapreduce.hadoop;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -30,19 +31,23 @@
import java.util.Objects;

import com.google.common.base.Function;
import com.google.common.base.Strings;

import org.apache.tez.common.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;

import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -72,6 +77,7 @@
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;

@Public
@Unstable
Expand Down Expand Up @@ -889,4 +895,29 @@ public static int getDagAttemptNumber(Configuration conf) {
return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER);
}

public static MRSplitProto getProto(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException {
return !Strings.isNullOrEmpty(initEvent.getSerializedPath()) ? readProtoFromFs(initEvent, jobConf)
: readProtoFromPayload(initEvent);
}

private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException {
String serializedPath = initEvent.getSerializedPath();
Path filePath = new Path(serializedPath);
LOG.info("Reading InputDataInformationEvent from path: {}", filePath);

MRSplitProto splitProto = null;
FileSystem fs = FileSystem.get(filePath.toUri(), jobConf);

try (FSDataInputStream in = fs.open(filePath)) {
splitProto = MRSplitProto.parseFrom(in);
fs.delete(filePath, false);
}
return splitProto;
}

private static MRSplitProto readProtoFromPayload(InputDataInformationEvent initEvent) throws IOException {
ByteBuffer payload = initEvent.getUserPayload();
LOG.info("Reading InputDataInformationEvent from payload, size: {} bytes}", payload.limit());
return MRSplitProto.parseFrom(ByteString.copyFrom(payload));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import com.google.protobuf.ByteString;

import org.apache.tez.runtime.api.ProgressFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,6 +70,7 @@
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;

import org.apache.tez.common.Preconditions;

import com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -672,7 +671,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I
LOG.debug(getContext().getInputOutputVertexNames() + " initializing RecordReader from event");
}
Objects.requireNonNull(initEvent, "InitEvent must be specified");
MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload()));
MRSplitProto splitProto = MRInputHelpers.getProto(initEvent, jobConf);
Object splitObj = null;
long splitLength = -1;
if (useNewApi) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import com.google.protobuf.ByteString;

public class TestMRInputHelpers {

protected static MiniDFSCluster dfsCluster;
Expand All @@ -56,9 +62,12 @@ public class TestMRInputHelpers {
private static Path oldSplitsDir;
private static Path newSplitsDir;

private static String TEST_ROOT_DIR = "target"
private static final String TEST_ROOT_DIR = "target"
+ Path.SEPARATOR + TestMRHelpers.class.getName() + "-tmpDir";

private static final Path LOCAL_TEST_ROOT_DIR = new Path("target"
+ Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir");

@BeforeClass
public static void setup() throws IOException {
try {
Expand Down Expand Up @@ -188,6 +197,43 @@ public void testInputSplitLocalResourceCreation() throws Exception {
MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));
}

@Test
public void testInputEventSerializedPayload() throws IOException {
MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build();

InputDataInformationEvent initEvent =
InputDataInformationEvent.createWithSerializedPayload(0, proto.toByteString().asReadOnlyByteBuffer());
MRSplitProto protoFromEvent = MRInputHelpers.getProto(initEvent, new JobConf(conf));

Assert.assertEquals(proto, protoFromEvent);
}

@Test
public void testInputEventSerializedPath() throws IOException {
MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build();

FileSystem localFs = FileSystem.getLocal(conf);
Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR);

Path serializedPath = new Path(splitsDir + Path.SEPARATOR + "splitpayload");

try (FSDataOutputStream out = localFs.create(serializedPath)) {
proto.writeTo(out);
}

// event file is present on fs
Assert.assertTrue(localFs.exists(serializedPath));

InputDataInformationEvent initEvent =
InputDataInformationEvent.createWithSerializedPath(0, serializedPath.toUri().toString());
MRSplitProto protoFromEvent = MRInputHelpers.getProto(initEvent, new JobConf(conf));

Assert.assertEquals(proto, protoFromEvent);

// event file is delete after read
Assert.assertFalse(localFs.exists(serializedPath));
}

private void verifyLocationHints(Path inputSplitsDir,
List<TaskLocationHint> actual) throws Exception {
JobID jobId = new JobID("dummy", 1);
Expand Down Expand Up @@ -232,30 +278,32 @@ private DataSourceDescriptor generateDataSourceDescriptorMapRed(Path inputSplits
@Test(timeout = 5000)
public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exception {
FileSystem localFs = FileSystem.getLocal(conf);
Path LOCAL_TEST_ROOT_DIR = new Path("target"
+ Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir");
Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR);

try {
localFs.mkdirs(LOCAL_TEST_ROOT_DIR);

Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR);
DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir);

DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir);

Map<String, LocalResource> localResources = dataSource.getAdditionalLocalFiles();
Map<String, LocalResource> localResources = dataSource.getAdditionalLocalFiles();

Assert.assertEquals(2, localResources.size());
Assert.assertTrue(localResources.containsKey(
MRInputHelpers.JOB_SPLIT_RESOURCE_NAME));
Assert.assertTrue(localResources.containsKey(
MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));
Assert.assertEquals(2, localResources.size());
Assert.assertTrue(localResources.containsKey(
MRInputHelpers.JOB_SPLIT_RESOURCE_NAME));
Assert.assertTrue(localResources.containsKey(
MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));

for (LocalResource lr : localResources.values()) {
Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme()));
}
} finally {
localFs.delete(LOCAL_TEST_ROOT_DIR, true);
for (LocalResource lr : localResources.values()) {
Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme()));
}
}

@Before
public void before() throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
localFs.mkdirs(LOCAL_TEST_ROOT_DIR);
}

@After
public void after() throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
localFs.delete(LOCAL_TEST_ROOT_DIR, true);
}
}
Loading

0 comments on commit a379efe

Please sign in to comment.