diff --git a/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/MetadataServiceMain.java b/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/MetadataServiceMain.java index b145d9e50b18..8ef580633609 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/MetadataServiceMain.java +++ b/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/MetadataServiceMain.java @@ -59,15 +59,19 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.twill.zookeeper.ZKClientService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The main class to run metadata service. Also, the dataset op executor is running this process as * well. */ public class MetadataServiceMain extends AbstractServiceMain { + private static final Logger LOG = LoggerFactory.getLogger(MetadataServiceMain.class); + private CConfiguration cConf; /** - * Main entry point + * Main entry point. */ public static void main(String[] args) throws Exception { main(MetadataServiceMain.class, args); @@ -76,6 +80,7 @@ public static void main(String[] args) throws Exception { @Override protected List getServiceModules(MasterEnvironment masterEnv, EnvironmentOptions options, CConfiguration cConf) { + this.cConf = cConf; return Arrays.asList( new MessagingServiceModule(cConf), new NamespaceQueryAdminModule(), @@ -113,7 +118,15 @@ protected void addServices(Injector injector, List services, EnvironmentOptions options) { services.add(injector.getInstance(MetadataService.class)); services.add(injector.getInstance(MetadataSubscriberService.class)); - services.add(injector.getInstance(MetadataConsumerSubscriberService.class)); + + if (cConf != null && cConf.getStringCollection( + Constants.MetadataConsumer.METADATA_CONSUMER_EXTENSIONS_ENABLED_LIST).isEmpty()) { + LOG.info("Skipping enabling MetadataConsumerSubscriberService, " + + "no metadata consumer extensions are enabled."); + } else { + services.add(injector.getInstance(MetadataConsumerSubscriberService.class)); + } + Binding zkBinding = injector.getExistingBinding( Key.get(ZKClientService.class)); if (zkBinding != null) {