Skip to content

Commit

Permalink
TEZ-4527: Add generic and pluggable hooks for DAGs and task attempts (#…
Browse files Browse the repository at this point in the history
…324) (Shohei Okumiya reviewed by Laszlo Bodor)
  • Loading branch information
okumin authored Dec 22, 2024
1 parent ca15119 commit 1084699
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 43 deletions.
21 changes: 19 additions & 2 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2297,12 +2297,14 @@ static Set<String> getPropertySet() {
public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties";

/**
* Frequency at which thread dump should be captured. Supports TimeUnits.
* Frequency at which thread dump should be captured. Supports TimeUnits. This is effective only
* when org.apache.tez.dag.app.ThreadDumpDAGHook is configured to tez.am.hooks or
* org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook is configured to tez.task.attempt.hooks.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "100ms";

/**
* Limits the amount of data that can be written to LocalFileSystem by a Task.
Expand All @@ -2312,4 +2314,19 @@ static Set<String> getPropertySet() {
public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes";
public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1;

/**
* Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezDAGHook.
* e.g. org.apache.tez.dag.app.ThreadDumpDAGHook
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_AM_HOOKS = TEZ_AM_PREFIX + "hooks";

/**
* Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezTaskAttemptHook.
* e.g. org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_TASK_ATTEMPT_HOOKS = TEZ_TASK_PREFIX + "attempt.hooks";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.hook;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezDAGID;

/**
* A hook which is instantiated and triggered before and after a DAG is executed.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TezDAGHook {
/**
* Invoked before the DAG starts.
*
* @param id the DAG id
* @param conf the conf
*/
void start(TezDAGID id, Configuration conf);

/**
* Invoked after the DAG finishes.
*/
void stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.hook;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezTaskAttemptID;

/**
* A hook which is instantiated and triggered before and after a task attempt is executed.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TezTaskAttemptHook {
/**
* Invoked before the task attempt starts.
*
* @param id the task attempt id
* @param conf the conf
*/
void start(TezTaskAttemptID id, Configuration conf);

/**
* Invoked after the task attempt finishes.
*/
void stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

@Private
package org.apache.tez.runtime.hook;

import org.apache.hadoop.classification.InterfaceAudience.Private;
23 changes: 17 additions & 6 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.tez.Utils;
import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
Expand Down Expand Up @@ -187,7 +188,7 @@
import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.hook.TezDAGHook;
import org.apache.tez.util.LoggingUtils;
import org.apache.tez.util.TezMxBeanResourceCalculator;
import org.codehaus.jettison.json.JSONException;
Expand Down Expand Up @@ -343,7 +344,7 @@ public class DAGAppMaster extends AbstractService {
Map<Service, ServiceWithDependency> services =
new LinkedHashMap<Service, ServiceWithDependency>();
private ThreadLocalMap mdcContext;
private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;
private TezDAGHook[] hooks = {};

public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Expand Down Expand Up @@ -770,7 +771,9 @@ protected synchronized void handle(DAGAppMasterEvent event) {
"DAGAppMaster Internal Error occurred");
break;
case DAG_FINISHED:
tezThreadDumpHelper.stop();
for (TezDAGHook hook : hooks) {
hook.stop();
}
DAGAppMasterEventDAGFinished finishEvt =
(DAGAppMasterEventDAGFinished) event;
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
Expand Down Expand Up @@ -2226,8 +2229,10 @@ public Void run() throws Exception {
execService.shutdownNow();
}

// Check if the thread dump service is up in any case, if yes attempt a shutdown
tezThreadDumpHelper.stop();
// Try to shut down any hooks that are still active
for (TezDAGHook hook : hooks) {
hook.stop();
}

super.serviceStop();
}
Expand Down Expand Up @@ -2599,7 +2604,13 @@ private void countHeldContainers(DAG newDAG) {
private void startDAGExecution(DAG dag, final Map<String, LocalResource> additionalAmResources)
throws TezException {
currentDAG = dag;
tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString());
final Configuration conf = dag.getConf();
final String[] hookClasses = conf.getStrings(TezConfiguration.TEZ_AM_HOOKS, new String[0]);
hooks = new TezDAGHook[hookClasses.length];
for (int i = 0; i < hooks.length; i++) {
hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
hooks[i].start(dag.getID(), conf);
}

// Try localizing the actual resources.
List<URL> additionalUrlsForClasspath;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.app;

import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.hook.TezDAGHook;

/**
* A DAG hook which dumps thread information periodically.
*/
public class ThreadDumpDAGHook implements TezDAGHook {
private TezThreadDumpHelper helper;

@Override
public void start(TezDAGID id, Configuration conf) {
helper = TezThreadDumpHelper.getInstance(conf).start(id.toString());
}

@Override
public void stop() {
helper.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Appender;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezContainerLogAppender;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,10 +47,9 @@

public class TezThreadDumpHelper {

public static final NoopTezThreadDumpHelper NOOP_TEZ_THREAD_DUMP_HELPER = new NoopTezThreadDumpHelper();
private long duration = 0L;
private Path basePath = null;
private FileSystem fs = null;
private final long duration;
private final Path basePath;
private final FileSystem fs;

private static final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean();
private static final Logger LOG = LoggerFactory.getLogger(TezThreadDumpHelper.class);
Expand All @@ -70,21 +71,17 @@ private TezThreadDumpHelper(long duration, Configuration conf) throws IOExceptio
"path: {}", duration, basePath);
}

public TezThreadDumpHelper() {
}

public static TezThreadDumpHelper getInstance(Configuration conf) {
long periodicThreadDumpFrequency =
conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);

if (periodicThreadDumpFrequency > 0) {
try {
return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
} catch (IOException e) {
LOG.warn("Can not initialize periodic thread dump service", e);
}
long periodicThreadDumpFrequency = conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL,
TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
Preconditions.checkArgument(periodicThreadDumpFrequency > 0, "%s must be positive duration",
TEZ_THREAD_DUMP_INTERVAL);

try {
return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
} catch (IOException e) {
throw new TezUncheckedException("Can not initialize periodic thread dump service", e);
}
return NOOP_TEZ_THREAD_DUMP_HELPER;
}

public TezThreadDumpHelper start(String name) {
Expand Down Expand Up @@ -178,18 +175,4 @@ private String getTaskName(long id, String taskName) {
return id + " (" + taskName + ")";
}
}

private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper {

@Override
public TezThreadDumpHelper start(String name) {
// Do Nothing
return this;
}

@Override
public void stop() {
// Do Nothing
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.log4j.helpers.ThreadLocalMap;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezLocalResource;
Expand All @@ -69,10 +70,10 @@
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.hook.TezTaskAttemptHook;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.util.LoggingUtils;

Expand Down Expand Up @@ -120,7 +121,6 @@ public class TezChild {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final String user;
private final boolean updateSysCounters;
private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;

private Multimap<String, String> startedInputsMap = HashMultimap.create();
private final boolean ownUmbilical;
Expand Down Expand Up @@ -295,7 +295,13 @@ public ContainerExecutionResult run() throws IOException, InterruptedException,
hadoopShim, sharedExecutor);

boolean shouldDie;
tezThreadDumpHelper = TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString());
final String[] hookClasses = taskConf
.getStrings(TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS, new String[0]);
final TezTaskAttemptHook[] hooks = new TezTaskAttemptHook[hookClasses.length];
for (int i = 0; i < hooks.length; i++) {
hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
hooks[i].start(attemptId, taskConf);
}
try {
TaskRunner2Result result = taskRunner.run();
LOG.info("TaskRunner2Result: {}", result);
Expand All @@ -314,7 +320,9 @@ public ContainerExecutionResult run() throws IOException, InterruptedException,
e, "TaskExecutionFailure: " + e.getMessage());
}
} finally {
tezThreadDumpHelper.stop();
for (TezTaskAttemptHook hook : hooks) {
hook.stop();
}
FileSystem.closeAllForUGI(childUGI);
}
}
Expand Down
Loading

0 comments on commit 1084699

Please sign in to comment.