Skip to content

Commit

Permalink
TEZ-4548: InputDataInformationEvent to be read from serialized payloa…
Browse files Browse the repository at this point in the history
…d from filesystem (#341) (Laszlo Bodor reviewed by Ayush Saxena)
  • Loading branch information
abstractdog authored Apr 2, 2024
1 parent 34bb628 commit f080031
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ public interface InputInitializerContext {
* @return Resource
*/
Resource getVertexTaskResource();


/**
* Get the vertex id as integer that belongs to this input.
*/
int getVertexId();

/**
* Get the total resource allocated to this vertex. If the DAG is running in
* a busy cluster then it may have no resources available dedicated to it. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public final class InputDataInformationEvent extends Event {
private final int sourceIndex;
private int targetIndex; // TODO Likely to be multiple at a later point.
private final ByteBuffer userPayload;
private String serializedPath;
private final Object userPayloadObject;


private InputDataInformationEvent(int srcIndex, ByteBuffer userPayload) {
this.sourceIndex = srcIndex;
Expand Down Expand Up @@ -79,6 +79,12 @@ public static InputDataInformationEvent createWithObjectPayload(int srcIndex,
return new InputDataInformationEvent(srcIndex, userPayloadDeserialized, null);
}

public static InputDataInformationEvent createWithSerializedPath(int srcIndex, String serializedPath) {
InputDataInformationEvent event = new InputDataInformationEvent(srcIndex, null);
event.serializedPath = serializedPath;
return event;
}

public int getSourceIndex() {
return this.sourceIndex;
}
Expand All @@ -90,19 +96,29 @@ public int getTargetIndex() {
public void setTargetIndex(int target) {
this.targetIndex = target;
}


public String getSerializedPath() {
return serializedPath;
}

public ByteBuffer getUserPayload() {
return userPayload == null ? null : userPayload.asReadOnlyBuffer();
}

public Object getDeserializedUserPayload() {
return this.userPayloadObject;
}

@Override
public String toString() {
return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex="
+ targetIndex + ", serializedUserPayloadExists=" + (userPayload != null)
+ ", deserializedUserPayloadExists=" + (userPayloadObject != null) + "]";
}
StringBuilder sb = new StringBuilder();
sb.append("InputDataInformationEvent [sourceIndex=").append(sourceIndex)
.append(", targetIndex=").append(targetIndex)
.append(", serializedUserPayloadExists=").append(userPayload != null)
.append(", deserializedUserPayloadExists=").append(userPayloadObject != null);
if (serializedPath != null) {
sb.append(", serializedPath=").append(serializedPath);
}
sb.append("]");
return sb.toString();
}
}
1 change: 1 addition & 0 deletions tez-api/src/main/proto/Events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ message RootInputDataInformationEventProto {
optional int32 source_index = 1;
optional int32 target_index = 2;
optional bytes user_payload = 3;
optional bytes serialized_path = 4;
}

message CompositeEventProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
* limitations under the License.
*/

package org.apache.tez.runtime.api.event;
package org.apache.tez.runtime.api.events;

import java.nio.ByteBuffer;

import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.junit.Assert;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.api.events;

import java.nio.ByteBuffer;

import org.junit.Assert;
import org.junit.Test;

import com.google.common.base.Charsets;

public class TestInputDataInformationEvent {

@Test
public void testApiPayloadOrPath() {
InputDataInformationEvent eventWithSerializedPayload =
InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.wrap("payload1".getBytes()));
// event created by createWithSerializedPayload should contain serialized payload
// but not a path or a deserialized payload
Assert.assertEquals("payload1", Charsets.UTF_8.decode(eventWithSerializedPayload.getUserPayload()).toString());
Assert.assertNull(eventWithSerializedPayload.getSerializedPath());
Assert.assertNull(eventWithSerializedPayload.getDeserializedUserPayload());

InputDataInformationEvent eventWithObjectPayload = InputDataInformationEvent.createWithObjectPayload(0, "payload2");
// event created by eventWithObjectPayload should contain a deserialized payload
// but not a path or serialized payload
Assert.assertEquals("payload2", eventWithObjectPayload.getDeserializedUserPayload());
Assert.assertNull(eventWithObjectPayload.getSerializedPath());
Assert.assertNull(eventWithObjectPayload.getUserPayload());

InputDataInformationEvent eventWithPath = InputDataInformationEvent.createWithSerializedPath(0, "file://hello");
// event created by createWithSerializedPath should contain a path
// but neither serialized nor deserialized payload
Assert.assertEquals("file://hello", eventWithPath.getSerializedPath());
Assert.assertNull(eventWithPath.getUserPayload());
Assert.assertNull(eventWithPath.getDeserializedUserPayload());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,8 @@ public List<Event> call() throws Exception {
List<Event> events = ugi.doAs(new PrivilegedExceptionAction<List<Event>>() {
@Override
public List<Event> run() throws Exception {
LOG.info(
"Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() +
" on vertex " + initializerWrapper.getVertexLogIdentifier());
LOG.info("Starting InputInitializer for Input: {} on vertex {}", initializerWrapper.getInput().getName(),
initializerWrapper.getVertexLogIdentifier());
try {
TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
initializerWrapper.vertexId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public Resource getVertexTaskResource() {
return vertex.getTaskResource();
}

@Override
public int getVertexId() {
return vertex.getVertexId().getId();
}

@Override
public Resource getTotalAvailableResource() {
return appContext.getTaskScheduler().getTotalResources(vertex.getTaskSchedulerIdentifier());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.mapreduce.hadoop;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -30,19 +31,23 @@
import java.util.Objects;

import com.google.common.base.Function;
import com.google.common.base.Strings;

import org.apache.tez.common.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;

import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -72,6 +77,7 @@
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;

@Public
@Unstable
Expand Down Expand Up @@ -889,4 +895,29 @@ public static int getDagAttemptNumber(Configuration conf) {
return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER);
}

public static MRSplitProto getProto(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException {
return Strings.isNullOrEmpty(initEvent.getSerializedPath()) ? readProtoFromPayload(initEvent)
: readProtoFromFs(initEvent, jobConf);
}

private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException {
String serializedPath = initEvent.getSerializedPath();
Path filePath = new Path(serializedPath);
LOG.info("Reading InputDataInformationEvent from path: {}", filePath);

MRSplitProto splitProto = null;
FileSystem fs = filePath.getFileSystem(jobConf);

try (FSDataInputStream in = fs.open(filePath)) {
splitProto = MRSplitProto.parseFrom(in);
fs.delete(filePath, false);
}
return splitProto;
}

private static MRSplitProto readProtoFromPayload(InputDataInformationEvent initEvent) throws IOException {
ByteBuffer payload = initEvent.getUserPayload();
LOG.info("Reading InputDataInformationEvent from payload: {}", payload);
return MRSplitProto.parseFrom(ByteString.copyFrom(payload));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import com.google.protobuf.ByteString;

import org.apache.tez.runtime.api.ProgressFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,6 +70,7 @@
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;

import org.apache.tez.common.Preconditions;

import com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -672,7 +671,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I
LOG.debug(getContext().getInputOutputVertexNames() + " initializing RecordReader from event");
}
Objects.requireNonNull(initEvent, "InitEvent must be specified");
MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload()));
MRSplitProto splitProto = MRInputHelpers.getProto(initEvent, jobConf);
Object splitObj = null;
long splitLength = -1;
if (useNewApi) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public Resource getVertexTaskResource() {
return Resource.newInstance(1024, 1);
}

@Override
public int getVertexId() {
return 0;
}

@Override
public Resource getTotalAvailableResource() {
return Resource.newInstance(10240, 10);
Expand Down
Loading

0 comments on commit f080031

Please sign in to comment.