Skip to content

Commit

Permalink
TEZ-2119: Counter for launched containers (#301) (Laszlo Bodor review…
Browse files Browse the repository at this point in the history
…ed by Jonathan Eagles and Ayush Saxena)
  • Loading branch information
abstractdog authored Aug 24, 2023
1 parent 4b97acb commit b643f9b
Show file tree
Hide file tree
Showing 20 changed files with 491 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,51 @@ public enum DAGCounter {
AM_CPU_MILLISECONDS,
/** Wall clock time taken by all the tasks. */
WALL_CLOCK_MILLIS,
AM_GC_TIME_MILLIS
AM_GC_TIME_MILLIS,

/*
* Type: # of containers
* Both allocated and launched containers before DAG start.
* This is incremented only once when the DAG starts and it's calculated
* by querying all the held containers from TaskSchedulers.
*/
INITIAL_HELD_CONTAINERS,

/*
* Type: # of containers
* All containers that have been seen/used in this DAG by task allocation.
* This counter can be calculated at the end of DAG by simply counting the distinct
* ContainerIds that have been seen in TaskSchedulerManager.taskAllocated callbacks.
*/
TOTAL_CONTAINERS_USED,

/*
* Type: # of events
* Number of container allocations during a DAG. This is incremented every time
* the containerAllocated callback is called in the TaskSchedulerContext.
* This counter doesn't account for initially held (launched, allocated) containers.
*/
TOTAL_CONTAINER_ALLOCATION_COUNT,

/*
* Type: # of events
* Number of container launches during a DAG. This is incremented every time
* the containerLaunched callback is called in the ContainerLauncherContext.
* This counter doesn't account for initially held (launched, allocated) containers.
*/
TOTAL_CONTAINER_LAUNCH_COUNT,

/*
* Type: # of events
* Number of container releases during a DAG. This is incremented every time
* the containerBeingReleased callback is called in the TaskSchedulerContext.
*/
TOTAL_CONTAINER_RELEASE_COUNT,

/*
* Type: # of events
* Number of container reuses during a DAG. This is incremented every time
* the containerReused callback is called in the TaskSchedulerContext.
*/
TOTAL_CONTAINER_REUSE_COUNT
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.apache.tez.serviceplugins.api;

import java.util.List;

import javax.annotation.Nullable;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -263,4 +265,19 @@ public abstract boolean deallocateTask(Object task, boolean taskSucceeded,
*/
public abstract void dagComplete() throws ServicePluginException;

/**
* Get the number of held containers.
*/
public int getHeldContainersCount() {
return 0;
}

/**
* Callback to be used in the event of a container allocation.
*/
protected void onContainersAllocated(List<Container> containers) {
for (Container container : containers) {
getContext().containerAllocated(container);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ void taskAllocated(Object task,
Object appCookie,
Container container);

/**
* Indicate to the framework that a container is being allocated.
*
* @param container the actual container
*/
void containerAllocated(Container container);

/**
* Indicate to the framework that a container is being reused:
* there is a task assigned to an already used container.
*
* @param container the actual container
*/
void containerReused(Container container);

/**
* Indicate to the framework that a container has completed. This is typically used by sources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
Expand Down Expand Up @@ -65,13 +66,13 @@ public ContainerLauncherContextImpl(AppContext appContext, ContainerLauncherMana

@Override
public void containerLaunched(ContainerId containerId) {
context.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_LAUNCH_COUNT, 1);
context.getEventHandler().handle(
new AMContainerEventLaunched(containerId));
ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
containerId, context.getClock().getTime(), context.getApplicationAttemptId());
context.getHistoryHandler().handle(new DAGHistoryEvent(
null, lEvt));

}

@Override
Expand Down
29 changes: 13 additions & 16 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.apache.tez.common.TezConverterUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.VersionInfo;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.common.security.JobTokenIdentifier;
Expand Down Expand Up @@ -774,8 +775,9 @@ protected synchronized void handle(DAGAppMasterEvent event) {
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
System.err.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId());
System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId());
// Stop vertex services if any
stopVertexServices(currentDAG);

currentDAG.onFinish();

if (!isSession) {
LOG.info("Not a session, AM will unregister as DAG has completed");
this.taskSchedulerManager.setShouldUnregisterFlag();
Expand Down Expand Up @@ -1903,7 +1905,7 @@ void stopServices() {
Exception firstException = null;
// stop in reverse order of start
if (currentDAG != null) {
stopVertexServices(currentDAG);
currentDAG.onFinish();
}
List<Service> serviceList = new ArrayList<Service>(services.size());
for (ServiceWithDependency sd : services.values()) {
Expand Down Expand Up @@ -2093,7 +2095,7 @@ public void serviceStart() throws Exception {
dagEventDispatcher.handle(recoverDAGEvent);
// If we reach here, then we have recoverable DAG and we need to
// reinitialize the vertex services including speculators.
startVertexServices(currentDAG);
currentDAG.onStart();
this.state = DAGAppMasterState.RUNNING;
}
} else {
Expand Down Expand Up @@ -2563,21 +2565,15 @@ public Void run() throws Exception {
throw new TezUncheckedException(e);
}

countHeldContainers(newDAG);
startDAGExecution(newDAG, lrDiff);
// set state after curDag is set
this.state = DAGAppMasterState.RUNNING;
}

private void startVertexServices(DAG dag) {
for (Vertex v : dag.getVertices().values()) {
v.startServices();
}
}

void stopVertexServices(DAG dag) {
for (Vertex v: dag.getVertices().values()) {
v.stopServices();
}
private void countHeldContainers(DAG newDAG) {
newDAG.setDagCounter(DAGCounter.INITIAL_HELD_CONTAINERS,
taskSchedulerManager.getHeldContainersCount());
}

private void startDAGExecution(DAG dag, final Map<String, LocalResource> additionalAmResources)
Expand Down Expand Up @@ -2613,8 +2609,9 @@ public List<URL> run() throws Exception {
// This is a synchronous call, not an event through dispatcher. We want
// job-init to be done completely here.
dagEventDispatcher.handle(initDagEvent);
// Start the vertex services
startVertexServices(dag);

dag.onStart();

// All components have started, start the job.
/** create a job-start event to get this ball rolling */
DAGEvent startDagEvent = new DAGEventStartDag(currentDAG.getID(), additionalUrlsForClasspath);
Expand Down
16 changes: 16 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.util.Set;

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
Expand Down Expand Up @@ -102,4 +104,18 @@ VertexStatusBuilder getVertexStatus(String vertexName,
*/
@Nullable DAGScheduler getDAGScheduler();

void incrementDagCounter(DAGCounter counter, int incrValue);
void setDagCounter(DAGCounter counter, int setValue);
void addUsedContainer(ContainerId containerId);

/**
* Called by the DAGAppMaster when the DAG is started normally or in the event of recovery.
*/
void onStart();

/**
* Called by the DAGAppMaster when the DAG is finished, or there is a currentDAG on AM stop.
* The implementation of this method should be idempontent.
*/
void onFinish();
}
44 changes: 44 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
Expand Down Expand Up @@ -248,6 +249,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
new CommitCompletedTransition();

private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
private final Set<ContainerId> containersUsedByCurrentDAG = new HashSet<>();

protected static final
StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
Expand Down Expand Up @@ -1441,6 +1443,16 @@ private void updateCpuCounters() {
dagCounters.findCounter(DAGCounter.AM_GC_TIME_MILLIS).setValue(totalDAGGCTime);
}

@Override
public void incrementDagCounter(DAGCounter counter, int incrValue) {
dagCounters.findCounter(counter).increment(incrValue);
}

@Override
public void setDagCounter(DAGCounter counter, int setValue) {
dagCounters.findCounter(counter).setValue(setValue);
}

private DAGState finished(DAGState finalState) {
boolean dagError = false;
try {
Expand Down Expand Up @@ -2542,4 +2554,36 @@ public DAGImpl setLogDirs(String[] logDirs) {
this.logDirs = logDirs;
return this;
}

@Override
public void onStart() {
startVertexServices();
}

@Override
public void onFinish() {
stopVertexServices();
handleUsedContainersOnDagFinish();
}

private void startVertexServices() {
for (Vertex v : getVertices().values()) {
v.startServices();
}
}

void stopVertexServices() {
for (Vertex v : getVertices().values()) {
v.stopServices();
}
}

@Override
public void addUsedContainer(ContainerId containerId) {
containersUsedByCurrentDAG.add(containerId);
}

private void handleUsedContainersOnDagFinish() {
setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ public void shutdown() throws Exception {

@Override
public void onContainersAllocated(List<Container> containers) {
super.onContainersAllocated(containers);

AMState appState = getContext().getAMState();
if (stopRequested || appState == AMState.COMPLETED) {
LOG.info("Ignoring {} allocations since app is terminating", containers.size());
Expand Down Expand Up @@ -946,6 +948,9 @@ private void addTaskAssignment(TaskRequest request, HeldContainer hc) {
assignedVertices.set(vertexIndex);
}
cset.add(hc);
if (!hc.isNew()) {
getContext().containerReused(hc.getContainer());
}
hc.assignTask(request);
}

Expand Down Expand Up @@ -1489,6 +1494,10 @@ Object getLastTask() {
return lastRequest != null ? lastRequest.getTask() : null;
}

boolean isNew() {
return lastRequest == null;
}

String getMatchingLocation() {
switch (state) {
case MATCHING_LOCAL:
Expand Down Expand Up @@ -2089,4 +2098,9 @@ protected void afterExecute(Runnable r, Throwable t) {
}
}
}

@Override
public int getHeldContainersCount() {
return heldContainers.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -513,4 +513,9 @@ void preemptTask(DeallocateContainerRequest request) {
}
}
}

@Override
public int getHeldContainersCount() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.AppContext;
Expand Down Expand Up @@ -69,13 +70,24 @@ public void taskAllocated(Object task, Object appCookie, Container container) {
taskSchedulerManager.taskAllocated(schedulerId, task, appCookie, container);
}

@Override
public void containerAllocated(Container container) {
appContext.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_ALLOCATION_COUNT, 1);
}

@Override
public void containerReused(Container container) {
appContext.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_REUSE_COUNT, 1);
}

@Override
public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) {
taskSchedulerManager.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
}

@Override
public void containerBeingReleased(ContainerId containerId) {
appContext.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_RELEASE_COUNT, 1);
taskSchedulerManager.containerBeingReleased(schedulerId, containerId);
}

Expand Down
Loading

0 comments on commit b643f9b

Please sign in to comment.