diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkStreamisConfigAutowired.java b/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkStreamisConfigAutowired.java index ba175600..a5459021 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkStreamisConfigAutowired.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkStreamisConfigAutowired.java @@ -2,6 +2,7 @@ import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig; import com.webank.wedatasphere.streamis.jobmanager.log.collector.log4j2.StreamisLog4j2AppenderConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.log4j2.filters.KeywordThresholdFilter; import com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; @@ -56,12 +57,17 @@ public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Bui if ("LevelMatch".equals(filterStrategy)) { ((StreamisLog4j2AppenderConfig.Builder)builder).withFilter(LevelMatchFilter.newBuilder().setOnMatch(Filter.Result.ACCEPT).setOnMismatch(Filter.Result.DENY) .setLevel(Level.getLevel(this.configuration.getString(LOG_FILTER_LEVEL_MATCH))).build()); - } else if ("ThresholdFilter".equals(filterStrategy)) { + } else if ("ThresholdMatch".equals(filterStrategy)) { ((StreamisLog4j2AppenderConfig.Builder)builder).withFilter(ThresholdFilter.createFilter(Level .getLevel(this.configuration.getString(LOG_FILTER_THRESHOLD_MATCH)), Filter.Result.ACCEPT, Filter.Result.DENY)); } else if ("RegexMatch".equals(filterStrategy)) { ((StreamisLog4j2AppenderConfig.Builder)builder).withFilter(RegexFilter.createFilter(this.configuration.getString(LOG_FILTER_REGEX), null, true, Filter.Result.ACCEPT, Filter.Result.DENY)); + } else if ("Keyword".equals(filterStrategy)){ + ((StreamisLog4j2AppenderConfig.Builder)builder).withFilter( + new KeywordThresholdFilter( + StringUtils.split(this.configuration.getString(LOG_FILTER_KEYWORDS), ","), + StringUtils.split(this.configuration.getString(LOG_FILTER_KEYWORDS_EXCLUDE), ","))); } } } @@ -69,7 +75,8 @@ public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Bui if (hadoopUser.equals("") || hadoopUser.equals("")){ hadoopUser = System.getProperty("user.name"); } - return builder.setRpcConnTimeout(this.configuration.getInteger(LOG_RPC_CONN_TIMEOUT)) + return builder.setDebugMode(this.configuration.getBoolean(DEBUG_MODE)) + .setRpcConnTimeout(this.configuration.getInteger(LOG_RPC_CONN_TIMEOUT)) .setRpcSocketTimeout(this.configuration.getInteger(LOG_RPC_SOCKET_TIMEOUT)) .setRpcSendRetryCnt(this.configuration.getInteger(LOG_RPC_SEND_RETRY_COUNT)) .setRpcServerRecoveryTimeInSec(this.configuration.getInteger(LOG_RPC_SERVER_RECOVERY_TIME)) @@ -81,10 +88,13 @@ public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Bui hadoopUser)) .setRpcCacheSize(this.configuration.getInteger(LOG_RPC_CACHE_SIZE)) .setRpcCacheMaxConsumeThread(this.configuration.getInteger(LOG_PRC_CACHE_MAX_CONSUME_THREAD)) + .setDiscard(this.configuration.getBoolean(LOG_RPC_CACHE_DISCARD)) + .setDiscardWindow(this.configuration.getInteger(LOG_RPC_CACHE_DISCARD_WINDOW)) .setRpcBufferSize(this.configuration.getInteger(LOG_RPC_BUFFER_SIZE)) .setRpcBufferExpireTimeInSec(this.configuration.getInteger(LOG_RPC_BUFFER_EXPIRE_TIME)).build(); } + /** * According to : * String launchCommand = diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkStreamisConfigDefine.java b/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkStreamisConfigDefine.java index 379f15e0..5f23ae68 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkStreamisConfigDefine.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkStreamisConfigDefine.java @@ -2,6 +2,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import scala.Int; import java.util.List; @@ -86,8 +87,19 @@ public class FlinkStreamisConfigDefine { * Max cache consume threads in log RPC module */ public static final ConfigOption LOG_PRC_CACHE_MAX_CONSUME_THREAD = ConfigOptions.key("stream.log.rpc.cache.max-consume-thread") - .intType().defaultValue(10).withDescription("Max cache consume threads in log RPC module"); + .intType().defaultValue(2).withDescription("Max cache consume threads in log RPC module"); + /** + * If discard the useless log + */ + public static final ConfigOption LOG_RPC_CACHE_DISCARD = ConfigOptions.key("stream.log.rpc.cache.discard") + .booleanType().defaultValue(true).withDescription("If discard the useless log"); + + /** + * The window size of discarding + */ + public static final ConfigOption LOG_RPC_CACHE_DISCARD_WINDOW = ConfigOptions.key("stream.log.rpc.cache.discard-window") + .intType().defaultValue(2).withDescription("The window size of discarding"); /** * Buffer size in log RPC module */ @@ -104,7 +116,7 @@ public class FlinkStreamisConfigDefine { * Log filter strategy list */ public static final ConfigOption> LOG_FILTER_STRATEGIES = ConfigOptions.key("stream.log.filter.strategies") - .stringType().asList().defaultValues("LevelMatch").withDescription("Log filter strategy list"); + .stringType().asList().defaultValues("Keyword").withDescription("Log filter strategy list"); /** * Level value of LevelMatch filter strategy @@ -122,4 +134,22 @@ public class FlinkStreamisConfigDefine { */ public static final ConfigOption LOG_FILTER_REGEX = ConfigOptions.key("stream.log.filter.regex.value") .stringType().defaultValue(".*").withDescription("Regex value of RegexMatch filter strategy"); + + /** + * Accept keywords of Keyword filter strategy + */ + public static final ConfigOption LOG_FILTER_KEYWORDS = ConfigOptions.key("stream.log.filter.keywords") + .stringType().defaultValue("ERROR").withDescription("Accept keywords of Keyword filter strategy"); + + /** + * Exclude keywords of Keyword filter strategy + */ + public static final ConfigOption LOG_FILTER_KEYWORDS_EXCLUDE = ConfigOptions.key("stream.log.filter.keywords.exclude") + .stringType().defaultValue("").withDescription("Exclude keywords of Keyword filter strategy"); + + /** + * Debug mode + */ + public static final ConfigOption DEBUG_MODE = ConfigOptions.key("stream.log.debug") + .booleanType().defaultValue(false).withDescription("Debug mode"); } diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/test/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkConfigurationLoadTest.java b/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/test/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkConfigurationLoadTest.java index 6958957a..79ad4601 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/test/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkConfigurationLoadTest.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/test/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkConfigurationLoadTest.java @@ -3,12 +3,15 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Enumeration; import java.util.Objects; import java.util.Properties; public class FlinkConfigurationLoadTest { + private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigurationLoadTest.class); @Test public void loadConfiguration() { String configDir = Objects.requireNonNull(FlinkConfigurationLoadTest.class.getResource("/")).getFile(); @@ -21,4 +24,5 @@ public void loadConfiguration() { } GlobalConfiguration.loadConfiguration(configDir, dynamicConfiguration); } + } diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/test/resources/META-INF/services/com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired b/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/test/resources/META-INF/services/com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired new file mode 100644 index 00000000..dc13253b --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/test/resources/META-INF/services/com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired @@ -0,0 +1 @@ +com.webank.wedatasphere.streamis.jobmanager.log.collector.flink.FlinkStreamisConfigAutowired \ No newline at end of file diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/test/resources/log4j2.xml b/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/test/resources/log4j2.xml new file mode 100644 index 00000000..ee3f4125 --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/flink-streamis-log-collector/src/test/resources/log4j2.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + ` + + + + + + + + \ No newline at end of file diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/RpcLogSenderConfig.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/RpcLogSenderConfig.java index 0fbc563d..0fb03185 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/RpcLogSenderConfig.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/RpcLogSenderConfig.java @@ -38,6 +38,10 @@ public class RpcLogSenderConfig { */ private int maxDelayTimeInSec = 60; + /** + * If open debug mode + */ + private boolean debugMode = false; /** * Auth config */ @@ -149,6 +153,14 @@ public void setServerRecoveryTimeInSec(int serverRecoveryTimeInSec) { this.serverRecoveryTimeInSec = serverRecoveryTimeInSec; } + public boolean isDebugMode() { + return debugMode; + } + + public void setDebugMode(boolean debugMode) { + this.debugMode = debugMode; + } + @Override public String toString() { return "RpcLogSenderConfig{" + @@ -161,6 +173,8 @@ public String toString() { ", authConfig=" + authConfig + ", cacheConfig=" + cacheConfig + ", bufferConfig=" + bufferConfig + + ", debug=" + debugMode + '}'; } + } diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/SendLogCacheConfig.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/SendLogCacheConfig.java index 1caaedb2..e40a630c 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/SendLogCacheConfig.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/SendLogCacheConfig.java @@ -13,7 +13,17 @@ public class SendLogCacheConfig { /** * Max number of consuming thread */ - private int maxConsumeThread = 10; + private int maxConsumeThread = 2; + + /** + * The switch to discard log + */ + private boolean discard = true; + + /** + * Discard window in second + */ + private int discardWindow = 2; public SendLogCacheConfig(){ @@ -40,11 +50,32 @@ public void setMaxConsumeThread(int maxConsumeThread) { this.maxConsumeThread = maxConsumeThread; } + public boolean isDiscard() { + return discard; + } + + public void setDiscard(boolean discard) { + this.discard = discard; + } + + public int getDiscardWindow() { + return discardWindow; + } + + public void setDiscardWindow(int discardWindow) { + this.discardWindow = discardWindow; + } + @Override public String toString() { return "SendLogCacheConfig{" + "size=" + size + ", maxConsumeThread=" + maxConsumeThread + + ", discard=" + discard + + ", discardWindow=" + discardWindow + '}'; } + + + } diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/StreamisLogAppenderConfig.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/StreamisLogAppenderConfig.java index 35680913..76fbd0c9 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/StreamisLogAppenderConfig.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/StreamisLogAppenderConfig.java @@ -1,6 +1,9 @@ package com.webank.wedatasphere.streamis.jobmanager.log.collector.config; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilter; + import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.Optional; @@ -14,9 +17,15 @@ public class StreamisLogAppenderConfig { protected final RpcLogSenderConfig senderConfig; - protected StreamisLogAppenderConfig(String applicationName, RpcLogSenderConfig rpcLogSenderConfig){ + /** + * Message filters + */ + protected final List messageFilters; + protected StreamisLogAppenderConfig(String applicationName, RpcLogSenderConfig rpcLogSenderConfig, + List messageFilters){ this.applicationName = applicationName; this.senderConfig = null != rpcLogSenderConfig? rpcLogSenderConfig : new RpcLogSenderConfig(); + this.messageFilters = messageFilters; } public static class Builder{ @@ -30,6 +39,11 @@ public static class Builder{ */ protected final RpcLogSenderConfig rpcLogSenderConfig; + /** + * Message filters + */ + protected final List messageFilters = new ArrayList<>(); + public Builder(String applicationName, RpcLogSenderConfig rpcLogSenderConfig){ this.applicationName = applicationName; @@ -189,8 +203,47 @@ public StreamisLogAppenderConfig.Builder setRpcBufferExpireTimeInSec(int expireT return this; } + /** + * Add log message filter + * @param messageFilter message filter + * @return builder + */ + public StreamisLogAppenderConfig.Builder withMessageFilter(LogMessageFilter messageFilter){ + this.messageFilters.add(messageFilter); + return this; + } + + /** + * Set to discard the useless log + * @param discard discard + * @return builder + */ + public StreamisLogAppenderConfig.Builder setDiscard(boolean discard){ + this.rpcLogSenderConfig.getCacheConfig().setDiscard(discard); + return this; + } + + /** + * Set the window size of discarding + * @param windowSize + * @return + */ + public StreamisLogAppenderConfig.Builder setDiscardWindow(int windowSize){ + this.rpcLogSenderConfig.getCacheConfig().setDiscardWindow(windowSize); + return this; + } + /** + * Switch to debug + * @param debugMode debug mode + * @return builder + */ + public StreamisLogAppenderConfig.Builder setDebugMode(boolean debugMode){ + this.rpcLogSenderConfig.setDebugMode(debugMode); + return this; + } + public StreamisLogAppenderConfig build(){ - return new StreamisLogAppenderConfig(applicationName, rpcLogSenderConfig); + return new StreamisLogAppenderConfig(applicationName, rpcLogSenderConfig, messageFilters); } } public String getApplicationName() { @@ -202,4 +255,7 @@ public RpcLogSenderConfig getSenderConfig() { return senderConfig; } + public List getMessageFilters() { + return messageFilters; + } } diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/message/filters/KeywordMessageFilter.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/message/filters/KeywordMessageFilter.java new file mode 100644 index 00000000..5d12ea07 --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/message/filters/KeywordMessageFilter.java @@ -0,0 +1,126 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Comparator; +import java.util.regex.Pattern; + +/** + * Message filter of keyword + */ +public class KeywordMessageFilter implements LogMessageFilter{ + + /** + * Accept keywords + */ + private final String[] acceptKeywords; + + /** + * Regex pattern of accept keywords + */ + private Pattern acceptPattern; + /** + * Exclude keywords + */ + private final String[] excludeKeywords; + + /** + * Regex pattern of exclude keywords + */ + private Pattern excludePattern; + + /** + * Flags for pattern + */ + private int patternFlag = 0; + + public KeywordMessageFilter(String[] acceptKeywords, String[] excludeKeywords){ + this(acceptKeywords, excludeKeywords, null); + } + + public KeywordMessageFilter(String[] acceptKeywords, String[] excludeKeywords, String[] patternFlags){ + this.acceptKeywords = acceptKeywords; + this.excludeKeywords = excludeKeywords; + try { + this.patternFlag = toPatternFlags(patternFlags); + } catch (IllegalAccessException e) { + // Ignore + } + // Build regex pattern + if (acceptKeywords != null && acceptKeywords.length > 0){ + this.acceptPattern = toMatchPattern(acceptKeywords, this.patternFlag); + } + if (excludeKeywords != null && excludeKeywords.length > 0){ + this.excludePattern = toMatchPattern(excludeKeywords, this.patternFlag); + } + } + + @Override + public boolean doFilter(String logger, String message) { + boolean accept = true; + if (null != acceptPattern){ + accept = acceptPattern.matcher(message).find(); + } + if (accept && null != excludePattern){ + accept = !excludePattern.matcher(message).find(); + } + return accept; + } + + /** + * Convert to pattern + * @param keywords keyword array + * @param flag pattern flag + * @return Regex pattern + */ + protected Pattern toMatchPattern(String[] keywords, int flag){ + StringBuilder patternStr = new StringBuilder("("); + for(int i = 0; i < keywords.length; i++){ + patternStr.append(keywords[i]); + if (i != keywords.length - 1){ + patternStr.append("|"); + } + } + patternStr.append(")"); + return Pattern.compile(patternStr.toString(), flag); + } + + /** + * Convert the pattern flag array to int + * @param patternFlags flag string array + * @return int value + * @throws IllegalArgumentException + * @throws IllegalAccessException + */ + private static int toPatternFlags(final String[] patternFlags) throws IllegalArgumentException, + IllegalAccessException { + if (patternFlags == null || patternFlags.length == 0) { + return 0; + } + final Field[] fields = Pattern.class.getDeclaredFields(); + final Comparator comparator = Comparator.comparing(Field::getName); + Arrays.sort(fields, comparator); + final String[] fieldNames = new String[fields.length]; + for (int i = 0; i < fields.length; i++) { + fieldNames[i] = fields[i].getName(); + } + int flags = 0; + for (final String test : patternFlags) { + final int index = Arrays.binarySearch(fieldNames, test); + if (index >= 0) { + final Field field = fields[index]; + flags |= field.getInt(Pattern.class); + } + } + return flags; + } + + public final String[] getAcceptKeywords(){ + return this.acceptKeywords; + } + + public final String[] getExcludeKeywords(){ + return this.excludeKeywords; + } + +} diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/message/filters/LogMessageFilter.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/message/filters/LogMessageFilter.java new file mode 100644 index 00000000..bc778bea --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/message/filters/LogMessageFilter.java @@ -0,0 +1,15 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters; + +/** + * Log message filter, filter the message content (layout formatted) + */ +public interface LogMessageFilter { + /** + * Filter formatted message + * @param logger logger name + * @param message message content + * @return if match the filter + */ + boolean doFilter(String logger, String message); + +} diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/message/filters/LogMessageFilterAdapter.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/message/filters/LogMessageFilterAdapter.java new file mode 100644 index 00000000..f98427af --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/message/filters/LogMessageFilterAdapter.java @@ -0,0 +1,13 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters; + +/** + * Interface for adaptor + */ +public interface LogMessageFilterAdapter { + + /** + * Message filter + * @return filter + */ + LogMessageFilter getLogMessageFilter(); +} diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/AbstractRpcLogSender.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/AbstractRpcLogSender.java index 3afc3414..14e3ccee 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/AbstractRpcLogSender.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/AbstractRpcLogSender.java @@ -11,6 +11,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -35,6 +36,7 @@ public abstract class AbstractRpcLogSender implements R * Max thread num of send */ int maxCacheConsume; + /** * Connect config */ @@ -57,7 +59,6 @@ public AbstractRpcLogSender(RpcLogSenderConfig rpcSenderConfig){ this.cacheSize = cacheConfig.getSize(); this.maxCacheConsume = cacheConfig.getMaxConsumeThread(); this.sendBufSize = rpcSenderConfig.getBufferConfig().getSize(); - if (sendBufSize > cacheSize) { throw new IllegalArgumentException("Size of send buffer is larger than cache size"); } @@ -121,10 +122,15 @@ protected RpcLogContext getOrCreateRpcLogContext(){ if (null == this.rpcLogContext){ synchronized (this){ if (null == this.rpcLogContext){ - SendLogCache logCache = new QueuedSendLogCache(this.cacheSize, false); + // Use fair lock + SendLogCache logCache = new QueuedSendLogCache(this.cacheSize, + this.rpcSenderConfig.getCacheConfig().isDiscard(), + this.rpcSenderConfig.getCacheConfig().getDiscardWindow() * 1000,false); this.rpcLogContext = new RpcLogContext(logCache); - // Start cache consumer - this.rpcLogContext.startCacheConsumer(); + // Start cache consumers + for(int i = 0; i < maxCacheConsume; i++) { + this.rpcLogContext.startCacheConsumer(); + } } } @@ -153,7 +159,7 @@ private class RpcLogContext{ /** * Futures of consumers */ - private final Map> sendLogCacheConsumers = new ConcurrentHashMap<>(); + private final LinkedList> sendLogCacheConsumers = new LinkedList<>(); /** * Context lock */ @@ -181,33 +187,38 @@ public Thread newThread(Runnable r) { }); } - public void startCacheConsumer(){ + public boolean startCacheConsumer(){ + if (consumers >= maxCacheConsume) { +// throw new IllegalStateException("Over the limit number of cache consumers: [" + maxCacheConsume + "]"); + return false; + } this.ctxLock.lock(); try { - if (consumers >= maxCacheConsume) { - throw new IllegalStateException("Over the limit number of cache consumers: [" + maxCacheConsume + "]"); + if (consumers < maxCacheConsume) { + String id = UUID.randomUUID().toString(); + SendBuffer sendBuffer = new ImmutableSendBuffer<>(sendBufSize); + SendLogCacheConsumer consumer = new SendLogCacheConsumer(id, logCache, sendBuffer, rpcSenderConfig) { + @Override + protected void onFlushAndSend(SendBuffer sendBuffer) { + // First to aggregate the buffer + E aggEntity = aggregateBuffer(sendBuffer); + Optional.ofNullable(getSendLogExceptionStrategy()).ifPresent( + strategy -> strategy.doSend(() -> { + doSend(aggEntity, rpcSenderConfig); + return null; + }, sendBuffer)); + } + }; + Future future = this.consumePool.submit(consumer); + consumer.setFuture(future); + sendLogCacheConsumers.add(consumer); + this.consumers++; + return true; } - String id = UUID.randomUUID().toString(); - SendBuffer sendBuffer = new ImmutableSendBuffer<>(sendBufSize); - SendLogCacheConsumer consumer = new SendLogCacheConsumer(id, logCache, sendBuffer, rpcSenderConfig) { - @Override - protected void onFlushAndSend(SendBuffer sendBuffer) { - // First to aggregate the buffer - E aggEntity = aggregateBuffer(sendBuffer); - Optional.ofNullable(getSendLogExceptionStrategy()).ifPresent( - strategy -> strategy.doSend(() -> { - doSend(aggEntity, rpcSenderConfig); - return null; - }, sendBuffer)); - } - }; - Future future = this.consumePool.submit(consumer); - consumer.setFuture(future); - sendLogCacheConsumers.put(id, consumer); - this.consumers++; } finally { this.ctxLock.unlock(); } + return false; } public SendLogCache getLogCache(){ @@ -215,12 +226,24 @@ public SendLogCache getLogCache(){ } /** - * Destroy cache consumer - * @param id id + * Destroy cache consumer(select the tail one) */ - public void destroyCacheConsumer(String id){ - SendLogCacheConsumer consumer = sendLogCacheConsumers.remove(id); - consumer.shutdown(); + public boolean destroyCacheConsumer(){ + if (this.consumers <= 1){ + return false; + } + this.ctxLock.lock(); + try { + if (this.consumers > 1 && this.sendLogCacheConsumers.size() > 1) { + SendLogCacheConsumer consumer = sendLogCacheConsumers.removeLast(); + consumer.shutdown(); + this.consumers --; + return true; + } + } finally { + this.ctxLock.unlock(); + } + return false; } /** @@ -229,7 +252,9 @@ public void destroyCacheConsumer(String id){ public void destroyCacheConsumers(){ this.ctxLock.lock(); try { - sendLogCacheConsumers.forEach( (key, consumer)-> consumer.shutdown()); + sendLogCacheConsumers.forEach(SendLogCacheConsumer::shutdown); + sendLogCacheConsumers.clear(); + this.consumers = 0; } finally { this.ctxLock.unlock(); } @@ -252,6 +277,29 @@ private class QueuedSendLogCache implements SendLogCache{ // Count int count; + // Wait time in caching + final AtomicLong cacheWaitTime = new AtomicLong(0); + + // Wait time in taking + final AtomicLong takeWaitTime = new AtomicLong(0); + + // Performance of processing + final AtomicLong process = new AtomicLong(0); + + // Control flow + final AtomicLong control = new AtomicLong(Long.MAX_VALUE - 1); + + // If enable to discard log + boolean discard; + + int discardCount = 0; + + // Time clock + long clock = System.currentTimeMillis(); + + // interval to control + long controlInterval = 1 * 1000; + // Reentrant lock final ReentrantLock lock; @@ -261,11 +309,15 @@ private class QueuedSendLogCache implements SendLogCache{ // Condition for waiting puts(cacheLog) private final Condition notFull; - public QueuedSendLogCache(int capacity, boolean fair) { + public QueuedSendLogCache(int capacity, boolean discard, int discardWind, boolean fair) { this.items = new Object[capacity]; lock = new ReentrantLock(fair); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); + this.discard = discard; + // Make the discard window size as the control interval + this.controlInterval = discardWind; + this.clock = System.currentTimeMillis() + controlInterval; } @Override @@ -273,10 +325,23 @@ public void cacheLog(T logElement) throws InterruptedException { // Skip the null element if (Objects.nonNull(logElement)){ final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); + boolean tryLock = lock.tryLock(); + if (!tryLock){ + lock.lockInterruptibly(); + } try{ + flowControl(); + if (discard && control.decrementAndGet() <= 0){ + if (logElement.mark() < 2){ + discardCount++; + return; + } + } while (count == items.length){ +// System.out.println("The queue is full, maybe lost the data"); + long ws = System.currentTimeMillis(); notFull.await(); + cacheWaitTime.addAndGet(System.currentTimeMillis() - ws); } enqueue(logElement); }finally{ @@ -324,13 +389,17 @@ public T takeLog(long timeout, TimeUnit unit) throws InterruptedException { T element; lock.lockInterruptibly(); try{ + flowControl(); while (count == 0){ - if (nanos < 0){ + long ws = System.currentTimeMillis(); + if (nanos <= 0){ return null; } nanos = notEmpty.awaitNanos(nanos); + takeWaitTime.addAndGet(System.currentTimeMillis() - ws); } element = dequeue(); + process.incrementAndGet(); } finally { lock.unlock(); } @@ -387,6 +456,7 @@ public int drainLogsTo(SendBuffer sendBuffer, int maxElements) { final ReentrantLock lock = this.lock; lock.lock(); try{ + flowControl(); int n = Math.min(maxElements, count); int take = takeIndex; int i = 0; @@ -406,6 +476,7 @@ public int drainLogsTo(SendBuffer sendBuffer, int maxElements) { break; } } + process.addAndGet(i); return i; } finally { if (i > 0){ @@ -472,5 +543,31 @@ private T dequeue(){ notFull.signal(); return element; } + + /** + * Flow control + */ + private void flowControl(){ + long ws = System.currentTimeMillis(); + if (clock <= ws) { + long interval = ws - clock + controlInterval; + clock = ws + controlInterval; + if (rpcSenderConfig.isDebugMode()) { + System.out.println("cacheWait: " + cacheWaitTime.get() + ", takeWait:" + takeWaitTime.get() + ", discarded: " + discardCount); + } + if (takeWaitTime.get() <= 0 && process.get() > 0){ + this.control.set((long) ((double)process.get() * ((double)controlInterval / (double)interval))); + if (rpcSenderConfig.isDebugMode()) { + System.out.println("new window control: " + this.control.get()); + } + } else { + this.control.set(Long.MAX_VALUE); + } + cacheWaitTime.set(0); + takeWaitTime.set(0); + process.set(0); + discardCount = 0; + } + } } } diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/http/HttpClientTool.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/http/HttpClientTool.java index e67bec6d..12f3f7da 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/http/HttpClientTool.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/http/HttpClientTool.java @@ -55,7 +55,7 @@ public static HttpClient createHttpClient(RpcLogSenderConfig rpcSenderConfig){ defaultHeaders.add(new BasicHeader(rpcSenderConfig.getAuthConfig().getTokenCodeKey(), tokenValue)); } clientBuilder.setDefaultRequestConfig(requestConfig).setDefaultHeaders(defaultHeaders) - .useSystemProperties().setMaxConnTotal(maxConn); + .useSystemProperties().setMaxConnTotal(maxConn).setMaxConnPerRoute(maxConn); CloseableHttpClient httpClient = clientBuilder.build(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/utils/StringUtils.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/utils/StringUtils.java new file mode 100644 index 00000000..4b1660e6 --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector-core/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/utils/StringUtils.java @@ -0,0 +1,22 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.utils; + +/** + * Tool to operate str + */ +public class StringUtils { + + /** + * Convert string to array + * @param input string + * @param delimiter delimiter + * @return array + */ + public static String[] convertStrToArray(String input, String delimiter){ + if (null != input && !input.trim().equals("") && + !input.equals(delimiter.trim())){ + return input.split(","); + } + return null; + } + +} diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisRpcLogAppender.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisRpcLogAppender.java index c4674106..a82f44cb 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisRpcLogAppender.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisRpcLogAppender.java @@ -4,6 +4,7 @@ import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.RpcLogSenderConfig; import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig; import com.webank.wedatasphere.streamis.jobmanager.log.collector.log4j2.StreamisLog4j2AppenderConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilter; import com.webank.wedatasphere.streamis.jobmanager.log.collector.sender.StreamisRpcLogSender; import com.webank.wedatasphere.streamis.jobmanager.log.entities.StreamisLogEvent; import com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired; @@ -23,6 +24,8 @@ import java.util.List; import java.util.Objects; import java.util.ServiceLoader; +import java.util.function.BiFunction; +import java.util.function.Function; /** * Streamis rpc log appender @@ -44,6 +47,12 @@ public class StreamisRpcLogAppender extends AbstractAppender { * Cache */ private final LogCache logCache; + + /** + * Filter function + */ + private BiFunction messageFilterFunction = (logger, message) -> true; + protected StreamisRpcLogAppender(String name, Filter filter, Layout layout, boolean ignoreExceptions, Property[] properties, @@ -55,18 +64,31 @@ protected StreamisRpcLogAppender(String name, Filter filter, this.rpcLogSender.setExceptionListener((subject, t, message) -> LOGGER.error((null != subject? subject.getClass().getSimpleName() : "") + ": " + message, t)); this.logCache = this.rpcLogSender.getOrCreateLogCache(); + List messageFilters = appenderConfig.getMessageFilters(); + if (null != messageFilters && messageFilters.size() > 0){ + messageFilterFunction = (logger, message) ->{ + for(LogMessageFilter messageFilter : messageFilters){ + if (!messageFilter.doFilter(logger, message)){ + return false; + } + } + return true; + }; + } Runtime.getRuntime().addShutdownHook(new Thread(this.rpcLogSender::close)); } @Override public void append(LogEvent event) { String content = new String(getLayout().toByteArray(event)); - // Transform to stream log event; - StreamisLogEvent logEvent = new StreamisLogEvent(content, System.currentTimeMillis()); - try { - this.logCache.cacheLog(logEvent); - } catch (InterruptedException e) { - LOGGER.error("StreamisRpcLogAppender: {} interrupted when cache the log into the RPC sender, message: {}", this.getName(), e.getMessage()); + if (messageFilterFunction.apply(event.getLoggerName(), content)) { + // Transform to stream log event; + StreamisLogEvent logEvent = new StreamisLogEvent(content, event.getTimeMillis()); + try { + this.logCache.cacheLog(logEvent); + } catch (InterruptedException e) { + LOGGER.error("StreamisRpcLogAppender: {} interrupted when cache the log into the RPC sender, message: {}", this.getName(), e.getMessage()); + } } } diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j2/StreamisLog4j2AppenderConfig.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j2/StreamisLog4j2AppenderConfig.java index f68d7721..adf7dfe0 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j2/StreamisLog4j2AppenderConfig.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j2/StreamisLog4j2AppenderConfig.java @@ -2,6 +2,8 @@ import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.RpcLogSenderConfig; import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilter; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilterAdapter; import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.filter.CompositeFilter; @@ -19,8 +21,8 @@ public class StreamisLog4j2AppenderConfig extends StreamisLogAppenderConfig { private final Filter filter; public StreamisLog4j2AppenderConfig(String applicationName, Filter filter, - RpcLogSenderConfig rpcLogSenderConfig){ - super(applicationName, rpcLogSenderConfig); + RpcLogSenderConfig rpcLogSenderConfig, List messageFilters){ + super(applicationName, rpcLogSenderConfig, messageFilters); this.filter = filter; } @@ -45,7 +47,11 @@ public Builder(String applicationName, Filter filter, RpcLogSenderConfig rpcLogS */ public StreamisLog4j2AppenderConfig.Builder setFilter(Filter filter){ this.filters.clear(); + this.messageFilters.clear(); this.filters.add(filter); + if (filter instanceof LogMessageFilterAdapter){ + this.messageFilters.add(((LogMessageFilterAdapter) filter).getLogMessageFilter()); + } return this; } @@ -56,6 +62,9 @@ public StreamisLog4j2AppenderConfig.Builder setFilter(Filter filter){ */ public StreamisLog4j2AppenderConfig.Builder withFilter(Filter filter){ filters.add(filter); + if (filter instanceof LogMessageFilterAdapter){ + this.messageFilters.add(((LogMessageFilterAdapter) filter).getLogMessageFilter()); + } return this; } @@ -70,7 +79,7 @@ public StreamisLog4j2AppenderConfig build(){ } else if (!filters.isEmpty()){ logFilter = filters.get(0); } - return new StreamisLog4j2AppenderConfig(applicationName, logFilter, rpcLogSenderConfig); + return new StreamisLog4j2AppenderConfig(applicationName, logFilter, rpcLogSenderConfig, messageFilters); } } public Filter getFilter() { diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j2/config/RpcLogSenderConfig.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j2/config/RpcLogSenderConfig.java index 7be52646..f9dff1d1 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j2/config/RpcLogSenderConfig.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j2/config/RpcLogSenderConfig.java @@ -26,12 +26,15 @@ public static RpcLogSenderConfig createConfig( @PluginAttribute("address") String address, @PluginAttribute("sendRetryCnt") String sendRetryCnt, @PluginAttribute("connectionTimeout") String connectionTimeout, @PluginAttribute("socketTimeout") String socketTimeout, @PluginAttribute("serverRecoveryTimeInSec") String serverRecoveryTimeInSec, @PluginAttribute("maxDelayTimeInSec") String maxDelayTimeInSec, + @PluginAttribute("debugMode")String debugMode, @PluginElement("AuthConfig")RpcAuthConfig authConfig, @PluginElement("SendLogCache") SendLogCacheConfig cacheConfig, @PluginElement("SendBuffer")SendBufferConfig bufferConfig){ - return new RpcLogSenderConfig(address, Integers.parseInt(sendRetryCnt, 3), + RpcLogSenderConfig config = new RpcLogSenderConfig(address, Integers.parseInt(sendRetryCnt, 3), Integers.parseInt(connectionTimeout, 3000), Integers.parseInt(socketTimeout, 15000), Integers.parseInt(serverRecoveryTimeInSec, 5), Integers.parseInt(maxDelayTimeInSec, 60), authConfig, cacheConfig, bufferConfig); + config.setDebugMode(Boolean.parseBoolean(debugMode)); + return config; } } diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j2/filters/KeywordThresholdFilter.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j2/filters/KeywordThresholdFilter.java new file mode 100644 index 00000000..59a2a3da --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j2/filters/KeywordThresholdFilter.java @@ -0,0 +1,84 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.collector.log4j2.filters; + +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.KeywordMessageFilter; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilter; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilterAdapter; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.Marker; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.Logger; +import org.apache.logging.log4j.core.filter.AbstractFilter; +import org.apache.logging.log4j.message.Message; + +import java.util.Optional; + +/** + * Threshold filter with keyword + */ +public class KeywordThresholdFilter extends AbstractFilter implements LogMessageFilterAdapter { + + /** + * Level + */ + private final Level level; + + /** + * Message filter + */ + private final KeywordMessageFilter messageFilter; + public KeywordThresholdFilter(String[] acceptKeywords, String[] excludeKeywords){ + // Use accept and deny match + super(Filter.Result.ACCEPT, Filter.Result.DENY); + // If accept keywords is empty, set the log level to warn + if (null == acceptKeywords || acceptKeywords.length <= 0){ + this.level = Level.WARN; + System.out.println("The keywords is empty, set the log threshold level >= " + this.level); + } else { + this.level = Level.ALL; + } + this.messageFilter = new KeywordMessageFilter(acceptKeywords, excludeKeywords); + } + + @Override + public Result filter(LogEvent event) { + return filter(event.getLevel()); + } + + @Override + public Result filter(Logger logger, Level level, Marker marker, Message msg, Throwable t) { + return filter(level); + } + + @Override + public Result filter(Logger logger, Level level, Marker marker, Object msg, Throwable t) { + return filter(level); + } + + @Override + public Result filter(Logger logger, Level level, Marker marker, String msg, Object... params) { + return filter(level); + } + + private Result filter(final Level level){ + return level.isMoreSpecificThan(this.level) ? onMatch : onMismatch; + } + + public Level getLevel() { + return level; + } + + @Override + public String toString() { + return level.toString() + + "|acceptKeywords:[" + + Optional.ofNullable(this.messageFilter.getAcceptKeywords()).orElse(new String[]{}).length + + "]|excludeKeywords:[" + + Optional.ofNullable(this.messageFilter.getExcludeKeywords()).orElse(new String[]{}).length + "]" ; + } + + @Override + public LogMessageFilter getLogMessageFilter() { + return this.messageFilter; + } +} diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/test/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisLogAppenderTest.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/test/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisLogAppenderTest.java index 2cec0ec4..0bc49c13 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/test/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisLogAppenderTest.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/test/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisLogAppenderTest.java @@ -8,18 +8,22 @@ public class StreamisLogAppenderTest { private static final Logger LOG = LoggerFactory.getLogger(StreamisLogAppenderTest.class); @Test public void appenderLog() throws InterruptedException { - int total = 1000; - int tps = 100; + int total = 10000; + int tps = 1000; long timer = System.currentTimeMillis() + 1000; - for(int i = 0; i < total; i ++){ - if (i > 0 && i % tps == 0){ + for (int i = 0; i < total; i++) { + if (i > 0 && i % tps == 0) { long sleep = timer - System.currentTimeMillis(); - if (sleep > 0){ - Thread.sleep(sleep); + if (sleep > 0) { + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + e.printStackTrace(); + } } timer = System.currentTimeMillis() + 1000; } - LOG.info("Stream Log appender test, sequence id: " + i); + LOG.info("ERROR: Stream Log appender test, sequence id: " + i); } } } diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/test/resources/log4j2.xml b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/test/resources/log4j2.xml index ee3f4125..27aff1d6 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/test/resources/log4j2.xml +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector/src/test/resources/log4j2.xml @@ -20,7 +20,7 @@ - diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisRpcLogAppender.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisRpcLogAppender.java index ccbaf02f..90a28abf 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisRpcLogAppender.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisRpcLogAppender.java @@ -3,6 +3,7 @@ import com.webank.wedatasphere.streamis.jobmanager.log.collector.cache.LogCache; import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.RpcLogSenderConfig; import com.webank.wedatasphere.streamis.jobmanager.log.collector.log4j1.StreamisLog4jAppenderConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilter; import com.webank.wedatasphere.streamis.jobmanager.log.collector.sender.StreamisRpcLogSender; import com.webank.wedatasphere.streamis.jobmanager.log.entities.StreamisLogEvent; import com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired; @@ -13,6 +14,7 @@ import org.apache.log4j.spi.LoggingEvent; import java.util.*; +import java.util.function.BiFunction; /** * Rpc appender for log4j1 @@ -24,6 +26,7 @@ public class StreamisRpcLogAppender extends AppenderSkeleton { */ private String applicationName; + private String filterEnable = "true"; /** * Appender config */ @@ -45,18 +48,25 @@ public class StreamisRpcLogAppender extends AppenderSkeleton { */ private LogCache logCache; + /** + * Filter function + */ + private BiFunction messageFilterFunction = (logger, message) -> false; @Override protected void append(LoggingEvent loggingEvent) { String content = super.getLayout().format(loggingEvent); - // Transform to stream log event; - StreamisLogEvent logEvent = new StreamisLogEvent(content, System.currentTimeMillis()); - if (Objects.nonNull(logCache)){ - try { - this.logCache.cacheLog(logEvent); - } catch (InterruptedException e) { - LogLog.error("StreamisRpcLogAppender: " + this.getName() + - " interrupted when cache the log into the RPC sender, message: " + e.getMessage()); + if (messageFilterFunction.apply(loggingEvent.getLoggerName(), content)) { + // Transform to stream log event; + // System.currentTimeMills() -> loggingEvent.getTimeStamp() + StreamisLogEvent logEvent = new StreamisLogEvent(content, loggingEvent.getTimeStamp()); + if (Objects.nonNull(logCache)) { + try { + this.logCache.cacheLog(logEvent); + } catch (InterruptedException e) { + LogLog.error("StreamisRpcLogAppender: " + this.getName() + + " interrupted when cache the log into the RPC sender, message: " + e.getMessage()); + } } } } @@ -81,6 +91,9 @@ public void activateOptions() { if (Objects.isNull(getLayout())){ setLayout(new SimpleLayout()); } + if (System.getProperty("filter.enable") == null){ + System.setProperty("filter.enable", filterEnable); + } // Search the config autowired class List configAutowiredEntities = new ArrayList<>(); StreamisLog4jAppenderConfig logAppenderConfig = null; @@ -108,13 +121,24 @@ public void activateOptions() { // First to clear the filters clearFilters(); // Then to add filter - logAppenderConfig.getFilters().forEach(this :: addFilter); + logAppenderConfig.getFilters().forEach(this::addFilter); System.out.println("StreamisRpcLogAppender: init with config => " + logAppenderConfig); this.rpcLogSender = new StreamisRpcLogSender(this.appenderConfig.getApplicationName(), this.appenderConfig.getSenderConfig()); this.rpcLogSender.setExceptionListener((subject, t, message) -> LogLog.error((null != subject? subject.getClass().getSimpleName() : "") + ": " + message, t)); this.logCache = this.rpcLogSender.getOrCreateLogCache(); + List messageFilters = appenderConfig.getMessageFilters(); + if (null != messageFilters && messageFilters.size() > 0){ + messageFilterFunction = (logger, message) ->{ + for(LogMessageFilter messageFilter : messageFilters){ + if (!messageFilter.doFilter(logger, message)){ + return false; + } + } + return true; + }; + } } @@ -130,6 +154,14 @@ public void setAppName(String applicationName) { this.applicationName = applicationName; } + public String getFilterEnable() { + return filterEnable; + } + + public void setFilterEnable(String filterEnable) { + this.filterEnable = filterEnable; + } + public void setRpcAddress(String address){ this.rpcLogSenderConfig.setAddress(address); } @@ -186,4 +218,16 @@ public void setRpcBufferSize(int bufferSize){ public void setRpcBufferExpireTimeInSec(int expireTimeInSec){ this.rpcLogSenderConfig.getBufferConfig().setExpireTimeInSec(expireTimeInSec); } + + public void setDebugMode(boolean debugMode){ + this.rpcLogSenderConfig.setDebugMode(debugMode); + } + + public void setDiscard(boolean discard){ + this.rpcLogSenderConfig.getCacheConfig().setDiscard(discard); + } + + public void setDiscardWindow(int window){ + this.rpcLogSenderConfig.getCacheConfig().setDiscardWindow(window); + } } diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j1/StreamisLog4jAppenderConfig.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j1/StreamisLog4jAppenderConfig.java index 84489c15..f10bef45 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j1/StreamisLog4jAppenderConfig.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j1/StreamisLog4jAppenderConfig.java @@ -2,6 +2,8 @@ import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.RpcLogSenderConfig; import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilter; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilterAdapter; import org.apache.log4j.Priority; import org.apache.log4j.spi.Filter; @@ -24,8 +26,8 @@ public class StreamisLog4jAppenderConfig extends StreamisLogAppenderConfig { private final Priority threshold; protected StreamisLog4jAppenderConfig(String applicationName, Priority threshold, List filters, - RpcLogSenderConfig rpcLogSenderConfig) { - super(applicationName, rpcLogSenderConfig); + RpcLogSenderConfig rpcLogSenderConfig, List messageFilters) { + super(applicationName, rpcLogSenderConfig, messageFilters); this.threshold = threshold; this.filters.addAll(filters); } @@ -52,12 +54,19 @@ public Builder(String applicationName, Priority threshold, Filter filter,RpcLogS public StreamisLog4jAppenderConfig.Builder setFilter(Filter filter){ this.filters.clear(); + this.messageFilters.clear(); this.filters.add(filter); + if (filter instanceof LogMessageFilterAdapter){ + this.messageFilters.add(((LogMessageFilterAdapter) filter).getLogMessageFilter()); + } return this; } public StreamisLog4jAppenderConfig.Builder withFilter(Filter filter){ filters.add(filter); + if (filter instanceof LogMessageFilterAdapter){ + this.messageFilters.add(((LogMessageFilterAdapter) filter).getLogMessageFilter()); + } return this; } @@ -66,12 +75,18 @@ public StreamisLog4jAppenderConfig.Builder withFilter(Filter filter){ * @param threshold threshold * @return builder */ - public StreamisLog4jAppenderConfig.Builder threshold(Priority threshold){ - this.threshold = threshold; + public StreamisLog4jAppenderConfig.Builder threshold(Priority threshold, boolean needMoreSpecific){ + if (needMoreSpecific){ + if (this.threshold == null || threshold.isGreaterOrEqual(this.threshold)){ + this.threshold = threshold; + } + }else { + this.threshold = threshold; + } return this; } public StreamisLog4jAppenderConfig build(){ - return new StreamisLog4jAppenderConfig(applicationName, threshold, filters, rpcLogSenderConfig); + return new StreamisLog4jAppenderConfig(applicationName, threshold, filters, rpcLogSenderConfig, messageFilters); } } diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j1/filters/KeywordAllMatchFilter.java b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j1/filters/KeywordAllMatchFilter.java new file mode 100644 index 00000000..1fe60b30 --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/log4j1/filters/KeywordAllMatchFilter.java @@ -0,0 +1,31 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.collector.log4j1.filters; + +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.KeywordMessageFilter; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilter; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.message.filters.LogMessageFilterAdapter; +import org.apache.log4j.spi.Filter; +import org.apache.log4j.spi.LoggingEvent; + +/** + * All match filter with keyword + */ +public class KeywordAllMatchFilter extends Filter implements LogMessageFilterAdapter { + + /** + * Message filter + */ + private final KeywordMessageFilter messageFilter; + + public KeywordAllMatchFilter(String[] acceptKeywords, String[] excludeKeywords){ + this.messageFilter = new KeywordMessageFilter(acceptKeywords, excludeKeywords); + } + @Override + public int decide(LoggingEvent event) { + return Filter.ACCEPT; + } + + @Override + public LogMessageFilter getLogMessageFilter() { + return this.messageFilter; + } +} diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/test/resources/log4j.properties b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/test/resources/log4j.properties index 85e2a916..8801938a 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/test/resources/log4j.properties +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/streamis-job-log-collector1x/src/test/resources/log4j.properties @@ -26,6 +26,7 @@ log4j.appender.console.layout.ConversionPattern= %d{ISO8601} %-5p (%t) %p %c{1} log4j.appender.stream=com.webank.wedatasphere.streamis.jobmanager.log.collector.StreamisRpcLogAppender log4j.appender.stream.appName=stream_applicatioin log4j.appender.stream.Threshold=INFO +log4j.appender.stream.filterEnable=false log4j.appender.stream.layout=org.apache.log4j.PatternLayout log4j.appender.stream.layout.ConversionPattern= %d{ISO8601} %-5p (%t) [%F:%M(%L)] - %m%n log4j.appender.stream.rpcConnTimeout=3000 diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/xspark-streamis-log-collector/pom.xml b/streamis-jobmanager/streamis-job-log/job-log-collector/xspark-streamis-log-collector/pom.xml index 364eed1c..9866eede 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/xspark-streamis-log-collector/pom.xml +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/xspark-streamis-log-collector/pom.xml @@ -15,6 +15,8 @@ 8 8 + 1.2.17 + 1.7.12 @@ -23,6 +25,27 @@ streamis-job-log-collector1x ${streamis.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + provided + + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + provided + + + + log4j + log4j + ${log4j.version} + provided + diff --git a/streamis-jobmanager/streamis-job-log/job-log-collector/xspark-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/spark/SparkStreamisConfigAutowired.java b/streamis-jobmanager/streamis-job-log/job-log-collector/xspark-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/spark/SparkStreamisConfigAutowired.java index a2a73adb..2d92da75 100644 --- a/streamis-jobmanager/streamis-job-log/job-log-collector/xspark-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/spark/SparkStreamisConfigAutowired.java +++ b/streamis-jobmanager/streamis-job-log/job-log-collector/xspark-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/spark/SparkStreamisConfigAutowired.java @@ -1,15 +1,24 @@ package com.webank.wedatasphere.streamis.jobmanager.log.collector.spark; import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.log4j1.StreamisLog4jAppenderConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.collector.log4j1.filters.KeywordAllMatchFilter; +import com.webank.wedatasphere.streamis.jobmanager.log.utils.StringUtils; import com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired; +import org.apache.log4j.Level; import java.util.Optional; - /** * Autoconfigure the streamis config in Spark environment */ public class SparkStreamisConfigAutowired implements StreamisConfigAutowired { + private static final String DEBUG_MODE = "log.debug.mode"; + + private static final String DISCARD_SWITCH = "log.discard"; + + private static final String DISCARD_WINDOW = "log.discard.window"; + private static final String APP_NAME_CONFIG = "app.name"; private static final String SERVER_ADDRESS_CONFIG = "streamis.url"; @@ -19,9 +28,31 @@ public class SparkStreamisConfigAutowired implements StreamisConfigAutowired { private static final String PROJECT_NAME_CONFIG = "project.name"; private static final String DEFAULT_COLLECTOR_URI = "/api/rest_j/v1/streamis/streamJobManager/log/collect/events"; + + private static final String FILTER_ENABLE = "filter.enable"; + + private static final String FILTER_KEYWORD = "filter.keywords"; + + private static final String FILTER_KEYWORD_EXCLUDE = "filter.keywords.exclude"; @Override public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Builder builder) throws Exception { // Load the config from system properties + String debugMode = System.getProperty(DEBUG_MODE, "false"); + if (null != debugMode && debugMode.equals("true")){ + builder.setDebugMode(true); + } + String discard = System.getProperty(DISCARD_SWITCH, "true"); + if (null != discard && discard.equals("true")){ + builder.setDiscard(true); + } + String discardWind = System.getProperty(DISCARD_WINDOW, "2"); + if (null != discardWind){ + try{ + builder.setDiscardWindow(Integer.parseInt(discardWind)); + } catch (Exception e){ + // Ignore + } + } Optional.ofNullable(System.getProperty(APP_NAME_CONFIG)).ifPresent(appName -> { String projectName = System.getProperty(PROJECT_NAME_CONFIG); if (null != projectName && !projectName.trim().equals("")){ @@ -51,6 +82,25 @@ public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Bui } System.out.println("Spark env to streamis: log user =>" + user); builder.setRpcAuthTokenUser(user); + // Set filter + boolean filterEnable = true; + try { + filterEnable = Boolean.parseBoolean(System.getProperty(FILTER_ENABLE, "true")); + }catch (Exception e){ + // ignore + } + if (filterEnable && builder instanceof StreamisLog4jAppenderConfig.Builder){ + StreamisLog4jAppenderConfig.Builder log4jBuilder = ((StreamisLog4jAppenderConfig.Builder) builder); + String[] acceptKeywords = StringUtils.convertStrToArray(System.getProperty(FILTER_KEYWORD, "ERROR"), ","); + KeywordAllMatchFilter keywordAllMatchFilter = new KeywordAllMatchFilter( + acceptKeywords, + StringUtils.convertStrToArray(System.getProperty(FILTER_KEYWORD_EXCLUDE), ",")); + if (null == acceptKeywords || acceptKeywords.length <=0 ){ + System.out.println("The keywords is empty, set the log threshold level >= " + Level.WARN); + log4jBuilder.threshold(Level.WARN, true); + } + log4jBuilder.setFilter(keywordAllMatchFilter); + } return builder.build(); } } diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/pom.xml b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/pom.xml index fa3bf44e..8b4714e2 100644 --- a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/pom.xml +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/pom.xml @@ -27,5 +27,11 @@ org.apache.linkis linkis-module + + junit + junit + ${junit.version} + test + \ No newline at end of file diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/StreamisJobLogAutoConfiguration.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/StreamisJobLogAutoConfiguration.java index 7b839893..f3f32e36 100644 --- a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/StreamisJobLogAutoConfiguration.java +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/StreamisJobLogAutoConfiguration.java @@ -1,4 +1,24 @@ package com.webank.wedatasphere.streamis.jobmanager.log.server; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.JobLogStorage; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.StreamisJobLogStorage; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.StorageThresholdDriftPolicy; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.loadbalancer.RoundRobinLoadBalancer; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.loadbalancer.SimpleLoadBalancer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration public class StreamisJobLogAutoConfiguration { + + @Bean(initMethod = "init", destroyMethod = "destroy") + @ConditionalOnMissingBean(JobLogStorage.class) + public JobLogStorage streamisJobLogStorage(){ + StreamisJobLogStorage jobLogStorage = new StreamisJobLogStorage(); + jobLogStorage.addLoadBalancer(new RoundRobinLoadBalancer()); + jobLogStorage.addLoadBalancer(new SimpleLoadBalancer()); + jobLogStorage.setBucketDriftPolicy(new StorageThresholdDriftPolicy()); + return jobLogStorage; + } } diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/config/StreamJobLogConfig.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/config/StreamJobLogConfig.java index c857d524..1f6777e9 100644 --- a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/config/StreamJobLogConfig.java +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/config/StreamJobLogConfig.java @@ -8,6 +8,26 @@ */ public class StreamJobLogConfig { + /** + * Set the log restful api as no-auth + */ + public static final CommonVars NO_AUTH_REST = CommonVars.apply("wds.stream.job.log.restful.no-auth", false); + + /** + * The threshold of log storage + */ + public static final CommonVars STORAGE_THRESHOLD = CommonVars.apply("wds.stream.job.log.storage.threshold", 0.9); + + /** + * Max weight of storage context + */ + public static final CommonVars STORAGE_CONTEXT_MAX_WEIGHT = CommonVars.apply("wds.stream.job.log.storage.context.max-weight", 5); + + /** + * Paths of storage context + */ + public static final CommonVars STORAGE_CONTEXT_PATHS = CommonVars.apply("wds.stream.job.log.storage.context.paths", "/data/stream/log"); + /** * Bucket monitor name */ diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/restful/JobLogRestfulApi.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/restful/JobLogRestfulApi.java index e9d19c65..27104311 100644 --- a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/restful/JobLogRestfulApi.java +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/restful/JobLogRestfulApi.java @@ -1,12 +1,10 @@ package com.webank.wedatasphere.streamis.jobmanager.log.server.restful; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.webank.wedatasphere.streamis.jobmanager.log.entities.StreamisLogEvent; +import com.webank.wedatasphere.streamis.jobmanager.log.server.config.StreamJobLogConfig; import com.webank.wedatasphere.streamis.jobmanager.log.server.entities.StreamisLogEvents; import com.webank.wedatasphere.streamis.jobmanager.log.server.exception.StreamJobLogException; import com.webank.wedatasphere.streamis.jobmanager.log.server.service.StreamisJobLogService; import org.apache.commons.lang.StringUtils; -import org.apache.linkis.common.utils.JsonUtils; import org.apache.linkis.server.Message; import org.apache.linkis.server.security.SecurityFilter; import org.slf4j.Logger; @@ -18,7 +16,6 @@ import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; -import java.io.IOException; @RestController @RequestMapping(path = "/streamis/streamJobManager/log") @@ -36,9 +33,24 @@ public Message collectEvents(@RequestBody StreamisLogEvents events, HttpServletR if (StringUtils.isBlank(events.getAppName())){ return Message.ok("Ignore the stream log events without application name"); } - String userName = SecurityFilter.getLoginUsername(request); - if (StringUtils.isBlank(userName)){ - throw new StreamJobLogException(-1, "The request should has token user"); + String userName; + if (StreamJobLogConfig.NO_AUTH_REST.getValue()){ + userName = request.getHeader("Token-User"); + if (StringUtils.isBlank(userName)){ + try { + userName = SecurityFilter.getLoginUsername(request); + }catch(Exception e){ + // Ignore + } + if (StringUtils.isBlank(userName)){ + userName = "hadoop"; + } + } + }else { + userName = SecurityFilter.getLoginUsername(request); + if (StringUtils.isBlank(userName)) { + throw new StreamJobLogException(-1, "The request should has token user"); + } } this.streamisJobLogService.store(userName, events); result = Message.ok(); diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/JobLogStorage.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/JobLogStorage.java index e7059033..4299104d 100644 --- a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/JobLogStorage.java +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/JobLogStorage.java @@ -2,6 +2,9 @@ import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucket; import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucketConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucketDriftPolicy; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.JobLogStorageContextListener; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.loadbalancer.JobLogStorageLoadBalancer; /** * Storage of job log @@ -17,10 +20,27 @@ public interface JobLogStorage { */ JobLogBucket getOrCreateBucket(String userName, String appName, JobLogBucketConfig bucketConfig); + /** + * Set bucket drift policy + * @param bucketDriftPolicy bucket drift policy + */ + void setBucketDriftPolicy(JobLogBucketDriftPolicy bucketDriftPolicy); + + /** + * Add context listener + * @param listener listener + */ + void addContextListener(JobLogStorageContextListener listener); + + /** + * Add load balancer + * @param loadBalancer load balancer + */ + void addLoadBalancer(JobLogStorageLoadBalancer loadBalancer); /** * Init method */ - void init(); + void init() throws Exception; /** * Destroy method diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/StreamisJobLogStorage.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/StreamisJobLogStorage.java index 154c8f6f..ccadd721 100644 --- a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/StreamisJobLogStorage.java +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/StreamisJobLogStorage.java @@ -1,9 +1,16 @@ package com.webank.wedatasphere.streamis.jobmanager.log.server.storage; import com.webank.wedatasphere.streamis.jobmanager.log.server.config.StreamJobLogConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.server.exception.StreamJobLogException; import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucket; import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucketConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucketDriftPolicy; import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucketState; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.*; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.loadbalancer.JobLogStorageLoadBalancer; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.loadbalancer.RoundRobinLoadBalancer; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.utils.MemUtils; +import org.apache.commons.lang.StringUtils; import org.apache.linkis.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,12 +18,13 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.text.SimpleDateFormat; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -25,19 +33,39 @@ /** * Job log storage */ -@Component public class StreamisJobLogStorage implements JobLogStorage{ private static final Logger LOG = LoggerFactory.getLogger(StreamisJobLogStorage.class); + + /** + * Storage context + */ + private final List storageContexts = new CopyOnWriteArrayList<>(); + + /** + * Drift policy + */ + private JobLogBucketDriftPolicy bucketDriftPolicy; /** * Buckets */ private final Map buckets = new ConcurrentHashMap<>(); + /** + * Context listeners + */ + private final List contextListeners = new ArrayList<>(); + + /** + * Load balancer + */ + private final List loadBalancers = new ArrayList<>(); + /** * Constructor cache */ private final Map> bucketConstructors = new ConcurrentHashMap<>(); + /** * To monitor the status of buckets */ @@ -47,26 +75,30 @@ public class StreamisJobLogStorage implements JobLogStorage{ public JobLogBucket getOrCreateBucket(String userName, String appName, JobLogBucketConfig bucketConfig) { String bucketName = toBucketName(userName, appName); return buckets.computeIfAbsent(bucketName, name -> { - Class bucketClass = bucketConfig.getBucketClass(); - if (Objects.nonNull(bucketClass)) { - Constructor constructor = bucketConstructors.computeIfAbsent(bucketClass.getName(), className -> { - Constructor[] constructors = bucketClass.getConstructors(); - Constructor matchConstructor = null; - for (Constructor constructor1 : constructors) { - Class[] inputParams = constructor1.getParameterTypes(); - if (inputParams.length >= 2 && inputParams[0].equals(String.class) - && inputParams[1].equals(JobLogBucketConfig.class)) { - matchConstructor = constructor1; - break; + // First to choose context + JobLogStorageContext context = chooseStorageContext(bucketName, bucketConfig); + if (null != context){ + Class bucketClass = bucketConfig.getBucketClass(); + if (Objects.nonNull(bucketClass)) { + Constructor constructor = bucketConstructors.computeIfAbsent(bucketClass.getName(), className -> { + Constructor[] constructors = bucketClass.getConstructors(); + Constructor matchConstructor = null; + for (Constructor constructor1 : constructors) { + Class[] inputParams = constructor1.getParameterTypes(); + if (inputParams.length >= 3 && inputParams[0].equals(String.class) + && inputParams[1].equals(JobLogStorageContext.class) && inputParams[2].equals(JobLogBucketConfig.class)) { + matchConstructor = constructor1; + break; + } + } + return matchConstructor; + }); + if (Objects.nonNull(constructor)) { + try { + return (JobLogBucket) constructor.newInstance(bucketName, context, bucketConfig); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + LOG.warn("Cannot create storage log bucket from [{}]", bucketClass.getName(), e); } - } - return matchConstructor; - }); - if (Objects.nonNull(constructor)) { - try { - return (JobLogBucket) constructor.newInstance(bucketName, bucketConfig); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { - LOG.warn("Cannot create storage log bucket from [{}]", bucketClass.getName(), e); } } } @@ -74,9 +106,31 @@ public JobLogBucket getOrCreateBucket(String userName, String appName, JobLogBuc }); } + @Override + public void setBucketDriftPolicy(JobLogBucketDriftPolicy bucketDriftPolicy) { + this.bucketDriftPolicy = bucketDriftPolicy; + } + + @Override + public void addContextListener(JobLogStorageContextListener listener) { + this.contextListeners.add(listener); + } + + @Override + public void addLoadBalancer(JobLogStorageLoadBalancer loadBalancer) { + this.loadBalancers.add(loadBalancer); + if (loadBalancer instanceof JobLogStorageContextListener){ + addContextListener((JobLogStorageContextListener) loadBalancer); + } + } + @Override @PostConstruct - public synchronized void init() { + public synchronized void init() throws Exception{ + initStorageContexts(StringUtils.split(StreamJobLogConfig.STORAGE_CONTEXT_PATHS.getValue(), ",")); + onContextEvent(new ContextLaunchEvent(new ArrayList<>(this.storageContexts))); + // Init load balancer + initLoadBalancers(); if (Objects.isNull(monitorThread)){ monitorThread = Utils.defaultScheduler().scheduleAtFixedRate(() -> { String threadName = Thread.currentThread().getName(); @@ -84,8 +138,17 @@ public synchronized void init() { Thread.currentThread().setName(StreamJobLogConfig.BUCKET_MONITOR_NAME.getValue()); long maxIdleTime = StreamJobLogConfig.BUCKET_MAX_IDLE_TIME.getValue().toLong(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + // Update the storage context + JobLogStorageContext[] contexts = this.storageContexts.toArray(new JobLogStorageContext[0]); + try { + updateContextWeight(contexts); + // Notify the listener to refresh the context information + onContextEvent(new ContextRefreshAllEvent()); + } catch (IOException e) { + LOG.warn("Unable to calculate weight array of storage context list", e); + } if (buckets.size() > 0) { - StringBuilder builder = new StringBuilder("Buckets in LogStorage: [\n"); + StringBuilder builder = new StringBuilder("Buckets(").append(buckets.size()).append(") in LogStorage: [\n"); buckets.forEach((bucketName, bucket) -> { JobLogBucketState bucketState = bucket.getBucketState(); builder.append("bucket: [ name: ") @@ -95,17 +158,29 @@ public synchronized void init() { .append(", write-rate: ").append(bucketState.getBucketWriteRate()).append("/s") .append(", last-write-time: ").append(dateFormat.format(bucketState.getBucketWriteTime())) .append(" ]\n"); + boolean closeBucket = false; if (bucketState.getBucketWriteTime() + maxIdleTime <= System.currentTimeMillis()) { LOG.info("Close the idle bucket: [ name: {}, last-write-time: {} ]", bucketName, dateFormat.format(bucketState.getBucketWriteTime())); - bucket.close(); + closeBucket = true; + } if (Objects.nonNull(bucketDriftPolicy) && bucketDriftPolicy.onPolicy(bucket, contexts)){ + LOG.info("Drift the bucket: [ name: {}, last-write-time: {} ]", bucketName, + dateFormat.format(bucketState.getBucketWriteTime())); + closeBucket = true; + } + if (closeBucket) { // Delete the bucket + // First to move the bucket from map, then close it buckets.remove(bucketName); + bucket.close(); } - }); LOG.info(builder.toString()); } + } catch (Throwable e){ + assert LOG != null; + LOG.warn("Some exception happened in monitor thread", e); + //Ignore } finally { Thread.currentThread().setName(threadName); } @@ -114,15 +189,6 @@ public synchronized void init() { } } - /** - * Bucket name - * @param userName username - * @param appName app name - * @return bucket name - */ - private String toBucketName(String userName, String appName){ - return userName + "." + appName; - } @Override @PreDestroy @@ -133,4 +199,134 @@ public void destroy() { monitorThread.cancel(true); } } + + /** + * Choose storage context + * @param bucketName bucket name + * @param jobLogBucketConfig bucket config + * @return storage context + */ + private JobLogStorageContext chooseStorageContext(String bucketName, JobLogBucketConfig jobLogBucketConfig){ + JobLogStorageContext context; + for(JobLogStorageLoadBalancer balancer : loadBalancers){ + context = balancer.chooseContext(bucketName, jobLogBucketConfig); + if (null != context){ + return context; + } + } + return null; + } + + /** + * Init load balancers + */ + private void initLoadBalancers(){ + for(JobLogStorageLoadBalancer loadBalancer : this.loadBalancers){ + loadBalancer.init(); + } + // Sort the load balancer + this.loadBalancers.sort(Comparator.comparingInt(JobLogStorageLoadBalancer::priority).reversed()); + } + /** + * Init the storage context + * @param storagePaths storage paths + */ + private void initStorageContexts(String[] storagePaths) throws StreamJobLogException { + LOG.info("Init the storage context: [" + StringUtils.join(storagePaths, ",") + "]"); + for(String storagePath : storagePaths){ + if (StringUtils.isNotBlank(storagePath)) { + // TODO the score of context + this.storageContexts.add(new JobLogStorageContext(storagePath, 1.0)); + } + } + if (!this.storageContexts.isEmpty()) { + int size = this.storageContexts.size(); + try { + updateContextWeight(storageContexts.toArray(new JobLogStorageContext[size])); + } catch (IOException e) { + throw new StreamJobLogException(-1, "Unable to calculate weight array of storage context list", e); + } + } + } + + private void updateContextWeight(JobLogStorageContext[] contexts) throws IOException { + double[] weights = calculateContextWeight(contexts); + StringBuilder builder = new StringBuilder("Update storage context weights:[\n"); + for(int i = 0 ; i < weights.length; i ++){ + JobLogStorageContext context = contexts[i]; + builder.append(context.getStorePath()).append(" => ").append(weights[i]); + if (i != weights.length - 1){ + builder.append(", "); + } + context.setStoreWeight(weights[i]); + } + builder.append("\n]"); + LOG.info(builder.toString()); + } + /** + * Calculate the base weight of storage context + * @param contexts context array + */ + private double[] calculateContextWeight(JobLogStorageContext[] contexts) throws IOException { + double[] weights = new double[contexts.length]; + if (contexts.length > 0) { + int maxNormalizeWt = StreamJobLogConfig.STORAGE_CONTEXT_MAX_WEIGHT.getValue(); + double storageThreshold = StreamJobLogConfig.STORAGE_THRESHOLD.getValue(); + if (maxNormalizeWt < 1){ + maxNormalizeWt = 1; + } + double maxWeight = Double.MIN_VALUE; + double minWeight = Double.MAX_VALUE; + int i = 0; + for (; i < weights.length; i++) { + JobLogStorageContext context = contexts[0]; + long usableSpace = context.getUsableSpace(); + long totalSpace = context.getTotalSpace(); + double usage = (double)(totalSpace - usableSpace) / (double)totalSpace; + double weight = 0d; + if (usage >= storageThreshold){ + LOG.warn("The usage of storage context:[{}] reach the threshold: {} > {}, set the weight of it to 0", + context.getStorePath(), usage, storageThreshold); + } else { + long freeSpaceInGB = MemUtils.convertToGB(usableSpace, "B"); + if (freeSpaceInGB <= 0) { + freeSpaceInGB = 1; + } + weight = context.getScore() * (double) freeSpaceInGB; + } + weights[i] = weight; + if (weight > maxWeight){ + maxWeight = weight; + } + if (weight < minWeight){ + minWeight = weight; + } + } + double sub = maxWeight - minWeight; + i = i - 1; + for (; i >= 0; i--){ + weights[i] = (sub > 0? (maxNormalizeWt - 1) * (weights[i] - minWeight) * sub : 0) + 1; + } + } + return weights; + } + + /** + * Produce context event + * @param event event + */ + private void onContextEvent(JobLogStorageContextListener.ContextEvent event){ + for(JobLogStorageContextListener listener : contextListeners){ + listener.onContextEvent(event); + } + } + /** + * Bucket name + * @param userName username + * @param appName app name + * @return bucket name + */ + private String toBucketName(String userName, String appName){ + return userName + "." + appName; + } } diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/JobLogBucket.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/JobLogBucket.java index b55ce818..463edab7 100644 --- a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/JobLogBucket.java +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/JobLogBucket.java @@ -1,5 +1,7 @@ package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.JobLogStorageContext; + /** * Job log bucket for streamis */ @@ -17,6 +19,11 @@ public interface JobLogBucket { */ JobLogStorageWriter getBucketStorageWriter(); + /** + * Get storage context + * @return context + */ + JobLogStorageContext getStorageContext(); /** * Bucket name * @return bucket name diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/JobLogBucketConfig.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/JobLogBucketConfig.java index 831cccc6..7264986b 100644 --- a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/JobLogBucketConfig.java +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/JobLogBucketConfig.java @@ -29,11 +29,6 @@ public JobLogBucketConfig(){ */ private Class bucketClass; - /** - * Root path for bucket - */ - private String bucketRootPath = StreamJobLogConfig.BUCKET_ROOT_PATH.getValue(); - /** * Attribute */ @@ -67,14 +62,6 @@ public void setBucketClass(Class bucketClass) { this.bucketClass = bucketClass; } - public String getBucketRootPath() { - return bucketRootPath; - } - - public void setBucketRootPath(String bucketRootPath) { - this.bucketRootPath = bucketRootPath; - } - public Map getAttributes() { return attributes; } diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/JobLogBucketDriftPolicy.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/JobLogBucketDriftPolicy.java new file mode 100644 index 00000000..147f8faf --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/JobLogBucketDriftPolicy.java @@ -0,0 +1,15 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket; + +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.JobLogStorageContext; + +/** + * Drift policy + */ +public interface JobLogBucketDriftPolicy { + /** + * Decide whether you should drift the bucket + * @param bucket bucket + * @return + */ + boolean onPolicy(JobLogBucket bucket, JobLogStorageContext[] contexts); +} diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/Log4j2JobLogBucket.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/Log4j2JobLogBucket.java index 0915e0fa..6d7b6318 100644 --- a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/Log4j2JobLogBucket.java +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/Log4j2JobLogBucket.java @@ -1,6 +1,7 @@ package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket; import com.webank.wedatasphere.streamis.jobmanager.log.entities.LogElement; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.JobLogStorageContext; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.apache.linkis.common.conf.CommonVars; @@ -21,8 +22,12 @@ import java.io.File; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * Job log bucket for log4j @@ -39,6 +44,10 @@ public class Log4j2JobLogBucket implements JobLogBucket{ */ private final String bucketName; + /** + * Logger name + */ + private final String loggerName; /** * Logger context */ @@ -49,6 +58,10 @@ public class Log4j2JobLogBucket implements JobLogBucket{ */ private final Logger logger; + /** + * Storage context + */ + private final JobLogStorageContext storageContext; /** * Storage writer */ @@ -69,20 +82,41 @@ public class Log4j2JobLogBucket implements JobLogBucket{ */ private long preIntervalTime; + /** + * Active thread + */ + private final AtomicLong activeThread = new AtomicLong(0); /** * Interval counter */ private final AtomicLong intervalCounter = new AtomicLong(0); + /** + * Shutdown flag + */ + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + + /** + * Shutdown lock + */ + private final ReentrantLock shutdownLock = new ReentrantLock(); + + /** + * Shutdown condition + */ + private final Condition canShutdown = shutdownLock.newCondition(); /** * Store the write rate */ private double writeRate; - public Log4j2JobLogBucket(String bucketName, JobLogBucketConfig config){ + public Log4j2JobLogBucket(String bucketName, JobLogStorageContext storageContext, JobLogBucketConfig config){ this.bucketName = bucketName; + // Build unique logger name + this.loggerName = bucketName + System.currentTimeMillis() + "_" + Thread.currentThread().getId(); + this.storageContext = storageContext; // Create logger context this.loggerContext = (LoggerContext) LogManager.getContext(false); - this.logger = initLogger(this.bucketName, config, this.loggerContext); + this.logger = initLogger(this.bucketName, this.loggerName, this.storageContext, config, this.loggerContext); this.jobLogStorageWriter = createStorageWriter(); this.jobLogBucketState = createBucketState(); } @@ -96,6 +130,11 @@ public JobLogStorageWriter getBucketStorageWriter() { return this.jobLogStorageWriter; } + @Override + public JobLogStorageContext getStorageContext() { + return this.storageContext; + } + @Override public String getBucketName() { return this.bucketName; @@ -103,20 +142,34 @@ public String getBucketName() { @Override public void close() { + this.isShutdown.set(true); + this.shutdownLock.lock(); + try{ + if (activeThread.get() > 0) { + if (!this.canShutdown.await(5, TimeUnit.SECONDS)) { + LOG.warn("Shutdown the bucket: [{}] directly because the timeout of waiting", bucketName); + } + } + } catch (InterruptedException e) { + // Ignore + } finally { + this.shutdownLock.unlock(); + } Configuration log4jConfig = this.loggerContext.getConfiguration(); // First to stop appender - log4jConfig.getAppender(this.bucketName).stop(); - log4jConfig.getLoggerConfig(this.bucketName).removeAppender(this.bucketName); - log4jConfig.removeLogger(this.bucketName); + log4jConfig.getAppender(this.loggerName).stop(); + log4jConfig.getLoggerConfig(this.loggerName).removeAppender(this.loggerName); + log4jConfig.removeLogger(this.loggerName); loggerContext.updateLoggers(); } - private synchronized Logger initLogger(String bucketName, JobLogBucketConfig config, LoggerContext loggerContext){ + private synchronized Logger initLogger(String bucketName, String loggerName, + JobLogStorageContext storageContext, JobLogBucketConfig config, LoggerContext loggerContext){ Configuration log4jConfig = loggerContext.getConfiguration(); - String fileName = resolveFileName(config.getBucketRootPath(), bucketName); + String fileName = resolveFileName(storageContext.getStorePath().toString(), bucketName); RollingFileAppender appender = RollingFileAppender.newBuilder() .setLayout(PatternLayout.newBuilder().withPattern(config.getLogLayOutPattern()).build()) - .setName(bucketName) + .setName(loggerName) // .withFileOwner() .withFileName(fileName) .withFilePattern(resolveFilePattern(fileName, config.getBucketPartCompress())) @@ -128,14 +181,14 @@ private synchronized Logger initLogger(String bucketName, JobLogBucketConfig con log4jConfig.addAppender(appender); LoggerConfig loggerConfig = LoggerConfig.newBuilder().withAdditivity(false).withLevel(Level.ALL) .withRefs(new AppenderRef[]{ - AppenderRef.createAppenderRef(bucketName, null, null) + AppenderRef.createAppenderRef(loggerName, null, null) }) - .withLoggerName(bucketName).withConfig(log4jConfig).build(); + .withLoggerName(loggerName).withConfig(log4jConfig).build(); loggerConfig.addAppender(appender, null, null); - log4jConfig.addLogger(bucketName, loggerConfig); + log4jConfig.addLogger(loggerName, loggerConfig); // Should we update the logger context ? loggerContext.updateLoggers(); - return loggerContext.getLogger(bucketName); + return loggerContext.getLogger(loggerName); } /** @@ -146,16 +199,34 @@ private JobLogStorageWriter createStorageWriter(){ return new JobLogStorageWriter() { @Override public void write(LogElement logEl) { - String[] contents = logEl.getContents(); - if (null != contents){ - for(String content : contents){ - write(content); + activeThread.incrementAndGet(); + try { + String[] contents = logEl.getContents(); + if (null != contents) { + for (String content : contents) { + write(content, true); + } + } + }finally { + if (activeThread.decrementAndGet() <= 0 && isShutdown.get()){ + notifyShutdown(); } } } @Override public void write(String logLine) { + activeThread.incrementAndGet(); + try { + write(logLine, false); + }finally { + if (activeThread.decrementAndGet() <= 0 && isShutdown.get()){ + notifyShutdown(); + } + } + } + + private void write(String logLine, boolean batch){ logger.info(logLine); long currentTime = System.currentTimeMillis(); long intervalCnt = intervalCounter.getAndIncrement(); @@ -167,9 +238,7 @@ public void write(String logLine) { intervalCounter.set(0); } lastWriteTime = currentTime; - } - @Override public void close() { // Ignore @@ -187,7 +256,7 @@ private JobLogBucketState createBucketState(){ @Override public String getBucketPath() { if (StringUtils.isBlank(bucketPath)) { - Appender appender = loggerContext.getConfiguration().getAppender(bucketName); + Appender appender = loggerContext.getConfiguration().getAppender(loggerName); if (appender instanceof RollingFileAppender) { bucketPath = new File(((RollingFileAppender) appender).getFileName()).getParent(); } @@ -216,6 +285,15 @@ public long getBucketWriteTime() { } }; } + + private void notifyShutdown(){ + this.shutdownLock.lock(); + try{ + this.canShutdown.notifyAll(); + }finally { + this.shutdownLock.unlock(); + } + } /** * Create rollover strategy * @param configuration configuration diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/StorageThresholdDriftPolicy.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/StorageThresholdDriftPolicy.java new file mode 100644 index 00000000..608faa75 --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/StorageThresholdDriftPolicy.java @@ -0,0 +1,23 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket; + +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.JobLogStorageContext; + +public class StorageThresholdDriftPolicy implements JobLogBucketDriftPolicy{ + @Override + public boolean onPolicy(JobLogBucket bucket, JobLogStorageContext[] contexts) { + JobLogStorageContext bucketContext = bucket.getStorageContext(); + // Means that the storage context is not healthy + if (bucketContext.getStoreWeight() <= 0){ + // Find the available context + boolean hasRest = false; + for(JobLogStorageContext context : contexts){ + if (context.getStoreWeight() > 0){ + hasRest = true; + break; + } + } + return hasRest; + } + return false; + } +} diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/ContextDownEvent.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/ContextDownEvent.java new file mode 100644 index 00000000..c1964376 --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/ContextDownEvent.java @@ -0,0 +1,20 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context; + +/** + * Means that the storage context has been downed + */ +public class ContextDownEvent implements JobLogStorageContextListener.ContextEvent { + + /** + * Context id + */ + private final String contextId; + + public ContextDownEvent(String contextId){ + this.contextId = contextId; + } + + public String getContextId() { + return contextId; + } +} diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/ContextLaunchEvent.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/ContextLaunchEvent.java new file mode 100644 index 00000000..59de63d6 --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/ContextLaunchEvent.java @@ -0,0 +1,23 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context; + +import java.util.List; + +/** + * Means that the storage context has been launched + */ +public class ContextLaunchEvent implements JobLogStorageContextListener.ContextEvent { + + /** + * Storage contexts + */ + private final List contexts; + + public ContextLaunchEvent(List contexts){ + this.contexts = contexts; + } + + public List getContextList() { + return contexts; + } + +} diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/ContextRefreshAllEvent.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/ContextRefreshAllEvent.java new file mode 100644 index 00000000..b585e571 --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/ContextRefreshAllEvent.java @@ -0,0 +1,8 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context; + +/** + * Just a sign that to refresh all the storage context + */ +public class ContextRefreshAllEvent implements JobLogStorageContextListener.ContextEvent { + +} diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/JobLogStorageContext.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/JobLogStorageContext.java new file mode 100644 index 00000000..9ffd9522 --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/JobLogStorageContext.java @@ -0,0 +1,144 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context; + +import com.webank.wedatasphere.streamis.jobmanager.log.server.exception.StreamJobLogException; + +import java.io.IOException; +import java.nio.file.*; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.UUID; + +/** + * Storage context (represent the driver/disk) + */ +public class JobLogStorageContext{ + + /** + * Context id + */ + private final String id; + + /** + * Store path + */ + private final Path storePath; + + /** + * Store information + */ + private final FileStore storeInfo; + /** + * Score of storage context + */ + private final double score; + + /** + * Storage weight + */ + private double storeWeight; + + public JobLogStorageContext(String path, double score){ + this.id = UUID.randomUUID().toString(); + this.storePath = Paths.get(path); + this.storeInfo = initStorePath(this.storePath); + this.score = score; + } + + + private FileStore initStorePath(Path path){ + if (Files.notExists(path)){ + try { + Files.createDirectories(this.storePath, + PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwxr--"))); + } catch (IOException e) { + throw new StreamJobLogException.Runtime(-1, + "Cannot make the storage path directory: [" + path + "], message: " + e.getMessage()); + } + // Allow dir link + } else if (!Files.isDirectory(path)){ + throw new StreamJobLogException.Runtime(-1, + "the storage path: [" + path + "] is not directory" ); + } + try { + return Files.getFileStore(path); + } catch (IOException e) { + throw new StreamJobLogException.Runtime(-1, + "Fail to get the storage information in path: [" + path + "], message: " + e.getMessage()); + } + } + + public Path getStorePath() { + return storePath; + } + + /** + * Score + * @return score value + */ + public double getScore() { + return score; + } + + public String getId() { + return id; + } + + /** + * Total space + * @return bytes return + * @throws IOException + */ + public long getTotalSpace() throws IOException { + long result = storeInfo.getTotalSpace(); + if (result < 0){ + result = Long.MAX_VALUE; + } + return result; + } + + /** + * Usable space + * @return bytes return + * @throws IOException + */ + public long getUsableSpace() throws IOException { + long result = storeInfo.getUsableSpace(); + if (result < 0){ + result = Long.MAX_VALUE; + } + return result; + } + + /** + * Unallocated space + * @return bytes return + * @throws IOException + */ + public long getUnallocatedSpace() throws IOException{ + long result = storeInfo.getUnallocatedSpace(); + if (result < 0){ + result = Long.MAX_VALUE; + } + return result; + } + + public double getStoreWeight() { + return storeWeight; + } + + public void setStoreWeight(double storeWeight) { + this.storeWeight = storeWeight; + } + + @Override + public boolean equals(Object o) { + if (o instanceof JobLogStorageContext){ + return this.id.equals(((JobLogStorageContext) o).id); + } + return super.equals(o); + } + + @Override + public int hashCode() { + return this.id.hashCode(); + } +} diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/JobLogStorageContextListener.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/JobLogStorageContextListener.java new file mode 100644 index 00000000..77432a2b --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/context/JobLogStorageContextListener.java @@ -0,0 +1,17 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context; + +/** + * Context listener + */ +public interface JobLogStorageContextListener { + + /** + * Listen the context event + * @param event event + */ + void onContextEvent(ContextEvent event); + + interface ContextEvent{ + + } +} diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/loadbalancer/JobLogStorageLoadBalancer.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/loadbalancer/JobLogStorageLoadBalancer.java new file mode 100644 index 00000000..06d6186d --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/loadbalancer/JobLogStorageLoadBalancer.java @@ -0,0 +1,27 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.loadbalancer; + +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucketConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.JobLogStorageContext; + +public interface JobLogStorageLoadBalancer { + /** + * Init method + */ + void init(); + + /** + * The order + * @return priority value + */ + default int priority(){ + return -1; + } + + /** + * Choose storage context + * @param bucketName bucket name + * @param config bucket config + * @return + */ + JobLogStorageContext chooseContext(String bucketName, JobLogBucketConfig config); +} diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/loadbalancer/RoundRobinLoadBalancer.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/loadbalancer/RoundRobinLoadBalancer.java new file mode 100644 index 00000000..8d77d5fd --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/loadbalancer/RoundRobinLoadBalancer.java @@ -0,0 +1,199 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.loadbalancer; + +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucketConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * Round-robin load balancer + */ +public class RoundRobinLoadBalancer implements JobLogStorageLoadBalancer, JobLogStorageContextListener { + + private static final Logger LOG = LoggerFactory.getLogger(RoundRobinLoadBalancer.class); + + /** + * Candidate array + */ + private StorageContextInfo[] candidates = new StorageContextInfo[0]; + + /** + * Lock for candidate array + */ + private final ReentrantLock candidateLock = new ReentrantLock(); + @Override + public void onContextEvent(ContextEvent event) { + if (event instanceof ContextLaunchEvent){ + onLaunchContexts(((ContextLaunchEvent) event).getContextList()); + } else if (event instanceof ContextDownEvent){ + onDownContext(((ContextDownEvent) event).getContextId()); + } else if (event instanceof ContextRefreshAllEvent){ + onRefreshAllContext(); + } + } + + @Override + public int priority() { + return Integer.MAX_VALUE; + } + + @Override + public void init() { + + } + + @Override + public JobLogStorageContext chooseContext(String bucketName, JobLogBucketConfig config) { + updateCandidateContextWeight(); + candidateLock.lock(); + try { + int index = selectContext(candidates); + if (index >= 0){ + StorageContextInfo info = this.candidates[index]; + info.cwt = info.cwt -1; + LOG.info("Round-Robin chosen context: {} for bucket: {}", info.context.getStorePath(), bucketName); + return info.context; + } + }finally { + candidateLock.unlock(); + } + return null; + } + + private static class StorageContextInfo{ + + /** + * Storage context + */ + final JobLogStorageContext context; + + /** + * If the context is working + */ + boolean online = true; + + /** + * Weight value + */ + int wt; + + /** + * Dynamic weight + */ + int cwt; + + public StorageContextInfo(JobLogStorageContext context){ + this.context = context; + this.wt = (int)Math.floor(context.getStoreWeight()); + this.cwt = wt; + } + + public void refreshWeight(){ + this.wt = (int)Math.floor(context.getStoreWeight()); + if (this.cwt > this.wt){ + this.cwt = this.wt; + } + } + } + + /** + * Select context + * @param infoArray info array + * @return index + */ + private int selectContext(StorageContextInfo[] infoArray){ + int u = 0; + int reset = -1; + while (true){ + for (int i = 0; i < infoArray.length; i ++){ + if (!infoArray[i].online || infoArray[i].cwt <= 0){ + continue; + } + u = i; + while (i < infoArray.length - 1){ + i ++; + if (!infoArray[i].online || infoArray[i].cwt <= 0){ + continue; + } + if ((infoArray[u].wt * 1000 / infoArray[i].wt < + infoArray[u].cwt * 1000 / infoArray[i].cwt)){ + return u; + } + u = i; + } + return u; + } + if (++reset > 0){ + return -1; + } + for (StorageContextInfo info : infoArray){ + info.cwt = info.wt; + } + } + + } + /** + * Enlarge the candidate array of context info + * @param contexts context list + */ + private void onLaunchContexts(List contexts){ + if (contexts.size() > 0){ + candidateLock.lock(); + try{ + StorageContextInfo[] source = candidates; + int newSize = source.length + contexts.size(); + StorageContextInfo[] dest = new StorageContextInfo[newSize]; + System.arraycopy(source, 0, dest, 0, source.length); + int offset = source.length; + for(JobLogStorageContext context : contexts){ + dest[offset++] = new StorageContextInfo(context); + } + this.candidates = dest; + }finally { + candidateLock.unlock(); + } + } + } + + /** + * Mark the context has been downed + * @param contextId context id + */ + private void onDownContext(String contextId){ + // Need to lock the array ? + candidateLock.lock(); + try{ + for (StorageContextInfo info : candidates) { + if (contextId.equals(info.context.getId())) { + info.online = false; + return; + } + } + } finally { + candidateLock.unlock(); + } + } + + /** + * Refresh all the context + */ + private void onRefreshAllContext(){ + candidateLock.lock(); + try{ + // Update the dynamic weight + for (StorageContextInfo info : candidates) { + info.refreshWeight(); + } + } finally { + candidateLock.unlock(); + } + } + private void updateCandidateContextWeight(){ + // Empty method + } + +} diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/loadbalancer/SimpleLoadBalancer.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/loadbalancer/SimpleLoadBalancer.java new file mode 100644 index 00000000..e11e9fa9 --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/loadbalancer/SimpleLoadBalancer.java @@ -0,0 +1,51 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.loadbalancer; + +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucketConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.ContextDownEvent; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.ContextLaunchEvent; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.JobLogStorageContext; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.JobLogStorageContextListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.List; + +/** + * Simple load balancer + */ +public class SimpleLoadBalancer implements JobLogStorageLoadBalancer, JobLogStorageContextListener { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleLoadBalancer.class); + + private final List contexts = new ArrayList<>(); + + private final SecureRandom random = new SecureRandom(); + @Override + public void onContextEvent(ContextEvent event) { + if (event instanceof ContextLaunchEvent){ + contexts.addAll(((ContextLaunchEvent) event).getContextList()); + } else if (event instanceof ContextDownEvent){ + contexts.removeIf(context -> context.getId().equals(((ContextDownEvent) event).getContextId())); + } + } + + @Override + public void init() { + + } + + @Override + public JobLogStorageContext chooseContext(String bucketName, JobLogBucketConfig config) { + JobLogStorageContext context = randomSelectContext(this.contexts); + if (null != context){ + LOG.info("Random chosen context: {} for bucket: {}", context.getStorePath(), bucketName); + } + return context; + } + + private JobLogStorageContext randomSelectContext(List candidates){ + return candidates.get(random.nextInt(candidates.size())); + } +} diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/utils/MemUtils.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/utils/MemUtils.java new file mode 100644 index 00000000..8ef2f3ec --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/utils/MemUtils.java @@ -0,0 +1,234 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server.storage.utils; + +import org.apache.commons.lang.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * Mem utils + */ +public class MemUtils { + private static final Map UNIT_MAP = new HashMap<>(); + static{ + UNIT_MAP.put("G", StoreUnit.GB); + UNIT_MAP.put("GB", StoreUnit.GB); + UNIT_MAP.put("B", StoreUnit.B); + UNIT_MAP.put("M", StoreUnit.MB); + UNIT_MAP.put("MB", StoreUnit.MB); + UNIT_MAP.put("K", StoreUnit.KB); + UNIT_MAP.put("KB", StoreUnit.KB); + } + public static long convertToGB(long size, String unitFlag){ + if(size < 0){ + return -1L; + } + if(StringUtils.isNotBlank(unitFlag)){ + StoreUnit storeUnit = UNIT_MAP.get(unitFlag.trim().toUpperCase()); + if(null != storeUnit){ + return storeUnit.toGB(size); + } + } + return -1L; + } + + public static long convertToMB(long size, String unitFlag){ + if(size < 0){ + return -1L; + } + if(StringUtils.isNotBlank(unitFlag)){ + StoreUnit storeUnit = UNIT_MAP.get(unitFlag.trim().toUpperCase()); + if(null != storeUnit){ + return storeUnit.toMB(size); + } + } + return -1L; + } + + public static long convertToByte(long size, String unitFlag){ + if(size < 0){ + return -1L; + } + if(StringUtils.isNotBlank(unitFlag)){ + StoreUnit storeUnit = UNIT_MAP.get(unitFlag.trim().toUpperCase()); + if(null != storeUnit){ + return storeUnit.toB(size); + } + } + return -1L; + } + public enum StoreUnit { + /** + * byte + */ + B { + @Override + public long toB(long s){ + return s; + } + + @Override + public long toKB(long s){ + return s/(C1/C0); + } + + @Override + public long toMB(long s) { + return s/(C2/C0); + } + + @Override + public long toGB(long s) { + return s/(C3/C0); + } + + @Override + public long toTB(long s) { + return s/(C4/C0); + } + }, + /** + * kb + */ + KB{ + @Override + public long toB(long s){ + return x(s, C1/C0, Long.MAX_VALUE/(C1/C0)); + } + + @Override + public long toKB(long s){ + return s; + } + + @Override + public long toMB(long s) { + return s/(C2/C1); + } + + @Override + public long toGB(long s) { + return s/(C3/C1); + } + + @Override + public long toTB(long s) { + return s/(C4/C0); + } + }, + MB{ + @Override + public long toB(long s){ + return x(s, C2/C0, Long.MAX_VALUE/(C2/C0)); + } + + @Override + public long toKB(long s){ + return x(s, C2/C1, Long.MAX_VALUE/(C2/C1)); + } + + @Override + public long toMB(long s) { + return s; + } + + @Override + public long toGB(long s) { + return s/(C3/C2); + } + + @Override + public long toTB(long s) { + return s/(C4/C2); + } + }, + GB{ + @Override + public long toB(long s){ + return x(s, C3/C0, Long.MAX_VALUE/(C3/C0)); + } + + @Override + public long toKB(long s){ + return x(s, C3/C1, Long.MAX_VALUE/(C3/C1)); + } + + @Override + public long toMB(long s) { + return x(s, C3/C2, Long.MAX_VALUE/(C3/C2)); + } + + @Override + public long toGB(long s) { + return s; + } + + @Override + public long toTB(long s) { + return s/(C4/C3); + } + }, + TB{ + @Override + public long toB(long s){ + return x(s, C4/C0, Long.MAX_VALUE/(C4/C0)); + } + + @Override + public long toKB(long s){ + return x(s, C4/C1, Long.MAX_VALUE/(C4/C1)); + } + + @Override + public long toMB(long s) { + return x(s, C4/C2, Long.MAX_VALUE/(C4/C2)); + } + + @Override + public long toGB(long s) { + return x(s, C4/C3, Long.MAX_VALUE/(C4/C3)); + } + + @Override + public long toTB(long s) { + return s; + } + }; + + public long toB(long s){ + throw new AbstractMethodError(); + } + + public long toKB(long s){ + throw new AbstractMethodError(); + } + + public long toMB(long s){ + throw new AbstractMethodError(); + } + + public long toGB(long s){ + throw new AbstractMethodError(); + } + + public long toTB(long s){ + throw new AbstractMethodError(); + } + } + + static long x(long d, long m, long over){ + if(d > over){ + return Long.MAX_VALUE; + } + if(d < -over){ + return Long.MIN_VALUE; + } + return d * m; + } + static final long C0 = 1L; + static final long C1 = C0 * 1024L; + static final long C2 = C1 * 1024L; + static final long C3 = C2 * 1024L; + static final long C4 = C3 * 1024L; + +} \ No newline at end of file diff --git a/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/test/java/com/webank/wedatasphere/streamis/jobmanager/log/server/JobLogStorageTest.java b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/test/java/com/webank/wedatasphere/streamis/jobmanager/log/server/JobLogStorageTest.java new file mode 100644 index 00000000..a74d3895 --- /dev/null +++ b/streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/test/java/com/webank/wedatasphere/streamis/jobmanager/log/server/JobLogStorageTest.java @@ -0,0 +1,103 @@ +package com.webank.wedatasphere.streamis.jobmanager.log.server; + +import com.webank.wedatasphere.streamis.jobmanager.log.server.config.StreamJobLogConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.JobLogStorage; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.StreamisJobLogStorage; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucket; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.JobLogBucketConfig; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.bucket.StorageThresholdDriftPolicy; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.context.JobLogStorageContext; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.loadbalancer.RoundRobinLoadBalancer; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.loadbalancer.SimpleLoadBalancer; +import com.webank.wedatasphere.streamis.jobmanager.log.server.storage.utils.MemUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.linkis.common.conf.BDPConfiguration; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.Objects; + +public class JobLogStorageTest { + + private static final Logger LOG = LoggerFactory.getLogger(JobLogStorageTest.class); + @Test + public void storageContext() throws IOException { + URL url = JobLogStorageTest.class.getResource("/"); + if (null != url){ + JobLogStorageContext context = new JobLogStorageContext(url.getPath(), 1.0d); + System.out.println("disk total(bytes): " + context.getTotalSpace()); + System.out.println("disk total(gb): " + MemUtils.convertToGB(context.getTotalSpace(), "B")); + System.out.println("disk usable(bytes): " + context.getUsableSpace()); + System.out.println("disk usable(gb): " + MemUtils.convertToGB(context.getUsableSpace(), "B")); + } + } + @Test + public void calculateWeight() throws IOException { + JobLogStorageContext candidate1 = new JobLogStorageContext(Objects.requireNonNull(JobLogStorage.class.getResource("/")) + .getPath(), 1.0d); + JobLogStorageContext candidate2 = new JobLogStorageContext(Objects.requireNonNull(JobLogStorage.class.getResource("/")) + .getPath(), 1.0d); + JobLogStorageContext[] contexts = new JobLogStorageContext[]{candidate1, candidate2}; + double[] weights = new double[contexts.length]; + int maxNormalizeWt = StreamJobLogConfig.STORAGE_CONTEXT_MAX_WEIGHT.getValue(); + double storageThreshold = StreamJobLogConfig.STORAGE_THRESHOLD.getValue(); + if (maxNormalizeWt < 1){ + maxNormalizeWt = 1; + } + double maxWeight = Double.MIN_VALUE; + double minWeight = Double.MAX_VALUE; + int i = 0; + for (; i < weights.length; i++) { + JobLogStorageContext context = contexts[0]; + long usableSpace = context.getUsableSpace(); + long totalSpace = context.getTotalSpace(); + double usage = (double)(totalSpace - usableSpace) / (double)totalSpace; + double weight = 0d; + if (usage >= storageThreshold){ + LOG.info("The usage of storage context:[{}] reach the threshold: {} > {}, set the weight of it to 0", + context.getStorePath(), usage, storageThreshold); + } else { + long freeSpaceInGB = MemUtils.convertToGB(usableSpace, "B"); + if (freeSpaceInGB <= 0) { + freeSpaceInGB = 1; + } + weight = context.getScore() * (double) freeSpaceInGB; + } + weights[i] = weight; + if (weight > maxWeight){ + maxWeight = weight; + } + if (weight < minWeight){ + minWeight = weight; + } + } + double sub = maxWeight - minWeight; + i = i - 1; + for (; i >= 0; i--){ + weights[i] = (sub > 0? (maxNormalizeWt - 1) * (weights[i] - minWeight) * sub : 0) + 1; + } + System.out.println(StringUtils.join(weights, '|')); + } + + @Test + public void startLogStorage() throws Exception { + BDPConfiguration.set("wds.stream.job.log.storage.context.paths", Objects.requireNonNull(JobLogStorage.class.getResource("/")) + .getPath()); + JobLogStorage storage = createJobLogStorage(); + storage.init(); + JobLogBucket bucket = storage.getOrCreateBucket("hadoop", "test-app", new JobLogBucketConfig()); + bucket.getBucketStorageWriter().write("Hello world"); + Thread.sleep(1000); + storage.destroy(); + } + private JobLogStorage createJobLogStorage(){ + StreamisJobLogStorage jobLogStorage = new StreamisJobLogStorage(); + jobLogStorage.addLoadBalancer(new RoundRobinLoadBalancer()); + jobLogStorage.addLoadBalancer(new SimpleLoadBalancer()); + jobLogStorage.setBucketDriftPolicy(new StorageThresholdDriftPolicy()); + return jobLogStorage; + } +}