From c693513e531d9c68d626fa4ee50f7b8d1de2f994 Mon Sep 17 00:00:00 2001 From: Santhosh R Date: Tue, 10 May 2016 14:25:05 +0530 Subject: [PATCH] Initial Implementation of reactive blacklisting by which queries that end up scanning large number of rowscan be blocked for some time. --- Makefile.am | 5 +- src/core/TsdbQuery.java | 11 ++- src/tools/TSDMain.java | 12 +++ src/tsd/BlacklistRpc.java | 26 ++++++ src/tsd/QueryRpc.java | 9 ++ src/tsd/RpcManager.java | 3 +- src/utils/BlacklistManager.java | 83 +++++++++++++++++ src/utils/Config.java | 3 + test/tsd/TestReactiveBlacklisting.java | 124 +++++++++++++++++++++++++ 9 files changed, 273 insertions(+), 3 deletions(-) create mode 100644 src/tsd/BlacklistRpc.java create mode 100644 src/utils/BlacklistManager.java create mode 100644 test/tsd/TestReactiveBlacklisting.java diff --git a/Makefile.am b/Makefile.am index d2ad17539f..6db5182982 100644 --- a/Makefile.am +++ b/Makefile.am @@ -146,6 +146,7 @@ tsdb_SRC := \ src/tsd/AbstractHttpQuery.java \ src/tsd/AnnotationRpc.java \ src/tsd/BadRequestException.java \ + src/tsd/BlacklistRpc.java \ src/tsd/ConnectionManager.java \ src/tsd/GnuplotException.java \ src/tsd/GraphHandler.java \ @@ -181,6 +182,7 @@ tsdb_SRC := \ src/uid/UniqueId.java \ src/uid/UniqueIdFilterPlugin.java \ src/uid/UniqueIdInterface.java \ + src/utils/BlacklistManager.java \ src/utils/ByteArrayPair.java \ src/utils/ByteSet.java \ src/utils/Config.java \ @@ -337,6 +339,7 @@ test_SRC := \ test/tsd/TestQueryExecutor.java \ test/tsd/TestQueryRpc.java \ test/tsd/TestQueryRpcLastDataPoint.java \ + test/tsd/TestReactiveBlacklisting.java \ test/tsd/TestRpcHandler.java \ test/tsd/TestRpcPlugin.java \ test/tsd/TestRpcManager.java \ @@ -357,7 +360,7 @@ test_SRC := \ test/utils/TestJSON.java \ test/utils/TestPair.java \ test/utils/TestPluginLoader.java - + test_plugin_SRC := \ test/plugin/DummyPluginA.java \ test/plugin/DummyPluginB.java \ diff --git a/src/core/TsdbQuery.java b/src/core/TsdbQuery.java index 07a741457a..b46f9525a7 100644 --- a/src/core/TsdbQuery.java +++ b/src/core/TsdbQuery.java @@ -47,6 +47,7 @@ import net.opentsdb.uid.NoSuchUniqueName; import net.opentsdb.uid.UniqueId; import net.opentsdb.utils.DateTime; +import net.opentsdb.utils.BlacklistManager; /** * Non-synchronized implementation of {@link Query}. @@ -136,7 +137,11 @@ final class TsdbQuery implements Query { /** Whether or not to match series with ONLY the given tags */ private boolean explicit_tags; - + + private String metricName; + + private Map allTags; + /** Constructor. */ public TsdbQuery(final TSDB tsdb) { this.tsdb = tsdb; @@ -260,6 +265,8 @@ public void setTimeSeries(final String metric, findGroupBys(); this.metric = tsdb.metrics.getId(metric); + this.metricName = metric; + this.allTags = tags; aggregator = function; this.rate = rate; this.rate_options = rate_options; @@ -638,6 +645,8 @@ public Object call(final ArrayList> rows) DateTime.nanoTime(), scanner_start) > timeout) { throw new InterruptedException("Query timeout exceeded!"); } + + BlacklistManager.checkAndAddToBlacklist(metricName, allTags, nrows); rows_pre_filter += rows.size(); diff --git a/src/tools/TSDMain.java b/src/tools/TSDMain.java index f26a895419..b7c0f15e3c 100644 --- a/src/tools/TSDMain.java +++ b/src/tools/TSDMain.java @@ -36,6 +36,7 @@ import net.opentsdb.core.Const; import net.opentsdb.tsd.PipelineFactory; import net.opentsdb.tsd.RpcManager; +import net.opentsdb.utils.BlacklistManager; import net.opentsdb.utils.Config; import net.opentsdb.utils.FileSystem; import net.opentsdb.utils.Pair; @@ -214,6 +215,9 @@ public static void main(String[] args) throws IOException { bindAddress = InetAddress.getByName(config.getString("tsd.network.bind")); } + initReactiveBlacklisting(config); + + // we validated the network port config earlier final InetSocketAddress addr = new InetSocketAddress(bindAddress, config.getInt("tsd.network.port")); @@ -235,6 +239,14 @@ public static void main(String[] args) throws IOException { // The server is now running in separate threads, we can exit main. } + private static void initReactiveBlacklisting(Config config) { + // Initialise reactive blacklisting + boolean isReactiveBlacklistingEnabled = config.getBoolean("tsd.blacklist.reactive.enabled"); + int rowCountThreshold = config.getInt("tsd.blacklist.reactive.row_count"); + int blockTimeInSeconds = config.getInt("tsd.blacklist.reactive.block_time_seconds"); + BlacklistManager.initBlockListConfiguration(isReactiveBlacklistingEnabled, rowCountThreshold, blockTimeInSeconds); + } + private static StartupPlugin loadStartupPlugins(Config config) { Logger log = LoggerFactory.getLogger(TSDMain.class); diff --git a/src/tsd/BlacklistRpc.java b/src/tsd/BlacklistRpc.java new file mode 100644 index 0000000000..9a195967a6 --- /dev/null +++ b/src/tsd/BlacklistRpc.java @@ -0,0 +1,26 @@ +package net.opentsdb.tsd; + +import com.fasterxml.jackson.databind.ObjectMapper; +import net.opentsdb.core.TSDB; +import net.opentsdb.utils.BlacklistManager; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.io.IOException; +import java.util.List; + +/** + * Created by santhosh.r on 26/04/16. + */ +public class BlacklistRpc implements HttpRpc { + @Override + public void execute(TSDB tsdb, HttpQuery query) throws IOException { + if (query.method() != HttpMethod.GET) { + throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED, + "Method not allowed", "The HTTP method [" + query.method().getName() + + "] is not permitted for this endpoint"); + } + List blacklistedMetrics = BlacklistManager.getAllBlacklistedMetrics(); + query.sendReply(new ObjectMapper().writeValueAsString(blacklistedMetrics)); + } +} diff --git a/src/tsd/QueryRpc.java b/src/tsd/QueryRpc.java index 2f6d92be24..1dbc7117b4 100644 --- a/src/tsd/QueryRpc.java +++ b/src/tsd/QueryRpc.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import net.opentsdb.utils.BlacklistManager; import org.hbase.async.HBaseException; import org.hbase.async.RpcTimedOutException; import org.hbase.async.Bytes.ByteMap; @@ -155,6 +156,14 @@ private void handleQuery(final TSDB tsdb, final HttpQuery query, throw new BadRequestException(HttpResponseStatus.BAD_REQUEST, e.getMessage(), data_query.toString(), e); } + + List subQueries = data_query.getQueries(); + for(TSSubQuery subQuery : subQueries) { + if(BlacklistManager.isBlacklisted(subQuery.getMetric(), subQuery.getTags())) { + throw new BadRequestException(HttpResponseStatus.BAD_REQUEST, + "Metric Blacklisted : " + subQuery.getMetric(), data_query.toString()); + } + } // if the user tried this query multiple times from the same IP and src port // they'll be rejected on subsequent calls diff --git a/src/tsd/RpcManager.java b/src/tsd/RpcManager.java index 1814326c34..1c167ebe08 100644 --- a/src/tsd/RpcManager.java +++ b/src/tsd/RpcManager.java @@ -294,7 +294,8 @@ private void initializeBuiltinRpcs(final String mode, } http.put("api/search", new SearchRpc()); http.put("api/config", new ShowConfig()); - + http.put("api/blacklist", new BlacklistRpc()); + if (tsdb.getConfig().getString("tsd.no_diediedie").equals("false")) { final DieDieDie diediedie = new DieDieDie(); telnet.put("diediedie", diediedie); diff --git a/src/utils/BlacklistManager.java b/src/utils/BlacklistManager.java new file mode 100644 index 0000000000..0ade8d0659 --- /dev/null +++ b/src/utils/BlacklistManager.java @@ -0,0 +1,83 @@ +package net.opentsdb.utils; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * Created by santhosh.r on 25/04/16. + * + * Based on the number of rows scanned by a given query blocks any further calls to that same metric/tag combination + * for a specified amount of time. Currently blacklisting is supported only for query based on metric and tag name + * and not on direct querying using TSUID, which is not a common use case for cosmos. + */ +public class BlacklistManager { + private static final Logger logger = LoggerFactory.getLogger(BlacklistManager.class); + + private static boolean isReactiveBlacklistingEnabled = false; + + private static int rowCountThresholdForBlackList = 100000; + + private static int blockingTimeInSeconds = 600; + + private static Cache metricNameCache = CacheBuilder.newBuilder().expireAfterWrite(blockingTimeInSeconds, TimeUnit.SECONDS).build(); + + public static void initBlockListConfiguration(boolean isReactiveBlacklistingEnabled, int rowCountThreshold, int blockingTimeInSeconds) { + BlacklistManager.isReactiveBlacklistingEnabled = isReactiveBlacklistingEnabled; + if (isReactiveBlacklistingEnabled) { + BlacklistManager.rowCountThresholdForBlackList = rowCountThreshold; + BlacklistManager.blockingTimeInSeconds = blockingTimeInSeconds; + metricNameCache = CacheBuilder.newBuilder().expireAfterWrite(blockingTimeInSeconds, TimeUnit.SECONDS).build(); + } + } + + public static void checkAndAddToBlacklist(String metricName, Map tags, int nRowsScanned) { + if (isReactiveBlacklistingEnabled && (metricName != null) && (nRowsScanned > rowCountThresholdForBlackList)) { + String metricNameKey = getMetricNameKey(metricName, tags); + metricNameCache.put(metricNameKey, true); + logger.info("Metric " + metricNameKey + " is blacklisted for " + blockingTimeInSeconds + " seconds"); + } + } + + public static List getAllBlacklistedMetrics() { + List metricsNameList = new ArrayList(); + if (isReactiveBlacklistingEnabled) { + metricsNameList.addAll(metricNameCache.asMap().keySet()); + } + return metricsNameList; + } + + public static boolean isBlacklisted(String metricName, Map tags) { + if (isReactiveBlacklistingEnabled && (metricName != null)) { + String metricNameKey = getMetricNameKey(metricName, tags); + if (metricNameCache.getIfPresent(metricNameKey) != null) { + return true; + } + } + return false; + } + + private static String getMetricNameKey(String metricName, Map tags) { + SortedMap sortedTags = new TreeMap(); + sortedTags.putAll(tags); + StringBuilder blacklistingKey = new StringBuilder(); + blacklistingKey.append(metricName); + blacklistingKey.append("["); + int index = 0; + for (Map.Entry pair : tags.entrySet()) { + if(index != 0) { + blacklistingKey.append(","); + } + blacklistingKey.append(pair.getKey()); + blacklistingKey.append("="); + blacklistingKey.append(pair.getValue()); + index++; + } + blacklistingKey.append("]"); + return blacklistingKey.toString(); + } +} diff --git a/src/utils/Config.java b/src/utils/Config.java index 4927d43fbb..3a69f5a3af 100644 --- a/src/utils/Config.java +++ b/src/utils/Config.java @@ -540,6 +540,9 @@ protected void setDefaults() { + "Content-Type, Accept, Origin, User-Agent, DNT, Cache-Control, " + "X-Mx-ReqToken, Keep-Alive, X-Requested-With, If-Modified-Since"); default_map.put("tsd.query.timeout", "0"); + default_map.put("tsd.blacklist.reactive.enabled", "false"); + default_map.put("tsd.blacklist.reactive.row_count", "100000"); + default_map.put("tsd.blacklist.reactive.block_time_seconds", "600"); for (Map.Entry entry : default_map.entrySet()) { if (!properties.containsKey(entry.getKey())) diff --git a/test/tsd/TestReactiveBlacklisting.java b/test/tsd/TestReactiveBlacklisting.java new file mode 100644 index 0000000000..78d4a76d2a --- /dev/null +++ b/test/tsd/TestReactiveBlacklisting.java @@ -0,0 +1,124 @@ +package net.opentsdb.tsd; + +import com.stumbleupon.async.Deferred; +import net.opentsdb.core.BaseTsdbTest; +import net.opentsdb.core.Query; +import net.opentsdb.core.TSDB; +import net.opentsdb.meta.TestTSUIDQuery; +import net.opentsdb.storage.MockBase; +import net.opentsdb.uid.UniqueId; +import net.opentsdb.utils.BlacklistManager; +import net.opentsdb.utils.Config; +import net.opentsdb.utils.DateTime; +import org.hbase.async.HBaseClient; +import org.hbase.async.KeyValue; +import org.hbase.async.Scanner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ TSDB.class, HBaseClient.class, Config.class, HttpQuery.class, + Query.class, Deferred.class, UniqueId.class, DateTime.class, KeyValue.class, + Scanner.class }) +public class TestReactiveBlacklisting extends BaseTsdbTest { + private QueryRpc rpc; + + private Map host1tags; + private Map host2tags; + + private final int rowCountThreshold = 0; + private final int blockTimeInSeconds = 10; + + private final String allHostQuery = "/api/query?start=1h-ago&m=sum:sys.cpu.user"; + private final String host1Query1 = "/api/query?start=1h-ago&m=sum:sys.cpu.user{host=web01,datacenter=dc01}"; + private final String host1Query2 = "/api/query?start=1h-ago&m=sum:sys.cpu.user{datacenter=dc01,host=web01}"; + private final String host2Query1 = "/api/query?start=1h-ago&m=sum:sys.cpu.user{host=web02,datacenter=dc01}"; + + @Before + public void beforeLocal() throws Exception { + Whitebox.setInternalState(config, "enable_tsuid_incrementing", true); + Whitebox.setInternalState(config, "enable_realtime_ts", true); + rpc = new QueryRpc(); + storage = new MockBase(tsdb, client, true, true, true, true); + TestTSUIDQuery.setupStorage(tsdb, storage); + host1tags = new HashMap(); + host1tags.put("host", "web01"); + host1tags.put("owner", "web02"); + host2tags = new HashMap(); + host2tags.put("host", "web02"); + host2tags.put("owner", "web01"); + + } + + private void executeQueryWith200Response(String queryString) throws IOException { + final HttpQuery query = NettyMocks.getQuery(tsdb, queryString); + rpc.execute(tsdb, query); + assert (query.response().getStatus().getCode() == 200); + } + + private void executeQueryWith400Blacklist(String queryString) throws IOException { + final HttpQuery query = NettyMocks.getQuery(tsdb, queryString); + boolean exception = false; + try { + rpc.execute(tsdb, query); + } catch (BadRequestException be) { + exception = true; + assert (be.getMessage().startsWith("Metric Blacklisted")); + } + assert (exception == true); + } + + private void toggleReactiveBlacklisting(boolean enable) { + BlacklistManager.initBlockListConfiguration(enable, rowCountThreshold, blockTimeInSeconds); + } + + @Test + public void testWithNoBlacklisting() throws Exception { + toggleReactiveBlacklisting(false); + PowerMockito.mockStatic(DateTime.class); + PowerMockito.when(DateTime.currentTimeMillis()).thenReturn(1461924148000L); + tsdb.addPoint("sys.cpu.user", 1461924131, 1, host1tags); + tsdb.addPoint("sys.cpu.user", 1461924141, 1, host1tags); + tsdb.addPoint("sys.cpu.user", 1461924131, 1, host2tags); + tsdb.addPoint("sys.cpu.user", 1461924141, 1, host2tags); + + executeQueryWith200Response(allHostQuery); + executeQueryWith200Response(allHostQuery); + executeQueryWith200Response(host1Query1); + executeQueryWith200Response(host1Query2); + executeQueryWith200Response(host2Query1); + Thread.sleep(blockTimeInSeconds * 1000); + executeQueryWith200Response(allHostQuery); + executeQueryWith200Response(host1Query2); + } + + @Test + public void testWithBlacklisting() throws Exception { + toggleReactiveBlacklisting(true); + PowerMockito.mockStatic(DateTime.class); + PowerMockito.when(DateTime.currentTimeMillis()).thenReturn(1461924148000L); + tsdb.addPoint("sys.cpu.user", 1461924131, 1, host1tags); + tsdb.addPoint("sys.cpu.user", 1461924141, 1, host1tags); + tsdb.addPoint("sys.cpu.user", 1461924131, 1, host2tags); + tsdb.addPoint("sys.cpu.user", 1461924141, 1, host2tags); + + executeQueryWith200Response(allHostQuery); + executeQueryWith400Blacklist(allHostQuery); + executeQueryWith200Response(host1Query1); + executeQueryWith400Blacklist(host1Query2); + executeQueryWith200Response(host2Query1); + Thread.sleep(blockTimeInSeconds * 1000); + executeQueryWith200Response(allHostQuery); + executeQueryWith200Response(host1Query2); + } + +}