From 52c5cc88bbabfa804f30c027bcee9d99aec6e0f4 Mon Sep 17 00:00:00 2001 From: qqeasonchen Date: Mon, 3 Aug 2020 11:57:09 +0800 Subject: [PATCH] merge change list from eventmesh --- build.gradle | 150 +++++---- defibus-broker/conf/checkstyle.xml | 292 +++++++++++++----- .../broker/client/AdjustQueueNumStrategy.java | 26 +- .../broker/topic/DeFiTopicConfigManager.java | 2 +- .../MessageRedirectManagerTest.java | 1 - defibus-client/conf/checkstyle.xml | 292 +++++++++++++----- .../client/common/DeFiBusClientConfig.java | 11 + .../impl/producer/DeFiBusProducerImpl.java | 3 +- .../producer/HealthyMessageQueueSelector.java | 62 +++- .../defibus/consumer/DeFiBusPushConsumer.java | 15 +- .../defibus/producer/DeFiBusProducer.java | 5 + .../client/impl/DeFiBusClientManagerTest.java | 7 - .../factory/DeFiBusClientInstanceTest.java | 24 -- .../client/producer/DeFiBusProducerTest.java | 53 ---- .../HealthyMessageQueueSelectorTest.java | 10 +- defibus-common/conf/checkstyle.xml | 292 +++++++++++++----- .../webank/defibus/common/DeFiBusVersion.java | 6 +- defibus-examples/conf/checkstyle.xml | 292 +++++++++++++----- .../defibus/examples/rpc/RequestProducer.java | 11 +- .../examples/rpc/ResponseConsumer.java | 37 +-- .../rpc/ResponseConsumerAutoReply.java | 23 +- .../defibus/examples/simple/PubProducer.java | 9 +- .../defibus/examples/simple/SubConsumer.java | 19 +- defibus-namesrv/conf/checkstyle.xml | 292 +++++++++++++----- .../namesrv/DeFiBusNamesrvStartup.java | 14 + defibus-tools/conf/checkstyle.xml | 292 +++++++++++++----- .../defibus/tools/admin/DeFiBusAdminExt.java | 27 ++ .../tools/command/DeFiBusAdminStartup.java | 219 ++++++++++++- .../topic/UpdateTopicPermSubCommand.java | 207 +++++++++++++ gradle.properties | 2 +- script/broker_watchdog.sh | 17 + script/namesrv_watchdog.sh | 16 + script/runadmin.cmd | 22 +- script/runadmin.sh | 24 +- script/runbroker.cmd | 22 +- script/runbroker.sh | 24 +- script/runbroker_cloud.sh | 23 +- script/runnamesrv.cmd | 22 +- script/runnamesrv.sh | 23 +- script/stop.cmd | 22 +- script/stop.sh | 24 +- settings.gradle | 1 + 42 files changed, 2135 insertions(+), 800 deletions(-) create mode 100644 defibus-tools/src/main/java/cn/webank/defibus/tools/admin/DeFiBusAdminExt.java create mode 100644 defibus-tools/src/main/java/cn/webank/defibus/tools/command/topic/UpdateTopicPermSubCommand.java diff --git a/build.gradle b/build.gradle index ffcf456..7e77ddd 100644 --- a/build.gradle +++ b/build.gradle @@ -22,22 +22,25 @@ buildscript { maven { url "https://maven.aliyun.com/repository/public" } + maven { url "https://plugins.gradle.org/m2/" } + } dependencies { - classpath("net.sourceforge.pmd:pmd-java:5.4.1") - classpath("com.puppycrawl.tools:checkstyle:6.16.1") - classpath("com.google.code.findbugs:findbugs:3.0.1") - classpath("gradle.plugin.com.github.kt3k.coveralls:coveralls-gradle-plugin:2.8.4") + //classpath("net.sourceforge.pmd:pmd-java:5.4.1") + //classpath("com.puppycrawl.tools:checkstyle:6.16.1") + classpath("gradle.plugin.com.github.spotbugs.snom:spotbugs-gradle-plugin:4.0.7") + classpath('com.github.spotbugs:spotbugs:4.0.0') } } allprojects { apply plugin: 'java' + clean.doFirst { delete 'build' delete 'dist' @@ -49,7 +52,7 @@ allprojects { } } -task tar(overwrite: true, type: Tar) { +task tar(type: Tar) { extension = 'tar.gz' compression = Compression.GZIP archiveName = project.name + '_' + project.version + '.' + extension @@ -59,7 +62,7 @@ task tar(overwrite: true, type: Tar) { } } -task zip(overwrite: true, type: Zip) { +task zip(type: Zip) { extension = 'zip' archiveName = project.name + '.' + project.version + '.' + extension destinationDir = new File(projectDir, 'build') @@ -69,7 +72,8 @@ task zip(overwrite: true, type: Zip) { } subprojects { - apply plugin: "java" + + apply plugin: "maven" apply plugin: "eclipse" apply plugin: "idea" @@ -77,32 +81,66 @@ subprojects { apply plugin: "jacoco" apply plugin: "checkstyle" apply plugin: "pmd" - apply plugin: "findbugs" - apply plugin: "com.github.kt3k.coveralls" + apply plugin: 'com.github.spotbugs' [compileJava, compileTestJava, javadoc]*.options*.encoding = 'UTF-8' compileJava.options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation" jacoco { - toolVersion = "0.7.7.201606060606" + toolVersion = "0.8.5" reportsDir = file("$buildDir/reports/jacoco") } + jacocoTestReport { +// sourceSets sourceSets.main +// executionData files("$buildDir/jacoco/jacocoTest.exec") + + reports { + xml.enabled false + csv.enabled false + html.destination file("${buildDir}/reports/jacoco") + } + } + checkstyle { - checkstyle.toolVersion = "6.19" + toolVersion = "8.32" ignoreFailures = true sourceSets = [sourceSets.main] configFile = 'conf/checkstyle.xml' as File + showViolations true + } + + tasks.withType(Checkstyle) { + reports { + xml.enabled false + html.enabled true + } } - findbugs { + spotbugs { + toolVersion = '4.0.2' ignoreFailures = true - findbugsTest.enabled = false - sourceSets = [sourceSets.main] + effort = "default" + reportLevel = "default" + showProgress = true } - tasks.withType(FindBugs) { +// tasks.withType(com.github.spotbugs.SpotBugsTask) { +// sourceDirs = [sourceSets.main] +// reports { +// xml.enabled = false +// html.enabled = true +// } +// } + spotbugsMain { + reports { + xml.enabled = false + html.enabled = true + } + } + + spotbugsTest { reports { xml.enabled = false html.enabled = true @@ -116,74 +154,47 @@ subprojects { } } + pmd { + consoleOutput = true + toolVersion = "6.23.0" + rulePriority = 5 + ruleSets = ["category/java/errorprone.xml", "category/java/bestpractices.xml"] ignoreFailures = true - pmdTest.enabled = false - sourceSets = [sourceSets.main] - ruleSets = [ - 'java-basic', - 'java-braces', - 'java-clone', - 'java-codesize', - 'java-comments', - 'java-controversial', - 'java-coupling', - 'java-design', - 'java-empty', - 'java-finalizers', - 'java-imports', - 'java-optimizations', - 'java-strictexception', - 'java-strings', - 'java-typeresolution', - 'java-unnecessary', - 'java-unusedcode' - ] } - task jacocoAllTestReport(type: JacocoReport, dependsOn: ["test"]) { - sourceSets sourceSets.main - executionData files("$buildDir/jacoco/test.exec") - reports { - html.enabled = true // human readable - xml.enabled = true // required by coveralls - } - } - coveralls{ - jacocoReportPath="${buildDir}/reports/jacoco/jacocoAllTestReport/jacocoAllTestReport.xml" - } List junit = [ - "junit:junit:4.12" + "junit:junit:4.12" ] List apache_commons = [ - "org.apache.commons:commons-collections4:4.1", - "commons-beanutils:commons-beanutils:1.9.3", - "org.apache.commons:commons-lang3:3.6", - "commons-codec:commons-codec:1.10" + "org.apache.commons:commons-collections4:4.1", + "commons-beanutils:commons-beanutils:1.9.3", + "org.apache.commons:commons-lang3:3.6", + "commons-codec:commons-codec:1.10" ] List logback = [ - "org.slf4j:slf4j-api:1.7.25" + "org.slf4j:slf4j-api:1.7.25" ] List guava = [ - "com.google.guava:guava:20.0" + "com.google.guava:guava:20.0" ] List fastjson = [ - "com.alibaba:fastjson:1.2.61" + "com.alibaba:fastjson:1.2.71" ] List common_io = [ - "commons-io:commons-io:2.4" + "commons-io:commons-io:2.4" ] List assertj = [ - "org.assertj:assertj-core:2.6.0" + "org.assertj:assertj-core:2.6.0" ] List mock = [ @@ -202,12 +213,12 @@ subprojects { jar { manifest { attributes("Specification-Version": project.version, - "Specification-Vendor": "WeBank, Inc.", - "Specification-Title": project.name, - "Implementation-Version": project.version, - "Implementation-Vendor": "WeBank, Inc.", - "Implementation-Title": project.name, - "Build-Jdk": project.findProperty("jdk") + "Specification-Vendor": "WeBank, Inc.", + "Specification-Title": project.name, + "Implementation-Version": project.version, + "Implementation-Vendor": "WeBank, Inc.", + "Implementation-Title": project.name, + "Build-Jdk": project.findProperty("jdk") ) } } @@ -286,5 +297,18 @@ subprojects { resolutionStrategy.cacheDynamicVersionsFor 0, TimeUnit.SECONDS } + uploadArchives { + repositories { + mavenDeployer { + snapshotRepository(url: 'file://D:\\LocalRepo') { + authentication(userName: 'Your user name', password: 'Your password') + } + repository(url: 'file://D:\\LocalRepo') { + authentication(userName: 'Your user name', password: 'Your password') + } + } + } + } + } diff --git a/defibus-broker/conf/checkstyle.xml b/defibus-broker/conf/checkstyle.xml index ef3a533..3d02411 100644 --- a/defibus-broker/conf/checkstyle.xml +++ b/defibus-broker/conf/checkstyle.xml @@ -1,43 +1,59 @@ - + + + + Checkstyle configuration that checks the Google coding conventions from Google Java Style + that can be found at https://google.github.io/styleguide/javaguide.html - + Checkstyle is very configurable. Be sure to read the documentation at + http://checkstyle.org (or in your downloaded distribution). - - - - + To completely disable a check, just comment it out or delete it from the file. + To suppress certain violations please review suppression filters. + + Authors: Max Vetrenko, Ruslan Diachenko, Roman Ivanov. + --> + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + value="\\u00(09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/> + @@ -46,29 +62,63 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - + + + - + + + + - - + - + + + + + + value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, + INSTANCE_INIT, ANNOTATION_DEF, ENUM_DEF"/> + + + + + + - + + value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/> + @@ -78,101 +128,187 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. + - + + - + + + + + + + + + + + + + + + + + + + - + - + + - + + - + + + + + + + + + + - - - + + - + - + + + + + - - - - + + + + + + + + + - - - + + - + + + + + + + + + + + + - + value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, + LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/> - + + + + - - - + + + - + - - - - + + + + + + + + - + @@ -180,18 +316,14 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - - - - - - + + - - + + + + - - - diff --git a/defibus-broker/src/main/java/cn/webank/defibus/broker/client/AdjustQueueNumStrategy.java b/defibus-broker/src/main/java/cn/webank/defibus/broker/client/AdjustQueueNumStrategy.java index 360294e..ed89963 100644 --- a/defibus-broker/src/main/java/cn/webank/defibus/broker/client/AdjustQueueNumStrategy.java +++ b/defibus-broker/src/main/java/cn/webank/defibus/broker/client/AdjustQueueNumStrategy.java @@ -17,9 +17,6 @@ package cn.webank.defibus.broker.client; -import cn.webank.defibus.broker.DeFiBrokerController; -import cn.webank.defibus.common.DeFiBusConstant; -import io.netty.channel.Channel; import java.util.List; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -27,6 +24,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; + +import io.netty.channel.Channel; + import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -36,6 +36,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import cn.webank.defibus.broker.DeFiBrokerController; +import cn.webank.defibus.common.DeFiBusConstant; + public class AdjustQueueNumStrategy { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final DeFiBrokerController deFiBrokerController; @@ -81,7 +84,8 @@ private void adjustQueueNumByConsumerCount(String topic, AdjustType scaleType) { case DECREASE_QUEUE_NUM: adjustWriteQueueNumByConsumerCount(topic, 0, scaleType); - long delayTimeMillis = deFiBrokerController.getDeFiBusBrokerConfig().getScaleQueueSizeDelayTimeMinute() * 60 * 1000; + long delayTimeMinutes = Math.min(deFiBrokerController.getDeFiBusBrokerConfig().getScaleQueueSizeDelayTimeMinute(), 10); + long delayTimeMillis = delayTimeMinutes * 60 * 1000; adjustReadQueueNumByConsumerCount(topic, delayTimeMillis, scaleType); break; } @@ -95,7 +99,7 @@ private void adjustReadQueueNumByConsumerCount(String topic, long delayMills, Ad public void run() { TopicConfig topicConfig = deFiBrokerController.getTopicConfigManager().getTopicConfigTable().get(topic); if (topicConfig != null) { - synchronized (topicConfig.getTopicName()) { + synchronized (topicConfig) { //query again to ensure it's newest topicConfig = deFiBrokerController.getTopicConfigManager().getTopicConfigTable().get(topic); @@ -114,7 +118,7 @@ public void run() { log.info("try adjust read queue size to {} for [{}], prev: {}, {}", adjustReadQueueSize, topic, topicConfig.getReadQueueNums(), mode); if (adjustReadQueueSize < topicConfig.getWriteQueueNums()) { log.info("adjust read queues to {} for [{}] fail. read queue size can't less than write queue size[{}]. {}", - adjustReadQueueSize, topic, topicConfig.getWriteQueueNums(), mode); + adjustReadQueueSize, topic, topicConfig.getWriteQueueNums(), mode); return; } boolean canAdjustReadQueueSize = isCanAdjustReadQueueSize(topic, adjustReadQueueSize); @@ -158,7 +162,7 @@ private void adjustWriteQueueNumByConsumerCount(String topic, long delayMills, A public void run() { TopicConfig topicConfig = deFiBrokerController.getTopicConfigManager().getTopicConfigTable().get(topic); if (topicConfig != null) { - synchronized (topicConfig.getTopicName()) { + synchronized (topicConfig) { //query again to ensure it's newest topicConfig = deFiBrokerController.getTopicConfigManager().getTopicConfigTable().get(topic); @@ -182,7 +186,7 @@ public void run() { notifyWhenTopicConfigChange(topic); } else { log.info("adjust write queues to {} for [{}] fail. target write queue size can't less than 0 or greater than read queue size[{}]. mode: {}", - adjustWriteQueueSize, topic, topicConfig.getReadQueueNums(), mode); + adjustWriteQueueSize, topic, topicConfig.getReadQueueNums(), mode); } } else { log.info("no need to adjust write queue size for [{}]. now [w:{}/r:{}]. {}", topic, topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums(), mode); @@ -243,10 +247,10 @@ private long nearbyClients(Set cidList) { public boolean test(String cid) { String[] cidArr = cid.split(DeFiBusConstant.INSTANCE_NAME_SEPERATER); if (cidArr.length > 2) { - String idc = cidArr[cidArr.length-1]; + String idc = cidArr[cidArr.length - 1]; String clusterName = deFiBrokerController.getBrokerConfig().getBrokerClusterName(); if (clusterName.toUpperCase().startsWith(idc) || - idc.startsWith(clusterName.toUpperCase())) { + idc.startsWith(clusterName.toUpperCase())) { return true; } } @@ -328,7 +332,7 @@ private boolean isAllMessageConsumed(String topic, Set groups, int queue long ackOffset = deFiBrokerController.getConsumeQueueManager().queryOffset(group, topic, queueId); if (ackOffset < maxOffset) { log.info("not finish consume message for topic: {} by group : {}, queueId: {}, ackOffset: {}, maxOffset: {}", - topic, group, queueId, ackOffset, maxOffset); + topic, group, queueId, ackOffset, maxOffset); return false; } } diff --git a/defibus-broker/src/main/java/cn/webank/defibus/broker/topic/DeFiTopicConfigManager.java b/defibus-broker/src/main/java/cn/webank/defibus/broker/topic/DeFiTopicConfigManager.java index b895cbd..52e03ee 100644 --- a/defibus-broker/src/main/java/cn/webank/defibus/broker/topic/DeFiTopicConfigManager.java +++ b/defibus-broker/src/main/java/cn/webank/defibus/broker/topic/DeFiTopicConfigManager.java @@ -189,7 +189,7 @@ public void decode(String jsonString) { public String encode(final boolean prettyFormat) { //check consistency of TopicConfigManager and DeFiTopicConfigManager boolean isChanged = false; - for (Map.Entry entry : this.brokerController.getTopicConfigManager().getTopicConfigTable().entrySet()) { + for (Entry entry : this.brokerController.getTopicConfigManager().getTopicConfigTable().entrySet()) { String topic = entry.getKey(); if (this.extTopicConfigTable.get(topic) == null) { this.extTopicConfigTable.put(topic, new DeFiBusTopicConfig(topic)); diff --git a/defibus-broker/src/test/java/cn/webank/defibus/broker/consumequeue/MessageRedirectManagerTest.java b/defibus-broker/src/test/java/cn/webank/defibus/broker/consumequeue/MessageRedirectManagerTest.java index 2bf037a..1884b27 100644 --- a/defibus-broker/src/test/java/cn/webank/defibus/broker/consumequeue/MessageRedirectManagerTest.java +++ b/defibus-broker/src/test/java/cn/webank/defibus/broker/consumequeue/MessageRedirectManagerTest.java @@ -18,7 +18,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.Before; import org.junit.Test; -import org.mockito.Spy; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.spy; diff --git a/defibus-client/conf/checkstyle.xml b/defibus-client/conf/checkstyle.xml index ef3a533..3d02411 100644 --- a/defibus-client/conf/checkstyle.xml +++ b/defibus-client/conf/checkstyle.xml @@ -1,43 +1,59 @@ - + + + + Checkstyle configuration that checks the Google coding conventions from Google Java Style + that can be found at https://google.github.io/styleguide/javaguide.html - + Checkstyle is very configurable. Be sure to read the documentation at + http://checkstyle.org (or in your downloaded distribution). - - - - + To completely disable a check, just comment it out or delete it from the file. + To suppress certain violations please review suppression filters. + + Authors: Max Vetrenko, Ruslan Diachenko, Roman Ivanov. + --> + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + value="\\u00(09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/> + @@ -46,29 +62,63 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - + + + - + + + + - - + - + + + + + + value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, + INSTANCE_INIT, ANNOTATION_DEF, ENUM_DEF"/> + + + + + + - + + value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/> + @@ -78,101 +128,187 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. + - + + - + + + + + + + + + + + + + + + + + + + - + - + + - + + - + + + + + + + + + + - - - + + - + - + + + + + - - - - + + + + + + + + + - - - + + - + + + + + + + + + + + + - + value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, + LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/> - + + + + - - - + + + - + - - - - + + + + + + + + - + @@ -180,18 +316,14 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - - - - - - + + - - + + + + - - - diff --git a/defibus-client/src/main/java/cn/webank/defibus/client/common/DeFiBusClientConfig.java b/defibus-client/src/main/java/cn/webank/defibus/client/common/DeFiBusClientConfig.java index cb557ab..73e0353 100644 --- a/defibus-client/src/main/java/cn/webank/defibus/client/common/DeFiBusClientConfig.java +++ b/defibus-client/src/main/java/cn/webank/defibus/client/common/DeFiBusClientConfig.java @@ -58,6 +58,8 @@ public class DeFiBusClientConfig { private long pullTimeDelayMillsWhenFlowControl = 50; private long pullTimeDelayMillsWhenSuspend = 500; + private int minMqNumWhenSendLocal = 1; + public String getProducerGroup() { return producerGroup; } @@ -270,6 +272,14 @@ public void setPullTimeDelayMillsWhenSuspend(long pullTimeDelayMillsWhenSuspend) this.pullTimeDelayMillsWhenSuspend = pullTimeDelayMillsWhenSuspend; } + public int getMinMqNumWhenSendLocal() { + return minMqNumWhenSendLocal; + } + + public void setMinMqNumWhenSendLocal(int minMqNumWhenSendLocal) { + this.minMqNumWhenSendLocal = minMqNumWhenSendLocal; + } + @Override public String toString() { return "DeFiBusClientConfig{" + "producerGroup='" + producerGroup + '\'' + @@ -299,6 +309,7 @@ public void setPullTimeDelayMillsWhenSuspend(long pullTimeDelayMillsWhenSuspend) ", pullTimeDelayMillsWhenExcept=" + pullTimeDelayMillsWhenExcept + ", pullTimeDelayMillsWhenFlowControl=" + pullTimeDelayMillsWhenFlowControl + ", pullTimeDelayMillsWhenSuspend=" + pullTimeDelayMillsWhenSuspend + + ", minMqNumWhenSendLocal=" + minMqNumWhenSendLocal + '}'; } } diff --git a/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/DeFiBusProducerImpl.java b/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/DeFiBusProducerImpl.java index 5c85420..f5e7b01 100644 --- a/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/DeFiBusProducerImpl.java +++ b/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/DeFiBusProducerImpl.java @@ -70,7 +70,8 @@ public class DeFiBusProducerImpl { public DeFiBusProducerImpl(DeFiBusProducer deFiBusProducer, DeFiBusClientConfig deFiBusClientConfig, DeFiBusClientInstance deFiBusClientInstance) { this.deFiBusProducer = deFiBusProducer; - this.messageQueueSelector = new HealthyMessageQueueSelector(new MessageQueueHealthManager(deFiBusClientConfig.getQueueIsolateTimeMillis())); + this.messageQueueSelector = new HealthyMessageQueueSelector(new MessageQueueHealthManager(deFiBusClientConfig.getQueueIsolateTimeMillis()), + deFiBusClientConfig.getMinMqNumWhenSendLocal()); executorService = deFiBusClientInstance.getExecutorService(); scheduledExecutorService = deFiBusClientInstance.getScheduledExecutorService(); diff --git a/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/HealthyMessageQueueSelector.java b/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/HealthyMessageQueueSelector.java index c64fc00..d0f616b 100644 --- a/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/HealthyMessageQueueSelector.java +++ b/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/HealthyMessageQueueSelector.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.collections4.MapUtils; @@ -35,15 +36,15 @@ public class HealthyMessageQueueSelector implements MessageQueueSelector { private static final Logger LOGGER = LoggerFactory.getLogger(HealthyMessageQueueSelector.class); - private final AtomicInteger sendWhichQueue = new AtomicInteger(0); - private final AtomicInteger sendWhichLocalQueue = new AtomicInteger(0); - private final AtomicInteger sendWhichRemoteQueue = new AtomicInteger(0); + private final ConcurrentHashMap topicSendIndex = new ConcurrentHashMap<>(); private final MessageQueueHealthManager messageQueueHealthManager; + private int minMqCountWhenSendLocal = 1; private Map sendNearbyMapping = new HashMap<>(); private Set localBrokers = new HashSet(); - public HealthyMessageQueueSelector(MessageQueueHealthManager messageQueueHealthManager) { + public HealthyMessageQueueSelector(MessageQueueHealthManager messageQueueHealthManager, int minMqCountWhenSendLocal) { this.messageQueueHealthManager = messageQueueHealthManager; + this.minMqCountWhenSendLocal = minMqCountWhenSendLocal; } @Override @@ -61,10 +62,17 @@ public MessageQueue select(List mqs, Message msg, final Object sel if (pub2local) { List localMQs = new ArrayList<>(); List remoteMqs = new ArrayList<>(); - separateLocalAndRemoteMQs(mqs, localBrokers, localMQs, remoteMqs); + HashMap localBrokerMQCount = separateLocalAndRemoteMQs(mqs, localBrokers, localMQs, remoteMqs); + + for (String brokerName : localBrokerMQCount.keySet()) { + //if MQ num less than threshold, send msg to all broker + if (localBrokerMQCount.get(brokerName) <= minMqCountWhenSendLocal) { + localMQs.addAll(remoteMqs); + } + } //try select a mq from local idc first - MessageQueue candidate = selectMessageQueue(localMQs, sendWhichLocalQueue, lastOne, msg); + MessageQueue candidate = selectMessageQueue(localMQs, lastOne, msg); if (candidate != null) { ((AtomicReference) selectedResultRef).set(candidate); LOGGER.debug("select local mq [{}], {}", candidate.toString(), msg); @@ -72,7 +80,7 @@ public MessageQueue select(List mqs, Message msg, final Object sel } //try select a mq from other idc if cannot select one from local idc - candidate = selectMessageQueue(remoteMqs, sendWhichRemoteQueue, lastOne, msg); + candidate = selectMessageQueue(remoteMqs, lastOne, msg); if (candidate != null) { ((AtomicReference) selectedResultRef).set(candidate); LOGGER.debug("select remote mq [{}], {}", candidate.toString(), msg); @@ -80,7 +88,7 @@ public MessageQueue select(List mqs, Message msg, final Object sel } } else { //try select a mq from all mqs - MessageQueue candidate = selectMessageQueue(mqs, sendWhichQueue, lastOne, msg); + MessageQueue candidate = selectMessageQueue(mqs, lastOne, msg); if (candidate != null) { ((AtomicReference) selectedResultRef).set(candidate); LOGGER.debug("select global mq [{}], {}", candidate.toString(), msg); @@ -90,7 +98,8 @@ public MessageQueue select(List mqs, Message msg, final Object sel //try select a mq which is not isolated if no mq satisfy all limits for (int j = 0; j < mqs.size(); j++) { - int pos = Math.abs(sendWhichQueue.getAndIncrement()) % mqs.size(); + int index = this.getSendIndex(msg.getTopic()); + int pos = Math.abs(index) % mqs.size(); MessageQueue candidate = mqs.get(pos); if (isQueueHealthy(candidate)) { ((AtomicReference) selectedResultRef).set(candidate); @@ -102,7 +111,8 @@ public MessageQueue select(List mqs, Message msg, final Object sel //in case of retry, still try select a mq from another broker if all mq isolated if (lastOne != null) { for (int j = 0; j < mqs.size(); j++) { - int pos = Math.abs(sendWhichQueue.getAndIncrement()) % mqs.size(); + int index = this.getSendIndex(msg.getTopic()); + int pos = Math.abs(index) % mqs.size(); MessageQueue candidate = mqs.get(pos); if (!lastOne.getBrokerName().equals(candidate.getBrokerName())) { ((AtomicReference) selectedResultRef).set(candidate); @@ -113,7 +123,8 @@ public MessageQueue select(List mqs, Message msg, final Object sel } //select a mq from all mqs anyway if no mq satisfy any limits - int pos = Math.abs(sendWhichQueue.getAndIncrement()) % mqs.size(); + int index = this.getSendIndex(msg.getTopic()); + int pos = Math.abs(index) % mqs.size(); MessageQueue candidate = mqs.get(pos); ((AtomicReference) selectedResultRef).set(candidate); LOGGER.debug("select any mq [{}], {}", candidate.toString(), msg); @@ -121,7 +132,7 @@ public MessageQueue select(List mqs, Message msg, final Object sel } - private MessageQueue selectMessageQueue(List mqs, AtomicInteger index, MessageQueue lastOneSelected, + private MessageQueue selectMessageQueue(List mqs, MessageQueue lastOneSelected, Message msg) { boolean isRetry = (lastOneSelected != null); List candidateMqs = mqs; @@ -129,7 +140,8 @@ private MessageQueue selectMessageQueue(List mqs, AtomicInteger in candidateMqs = filterMqsByBrokerName(mqs, lastOneSelected.getBrokerName()); } for (int i = 0; i < candidateMqs.size(); i++) { - int pos = Math.abs(index.getAndIncrement()) % candidateMqs.size(); + int index = this.getSendIndex(msg.getTopic()); + int pos = Math.abs(index) % candidateMqs.size(); MessageQueue candidate = candidateMqs.get(pos); if (isQueueHealthy(candidate)) { return candidate; @@ -154,20 +166,26 @@ private List filterMqsByBrokerName(final List mqs, S return result; } - private void separateLocalAndRemoteMQs(List mqs, Set localBrokers, + private HashMap separateLocalAndRemoteMQs(List mqs, Set localBrokers, List localMQs, List remoteMQs) { if (localMQs == null) localMQs = new ArrayList<>(); if (remoteMQs == null) remoteMQs = new ArrayList<>(); - + HashMap brokerMQCount = new HashMap<>(); for (MessageQueue mq : mqs) { if (localBrokers.contains(mq.getBrokerName())) { localMQs.add(mq); + Integer count = brokerMQCount.get(mq.getBrokerName()); + if (count == null) { + count = 0; + } + brokerMQCount.put(mq.getBrokerName(), count+1); } else { remoteMQs.add(mq); } } + return brokerMQCount; } public MessageQueueHealthManager getMessageQueueHealthManager() { @@ -185,4 +203,18 @@ public Set getLocalBrokers() { public void setLocalBrokers(Set localBrokers) { this.localBrokers = localBrokers; } + + private int getSendIndex(String topic) { + AtomicInteger index = topicSendIndex.get(topic); + if (index == null) { + topicSendIndex.putIfAbsent(topic, new AtomicInteger(0)); + index = topicSendIndex.get(topic); + } + int result = Math.abs(index.getAndIncrement()); + if (result < 0) { + index.set(0); + result = index.getAndIncrement(); + } + return result; + } } diff --git a/defibus-client/src/main/java/cn/webank/defibus/consumer/DeFiBusPushConsumer.java b/defibus-client/src/main/java/cn/webank/defibus/consumer/DeFiBusPushConsumer.java index d4dc91c..1b36bf3 100644 --- a/defibus-client/src/main/java/cn/webank/defibus/consumer/DeFiBusPushConsumer.java +++ b/defibus-client/src/main/java/cn/webank/defibus/consumer/DeFiBusPushConsumer.java @@ -36,7 +36,6 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +68,11 @@ public DeFiBusPushConsumer(final DeFiBusClientConfig deFiBusClientConfig) { defaultMQPushConsumer.setVipChannelEnabled(false); } + /** + * start the consumer which will begin to connect with the broker and then message can be consumed. + * If the consumer has been already started, nothing will happen. + * @throws MQClientException + */ public void start() throws MQClientException { if (isStart.compareAndSet(false, true)) { @@ -129,10 +133,19 @@ public void shutdown() { } } + /** + * register a message listener which specify the callback message how message should be consumed. The message will be consumed in a standalone thread pool. + * @param messageListener + */ public void registerMessageListener(MessageListenerConcurrently messageListener) { this.defaultMQPushConsumer.registerMessageListener(messageListener); } + /** + * subscirbe a topic so that the consumer can consume message from. Typically, you should subscribe topic first then start the consumer + * @param topic topic name that the consumer needs to subscribe + * @throws MQClientException + */ public void subscribe(String topic) throws MQClientException { this.defaultMQPushConsumer.subscribe(topic, "*"); LOG.info("add subscription [{}] to consumer", topic); diff --git a/defibus-client/src/main/java/cn/webank/defibus/producer/DeFiBusProducer.java b/defibus-client/src/main/java/cn/webank/defibus/producer/DeFiBusProducer.java index 1689e34..80dc1fc 100644 --- a/defibus-client/src/main/java/cn/webank/defibus/producer/DeFiBusProducer.java +++ b/defibus-client/src/main/java/cn/webank/defibus/producer/DeFiBusProducer.java @@ -66,6 +66,11 @@ public DeFiBusProducer(DeFiBusClientConfig deFiBusClientConfig) { this.deFiBusClientConfig = deFiBusClientConfig; } + /** + * start the producer which will begin to connect with the broker. A producer MUST call this method before sending any message + * If the producer has been already started, nothing will happen. + * @throws MQClientException + */ public void start() throws MQClientException { if (isStart.compareAndSet(false, true)) { try { diff --git a/defibus-client/src/test/java/cn/webank/defibus/client/impl/DeFiBusClientManagerTest.java b/defibus-client/src/test/java/cn/webank/defibus/client/impl/DeFiBusClientManagerTest.java index 0621ee1..62c2b38 100644 --- a/defibus-client/src/test/java/cn/webank/defibus/client/impl/DeFiBusClientManagerTest.java +++ b/defibus-client/src/test/java/cn/webank/defibus/client/impl/DeFiBusClientManagerTest.java @@ -17,13 +17,6 @@ package cn.webank.defibus.client.impl; -import cn.webank.defibus.client.DeFiBusClientManager; -import cn.webank.defibus.client.impl.factory.DeFiBusClientInstance; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.junit.Test; - import static org.assertj.core.api.Assertions.assertThat; public class DeFiBusClientManagerTest { diff --git a/defibus-client/src/test/java/cn/webank/defibus/client/impl/factory/DeFiBusClientInstanceTest.java b/defibus-client/src/test/java/cn/webank/defibus/client/impl/factory/DeFiBusClientInstanceTest.java index 71df815..615766b 100644 --- a/defibus-client/src/test/java/cn/webank/defibus/client/impl/factory/DeFiBusClientInstanceTest.java +++ b/defibus-client/src/test/java/cn/webank/defibus/client/impl/factory/DeFiBusClientInstanceTest.java @@ -17,31 +17,7 @@ package cn.webank.defibus.client.impl.factory; -import cn.webank.defibus.client.DeFiBusClientManager; -import cn.webank.defibus.client.impl.DeFiBusClientAPIImpl; -import cn.webank.defibus.client.impl.hook.DeFiBusClientHookFactory; -import cn.webank.defibus.common.util.ReflectUtil; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.QueueData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.when; //@RunWith(MockitoJUnitRunner.class) public class DeFiBusClientInstanceTest { diff --git a/defibus-client/src/test/java/cn/webank/defibus/client/producer/DeFiBusProducerTest.java b/defibus-client/src/test/java/cn/webank/defibus/client/producer/DeFiBusProducerTest.java index 8758fd7..cea5ad5 100644 --- a/defibus-client/src/test/java/cn/webank/defibus/client/producer/DeFiBusProducerTest.java +++ b/defibus-client/src/test/java/cn/webank/defibus/client/producer/DeFiBusProducerTest.java @@ -17,61 +17,8 @@ package cn.webank.defibus.client.producer; -import cn.webank.defibus.client.DeFiBusClientManager; -import cn.webank.defibus.client.common.DeFiBusClientConfig; -import cn.webank.defibus.client.impl.DeFiBusClientAPIImpl; -import cn.webank.defibus.client.impl.factory.DeFiBusClientInstance; -import cn.webank.defibus.client.impl.producer.RRCallback; -import cn.webank.defibus.client.impl.producer.RRResponseFuture; -import cn.webank.defibus.client.impl.producer.ResponseTable; -import cn.webank.defibus.common.DeFiBusConstant; -import cn.webank.defibus.producer.DeFiBusProducer; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.hook.SendMessageContext; -import org.apache.rocketmq.client.impl.CommunicationMode; -import org.apache.rocketmq.client.impl.factory.MQClientInstance; -import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; -import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageAccessor; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; -import org.apache.rocketmq.common.protocol.route.BrokerData; -import org.apache.rocketmq.common.protocol.route.QueueData; -import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.Spy; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.nullable; -import static org.mockito.Mockito.when; //@RunWith(MockitoJUnitRunner.class) public class DeFiBusProducerTest { diff --git a/defibus-client/src/test/java/cn/webank/defibus/client/producer/HealthyMessageQueueSelectorTest.java b/defibus-client/src/test/java/cn/webank/defibus/client/producer/HealthyMessageQueueSelectorTest.java index a0ee2e2..81a749a 100644 --- a/defibus-client/src/test/java/cn/webank/defibus/client/producer/HealthyMessageQueueSelectorTest.java +++ b/defibus-client/src/test/java/cn/webank/defibus/client/producer/HealthyMessageQueueSelectorTest.java @@ -45,7 +45,7 @@ public void testLocalValidQueue() { // PowerMockito.when(producerImplMock.getLocalBrokers()).thenReturn(locBrokers); MessageQueueHealthManager manager = new MessageQueueHealthManager(60 * 1000); - HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager); + HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager, 1); selector.setLocalBrokers(locBrokers); List mqs = new ArrayList<>(); @@ -65,7 +65,7 @@ public void testErrorQueue() { locBrokers.add("localIDC"); MessageQueueHealthManager manager = new MessageQueueHealthManager(60 * 1000); - HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager); + HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager, 1); selector.setLocalBrokers(locBrokers); List mqs = new ArrayList<>(); @@ -92,7 +92,7 @@ public void testOtherValidQueue() { locBrokers.add("localIDC"); MessageQueueHealthManager manager = new MessageQueueHealthManager(60 * 1000); - HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager); + HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager, 1); selector.setLocalBrokers(locBrokers); List mqs = new ArrayList<>(); @@ -122,7 +122,7 @@ public void testBizTopic() { localBrokers.add(localBrokerName); MessageQueueHealthManager manager = new MessageQueueHealthManager(60 * 1000); - HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager); + HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager, 1); selector.setLocalBrokers(localBrokers); //construct mq data @@ -193,7 +193,7 @@ public void testRetryBizTopic() { } MessageQueueHealthManager manager = new MessageQueueHealthManager(60 * 1000); - HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager); + HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager, 1); selector.setLocalBrokers(localBrokers); //construct mq data diff --git a/defibus-common/conf/checkstyle.xml b/defibus-common/conf/checkstyle.xml index ef3a533..3d02411 100644 --- a/defibus-common/conf/checkstyle.xml +++ b/defibus-common/conf/checkstyle.xml @@ -1,43 +1,59 @@ - + + + + Checkstyle configuration that checks the Google coding conventions from Google Java Style + that can be found at https://google.github.io/styleguide/javaguide.html - + Checkstyle is very configurable. Be sure to read the documentation at + http://checkstyle.org (or in your downloaded distribution). - - - - + To completely disable a check, just comment it out or delete it from the file. + To suppress certain violations please review suppression filters. + + Authors: Max Vetrenko, Ruslan Diachenko, Roman Ivanov. + --> + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + value="\\u00(09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/> + @@ -46,29 +62,63 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - + + + - + + + + - - + - + + + + + + value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, + INSTANCE_INIT, ANNOTATION_DEF, ENUM_DEF"/> + + + + + + - + + value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/> + @@ -78,101 +128,187 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. + - + + - + + + + + + + + + + + + + + + + + + + - + - + + - + + - + + + + + + + + + + - - - + + - + - + + + + + - - - - + + + + + + + + + - - - + + - + + + + + + + + + + + + - + value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, + LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/> - + + + + - - - + + + - + - - - - + + + + + + + + - + @@ -180,18 +316,14 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - - - - - - + + - - + + + + - - - diff --git a/defibus-common/src/main/java/cn/webank/defibus/common/DeFiBusVersion.java b/defibus-common/src/main/java/cn/webank/defibus/common/DeFiBusVersion.java index f9ebc81..389ffc5 100644 --- a/defibus-common/src/main/java/cn/webank/defibus/common/DeFiBusVersion.java +++ b/defibus-common/src/main/java/cn/webank/defibus/common/DeFiBusVersion.java @@ -22,7 +22,7 @@ public class DeFiBusVersion { public static String getVersionDesc(int value) { try { - DeFiBusVersion.Version v = DeFiBusVersion.Version.values()[value]; + Version v = Version.values()[value]; return v.name(); } catch (Exception e) { } @@ -30,8 +30,8 @@ public static String getVersionDesc(int value) { return "HigherVersion"; } - public static DeFiBusVersion.Version value2Version(int value) { - return DeFiBusVersion.Version.values()[value]; + public static Version value2Version(int value) { + return Version.values()[value]; } public enum Version { diff --git a/defibus-examples/conf/checkstyle.xml b/defibus-examples/conf/checkstyle.xml index ef3a533..3d02411 100644 --- a/defibus-examples/conf/checkstyle.xml +++ b/defibus-examples/conf/checkstyle.xml @@ -1,43 +1,59 @@ - + + + + Checkstyle configuration that checks the Google coding conventions from Google Java Style + that can be found at https://google.github.io/styleguide/javaguide.html - + Checkstyle is very configurable. Be sure to read the documentation at + http://checkstyle.org (or in your downloaded distribution). - - - - + To completely disable a check, just comment it out or delete it from the file. + To suppress certain violations please review suppression filters. + + Authors: Max Vetrenko, Ruslan Diachenko, Roman Ivanov. + --> + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + value="\\u00(09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/> + @@ -46,29 +62,63 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - + + + - + + + + - - + - + + + + + + value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, + INSTANCE_INIT, ANNOTATION_DEF, ENUM_DEF"/> + + + + + + - + + value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/> + @@ -78,101 +128,187 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. + - + + - + + + + + + + + + + + + + + + + + + + - + - + + - + + - + + + + + + + + + + - - - + + - + - + + + + + - - - - + + + + + + + + + - - - + + - + + + + + + + + + + + + - + value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, + LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/> - + + + + - - - + + + - + - - - - + + + + + + + + - + @@ -180,18 +316,14 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - - - - - - + + - - + + + + - - - diff --git a/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/RequestProducer.java b/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/RequestProducer.java index b5bf323..8cccab0 100644 --- a/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/RequestProducer.java +++ b/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/RequestProducer.java @@ -54,14 +54,13 @@ public static void main(String[] args) throws MQClientException { } else { logger.info("request success. cost: {}ms. reply msg: {}", cost, reply); } - } catch (MQClientException e) { + } catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) { logger.warn("{}", e); - } catch (RemotingException e) { - logger.warn("{}", e); - } catch (InterruptedException e) { - logger.warn("{}", e); - } catch (MQBrokerException e) { + } catch (Exception e) { logger.warn("{}", e); + } finally { + // normally , we ONLY shutdown DeFiBusProducer when the application exits. In this sample, we shutdown the producer when message is sent. + deFiBusProducer.shutdown(); } } } diff --git a/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/ResponseConsumer.java b/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/ResponseConsumer.java index 417bf1a..ff585db 100644 --- a/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/ResponseConsumer.java +++ b/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/ResponseConsumer.java @@ -17,13 +17,8 @@ package cn.webank.defibus.examples.rpc; -import cn.webank.defibus.client.common.DeFiBusClientConfig; -import cn.webank.defibus.client.common.DeFiBusClientUtil; -import cn.webank.defibus.common.DeFiBusConstant; -import cn.webank.defibus.consumer.DeFiBusMessageListenerConcurrently; -import cn.webank.defibus.consumer.DeFiBusPushConsumer; -import cn.webank.defibus.producer.DeFiBusProducer; import java.util.List; + import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -36,22 +31,26 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import cn.webank.defibus.client.common.DeFiBusClientConfig; +import cn.webank.defibus.client.common.DeFiBusClientUtil; +import cn.webank.defibus.common.DeFiBusConstant; +import cn.webank.defibus.consumer.DeFiBusMessageListenerConcurrently; +import cn.webank.defibus.consumer.DeFiBusPushConsumer; +import cn.webank.defibus.producer.DeFiBusProducer; + public class ResponseConsumer { private static final Logger logger = LoggerFactory.getLogger(ResponseConsumer.class); public static void main(String[] args) throws MQClientException { String topic = "RequestTopic"; DeFiBusClientConfig deFiBusClientConfig = new DeFiBusClientConfig(); - deFiBusClientConfig.setConsumerGroup("Your-group-name"); deFiBusClientConfig.setPullBatchSize(32); deFiBusClientConfig.setThreadPoolCoreSize(12); deFiBusClientConfig.setClusterPrefix("XL"); - DeFiBusProducer deFiBusProducer = new DeFiBusProducer(deFiBusClientConfig); deFiBusProducer.setNamesrvAddr("127.0.0.1:9876"); deFiBusProducer.start(); - DeFiBusPushConsumer deFiBusPushConsumer = new DeFiBusPushConsumer(deFiBusClientConfig); deFiBusPushConsumer.setNamesrvAddr("127.0.0.1:9876"); deFiBusPushConsumer.registerMessageListener(new DeFiBusMessageListenerConcurrently() { @@ -64,36 +63,34 @@ public ConsumeConcurrentlyStatus handleMessage(List msgs, ConsumeCon } else { try { logger.info("begin handle: " + msg.toString()); - Message replyMsg = DeFiBusClientUtil.createReplyMessage(msg, ("I am replying content").getBytes()); deFiBusProducer.reply(replyMsg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { logger.info("reply success. {}", msg.toString()); } - @Override public void onException(Throwable e) { logger.info("reply fail. {}", msg.toString(), e); } }); - } catch (InterruptedException e) { - logger.warn("{}", e); - } catch (RemotingException e) { - logger.warn("{}", e); - } catch (MQClientException e) { - logger.warn("{}", e); - } catch (MQBrokerException e) { + } catch (InterruptedException | RemotingException | MQClientException | MQBrokerException e) { logger.warn("{}", e); } } - } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); - deFiBusPushConsumer.subscribe(topic); deFiBusPushConsumer.start(); + + //shutdown the consumer when application exits. + Runtime.getRuntime().addShutdownHook(new Thread(){ + @Override + public void run() { + deFiBusPushConsumer.shutdown(); + } + }); } } diff --git a/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/ResponseConsumerAutoReply.java b/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/ResponseConsumerAutoReply.java index f861e69..36eb9c5 100644 --- a/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/ResponseConsumerAutoReply.java +++ b/defibus-examples/src/main/java/cn/webank/defibus/examples/rpc/ResponseConsumerAutoReply.java @@ -17,16 +17,17 @@ package cn.webank.defibus.examples.rpc; -import cn.webank.defibus.client.common.DeFiBusClientConfig; -import cn.webank.defibus.consumer.DeFiBusMessageListenerConcurrentlyWithReply; -import cn.webank.defibus.consumer.DeFiBusPushConsumer; -import cn.webank.defibus.producer.DeFiBusProducer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import cn.webank.defibus.client.common.DeFiBusClientConfig; +import cn.webank.defibus.consumer.DeFiBusMessageListenerConcurrentlyWithReply; +import cn.webank.defibus.consumer.DeFiBusPushConsumer; +import cn.webank.defibus.producer.DeFiBusProducer; + /** * the example of responder with auto reply */ @@ -44,9 +45,8 @@ public String handleMessage(MessageExt msg, ConsumeConcurrentlyContext context) //1. biz handle logic //2. create reply content - String replyContent = "A reply message content"; - return replyContent; + return "A reply message content"; } }); try { @@ -55,6 +55,17 @@ public String handleMessage(MessageExt msg, ConsumeConcurrentlyContext context) deFiBusPushConsumer.start(); } catch (MQClientException e) { e.printStackTrace(); + } finally { + deFiBusProducer.shutdown(); } + + //shutdown the consumer when application exits. + Runtime.getRuntime().addShutdownHook(new Thread(){ + @Override + public void run() { + deFiBusPushConsumer.shutdown(); + } + }); + } } diff --git a/defibus-examples/src/main/java/cn/webank/defibus/examples/simple/PubProducer.java b/defibus-examples/src/main/java/cn/webank/defibus/examples/simple/PubProducer.java index 8de4203..aab3f60 100644 --- a/defibus-examples/src/main/java/cn/webank/defibus/examples/simple/PubProducer.java +++ b/defibus-examples/src/main/java/cn/webank/defibus/examples/simple/PubProducer.java @@ -41,12 +41,11 @@ public static void main(String[] args) throws MQClientException { Message msg = new Message(topic, content.getBytes()); try { deFiBusProducer.publish(msg); - } catch (MQClientException e) { - logger.warn("{}", e); - } catch (RemotingException e) { - logger.warn("{}", e); - } catch (InterruptedException e) { + } catch (MQClientException | RemotingException | InterruptedException e) { logger.warn("{}", e); + } finally { + // normally , we only shutdown DeFiBusProducer when the application exits. In this sample, we shutdown the producer when message is sent. + deFiBusProducer.shutdown(); } } } diff --git a/defibus-examples/src/main/java/cn/webank/defibus/examples/simple/SubConsumer.java b/defibus-examples/src/main/java/cn/webank/defibus/examples/simple/SubConsumer.java index 56695ec..3c8af43 100644 --- a/defibus-examples/src/main/java/cn/webank/defibus/examples/simple/SubConsumer.java +++ b/defibus-examples/src/main/java/cn/webank/defibus/examples/simple/SubConsumer.java @@ -17,11 +17,8 @@ package cn.webank.defibus.examples.simple; -import cn.webank.defibus.client.common.DeFiBusClientConfig; -import cn.webank.defibus.consumer.DeFiBusMessageListenerConcurrently; -import cn.webank.defibus.consumer.DeFiBusPushConsumer; -import cn.webank.defibus.producer.DeFiBusProducer; import java.util.List; + import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.exception.MQClientException; @@ -29,6 +26,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import cn.webank.defibus.client.common.DeFiBusClientConfig; +import cn.webank.defibus.consumer.DeFiBusMessageListenerConcurrently; +import cn.webank.defibus.consumer.DeFiBusPushConsumer; +import cn.webank.defibus.producer.DeFiBusProducer; + public class SubConsumer { private static final Logger logger = LoggerFactory.getLogger(SubConsumer.class); @@ -58,5 +60,14 @@ public ConsumeConcurrentlyStatus handleMessage(List msgs, ConsumeCon deFiBusPushConsumer.subscribe(topic); deFiBusPushConsumer.start(); + + //shutdown the consumer when application exits. + Runtime.getRuntime().addShutdownHook(new Thread(){ + @Override + public void run() { + deFiBusPushConsumer.shutdown(); + } + }); + } } diff --git a/defibus-namesrv/conf/checkstyle.xml b/defibus-namesrv/conf/checkstyle.xml index ef3a533..3d02411 100644 --- a/defibus-namesrv/conf/checkstyle.xml +++ b/defibus-namesrv/conf/checkstyle.xml @@ -1,43 +1,59 @@ - + + + + Checkstyle configuration that checks the Google coding conventions from Google Java Style + that can be found at https://google.github.io/styleguide/javaguide.html - + Checkstyle is very configurable. Be sure to read the documentation at + http://checkstyle.org (or in your downloaded distribution). - - - - + To completely disable a check, just comment it out or delete it from the file. + To suppress certain violations please review suppression filters. + + Authors: Max Vetrenko, Ruslan Diachenko, Roman Ivanov. + --> + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + value="\\u00(09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/> + @@ -46,29 +62,63 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - + + + - + + + + - - + - + + + + + + value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, + INSTANCE_INIT, ANNOTATION_DEF, ENUM_DEF"/> + + + + + + - + + value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/> + @@ -78,101 +128,187 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. + - + + - + + + + + + + + + + + + + + + + + + + - + - + + - + + - + + + + + + + + + + - - - + + - + - + + + + + - - - - + + + + + + + + + - - - + + - + + + + + + + + + + + + - + value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, + LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/> - + + + + - - - + + + - + - - - - + + + + + + + + - + @@ -180,18 +316,14 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - - - - - - + + - - + + + + - - - diff --git a/defibus-namesrv/src/main/java/cn/webank/defibus/namesrv/DeFiBusNamesrvStartup.java b/defibus-namesrv/src/main/java/cn/webank/defibus/namesrv/DeFiBusNamesrvStartup.java index 5b9528c..a35aeee 100644 --- a/defibus-namesrv/src/main/java/cn/webank/defibus/namesrv/DeFiBusNamesrvStartup.java +++ b/defibus-namesrv/src/main/java/cn/webank/defibus/namesrv/DeFiBusNamesrvStartup.java @@ -18,8 +18,22 @@ package cn.webank.defibus.namesrv; import org.apache.rocketmq.namesrv.NamesrvStartup; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.MixAll; public class DeFiBusNamesrvStartup { + + private static final String DEFAULT_ROCKETMQ_HOME_PATH = "."; + + //init default rocketmq home path. + public static void initRocketMQHomePath() { + String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + if (StringUtils.isBlank(rocketmqHome)) { + System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, DEFAULT_ROCKETMQ_HOME_PATH); + } + } + + public static void main(String[] args) { NamesrvStartup.main0(args); } diff --git a/defibus-tools/conf/checkstyle.xml b/defibus-tools/conf/checkstyle.xml index ef3a533..3d02411 100644 --- a/defibus-tools/conf/checkstyle.xml +++ b/defibus-tools/conf/checkstyle.xml @@ -1,43 +1,59 @@ - + + + + Checkstyle configuration that checks the Google coding conventions from Google Java Style + that can be found at https://google.github.io/styleguide/javaguide.html - + Checkstyle is very configurable. Be sure to read the documentation at + http://checkstyle.org (or in your downloaded distribution). - - - - + To completely disable a check, just comment it out or delete it from the file. + To suppress certain violations please review suppression filters. + + Authors: Max Vetrenko, Ruslan Diachenko, Roman Ivanov. + --> + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + value="\\u00(09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/> + @@ -46,29 +62,63 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - + + + - + + + + - - + - + + + + + + value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, + INSTANCE_INIT, ANNOTATION_DEF, ENUM_DEF"/> + + + + + + - + + value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/> + @@ -78,101 +128,187 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. + - + + - + + + + + + + + + + + + + + + + + + + - + - + + - + + - + + + + + + + + + + - - - + + - + - + + + + + - - - - + + + + + + + + + - - - + + - + + + + + + + + + + + + - + value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, + LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/> - + + + + - - - + + + - + - - - - + + + + + + + + - + @@ -180,18 +316,14 @@ Checkstyle configuration that checks the WeBank Mumble Team coding convertions. - - - - - - + + - - + + + + - - - diff --git a/defibus-tools/src/main/java/cn/webank/defibus/tools/admin/DeFiBusAdminExt.java b/defibus-tools/src/main/java/cn/webank/defibus/tools/admin/DeFiBusAdminExt.java new file mode 100644 index 0000000..68abd18 --- /dev/null +++ b/defibus-tools/src/main/java/cn/webank/defibus/tools/admin/DeFiBusAdminExt.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.webank.defibus.tools.admin; + +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; + +public class DeFiBusAdminExt extends DefaultMQAdminExt { + public DeFiBusAdminExt(RPCHook rpcHook, long timeoutMillis) { + super(rpcHook, timeoutMillis); + } +} diff --git a/defibus-tools/src/main/java/cn/webank/defibus/tools/command/DeFiBusAdminStartup.java b/defibus-tools/src/main/java/cn/webank/defibus/tools/command/DeFiBusAdminStartup.java index 2db1efc..561fa43 100644 --- a/defibus-tools/src/main/java/cn/webank/defibus/tools/command/DeFiBusAdminStartup.java +++ b/defibus-tools/src/main/java/cn/webank/defibus/tools/command/DeFiBusAdminStartup.java @@ -17,10 +17,225 @@ package cn.webank.defibus.tools.command; -import org.apache.rocketmq.tools.command.MQAdminStartup; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.joran.JoranConfigurator; +import ch.qos.logback.core.joran.spi.JoranException; +import cn.webank.defibus.tools.command.topic.UpdateTopicPermSubCommand; +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.AclUtils; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.broker.*; +import org.apache.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand; +import org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand; +import org.apache.rocketmq.tools.command.connection.ConsumerConnectionSubCommand; +import org.apache.rocketmq.tools.command.connection.ProducerConnectionSubCommand; +import org.apache.rocketmq.tools.command.consumer.*; +import org.apache.rocketmq.tools.command.message.*; +import org.apache.rocketmq.tools.command.namesrv.*; +import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand; +import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand; +import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand; +import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; +import org.apache.rocketmq.tools.command.topic.*; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; public class DeFiBusAdminStartup { + protected static List subCommandList = new ArrayList(); + public static void main(String[] args) { - MQAdminStartup.main(args); + main0(args); + } + + public static void main0(String[] args) { + System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + + initCommand(); + + try { + initLogback(); + switch (args.length) { + case 0: + printHelp(); + break; + case 2: + if (args[0].equals("help")) { + SubCommand cmd = findSubCommand(args[1]); + if (cmd != null) { + Options options = ServerUtil.buildCommandlineOptions(new Options()); + options = cmd.buildCommandlineOptions(options); + if (options != null) { + ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options); + } + } else { + System.out.printf("The sub command %s not exist.%n", args[1]); + } + break; + } + case 1: + default: + SubCommand cmd = findSubCommand(args[0]); + if (cmd != null) { + String[] subargs = parseSubArgs(args); + + Options options = ServerUtil.buildCommandlineOptions(new Options()); + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), + new PosixParser()); + if (null == commandLine) { + return; + } + + if (commandLine.hasOption('n')) { + String namesrvAddr = commandLine.getOptionValue('n'); + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr); + } + + cmd.execute(commandLine, options, getAclRPCHook()); + } else { + System.out.printf("The sub command %s not exist.%n", args[0]); + } + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void initCommand() { + initCommand(new UpdateTopicSubCommand()); + initCommand(new DeleteTopicSubCommand()); + initCommand(new UpdateSubGroupSubCommand()); + initCommand(new DeleteSubscriptionGroupCommand()); + initCommand(new UpdateBrokerConfigSubCommand()); + initCommand(new UpdateTopicPermSubCommand()); + + initCommand(new TopicRouteSubCommand()); + initCommand(new TopicStatusSubCommand()); + initCommand(new TopicClusterSubCommand()); + + initCommand(new BrokerStatusSubCommand()); + initCommand(new QueryMsgByIdSubCommand()); + initCommand(new QueryMsgByKeySubCommand()); + initCommand(new QueryMsgByUniqueKeySubCommand()); + initCommand(new QueryMsgByOffsetSubCommand()); + + initCommand(new PrintMessageSubCommand()); + initCommand(new PrintMessageByQueueCommand()); + initCommand(new SendMsgStatusCommand()); + initCommand(new BrokerConsumeStatsSubCommad()); + + initCommand(new ProducerConnectionSubCommand()); + initCommand(new ConsumerConnectionSubCommand()); + initCommand(new ConsumerProgressSubCommand()); + initCommand(new ConsumerStatusSubCommand()); + initCommand(new CloneGroupOffsetCommand()); + + initCommand(new ClusterListSubCommand()); + initCommand(new TopicListSubCommand()); + + initCommand(new UpdateKvConfigCommand()); + initCommand(new DeleteKvConfigCommand()); + + initCommand(new WipeWritePermSubCommand()); + initCommand(new ResetOffsetByTimeCommand()); + + initCommand(new UpdateOrderConfCommand()); + initCommand(new CleanExpiredCQSubCommand()); + initCommand(new CleanUnusedTopicCommand()); + + initCommand(new StartMonitoringSubCommand()); + initCommand(new StatsAllSubCommand()); + + initCommand(new AllocateMQSubCommand()); + + initCommand(new CheckMsgSendRTCommand()); + initCommand(new CLusterSendMsgRTCommand()); + + initCommand(new GetNamesrvConfigCommand()); + initCommand(new UpdateNamesrvConfigCommand()); + initCommand(new GetBrokerConfigCommand()); + + initCommand(new QueryConsumeQueueCommand()); + initCommand(new SendMessageCommand()); + initCommand(new ConsumeMessageCommand()); + } + + private static void initLogback() throws JoranException { + String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + JoranConfigurator configurator = new JoranConfigurator(); + configurator.setContext(lc); + lc.reset(); + configurator.doConfigure(rocketmqHome + "/conf/logback_tools.xml"); + } + + private static void printHelp() { + System.out.printf("The most commonly used mqadmin commands are:%n"); + for (SubCommand cmd : subCommandList) { + System.out.printf(" %-20s %s%n", cmd.commandName(), cmd.commandDesc()); + } + + System.out.printf("%nSee 'mqadmin help ' for more information on a specific command.%n"); + } + + private static SubCommand findSubCommand(final String name) { + for (SubCommand cmd : subCommandList) { + if (cmd.commandName().toUpperCase().equals(name.toUpperCase())) { + return cmd; + } + } + + return null; + } + + private static String[] parseSubArgs(String[] args) { + if (args.length > 1) { + String[] result = new String[args.length - 1]; + for (int i = 0; i < args.length - 1; i++) { + result[i] = args[i + 1]; + } + return result; + } + return null; + } + + public static void initCommand(SubCommand command) { + subCommandList.add(command); + } + + public static RPCHook getAclRPCHook() { + String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + String fileName = "/conf/tools.yml"; + JSONObject yamlDataObject = AclUtils.getYamlDataObject(fileHome + fileName, + JSONObject.class); + + if (yamlDataObject == null || yamlDataObject.isEmpty()) { + System.out.printf(" Cannot find conf file %s, acl is not be enabled.%n", fileHome + fileName); + return null; + } + + String accessKey = yamlDataObject.getString("accessKey"); + String secretKey = yamlDataObject.getString("secretKey"); + + if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) { + System.out.printf("AccessKey or secretKey is blank, the acl is not enabled.%n"); + return null; + } + return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); } } diff --git a/defibus-tools/src/main/java/cn/webank/defibus/tools/command/topic/UpdateTopicPermSubCommand.java b/defibus-tools/src/main/java/cn/webank/defibus/tools/command/topic/UpdateTopicPermSubCommand.java new file mode 100644 index 0000000..0db2518 --- /dev/null +++ b/defibus-tools/src/main/java/cn/webank/defibus/tools/command/topic/UpdateTopicPermSubCommand.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.webank.defibus.tools.command.topic; + +import cn.webank.defibus.tools.admin.DeFiBusAdminExt; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.io.FileUtils; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.tools.command.SubCommand; + +import java.io.File; +import java.util.*; + +public class UpdateTopicPermSubCommand implements SubCommand { + ClusterInfo clusterInfo = null; + + @Override + public String commandName() { + return "updateTopicPerm"; + } + + @Override + public String commandDesc() { + return "Update topic perm"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("b", "brokerAddr", true, "create topic to which broker"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("f", "topicList", true, "read the topic list by file path, split with '\\n'"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "sleetTime", true, "sleep time between create two topic(ms)"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W; 4:R; 6:RW]"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) { + DeFiBusAdminExt deFiBusAdminExt = new DeFiBusAdminExt(rpcHook, 3000L); + deFiBusAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + deFiBusAdminExt.start(); + TopicConfig topicConfig = new TopicConfig(); + + long sleepTime = 1000; + if (commandLine.hasOption("s")) { + sleepTime = Long.parseLong(commandLine.getOptionValue("s").trim()); + } + + List topicList = new ArrayList<>(); + if (commandLine.hasOption("t")) { + String[] topicArr = commandLine.getOptionValue("t").trim().split(";"); + topicList = Arrays.asList(topicArr); + } else if (commandLine.hasOption("f")) { + String path = commandLine.getOptionValue("f").trim(); + topicList = FileUtils.readLines(new File(path)); + } + for (String topic : topicList) { + try { + updateTopicPerm(topic, deFiBusAdminExt, topicConfig, sleepTime, commandLine); + } catch (Exception e) { + System.out.println("[WARN] update topic[" + topic + "] perm failed ,exception info: " + e.getMessage()); + try { + updateTopicPerm(topic, deFiBusAdminExt, topicConfig, sleepTime, commandLine); + } catch (Exception e1) { + System.out.println("[WARN] try again ,update topic[" + topic + "] perm failed,exception info: " + e1.getMessage()); + } + } + } +// ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + e.printStackTrace(); + } finally { + deFiBusAdminExt.shutdown(); + } + return; + } + + private void updateTopicPerm(String topic, DeFiBusAdminExt deFiBusAdminExt, TopicConfig topicConfig, long sleepTime, CommandLine commandLine) throws InterruptedException, MQClientException, RemotingException, MQBrokerException { + TopicRouteData topicRouteData = deFiBusAdminExt.examineTopicRouteInfo(topic); + assert topicRouteData != null; + List queueDatas = topicRouteData.getQueueDatas(); + assert queueDatas != null && queueDatas.size() > 0; + + Set brokerAddrs = new HashSet<>(); + if (commandLine.hasOption('b')) { + brokerAddrs.add(commandLine.getOptionValue('b').trim()); + } else { + brokerAddrs = fetchMasterAddrByTopic(deFiBusAdminExt, topic); + } + if (brokerAddrs.size() == 0) { + System.out.println("[WARN] can not get brokerAddr for topic[" + topic + "]"); + return; + } + for (String brokerAddr : brokerAddrs) { + String brokerName = getBrokerNameByBrokerAddr(deFiBusAdminExt, brokerAddr); + QueueData queueData = null; + for (QueueData queueDataTemp : queueDatas) { + if (brokerName.equals(queueDataTemp.getBrokerName())) { + queueData = queueDataTemp; + break; + } + } + if (queueData == null) { + System.out.println("[WARN] topic[" + topic + "] get queueData failed for brokerAddr[" + brokerAddr + "]"); + return; + } + topicConfig.setTopicName(topic); + topicConfig.setWriteQueueNums(queueData.getWriteQueueNums()); + topicConfig.setReadQueueNums(queueData.getReadQueueNums()); + topicConfig.setPerm(queueData.getPerm()); + topicConfig.setTopicSysFlag(queueData.getTopicSynFlag()); + + //new perm + int perm = Integer.parseInt(commandLine.getOptionValue("p").trim()); + int oldPerm = topicConfig.getPerm(); + if (perm == oldPerm) { + System.out.printf("new perm equals to the old one!%n"); + } + topicConfig.setPerm(perm); + deFiBusAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig); + Thread.sleep(sleepTime); + System.out.printf("update topic[%s] perm from %s to %s in %s success.%n", topic, oldPerm, perm, brokerAddr); + } + } + + private Set fetchMasterAddrByTopic(DeFiBusAdminExt defaultMQAdminExt, String topic) throws RemotingException, MQClientException, InterruptedException { + Set masterSet = new HashSet<>(); + TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID); + if (masterAddr != null) { + masterSet.add(masterAddr); + } else { + System.out.println("there is no master alive in " + bd.getBrokerName() + ", skip this group"); + } + } + + return masterSet; + } + + private String getBrokerNameByBrokerAddr(DeFiBusAdminExt deFiBusAdminExt, String brokerAddr) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + String brokerName = null; + if (clusterInfo == null) { + clusterInfo = deFiBusAdminExt.examineBrokerClusterInfo(); + } + Map brokerDataMap = clusterInfo.getBrokerAddrTable(); + for (Map.Entry entry : brokerDataMap.entrySet()) { + Map brokerAddrs = entry.getValue().getBrokerAddrs(); + if (brokerAddrs.containsValue(brokerAddr)) { + brokerName = entry.getKey(); + break; + } + } + if (brokerName == null) { + System.out.println("can not get brokerName for brokerAddr[" + brokerAddr + "]"); + System.exit(-1); + } else { + System.out.println("brokerAddr[" + brokerAddr + "]'s name is " + brokerName); + } + return brokerName; + } +} diff --git a/gradle.properties b/gradle.properties index 022e772..0b010e9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -group=cn.webank.defibus +group=com.webank.defibus version=1.0.0 rocketmqVersion=4.4.0 jdk=1.8 diff --git a/script/broker_watchdog.sh b/script/broker_watchdog.sh index 71c3d04..b0a051b 100644 --- a/script/broker_watchdog.sh +++ b/script/broker_watchdog.sh @@ -1,3 +1,20 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + if [ ! -f "pid.file" ]; then Result_pid="noPid" else diff --git a/script/namesrv_watchdog.sh b/script/namesrv_watchdog.sh index 2811779..0982936 100644 --- a/script/namesrv_watchdog.sh +++ b/script/namesrv_watchdog.sh @@ -1,3 +1,19 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. Result=$(ps -ef|grep NamesrvStartup|grep -v grep | awk '{print $2}') if [ "" == "$Result" ] diff --git a/script/runadmin.cmd b/script/runadmin.cmd index eb1191c..61a1170 100644 --- a/script/runadmin.cmd +++ b/script/runadmin.cmd @@ -1,16 +1,18 @@ @echo off -rem Copyright (C) @2017 Webank Group Holding Limited +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at rem -rem Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -rem in compliance with the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software distributed under the License -rem is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -rem or implied. See the License for the specific language governing permissions and limitations under -rem the License. +rem http://www.apache.org/licenses/LICENSE-2.0 rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. if not exist "%JAVA_HOME%\bin\java.exe" echo Please set the JAVA_HOME variable in your environment, We need java(x64)! & EXIT /B 1 diff --git a/script/runadmin.sh b/script/runadmin.sh index 0fc6bb4..4e19a72 100644 --- a/script/runadmin.sh +++ b/script/runadmin.sh @@ -1,16 +1,20 @@ #!/bin/sh -# Copyright (C) @2017 Webank Group Holding Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing permissions and limitations under -# the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + #=========================================================================================== # Java Environment Setting diff --git a/script/runbroker.cmd b/script/runbroker.cmd index b9c5cd5..3cf476e 100644 --- a/script/runbroker.cmd +++ b/script/runbroker.cmd @@ -1,16 +1,18 @@ @echo off -rem Copyright (C) @2017 Webank Group Holding Limited +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at rem -rem Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -rem in compliance with the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software distributed under the License -rem is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -rem or implied. See the License for the specific language governing permissions and limitations under -rem the License. +rem http://www.apache.org/licenses/LICENSE-2.0 rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. if not exist "%JAVA_HOME%\bin\java.exe" echo Please set the JAVA_HOME variable in your environment, We need java(x64)! & EXIT /B 1 set "JAVA=%JAVA_HOME%\bin\java.exe" diff --git a/script/runbroker.sh b/script/runbroker.sh index 3fd5f68..f86d72b 100644 --- a/script/runbroker.sh +++ b/script/runbroker.sh @@ -1,16 +1,20 @@ #!/bin/sh -# Copyright (C) @2017 Webank Group Holding Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing permissions and limitations under -# the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + #=========================================================================================== # Java Environment Setting diff --git a/script/runbroker_cloud.sh b/script/runbroker_cloud.sh index c7c97b5..efafa39 100644 --- a/script/runbroker_cloud.sh +++ b/script/runbroker_cloud.sh @@ -1,16 +1,19 @@ #!/bin/sh -# Copyright (C) @2017 Webank Group Holding Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing permissions and limitations under -# the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. #=========================================================================================== # Java Environment Setting diff --git a/script/runnamesrv.cmd b/script/runnamesrv.cmd index 4cee714..b3344ad 100644 --- a/script/runnamesrv.cmd +++ b/script/runnamesrv.cmd @@ -1,16 +1,18 @@ @echo off -rem Copyright (C) @2017 Webank Group Holding Limited +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at rem -rem Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -rem in compliance with the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software distributed under the License -rem is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -rem or implied. See the License for the specific language governing permissions and limitations under -rem the License. +rem http://www.apache.org/licenses/LICENSE-2.0 rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. if not exist "%JAVA_HOME%\bin\java.exe" echo Please set the JAVA_HOME variable in your environment, We need java(x64)! & EXIT /B 1 set "JAVA=%JAVA_HOME%\bin\java.exe" diff --git a/script/runnamesrv.sh b/script/runnamesrv.sh index cfe31f4..f4395da 100644 --- a/script/runnamesrv.sh +++ b/script/runnamesrv.sh @@ -1,16 +1,19 @@ #!/bin/sh -# Copyright (C) @2017 Webank Group Holding Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing permissions and limitations under -# the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. #=========================================================================================== # Java Environment Setting diff --git a/script/stop.cmd b/script/stop.cmd index 2b466e1..6c2ad3f 100644 --- a/script/stop.cmd +++ b/script/stop.cmd @@ -1,16 +1,18 @@ @echo off -rem Copyright (C) @2017 Webank Group Holding Limited +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at rem -rem Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -rem in compliance with the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software distributed under the License -rem is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -rem or implied. See the License for the specific language governing permissions and limitations under -rem the License. +rem http://www.apache.org/licenses/LICENSE-2.0 rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. if not exist "%JAVA_HOME%\bin\jps.exe" echo Please set the JAVA_HOME variable in your environment, We need java(x64)! & EXIT /B 1 diff --git a/script/stop.sh b/script/stop.sh index 8c67c34..d4dfb13 100644 --- a/script/stop.sh +++ b/script/stop.sh @@ -1,18 +1,20 @@ #!/bin/sh - -# Copyright (C) @2017 Webank Group Holding Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -# in compliance with the License. You may obtain a copy of the License at +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing permissions and limitations under -# the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + echo "Now removing crontab....." crontab -l | grep -v broker_watchdog | grep -v namesrv_watchdog > tmp_crontab.txt diff --git a/settings.gradle b/settings.gradle index 82af252..1317b35 100644 --- a/settings.gradle +++ b/settings.gradle @@ -23,3 +23,4 @@ if (jdkVersion.equals('1.7')) { include 'defibus-common', 'defibus-client', 'defibus-tools', 'defibus-broker', 'defibus-namesrv', 'defibus-examples' } +