From 2cb85bc2f90c4a88c204b90cd800c51b2c657826 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Thu, 9 Sep 2021 11:38:16 +0200 Subject: [PATCH 1/7] Make ObserverHolder thread safe by having a thread local observer member --- .../query/internal/QueryTraceJUnitTest.java | 69 ++++++++++++++++--- .../query/internal/QueryObserverHolder.java | 12 ++-- .../cli/functions/DataCommandFunction.java | 6 +- 3 files changed, 71 insertions(+), 16 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java index c02b96a6af65..f7f040fbe58c 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java @@ -15,6 +15,7 @@ package org.apache.geode.cache.query.internal; import static org.apache.geode.cache.Region.SEPARATOR; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -60,10 +61,12 @@ public class QueryTraceJUnitTest { @Before public void setUp() throws Exception { CacheUtils.startCache(); + DefaultQuery.testHook = new BeforeQueryExecutionHook(); } @After public void tearDown() throws Exception { + DefaultQuery.testHook = null; CacheUtils.closeCache(); } @@ -104,7 +107,11 @@ public void testTraceOnPartitionedRegionWithTracePrefix() throws Exception { assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -141,7 +148,11 @@ public void testTraceOnLocalRegionWithTracePrefix() throws Exception { assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -183,7 +194,11 @@ public void testNegTraceOnPartitionedRegionWithTracePrefix() throws Exception { assertFalse(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertFalse(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should not have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isNotInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -223,7 +238,11 @@ public void testNegTraceOnLocalRegionWithTracePrefix() throws Exception { assertFalse(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertFalse(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should not have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isNotInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -262,7 +281,11 @@ public void testTraceOnPartitionedRegionWithTracePrefixNoComments() throws Excep assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -296,8 +319,11 @@ public void testTraceOnLocalRegionWithTracePrefixNoComments() throws Exception { assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); - // The query should return all elements in region. + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); } @@ -331,7 +357,11 @@ public void testTraceOnPartitionedRegionWithSmallTracePrefixNoComments() throws assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -366,7 +396,11 @@ public void testTraceOnLocalRegionWithSmallTracePrefixNoComments() throws Except assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -438,4 +472,21 @@ public void testQueryFailLocalRegionWithSmallTracePrefixNoSpace() throws Excepti } } + private class BeforeQueryExecutionHook implements DefaultQuery.TestHook { + private QueryObserver observer = null; + + @Override + public void doTestHook(final SPOTS spot, final DefaultQuery _ignored, + final ExecutionContext executionContext) { + switch (spot) { + case BEFORE_QUERY_EXECUTION: + observer = QueryObserverHolder.getInstance(); + break; + } + } + + public QueryObserver getObserver() { + return observer; + } + } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java index d3eb12419649..aeb881fa0203 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java @@ -49,31 +49,31 @@ public class QueryObserverHolder { * The current observer which will be notified of all query events. */ @MakeNotStatic - private static QueryObserver _instance = NO_OBSERVER; + private static ThreadLocal _instance = ThreadLocal.withInitial(() -> NO_OBSERVER); /** * Set the given observer to be notified of query events. Returns the current observer. */ public static QueryObserver setInstance(QueryObserver observer) { Support.assertArg(observer != null, "setInstance expects a non-null argument!"); - QueryObserver oldObserver = _instance; - _instance = observer; + QueryObserver oldObserver = _instance.get(); + _instance.set(observer); return oldObserver; } public static boolean hasObserver() { - return _instance != NO_OBSERVER; + return _instance.get() != NO_OBSERVER; } /** Return the current QueryObserver instance */ public static QueryObserver getInstance() { - return _instance; + return _instance.get(); } /** * Only for test purposes. */ public static void reset() { - _instance = NO_OBSERVER; + _instance.set(NO_OBSERVER); } } diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/DataCommandFunction.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/DataCommandFunction.java index 18d0ca22a4f9..29730472707a 100644 --- a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/DataCommandFunction.java +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/DataCommandFunction.java @@ -205,12 +205,13 @@ private DataCommandResult select(InternalCache cache, Object principal, String q Query query = qs.newQuery(queryString); DefaultQuery tracedQuery = (DefaultQuery) query; WrappedIndexTrackingQueryObserver queryObserver = null; + QueryObserver origQueryObserver = null; String queryVerboseMsg = null; long startTime = -1; if (tracedQuery.isTraced()) { startTime = NanoTimer.getTime(); queryObserver = new WrappedIndexTrackingQueryObserver(); - QueryObserverHolder.setInstance(queryObserver); + origQueryObserver = QueryObserverHolder.setInstance(queryObserver); } List list = new ArrayList<>(); @@ -237,6 +238,9 @@ private DataCommandResult select(InternalCache cache, Object principal, String q if (queryObserver != null) { QueryObserverHolder.reset(); } + if (tracedQuery.isTraced()) { + QueryObserverHolder.setInstance(origQueryObserver); + } } } From 141e30fce7e7360e5464190cab10b4cd271f84d7 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Mon, 20 Sep 2021 19:24:53 +0200 Subject: [PATCH 2/7] Have a global and thread local observer in QueryObserverHolder --- .../query/internal/QueryObserverHolder.java | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java index aeb881fa0203..25800e79f595 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java @@ -14,6 +14,7 @@ */ package org.apache.geode.cache.query.internal; + import org.apache.geode.annotations.Immutable; import org.apache.geode.annotations.internal.MakeNotStatic; @@ -45,29 +46,46 @@ public class QueryObserverHolder { */ @Immutable private static final QueryObserver NO_OBSERVER = new QueryObserverAdapter(); + /** + * The threadlocal current observer which will be notified of all query events. + */ + private static final ThreadLocal _instance = new ThreadLocal<>(); + /** * The current observer which will be notified of all query events. */ @MakeNotStatic - private static ThreadLocal _instance = ThreadLocal.withInitial(() -> NO_OBSERVER); + private static volatile QueryObserver _globalInstance = NO_OBSERVER; /** * Set the given observer to be notified of query events. Returns the current observer. */ public static QueryObserver setInstance(QueryObserver observer) { Support.assertArg(observer != null, "setInstance expects a non-null argument!"); - QueryObserver oldObserver = _instance.get(); + QueryObserver oldObserver; + if (_instance.get() != null) { + oldObserver = _instance.get(); + } else { + oldObserver = _globalInstance; + } _instance.set(observer); + _globalInstance = observer; return oldObserver; } public static boolean hasObserver() { - return _instance.get() != NO_OBSERVER; + if (_instance.get() != null) { + return _instance.get() != NO_OBSERVER; + } + return _globalInstance != NO_OBSERVER; } /** Return the current QueryObserver instance */ public static QueryObserver getInstance() { - return _instance.get(); + if (_instance.get() != null) { + return _instance.get(); + } + return _globalInstance; } /** @@ -75,5 +93,6 @@ public static QueryObserver getInstance() { */ public static void reset() { _instance.set(NO_OBSERVER); + _globalInstance = NO_OBSERVER; } } From 37d218070d398edec4a29df0f48288ab9939e72c Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Tue, 5 Oct 2021 11:31:55 +0200 Subject: [PATCH 3/7] Revert "Have a global and thread local observer in QueryObserverHolder" This reverts commit 141e30fce7e7360e5464190cab10b4cd271f84d7. --- .../query/internal/QueryObserverHolder.java | 27 +++---------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java index 25800e79f595..aeb881fa0203 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java @@ -14,7 +14,6 @@ */ package org.apache.geode.cache.query.internal; - import org.apache.geode.annotations.Immutable; import org.apache.geode.annotations.internal.MakeNotStatic; @@ -46,46 +45,29 @@ public class QueryObserverHolder { */ @Immutable private static final QueryObserver NO_OBSERVER = new QueryObserverAdapter(); - /** - * The threadlocal current observer which will be notified of all query events. - */ - private static final ThreadLocal _instance = new ThreadLocal<>(); - /** * The current observer which will be notified of all query events. */ @MakeNotStatic - private static volatile QueryObserver _globalInstance = NO_OBSERVER; + private static ThreadLocal _instance = ThreadLocal.withInitial(() -> NO_OBSERVER); /** * Set the given observer to be notified of query events. Returns the current observer. */ public static QueryObserver setInstance(QueryObserver observer) { Support.assertArg(observer != null, "setInstance expects a non-null argument!"); - QueryObserver oldObserver; - if (_instance.get() != null) { - oldObserver = _instance.get(); - } else { - oldObserver = _globalInstance; - } + QueryObserver oldObserver = _instance.get(); _instance.set(observer); - _globalInstance = observer; return oldObserver; } public static boolean hasObserver() { - if (_instance.get() != null) { - return _instance.get() != NO_OBSERVER; - } - return _globalInstance != NO_OBSERVER; + return _instance.get() != NO_OBSERVER; } /** Return the current QueryObserver instance */ public static QueryObserver getInstance() { - if (_instance.get() != null) { - return _instance.get(); - } - return _globalInstance; + return _instance.get(); } /** @@ -93,6 +75,5 @@ public static QueryObserver getInstance() { */ public static void reset() { _instance.set(NO_OBSERVER); - _globalInstance = NO_OBSERVER; } } From 19dd86098b456cc81237bfe7de450ee898e7e09d Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Tue, 5 Oct 2021 17:30:13 +0200 Subject: [PATCH 4/7] Fix some test cases --- .../QueryUsingFunctionContextDUnitTest.java | 177 +++++++++--------- 1 file changed, 85 insertions(+), 92 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java index 076a9bb51c1e..821deaf4bb23 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java @@ -19,12 +19,16 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; +import org.apache.geode.cache.query.internal.QueryObserver; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -165,7 +169,6 @@ public final void preTearDownCacheTestCase() throws Exception { @Override public final void postTearDownCacheTestCase() throws Exception { - Invoke.invokeInEveryVM(() -> QueryObserverHolder.reset()); cache = null; Invoke.invokeInEveryVM(new SerializableRunnable() { @Override @@ -205,8 +208,7 @@ public void run2() throws CacheException { for (int i = 0; i < queriesForRR.length; i++) { try { - function = new TestQueryFunction("queryFunctionOnRR"); - + function = new TestQueryFunction("queryFunctionOnRR", QueryObserverForTestQueriesWithFilterKeysOnReplicatedRegion.class.getName()); rcollector = FunctionService.onRegion(CacheFactory.getAnyInstance().getRegion(repRegionName)) .setArguments(queriesForRR[i]).execute(function); @@ -240,7 +242,7 @@ public void run2() throws CacheException { Object[][] r = new Object[1][2]; TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-1"); + function = new TestQueryFunction("queryFunction-1", null); QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); ArrayList queryResults2 = test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, queries[i]); @@ -276,7 +278,7 @@ public void run2() throws CacheException { filter.add(0); String query = "select * from " + SEPARATOR + " " + repRegionName + " where ID>=0"; TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-1"); + function = new TestQueryFunction("queryFunction-1", null); QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); try { test.runQueryOnClientUsingFunc(function, repRegionName, filter, query); @@ -288,7 +290,7 @@ public void run2() throws CacheException { query = "select * from " + SEPARATOR + " " + PartitionedRegionName1 + " where ID>=0"; func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-1"); + function = new TestQueryFunction("queryFunction-1", null); test = new QueryUsingFunctionContextDUnitTest(); try { test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, query); @@ -312,7 +314,7 @@ public void run2() throws CacheException { Set filter = getFilter(0, 1); TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-2"); + function = new TestQueryFunction("queryFunction-2", null); for (int i = 0; i < queries.length; i++) { Object[][] r = new Object[1][2]; @@ -342,33 +344,6 @@ public void run2() throws CacheException { @Test public void testQueriesWithFilterKeysOnPRLocalAndRemoteWithBucketDestroy() { - // Set Query Observer in cache on server1 - server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") { - @Override - public void run2() throws CacheException { - - class MyQueryObserver extends IndexTrackingQueryObserver { - - @Override - public void startQuery(Query query) { - // Destroy only for first query. - if (query.getQueryString().contains("ID>=0")) { - Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); - Region KeyRegion = null; - for (int i = 3; i < 7; i++) { - KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); - if (KeyRegion != null) { - KeyRegion.destroyRegion(); - } - } - } - } - }; - - QueryObserverHolder.setInstance(new MyQueryObserver()); - } - }); - client.invoke(new CacheSerializableRunnable("Test query on client and server") { @Override @@ -376,7 +351,7 @@ public void run2() throws CacheException { Set filter = getFilter(0, 2); TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-2"); - function = new TestQueryFunction("queryFunction"); + function = new TestQueryFunction("queryFunction", null); for (int i = 0; i < queries.length; i++) { @@ -408,30 +383,6 @@ public void testQueriesWithFilterKeysOnPRWithBucketDestroy() { Object[][] r = new Object[queries.length][2]; Set filter = new HashSet(); - // Close cache on server1 - server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") { - @Override - public void run2() throws CacheException { - - class MyQueryObserver extends IndexTrackingQueryObserver { - - @Override - public void startQuery(Query query) { - Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); - Region KeyRegion = null; - for (int i = 0; i < 7; i++) { - KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); - if (KeyRegion != null) { - KeyRegion.destroyRegion(); - } - } - } - }; - - QueryObserverHolder.setInstance(new MyQueryObserver()); - } - }); - client.invoke(new CacheSerializableRunnable("Run function on PR") { @Override public void run2() throws CacheException { @@ -442,7 +393,7 @@ public void run2() throws CacheException { for (int i = 0; i < queries.length; i++) { try { - function = new TestQueryFunction("queryFunctionBucketDestroy"); + function = new TestQueryFunction("queryFunctionBucketDestroy", QueryObserverForTestQueriesWithFilterKeysOnPRWithBucketDestroy.class.getName()); rcollector = FunctionService .onRegion(CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1)) @@ -479,30 +430,6 @@ public void testQueriesWithFilterKeysOnPRWithRebalancing() { IgnoredException.addIgnoredException("FunctionException"); IgnoredException.addIgnoredException("IOException"); - // Close cache on server1 - server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") { - @Override - public void run2() throws CacheException { - - class MyQueryObserver extends IndexTrackingQueryObserver { - - @Override - public void startQuery(Query query) { - Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); - Region KeyRegion = null; - for (int i = 6; i < 9; i++) { - KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); - if (KeyRegion != null) { - KeyRegion.destroyRegion(); - } - } - } - }; - - QueryObserverHolder.setInstance(new MyQueryObserver()); - } - }); - client.invoke(new CacheSerializableRunnable("Run function on PR") { @Override public void run2() throws CacheException { @@ -513,7 +440,7 @@ public void run2() throws CacheException { for (int i = 0; i < queries.length; i++) { try { - function = new TestQueryFunction("queryFunction"); + function = new TestQueryFunction("queryFunction", QueryObserverForTestQueriesWithFilterKeysOnPRWithRebalancing.class.getName()); rcollector = FunctionService .onRegion(CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1)) @@ -561,7 +488,7 @@ public void run2() throws CacheException { filter.add(0); for (int i = 0; i < nonColocatedQueries.length; i++) { - function = new TestQueryFunction("queryFunction-1"); + function = new TestQueryFunction("queryFunction-1", null); QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); try { ArrayList queryResults2 = test.runQueryOnClientUsingFunc(function, @@ -586,7 +513,7 @@ public void testJoinQueryPRWithMultipleIndexes() { @Override public void run2() throws CacheException { Set filter = getFilter(0, 1); - function = new TestQueryFunction("queryFunction-2"); + function = new TestQueryFunction("queryFunction-2", null); Object[][] r = new Object[2][2]; QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); int j = 0; @@ -616,8 +543,51 @@ public void run2() throws CacheException { }); } - // Helper classes and function + + public static class QueryObserverForTestQueriesWithFilterKeysOnReplicatedRegion extends IndexTrackingQueryObserver { + @Override + public void startQuery(Query query) { + Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); + Region KeyRegion = null; + for (int i = 0; i < 7; i++) { + KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); + if (KeyRegion != null) { + KeyRegion.destroyRegion(); + } + } + } + } + + public static class QueryObserverForTestQueriesWithFilterKeysOnPRWithBucketDestroy extends IndexTrackingQueryObserver { + @Override + public void startQuery(Query query) { + Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); + Region KeyRegion = null; + for (int i = 0; i < 7; i++) { + KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); + if (KeyRegion != null) { + KeyRegion.destroyRegion(); + } + } + } + } + + public static class QueryObserverForTestQueriesWithFilterKeysOnPRWithRebalancing extends IndexTrackingQueryObserver { + @Override + public void startQuery(Query query) { + Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); + Region KeyRegion = null; + for (int i = 6; i < 9; i++) { + KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); + if (KeyRegion != null) { + KeyRegion.destroyRegion(); + } + } + } + } + + public static class TestQueryFunction extends FunctionAdapter { @Override @@ -632,24 +602,46 @@ public boolean isHA() { private final String id; + private final String observerClassName; - public TestQueryFunction(String id) { + + public TestQueryFunction(String id, String observerClassName) { super(); this.id = id; + this.observerClassName = observerClassName; + } + + private QueryObserver getObserverFromClassName(String observerClassName) { + try { + Class clazz = Class.forName(observerClassName); + Constructor constructor = clazz.getConstructor(); + return (QueryObserver) constructor.newInstance(); + } catch (Exception e) { + return null; + } } @Override public void execute(FunctionContext context) { Cache cache = CacheFactory.getAnyInstance(); QueryService queryService = cache.getQueryService(); - ArrayList allQueryResults = new ArrayList(); String qstr = (String) context.getArguments(); + QueryObserver observer = null; + QueryObserver oldObserver = null; try { + observer = getObserverFromClassName(observerClassName); + if (observer != null) { + oldObserver = QueryObserverHolder.setInstance(observer); + } Query query = queryService.newQuery(qstr); context.getResultSender().lastResult( - (ArrayList) ((SelectResults) query.execute((RegionFunctionContext) context)).asList()); + ((SelectResults) query.execute((RegionFunctionContext) context)).asList()); } catch (Exception e) { throw new FunctionException(e); + } finally { + if (observer != null) { + QueryObserverHolder.setInstance(oldObserver); + } } } @@ -657,6 +649,7 @@ public void execute(FunctionContext context) { public String getId() { return this.id; } + } public static class TestServerQueryFunction extends FunctionAdapter { @@ -741,7 +734,7 @@ public void fillValuesInRegions() { } private void registerFunctionOnServers() { - function = new TestQueryFunction("queryFunction"); + function = new TestQueryFunction("queryFunction", null); server1.invoke(PRClientServerTestBase.class, "registerFunction", new Object[] {function}); server2.invoke(PRClientServerTestBase.class, "registerFunction", new Object[] {function}); From 87c0465fb3995b1555d5fc59344c4287d08b2ca6 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Tue, 5 Oct 2021 17:30:38 +0200 Subject: [PATCH 5/7] Revert "Fix some test cases" This reverts commit 19dd86098b456cc81237bfe7de450ee898e7e09d. --- .../QueryUsingFunctionContextDUnitTest.java | 177 +++++++++--------- 1 file changed, 92 insertions(+), 85 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java index 821deaf4bb23..076a9bb51c1e 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java @@ -19,16 +19,12 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; -import org.apache.geode.cache.query.internal.QueryObserver; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -169,6 +165,7 @@ public final void preTearDownCacheTestCase() throws Exception { @Override public final void postTearDownCacheTestCase() throws Exception { + Invoke.invokeInEveryVM(() -> QueryObserverHolder.reset()); cache = null; Invoke.invokeInEveryVM(new SerializableRunnable() { @Override @@ -208,7 +205,8 @@ public void run2() throws CacheException { for (int i = 0; i < queriesForRR.length; i++) { try { - function = new TestQueryFunction("queryFunctionOnRR", QueryObserverForTestQueriesWithFilterKeysOnReplicatedRegion.class.getName()); + function = new TestQueryFunction("queryFunctionOnRR"); + rcollector = FunctionService.onRegion(CacheFactory.getAnyInstance().getRegion(repRegionName)) .setArguments(queriesForRR[i]).execute(function); @@ -242,7 +240,7 @@ public void run2() throws CacheException { Object[][] r = new Object[1][2]; TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-1", null); + function = new TestQueryFunction("queryFunction-1"); QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); ArrayList queryResults2 = test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, queries[i]); @@ -278,7 +276,7 @@ public void run2() throws CacheException { filter.add(0); String query = "select * from " + SEPARATOR + " " + repRegionName + " where ID>=0"; TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-1", null); + function = new TestQueryFunction("queryFunction-1"); QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); try { test.runQueryOnClientUsingFunc(function, repRegionName, filter, query); @@ -290,7 +288,7 @@ public void run2() throws CacheException { query = "select * from " + SEPARATOR + " " + PartitionedRegionName1 + " where ID>=0"; func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-1", null); + function = new TestQueryFunction("queryFunction-1"); test = new QueryUsingFunctionContextDUnitTest(); try { test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, query); @@ -314,7 +312,7 @@ public void run2() throws CacheException { Set filter = getFilter(0, 1); TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1"); - function = new TestQueryFunction("queryFunction-2", null); + function = new TestQueryFunction("queryFunction-2"); for (int i = 0; i < queries.length; i++) { Object[][] r = new Object[1][2]; @@ -344,6 +342,33 @@ public void run2() throws CacheException { @Test public void testQueriesWithFilterKeysOnPRLocalAndRemoteWithBucketDestroy() { + // Set Query Observer in cache on server1 + server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") { + @Override + public void run2() throws CacheException { + + class MyQueryObserver extends IndexTrackingQueryObserver { + + @Override + public void startQuery(Query query) { + // Destroy only for first query. + if (query.getQueryString().contains("ID>=0")) { + Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); + Region KeyRegion = null; + for (int i = 3; i < 7; i++) { + KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); + if (KeyRegion != null) { + KeyRegion.destroyRegion(); + } + } + } + } + }; + + QueryObserverHolder.setInstance(new MyQueryObserver()); + } + }); + client.invoke(new CacheSerializableRunnable("Test query on client and server") { @Override @@ -351,7 +376,7 @@ public void run2() throws CacheException { Set filter = getFilter(0, 2); TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-2"); - function = new TestQueryFunction("queryFunction", null); + function = new TestQueryFunction("queryFunction"); for (int i = 0; i < queries.length; i++) { @@ -383,6 +408,30 @@ public void testQueriesWithFilterKeysOnPRWithBucketDestroy() { Object[][] r = new Object[queries.length][2]; Set filter = new HashSet(); + // Close cache on server1 + server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") { + @Override + public void run2() throws CacheException { + + class MyQueryObserver extends IndexTrackingQueryObserver { + + @Override + public void startQuery(Query query) { + Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); + Region KeyRegion = null; + for (int i = 0; i < 7; i++) { + KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); + if (KeyRegion != null) { + KeyRegion.destroyRegion(); + } + } + } + }; + + QueryObserverHolder.setInstance(new MyQueryObserver()); + } + }); + client.invoke(new CacheSerializableRunnable("Run function on PR") { @Override public void run2() throws CacheException { @@ -393,7 +442,7 @@ public void run2() throws CacheException { for (int i = 0; i < queries.length; i++) { try { - function = new TestQueryFunction("queryFunctionBucketDestroy", QueryObserverForTestQueriesWithFilterKeysOnPRWithBucketDestroy.class.getName()); + function = new TestQueryFunction("queryFunctionBucketDestroy"); rcollector = FunctionService .onRegion(CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1)) @@ -430,6 +479,30 @@ public void testQueriesWithFilterKeysOnPRWithRebalancing() { IgnoredException.addIgnoredException("FunctionException"); IgnoredException.addIgnoredException("IOException"); + // Close cache on server1 + server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") { + @Override + public void run2() throws CacheException { + + class MyQueryObserver extends IndexTrackingQueryObserver { + + @Override + public void startQuery(Query query) { + Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); + Region KeyRegion = null; + for (int i = 6; i < 9; i++) { + KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); + if (KeyRegion != null) { + KeyRegion.destroyRegion(); + } + } + } + }; + + QueryObserverHolder.setInstance(new MyQueryObserver()); + } + }); + client.invoke(new CacheSerializableRunnable("Run function on PR") { @Override public void run2() throws CacheException { @@ -440,7 +513,7 @@ public void run2() throws CacheException { for (int i = 0; i < queries.length; i++) { try { - function = new TestQueryFunction("queryFunction", QueryObserverForTestQueriesWithFilterKeysOnPRWithRebalancing.class.getName()); + function = new TestQueryFunction("queryFunction"); rcollector = FunctionService .onRegion(CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1)) @@ -488,7 +561,7 @@ public void run2() throws CacheException { filter.add(0); for (int i = 0; i < nonColocatedQueries.length; i++) { - function = new TestQueryFunction("queryFunction-1", null); + function = new TestQueryFunction("queryFunction-1"); QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); try { ArrayList queryResults2 = test.runQueryOnClientUsingFunc(function, @@ -513,7 +586,7 @@ public void testJoinQueryPRWithMultipleIndexes() { @Override public void run2() throws CacheException { Set filter = getFilter(0, 1); - function = new TestQueryFunction("queryFunction-2", null); + function = new TestQueryFunction("queryFunction-2"); Object[][] r = new Object[2][2]; QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest(); int j = 0; @@ -543,51 +616,8 @@ public void run2() throws CacheException { }); } - // Helper classes and function - - public static class QueryObserverForTestQueriesWithFilterKeysOnReplicatedRegion extends IndexTrackingQueryObserver { - @Override - public void startQuery(Query query) { - Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); - Region KeyRegion = null; - for (int i = 0; i < 7; i++) { - KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); - if (KeyRegion != null) { - KeyRegion.destroyRegion(); - } - } - } - } - - public static class QueryObserverForTestQueriesWithFilterKeysOnPRWithBucketDestroy extends IndexTrackingQueryObserver { - @Override - public void startQuery(Query query) { - Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); - Region KeyRegion = null; - for (int i = 0; i < 7; i++) { - KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); - if (KeyRegion != null) { - KeyRegion.destroyRegion(); - } - } - } - } - - public static class QueryObserverForTestQueriesWithFilterKeysOnPRWithRebalancing extends IndexTrackingQueryObserver { - @Override - public void startQuery(Query query) { - Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1); - Region KeyRegion = null; - for (int i = 6; i < 9; i++) { - KeyRegion = ((PartitionedRegion) pr).getBucketRegion(i/* key */); - if (KeyRegion != null) { - KeyRegion.destroyRegion(); - } - } - } - } - + // Helper classes and function public static class TestQueryFunction extends FunctionAdapter { @Override @@ -602,46 +632,24 @@ public boolean isHA() { private final String id; - private final String observerClassName; - - public TestQueryFunction(String id, String observerClassName) { + public TestQueryFunction(String id) { super(); this.id = id; - this.observerClassName = observerClassName; - } - - private QueryObserver getObserverFromClassName(String observerClassName) { - try { - Class clazz = Class.forName(observerClassName); - Constructor constructor = clazz.getConstructor(); - return (QueryObserver) constructor.newInstance(); - } catch (Exception e) { - return null; - } } @Override public void execute(FunctionContext context) { Cache cache = CacheFactory.getAnyInstance(); QueryService queryService = cache.getQueryService(); + ArrayList allQueryResults = new ArrayList(); String qstr = (String) context.getArguments(); - QueryObserver observer = null; - QueryObserver oldObserver = null; try { - observer = getObserverFromClassName(observerClassName); - if (observer != null) { - oldObserver = QueryObserverHolder.setInstance(observer); - } Query query = queryService.newQuery(qstr); context.getResultSender().lastResult( - ((SelectResults) query.execute((RegionFunctionContext) context)).asList()); + (ArrayList) ((SelectResults) query.execute((RegionFunctionContext) context)).asList()); } catch (Exception e) { throw new FunctionException(e); - } finally { - if (observer != null) { - QueryObserverHolder.setInstance(oldObserver); - } } } @@ -649,7 +657,6 @@ public void execute(FunctionContext context) { public String getId() { return this.id; } - } public static class TestServerQueryFunction extends FunctionAdapter { @@ -734,7 +741,7 @@ public void fillValuesInRegions() { } private void registerFunctionOnServers() { - function = new TestQueryFunction("queryFunction", null); + function = new TestQueryFunction("queryFunction"); server1.invoke(PRClientServerTestBase.class, "registerFunction", new Object[] {function}); server2.invoke(PRClientServerTestBase.class, "registerFunction", new Object[] {function}); From 58c595b950eb51a6940623ee530be55752d97211 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Tue, 5 Oct 2021 17:30:51 +0200 Subject: [PATCH 6/7] Revert "Revert "Have a global and thread local observer in QueryObserverHolder"" This reverts commit 37d218070d398edec4a29df0f48288ab9939e72c. --- .../query/internal/QueryObserverHolder.java | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java index aeb881fa0203..25800e79f595 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java @@ -14,6 +14,7 @@ */ package org.apache.geode.cache.query.internal; + import org.apache.geode.annotations.Immutable; import org.apache.geode.annotations.internal.MakeNotStatic; @@ -45,29 +46,46 @@ public class QueryObserverHolder { */ @Immutable private static final QueryObserver NO_OBSERVER = new QueryObserverAdapter(); + /** + * The threadlocal current observer which will be notified of all query events. + */ + private static final ThreadLocal _instance = new ThreadLocal<>(); + /** * The current observer which will be notified of all query events. */ @MakeNotStatic - private static ThreadLocal _instance = ThreadLocal.withInitial(() -> NO_OBSERVER); + private static volatile QueryObserver _globalInstance = NO_OBSERVER; /** * Set the given observer to be notified of query events. Returns the current observer. */ public static QueryObserver setInstance(QueryObserver observer) { Support.assertArg(observer != null, "setInstance expects a non-null argument!"); - QueryObserver oldObserver = _instance.get(); + QueryObserver oldObserver; + if (_instance.get() != null) { + oldObserver = _instance.get(); + } else { + oldObserver = _globalInstance; + } _instance.set(observer); + _globalInstance = observer; return oldObserver; } public static boolean hasObserver() { - return _instance.get() != NO_OBSERVER; + if (_instance.get() != null) { + return _instance.get() != NO_OBSERVER; + } + return _globalInstance != NO_OBSERVER; } /** Return the current QueryObserver instance */ public static QueryObserver getInstance() { - return _instance.get(); + if (_instance.get() != null) { + return _instance.get(); + } + return _globalInstance; } /** @@ -75,5 +93,6 @@ public static QueryObserver getInstance() { */ public static void reset() { _instance.set(NO_OBSERVER); + _globalInstance = NO_OBSERVER; } } From b6529dfdb3e66d971843b6913fbeab7c91875fa4 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Tue, 5 Oct 2021 17:44:56 +0200 Subject: [PATCH 7/7] QueryObserverHolder getInstance sets thread local instance if not set. --- .../cache/query/internal/QueryObserverHolder.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java index 25800e79f595..18209390306a 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java @@ -62,12 +62,7 @@ public class QueryObserverHolder { */ public static QueryObserver setInstance(QueryObserver observer) { Support.assertArg(observer != null, "setInstance expects a non-null argument!"); - QueryObserver oldObserver; - if (_instance.get() != null) { - oldObserver = _instance.get(); - } else { - oldObserver = _globalInstance; - } + QueryObserver oldObserver = _globalInstance; _instance.set(observer); _globalInstance = observer; return oldObserver; @@ -82,10 +77,10 @@ public static boolean hasObserver() { /** Return the current QueryObserver instance */ public static QueryObserver getInstance() { - if (_instance.get() != null) { - return _instance.get(); + if (_instance.get() == null) { + _instance.set(_globalInstance); } - return _globalInstance; + return _instance.get(); } /**