Skip to content

Commit

Permalink
CDAP-20500 add launch mode to dataproc provisioners
Browse files Browse the repository at this point in the history
Added the concept of a launch mode for runtime jobs.
When set to cluster mode, the workflow driver/spark client will
be run in a separate container, which is the previous behavior.
When set to client mode, the workflow driver/spark client will
be run within the runtime job, reducing the overhead required
for the job.

Also refactor DefaultRuntimeJob to re-use the existing
DistributedProgramContainerModule to setup most bindings.
  • Loading branch information
albertshau committed Apr 7, 2023
1 parent 898bcdc commit 8f67079
Show file tree
Hide file tree
Showing 20 changed files with 304 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,9 @@ public final class ProgramOptionConstants {
* This is needed for running tethered programs
*/
public static final String PROGRAM_RESOURCE_URI = "programResourceUri";

/**
* Option for the {@link io.cdap.cdap.runtime.spi.runtimejob.LaunchMode} for the run.
*/
public static final String LAUNCH_MODE = "launchMode";
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.LocalFile;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillPreparer;
import org.apache.twill.api.TwillRunnable;
Expand Down Expand Up @@ -237,7 +238,9 @@ private RuntimeJobInfo createRuntimeJobInfo(
}
}

ResourceSpecification resourceSpecification = runtimeSpec.getResourceSpecification();
return new DefaultRuntimeJobInfo(
getProgramRunId(), resultingFiles, jvmProperties, runtimeJobArguments);
getProgramRunId(), resultingFiles, jvmProperties, runtimeJobArguments,
resourceSpecification.getVirtualCores(), resourceSpecification.getMemorySize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Names;
import com.google.inject.util.Modules;
import io.cdap.cdap.api.app.ApplicationSpecification;
import io.cdap.cdap.api.artifact.ApplicationClass;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.deploy.ConfigResponse;
import io.cdap.cdap.app.deploy.Configurator;
import io.cdap.cdap.app.guice.ClusterMode;
import io.cdap.cdap.app.guice.DefaultProgramRunnerFactory;
import io.cdap.cdap.app.guice.RemoteExecutionDiscoveryModule;
import io.cdap.cdap.app.guice.DistributedProgramContainerModule;
import io.cdap.cdap.app.program.Program;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.app.program.Programs;
Expand All @@ -50,12 +51,11 @@
import io.cdap.cdap.app.runtime.ProgramRunner;
import io.cdap.cdap.app.runtime.ProgramRunnerFactory;
import io.cdap.cdap.app.runtime.ProgramRuntimeProvider;
import io.cdap.cdap.app.runtime.ProgramRuntimeProvider.Mode;
import io.cdap.cdap.app.runtime.ProgramStateWriter;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.discovery.ResolvingDiscoverable;
import io.cdap.cdap.common.guice.ConfigModule;
import io.cdap.cdap.common.guice.IOModule;
import io.cdap.cdap.common.io.Locations;
import io.cdap.cdap.common.lang.jar.BundleJarUtil;
import io.cdap.cdap.common.lang.jar.ClassLoaderFolder;
Expand All @@ -71,9 +71,6 @@
import io.cdap.cdap.internal.app.deploy.pipeline.AppDeploymentInfo;
import io.cdap.cdap.internal.app.deploy.pipeline.AppDeploymentRuntimeInfo;
import io.cdap.cdap.internal.app.deploy.pipeline.AppSpecInfo;
import io.cdap.cdap.internal.app.program.MessagingProgramStatePublisher;
import io.cdap.cdap.internal.app.program.MessagingProgramStateWriter;
import io.cdap.cdap.internal.app.program.ProgramStatePublisher;
import io.cdap.cdap.internal.app.runtime.AbstractListener;
import io.cdap.cdap.internal.app.runtime.BasicArguments;
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
Expand All @@ -86,6 +83,7 @@
import io.cdap.cdap.internal.app.runtime.artifact.RemoteArtifactRepository;
import io.cdap.cdap.internal.app.runtime.artifact.RemoteArtifactRepositoryReader;
import io.cdap.cdap.internal.app.runtime.artifact.RemoteIsolatedPluginFinder;
import io.cdap.cdap.internal.app.runtime.batch.MapReduceProgramRunner;
import io.cdap.cdap.internal.app.runtime.codec.ArgumentsCodec;
import io.cdap.cdap.internal.app.runtime.codec.ProgramOptionsCodec;
import io.cdap.cdap.internal.app.runtime.distributed.DistributedMapReduceProgramRunner;
Expand All @@ -96,27 +94,23 @@
import io.cdap.cdap.internal.app.runtime.monitor.RuntimeMonitors;
import io.cdap.cdap.internal.app.runtime.monitor.ServiceSocksProxyInfo;
import io.cdap.cdap.internal.app.runtime.monitor.TrafficRelayServer;
import io.cdap.cdap.internal.app.runtime.workflow.WorkflowProgramRunner;
import io.cdap.cdap.internal.profile.ProfileMetricService;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.appender.loader.LogAppenderLoaderService;
import io.cdap.cdap.logging.context.LoggingContextHelper;
import io.cdap.cdap.logging.guice.TMSLogAppenderModule;
import io.cdap.cdap.messaging.MessagingService;
import io.cdap.cdap.messaging.guice.MessagingServerRuntimeModule;
import io.cdap.cdap.messaging.server.MessagingHttpService;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProfileId;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.runtime.spi.RuntimeMonitorType;
import io.cdap.cdap.runtime.spi.provisioner.Cluster;
import io.cdap.cdap.runtime.spi.runtimejob.LaunchMode;
import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJob;
import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobEnvironment;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule;
import io.cdap.cdap.security.impersonation.CurrentUGIProvider;
import io.cdap.cdap.security.impersonation.UGIProvider;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
Expand Down Expand Up @@ -144,10 +138,12 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.ServiceAnnouncer;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.ServiceListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -185,12 +181,22 @@ public void run(RuntimeJobEnvironment runtimeJobEnv) throws Exception {
ProgramOptions programOpts = readJsonFile(
new File(DistributedProgramRunner.PROGRAM_OPTIONS_FILE_NAME),
ProgramOptions.class);

Map<String, String> enhancedSystemArgs = new HashMap<>(programOpts.getArguments().asMap());
LaunchMode launchMode = runtimeJobEnv.getLaunchMode();
enhancedSystemArgs.put(ProgramOptionConstants.LAUNCH_MODE, launchMode.name());
// in client mode, need to add host to the system arguments
if (launchMode == LaunchMode.CLIENT) {
enhancedSystemArgs.put(ProgramOptionConstants.HOST,
InetAddress.getLocalHost().getCanonicalHostName());
}
Arguments systemArgs = new BasicArguments(enhancedSystemArgs);
programOpts = new SimpleProgramOptions(programOpts.getProgramId(),
systemArgs, programOpts.getUserArguments(), programOpts.isDebug());
ProgramRunId programRunId = programOpts.getProgramId()
.run(ProgramRunners.getRunId(programOpts));
ProgramId programId = programRunId.getParent();

Arguments systemArgs = programOpts.getArguments();

// Setup logging context for the program
LoggingContextAccessor.setLoggingContext(
LoggingContextHelper.getLoggingContextWithRunId(programRunId,
Expand Down Expand Up @@ -514,58 +520,67 @@ List<Module> createModules(RuntimeJobEnvironment runtimeJobEnv, CConfiguration c
ProgramRunId programRunId,
ProgramOptions programOpts) {
List<Module> modules = new ArrayList<>();
modules.add(new ConfigModule(cConf));

RuntimeMonitorType runtimeMonitorType = SystemArguments.getRuntimeMonitorType(cConf,
programOpts);
modules.add(RuntimeMonitors.getRemoteAuthenticatorModule(runtimeMonitorType, programOpts));
// doesn't make sense to run a service program in isolated mode
ServiceAnnouncer serviceAnnouncer = new ServiceAnnouncer() {
@Override
public Cancellable announce(String s, int i) {
throw new UnsupportedOperationException("Services are not supported in remote jobs");
}

modules.add(new IOModule());
modules.add(new TMSLogAppenderModule());
modules.add(new RemoteExecutionDiscoveryModule());
modules.add(new AuthorizationEnforcementModule().getDistributedModules());
modules.add(new AuthenticationContextModules().getProgramContainerModule(cConf));
modules.add(new MetricsClientRuntimeModule().getDistributedModules());
modules.add(new MessagingServerRuntimeModule().getStandaloneModules());
@Override
public Cancellable announce(String s, int i, byte[] bytes) {
throw new UnsupportedOperationException("Services are not supported in remote jobs");
}
};
// module for running programs, except with MessagingServer bindings
// instead of MessagingClient. This is because this class runs a local MessagingService
// that the actual program will write to, while the RuntimeClientService relays the messages
// back to the actual system messaging service.
Module programModule = Modules.override(new DistributedProgramContainerModule(
cConf, new Configuration(), programRunId, programOpts, serviceAnnouncer))
.with(new MessagingServerRuntimeModule().getStandaloneModules());
modules.add(programModule);

modules.add(new AbstractModule() {
@Override
protected void configure() {
bind(ClusterMode.class).toInstance(ClusterMode.ISOLATED);
bind(UGIProvider.class).to(CurrentUGIProvider.class).in(Scopes.SINGLETON);

// Bindings from the environment
// Bindings from the environment for program runners
bind(TwillRunner.class).annotatedWith(Constants.AppFabric.ProgramRunner.class)
.toInstance(runtimeJobEnv.getTwillRunner());
bind(LocationFactory.class).toInstance(runtimeJobEnv.getLocationFactory());

MapBinder<ProgramType, ProgramRunner> defaultProgramRunnerBinder = MapBinder.newMapBinder(
binder(), ProgramType.class, ProgramRunner.class);

bind(ProgramRuntimeProvider.Mode.class).toInstance(ProgramRuntimeProvider.Mode.DISTRIBUTED);
if (runtimeJobEnv.getLaunchMode() == LaunchMode.CLIENT) {
bind(ProgramRuntimeProvider.Mode.class).toInstance(Mode.LOCAL);
defaultProgramRunnerBinder.addBinding(ProgramType.MAPREDUCE)
.to(MapReduceProgramRunner.class);
defaultProgramRunnerBinder.addBinding(ProgramType.WORKFLOW)
.to(WorkflowProgramRunner.class);
} else {
bind(ProgramRuntimeProvider.Mode.class).toInstance(Mode.DISTRIBUTED);
defaultProgramRunnerBinder.addBinding(ProgramType.MAPREDUCE)
.to(DistributedMapReduceProgramRunner.class);
defaultProgramRunnerBinder.addBinding(ProgramType.WORKFLOW)
.to(DistributedWorkflowProgramRunner.class);
defaultProgramRunnerBinder.addBinding(ProgramType.WORKER)
.to(DistributedWorkerProgramRunner.class);
}
bind(ProgramRunnerFactory.class).annotatedWith(Constants.AppFabric.ProgramRunner.class)
.to(DefaultProgramRunnerFactory.class).in(Scopes.SINGLETON);
bind(ProgramStatePublisher.class).to(MessagingProgramStatePublisher.class)
.in(Scopes.SINGLETON);
bind(ProgramStateWriter.class).to(MessagingProgramStateWriter.class).in(Scopes.SINGLETON);

defaultProgramRunnerBinder.addBinding(ProgramType.MAPREDUCE)
.to(DistributedMapReduceProgramRunner.class);
defaultProgramRunnerBinder.addBinding(ProgramType.WORKFLOW)
.to(DistributedWorkflowProgramRunner.class);
defaultProgramRunnerBinder.addBinding(ProgramType.WORKER)
.to(DistributedWorkerProgramRunner.class);

bind(ProgramRunnerFactory.class).to(DefaultProgramRunnerFactory.class).in(Scopes.SINGLETON);

bind(ProgramRunId.class).toInstance(programRunId);
bind(RuntimeMonitorType.class).toInstance(runtimeMonitorType);

// needed for app-spec regeneration
install(
new FactoryModuleBuilder()
.implement(Configurator.class, InMemoryConfigurator.class)
.build(ConfiguratorFactory.class)
);

bind(String.class)
.annotatedWith(Names.named(RemoteIsolatedPluginFinder.ISOLATED_PLUGIN_DIR))
.toInstance(programOpts.getArguments().getOption(ProgramOptionConstants.PLUGIN_DIR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,12 @@ public class DefaultRuntimeJobInfo implements RuntimeJobInfo {
private final Map<String, String> jvmProperties;

private final Map<String, String> arguments;

private final int virtualCores;
private final int memoryMb;

public DefaultRuntimeJobInfo(ProgramRunId programRunId, Collection<? extends LocalFile> files,
Map<String, String> jvmProperties) {
this(programRunId, files, jvmProperties, Collections.emptyMap());
}

public DefaultRuntimeJobInfo(ProgramRunId programRunId, Collection<? extends LocalFile> files,
Map<String, String> jvmProperties, Map<String, String> arguments) {
Map<String, String> jvmProperties, Map<String, String> arguments,
int virtualCores, int memoryMb) {
this.info = new ProgramRunInfo.Builder()
.setNamespace(programRunId.getNamespace())
.setApplication(programRunId.getApplication())
Expand All @@ -55,6 +52,8 @@ public DefaultRuntimeJobInfo(ProgramRunId programRunId, Collection<? extends Loc
this.files = Collections.unmodifiableCollection(new ArrayList<>(files));
this.jvmProperties = Collections.unmodifiableMap(new LinkedHashMap<>(jvmProperties));
this.arguments = Collections.unmodifiableMap(arguments);
this.virtualCores = virtualCores;
this.memoryMb = memoryMb;
}

@Override
Expand All @@ -81,4 +80,14 @@ public Map<String, String> getJvmProperties() {
public Map<String, String> getArguments() {
return arguments;
}

@Override
public int getVirtualCores() {
return virtualCores;
}

@Override
public int getMemoryMb() {
return memoryMb;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@
import com.google.inject.Guice;
import com.google.inject.Injector;
import io.cdap.cdap.app.runtime.Arguments;
import io.cdap.cdap.app.runtime.ProgramRunner;
import io.cdap.cdap.app.runtime.ProgramRunnerFactory;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.twill.NoopTwillRunnerService;
import io.cdap.cdap.internal.app.deploy.ConfiguratorFactory;
import io.cdap.cdap.internal.app.runtime.BasicArguments;
import io.cdap.cdap.internal.app.runtime.SimpleProgramOptions;
import io.cdap.cdap.internal.app.runtime.SystemArguments;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.runtime.spi.provisioner.Cluster;
import io.cdap.cdap.runtime.spi.provisioner.ClusterStatus;
import io.cdap.cdap.runtime.spi.provisioner.Node;
import io.cdap.cdap.runtime.spi.runtimejob.LaunchMode;
import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobEnvironment;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.twill.api.TwillRunner;
Expand All @@ -51,39 +57,61 @@ public class DefaultRuntimeJobTest {
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

@Test
public void testInjector() throws Exception {
public void testClusterModeInjector() throws Exception {
testInjector(LaunchMode.CLUSTER);
}

@Test
public void testClientModeInjector() throws Exception {
testInjector(LaunchMode.CLIENT);
}

private void testInjector(LaunchMode launchMode) throws IOException {
CConfiguration cConf = CConfiguration.create();
cConf.set(Constants.CFG_LOCAL_DATA_DIR, TEMP_FOLDER.newFolder().toString());

LocationFactory locationFactory = new LocalLocationFactory(TEMP_FOLDER.newFile());

DefaultRuntimeJob defaultRuntimeJob = new DefaultRuntimeJob();
Arguments systemArgs = new BasicArguments(Collections.singletonMap(SystemArguments.PROFILE_NAME, "test"));
Node node = new Node("test", Node.Type.MASTER, "127.0.0.1", System.currentTimeMillis(), Collections.emptyMap());
Cluster cluster = new Cluster("test", ClusterStatus.RUNNING, Collections.singleton(node), Collections.emptyMap());
ProgramRunId programRunId = NamespaceId.DEFAULT.app("app").workflow("workflow").run(RunIds.generate());
SimpleProgramOptions programOpts = new SimpleProgramOptions(programRunId.getParent(), systemArgs,
new BasicArguments());
Arguments systemArgs =
new BasicArguments(Collections.singletonMap(SystemArguments.PROFILE_NAME, "test"));
Node node = new Node("test", Node.Type.MASTER, "127.0.0.1", System.currentTimeMillis(),
Collections.emptyMap());
Cluster cluster = new Cluster("test", ClusterStatus.RUNNING, Collections.singleton(node),
Collections.emptyMap());
ProgramRunId programRunId =
NamespaceId.DEFAULT.app("app").workflow("workflow").run(RunIds.generate());
SimpleProgramOptions programOpts = new SimpleProgramOptions(programRunId.getParent(),
systemArgs, new BasicArguments());

Injector injector = Guice.createInjector(defaultRuntimeJob.createModules(
new RuntimeJobEnvironment() {

Injector injector = Guice.createInjector(defaultRuntimeJob.createModules(new RuntimeJobEnvironment() {
@Override
public LocationFactory getLocationFactory() {
return locationFactory;
}

@Override
public LocationFactory getLocationFactory() {
return locationFactory;
}
@Override
public TwillRunner getTwillRunner() {
return new NoopTwillRunnerService();
}

@Override
public TwillRunner getTwillRunner() {
return new NoopTwillRunnerService();
}
@Override
public Map<String, String> getProperties() {
return Collections.emptyMap();
}

@Override
public Map<String, String> getProperties() {
return Collections.emptyMap();
}
}, cConf, programRunId, programOpts));
@Override
public LaunchMode getLaunchMode() {
return launchMode;
}
}, cConf, programRunId, programOpts));

injector.getInstance(LogAppenderInitializer.class);
defaultRuntimeJob.createCoreServices(injector, systemArgs, cluster);
injector.getInstance(ConfiguratorFactory.class);
ProgramRunnerFactory programRunnerFactory = injector.getInstance(ProgramRunnerFactory.class);
programRunnerFactory.create(ProgramType.WORKFLOW);
}
}
Loading

0 comments on commit 8f67079

Please sign in to comment.