diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 1f4cd43a100..0924ad8eacb 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -1153,6 +1153,8 @@ Improvements Optimizations --------------------- +* SOLR-16497 Allow for finer grained locking of access to SolrCores to reduce lock contention + (Dennis Berger, Torsten Bøgh Köster, Marco Petris) * SOLR-16515: Remove synchronized access to cachedOrdMaps in SlowCompositeReaderWrapper (Dennis Berger, Torsten Bøgh Köster, Marco Petris) diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 0c88c2eaa95..6f7f9a0d05b 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -1268,8 +1268,11 @@ public void shutdown() { // First wake up the closer thread, it'll terminate almost immediately since it checks // isShutDown. - synchronized (solrCores.getModifyLock()) { - solrCores.getModifyLock().notifyAll(); // wake up anyone waiting + solrCores.getWriteLock().lock(); + try { + solrCores.getWriteLockCondition().signalAll(); // wake up anyone waiting + } finally { + solrCores.getWriteLock().unlock(); } if (backgroundCloser != null) { // Doesn't seem right, but tests get in here without initializing the core. @@ -1277,8 +1280,13 @@ public void shutdown() { while (true) { backgroundCloser.join(15000); if (backgroundCloser.isAlive()) { - synchronized (solrCores.getModifyLock()) { - solrCores.getModifyLock().notifyAll(); // there is a race we have to protect against + solrCores.getWriteLock().lock(); + try { + solrCores + .getWriteLockCondition() + .signalAll(); // there is a race we have to protect against + } finally { + solrCores.getWriteLock().unlock(); } } else { break; @@ -1314,8 +1322,11 @@ public void shutdown() { // It's still possible that one of the pending dynamic load operation is waiting, so wake it // up if so. Since all the pending operations queues have been drained, there should be // nothing to do. - synchronized (solrCores.getModifyLock()) { - solrCores.getModifyLock().notifyAll(); // wake up the thread + solrCores.getWriteLock().lock(); + try { + solrCores.getWriteLockCondition().signalAll(); // wake up the thread + } finally { + solrCores.getWriteLock().unlock(); } customThreadPool.submit( @@ -2615,13 +2626,16 @@ class CloserThread extends Thread { @Override public void run() { while (!container.isShutDown()) { - synchronized (solrCores.getModifyLock()) { // need this so we can wait and be awoken. + solrCores.getWriteLock().lock(); + try { // need this so we can wait and be awoken. try { - solrCores.getModifyLock().wait(); + solrCores.getWriteLockCondition().await(); } catch (InterruptedException e) { // Well, if we've been told to stop, we will. Otherwise, continue on and check to see if // there are any cores to close. } + } finally { + solrCores.getWriteLock().unlock(); } SolrCore core; diff --git a/solr/core/src/java/org/apache/solr/core/SolrCores.java b/solr/core/src/java/org/apache/solr/core/SolrCores.java index 4ea3974f0f8..532b785fd0a 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCores.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCores.java @@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.solr.common.SolrException; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.SolrNamedThreadFactory; @@ -42,7 +44,8 @@ public class SolrCores { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); // for locking around manipulating any of the core maps. - protected final Object modifyLock = new Object(); + private static final ReentrantReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock(); + private static final Condition WRITE_LOCK_CONDITION = READ_WRITE_LOCK.writeLock().newCondition(); private final Map cores = new LinkedHashMap<>(); // For "permanent" cores @@ -64,27 +67,28 @@ public class SolrCores { private final List pendingCloses = new ArrayList<>(); public static SolrCores newSolrCores(CoreContainer coreContainer) { - final int transientCacheSize = coreContainer.getConfig().getTransientCacheSize(); - if (transientCacheSize > 0) { - return new TransientSolrCores(coreContainer, transientCacheSize); - } else { - return new SolrCores(coreContainer); - } + return new SolrCores(coreContainer); } SolrCores(CoreContainer container) { this.container = container; } - public void addCoreDescriptor(CoreDescriptor p) { - synchronized (modifyLock) { + protected void addCoreDescriptor(CoreDescriptor p) { + READ_WRITE_LOCK.writeLock().lock(); + try { residentDescriptors.put(p.getName(), p); + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } } - public void removeCoreDescriptor(CoreDescriptor p) { - synchronized (modifyLock) { + protected void removeCoreDescriptor(CoreDescriptor p) { + READ_WRITE_LOCK.writeLock().lock(); + try { residentDescriptors.remove(p.getName()); + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } } @@ -99,7 +103,8 @@ protected void close() { while (true) { Collection coreList = new ArrayList<>(); - synchronized (modifyLock) { + READ_WRITE_LOCK.writeLock().lock(); + try { // remove all loaded cores; add to our working list. for (String name : getLoadedCoreNames()) { final var core = remove(name); @@ -110,6 +115,8 @@ protected void close() { coreList.addAll(pendingCloses); pendingCloses.clear(); + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } if (coreList.isEmpty()) { @@ -144,10 +151,13 @@ protected void close() { // Returns the old core if there was a core of the same name. // WARNING! This should be the _only_ place you put anything into the list of transient cores! - public SolrCore putCore(CoreDescriptor cd, SolrCore core) { - synchronized (modifyLock) { + protected SolrCore putCore(CoreDescriptor cd, SolrCore core) { + READ_WRITE_LOCK.writeLock().lock(); + try { addCoreDescriptor(cd); // cd must always be registered if we register a core return cores.put(cd.getName(), core); + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } } @@ -162,8 +172,11 @@ public SolrCore putCore(CoreDescriptor cd, SolrCore core) { */ @Deprecated public List getCores() { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { return new ArrayList<>(cores.values()); + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } @@ -179,8 +192,11 @@ public List getCores() { * can be sorted). */ public List getLoadedCoreNames() { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { return new ArrayList<>(cores.keySet()); + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } @@ -193,8 +209,11 @@ public List getLoadedCoreNames() { * can be sorted). */ public List getAllCoreNames() { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { return new ArrayList<>(residentDescriptors.keySet()); + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } @@ -203,8 +222,11 @@ public List getAllCoreNames() { * {@link #getCores()}.size(). */ public int getNumLoadedPermanentCores() { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { return cores.size(); + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } @@ -216,8 +238,11 @@ public int getNumLoadedTransientCores() { /** Gets the number of unloaded cores, including permanent and transient cores. */ public int getNumUnloadedCores() { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { return residentDescriptors.size() - cores.size(); + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } @@ -226,13 +251,19 @@ public int getNumUnloadedCores() { * cores. Faster equivalent for {@link #getAllCoreNames()}.size(). */ public int getNumAllCores() { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { return residentDescriptors.size(); + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } public void swap(String n0, String n1) { - synchronized (modifyLock) { + + READ_WRITE_LOCK.writeLock().lock(); + try { + SolrCore c0 = cores.get(n0); SolrCore c1 = cores.get(n1); // TODO DWS: honestly this doesn't appear to work properly unless the core is loaded @@ -263,12 +294,17 @@ public void swap(String n0, String n1) { .swapRegistries( c0.getCoreMetricManager().getRegistryName(), c1.getCoreMetricManager().getRegistryName()); + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } } public SolrCore remove(String name) { - synchronized (modifyLock) { + READ_WRITE_LOCK.writeLock().lock(); + try { return cores.remove(name); + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } } @@ -278,7 +314,8 @@ public SolrCore getCoreFromAnyList(String name, boolean incRefCount) { /* If you don't increment the reference count, someone could close the core before you use it. */ public SolrCore getCoreFromAnyList(String name, boolean incRefCount, UUID coreId) { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { SolrCore core = getLoadedCoreWithoutIncrement(name); if (core != null && coreId != null && !coreId.equals(core.uniqueId)) return null; @@ -288,13 +325,18 @@ public SolrCore getCoreFromAnyList(String name, boolean incRefCount, UUID coreId } return core; + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } /** (internal) Return a core that is already loaded, if it is. NOT incremented! */ protected SolrCore getLoadedCoreWithoutIncrement(String name) { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { return cores.get(name); + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } @@ -302,7 +344,8 @@ protected SolrCore getLoadedCoreWithoutIncrement(String name) { // not, it might have to close the core. However, there's a race condition. If the core happens to // be in the pending "to close" queue, we should NOT close it in unload core. public boolean isLoadedNotPendingClose(String name) { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { if (!isLoaded(name)) { return false; } @@ -314,19 +357,27 @@ public boolean isLoadedNotPendingClose(String name) { } return true; + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } public boolean isLoaded(String name) { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { return cores.containsKey(name); + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } /** The core is currently loading, unloading, or reloading. */ protected boolean hasPendingCoreOps(String name) { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { return pendingCoreOps.contains(name); + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } @@ -334,7 +385,9 @@ protected boolean hasPendingCoreOps(String name) { public SolrCore waitAddPendingCoreOps(String name) { // Keep multiple threads from operating on a core at one time. - synchronized (modifyLock) { + READ_WRITE_LOCK.writeLock().lock(); + try { + boolean pending; do { // Are we currently doing anything to this core? Loading, unloading, reloading? pending = pendingCoreOps.contains(name); // wait for the core to be done being operated upon @@ -350,7 +403,7 @@ public SolrCore waitAddPendingCoreOps(String name) { if (pending) { try { - modifyLock.wait(); + WRITE_LOCK_CONDITION.await(); } catch (InterruptedException e) { return null; // Seems best not to do anything at all if the thread is interrupted } @@ -364,6 +417,8 @@ public SolrCore waitAddPendingCoreOps(String name) { // we might have been _unloading_ the core, so return the core if it was loaded. return getCoreFromAnyList(name, false); } + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } return null; } @@ -371,16 +426,23 @@ public SolrCore waitAddPendingCoreOps(String name) { // We should always be removing the first thing in the list with our name! The idea here is to NOT // do anything on any core while some other operation is working on that core. public void removeFromPendingOps(String name) { - synchronized (modifyLock) { + READ_WRITE_LOCK.writeLock().lock(); + try { if (!pendingCoreOps.remove(name)) { log.warn("Tried to remove core {} from pendingCoreOps and it wasn't there. ", name); } - modifyLock.notifyAll(); + WRITE_LOCK_CONDITION.signalAll(); + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } } - public Object getModifyLock() { - return modifyLock; + public ReentrantReadWriteLock.WriteLock getWriteLock() { + return READ_WRITE_LOCK.writeLock(); + } + + public Condition getWriteLockCondition() { + return WRITE_LOCK_CONDITION; } // Be a little careful. We don't want to either open or close a core unless it's _not_ being @@ -388,7 +450,9 @@ public Object getModifyLock() { // closes until we find something NOT in the list of threads currently being loaded or reloaded. // The "usual" case will probably return the very first one anyway. public SolrCore getCoreToClose() { - synchronized (modifyLock) { + READ_WRITE_LOCK.writeLock().lock(); + try { + for (SolrCore core : pendingCloses) { if (!pendingCoreOps.contains(core.getName())) { pendingCoreOps.add(core.getName()); @@ -396,6 +460,8 @@ public SolrCore getCoreToClose() { return core; } } + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } return null; } @@ -408,8 +474,11 @@ public SolrCore getCoreToClose() { * @return the CoreDescriptor */ public CoreDescriptor getCoreDescriptor(String coreName) { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { return residentDescriptors.get(coreName); + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } @@ -420,22 +489,32 @@ public CoreDescriptor getCoreDescriptor(String coreName) { * @return An unordered list copy. This list can be modified by the caller (e.g. sorted). */ public List getCoreDescriptors() { - synchronized (modifyLock) { + READ_WRITE_LOCK.readLock().lock(); + try { return new ArrayList<>(residentDescriptors.values()); + } finally { + READ_WRITE_LOCK.readLock().unlock(); } } // cores marked as loading will block on getCore public void markCoreAsLoading(CoreDescriptor cd) { - synchronized (modifyLock) { + READ_WRITE_LOCK.writeLock().lock(); + try { + currentlyLoadingCores.add(cd.getName()); + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } } // cores marked as loading will block on getCore public void markCoreAsNotLoading(CoreDescriptor cd) { - synchronized (modifyLock) { + READ_WRITE_LOCK.writeLock().lock(); + try { currentlyLoadingCores.remove(cd.getName()); + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } } @@ -443,10 +522,12 @@ public void markCoreAsNotLoading(CoreDescriptor cd) { public void waitForLoadingCoresToFinish(long timeoutMs) { long time = System.nanoTime(); long timeout = time + TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS); - synchronized (modifyLock) { + READ_WRITE_LOCK.writeLock().lock(); + try { + while (!currentlyLoadingCores.isEmpty()) { try { - modifyLock.wait(500); + WRITE_LOCK_CONDITION.await(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -455,6 +536,8 @@ public void waitForLoadingCoresToFinish(long timeoutMs) { break; } } + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } } @@ -462,10 +545,12 @@ public void waitForLoadingCoresToFinish(long timeoutMs) { public void waitForLoadingCoreToFinish(String core, long timeoutMs) { long time = System.nanoTime(); long timeout = time + TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS); - synchronized (modifyLock) { + READ_WRITE_LOCK.writeLock().lock(); + try { + while (isCoreLoading(core)) { try { - modifyLock.wait(500); + WRITE_LOCK_CONDITION.await(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -474,6 +559,8 @@ public void waitForLoadingCoreToFinish(String core, long timeoutMs) { break; } } + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } } @@ -482,9 +569,13 @@ public boolean isCoreLoading(String name) { } public void queueCoreToClose(SolrCore coreToClose) { - synchronized (modifyLock) { + READ_WRITE_LOCK.writeLock().lock(); + try { + pendingCloses.add(coreToClose); // Essentially just queue this core up for closing. - modifyLock.notifyAll(); // Wakes up closer thread too + WRITE_LOCK_CONDITION.signalAll(); // Wakes up closer thread too + } finally { + READ_WRITE_LOCK.writeLock().unlock(); } } } diff --git a/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCache.java b/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCache.java deleted file mode 100644 index 51a625f2b7f..00000000000 --- a/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCache.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.core; - -import java.util.Collection; -import java.util.Set; - -/** - * The base class for custom transient core maintenance. Any custom plugin that wants to take - * control of transient caches (i.e. any core defined with transient=true) should override this - * class. - * - *

WARNING: There is quite a bit of higher-level locking done by the CoreContainer to avoid - * various race conditions etc. You should _only_ manipulate them within the method calls designed - * to change them. E.g. only add to the transient core descriptors in addTransientDescriptor etc. - * - *

Trust the higher-level code (mainly SolrCores and CoreContainer) to call the appropriate - * operations when necessary and to coordinate shutting down cores, manipulating the internal - * structures and the like. - * - *

The only real action you should _initiate_ is to close a core for whatever reason, and do that - * by calling notifyCoreCloseListener(coreToClose); The observer will call back to removeCore(name) - * at the appropriate time. There is no need to directly remove the core _at that time_ from the - * transientCores list, a call will come back to this class when CoreContainer is closing this core. - * - *

CoreDescriptors are read-once. During "core discovery" all valid descriptors are enumerated - * and added to the appropriate list. Thereafter, they are NOT re-read from disk. In those - * situations where you want to re-define the coreDescriptor, maintain a "side list" of changed core - * descriptors. Then override getTransientDescriptor to return your new core descriptor. NOTE: - * assuming you've already closed the core, the _next_ time that core is required - * getTransientDescriptor will be called and if you return the new core descriptor your - * re-definition should be honored. You'll have to maintain this list for the duration of this Solr - * instance running. If you persist the coreDescriptor, then next time Solr starts up the new - * definition will be read. - * - *

If you need to manipulate the return, for instance block a core from being loaded for some - * period of time, override say getTransientDescriptor and return null. - * - *

In particular, DO NOT reach into the transientCores structure from a method called to - * manipulate core descriptors or vice-versa. - */ -@Deprecated(since = "9.2") -public abstract class TransientSolrCoreCache { - - /** Adds the newly-opened core to the list of open cores. */ - public abstract SolrCore addCore(String name, SolrCore core); - - /** Returns the names of all possible cores, whether they are currently loaded or not. */ - public abstract Set getAllCoreNames(); - - /** Returns the names of all currently loaded cores. */ - public abstract Set getLoadedCoreNames(); - - /** - * Removes a core from the internal structures, presumably it being closed. If the core is - * re-opened, it will be re-added by CoreContainer. - */ - public abstract SolrCore removeCore(String name); - - /** Gets the core associated with the name. Returns null if there is none. */ - public abstract SolrCore getCore(String name); - - /** Returns whether the cache contains the named core. */ - public abstract boolean containsCore(String name); - - // These methods allow the implementation to maintain control over the core descriptors. - - /** - * Adds a new {@link CoreDescriptor}. This method will only be called during core discovery at - * startup. - */ - public abstract void addTransientDescriptor(String rawName, CoreDescriptor cd); - - /** - * Gets the {@link CoreDescriptor} for a transient core (loaded or unloaded). This method is used - * when opening cores and the like. If you want to change a core's descriptor, override this - * method and return the current core descriptor. - */ - public abstract CoreDescriptor getTransientDescriptor(String name); - - /** Gets the {@link CoreDescriptor} for all transient cores (loaded and unloaded). */ - public abstract Collection getTransientDescriptors(); - - /** Removes a {@link CoreDescriptor} from the list of transient cores descriptors. */ - public abstract CoreDescriptor removeTransientDescriptor(String name); - - /** Called in order to free resources. */ - public void close() { - // Nothing to do currently - } - ; - - /** - * Gets a custom status for the given core name. Allows custom implementations to communicate - * arbitrary information as necessary. - */ - public abstract int getStatus(String coreName); - - /** - * Sets a custom status for the given core name. Allows custom implementations to communicate - * arbitrary information as necessary. - */ - public abstract void setStatus(String coreName, int status); -} diff --git a/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheDefault.java b/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheDefault.java deleted file mode 100644 index c3c1ac0b620..00000000000 --- a/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheDefault.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.core; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import java.lang.invoke.MethodHandles; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import org.apache.solr.common.util.CollectionUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Cache of the most frequently accessed transient cores. Keeps track of all the registered - * transient cores descriptors, including the cores in the cache as well as all the others. - */ -@Deprecated(since = "9.2") -public class TransientSolrCoreCacheDefault extends TransientSolrCoreCache { - // TODO move into TransientSolrCores; remove TransientSolrCoreCache base/abstraction. - - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static final int DEFAULT_TRANSIENT_CACHE_SIZE = Integer.MAX_VALUE; - public static final String TRANSIENT_CACHE_SIZE = "transientCacheSize"; - - private final TransientSolrCores solrCores; - - /** - * "Lazily loaded" cores cache with limited size. When the max size is reached, the least accessed - * core is evicted to make room for a new core. - */ - protected final Cache transientCores; - - /** - * Unlimited map of all the descriptors for all the registered transient cores, including the - * cores in the {@link #transientCores} as well as all the others. - */ - protected final Map transientDescriptors; - - public TransientSolrCoreCacheDefault(TransientSolrCores solrCores, int cacheMaxSize) { - this.solrCores = solrCores; - - // Now don't allow ridiculous allocations here, if the size is > 1,000, we'll just deal with - // adding cores as they're opened. This blows up with the marker value of -1. - int initialCapacity = Math.min(cacheMaxSize, 1024); - log.info( - "Allocating transient core cache for max {} cores with initial capacity of {}", - cacheMaxSize, - initialCapacity); - Caffeine transientCoresCacheBuilder = - Caffeine.newBuilder() - .initialCapacity(initialCapacity) - // Use the current thread to queue evicted cores for closing. This ensures the - // cache max size is respected (with a different thread the max size would be - // respected asynchronously only eventually). - .executor(Runnable::run) - .removalListener( - (coreName, core, cause) -> { - if (core != null && cause.wasEvicted()) { - onEvict(core); - } - }); - if (cacheMaxSize != Integer.MAX_VALUE) { - transientCoresCacheBuilder.maximumSize(cacheMaxSize); - } - transientCores = transientCoresCacheBuilder.build(); - - transientDescriptors = CollectionUtil.newLinkedHashMap(initialCapacity); - } - - private void onEvict(SolrCore core) { - assert Thread.holdsLock(solrCores.getModifyLock()); - // note: the cache's maximum size isn't strictly enforced; it can grow some if we un-evict - if (solrCores.hasPendingCoreOps(core.getName())) { - // core is loading, unloading, or reloading - if (log.isInfoEnabled()) { - log.info( - "NOT evicting transient core [{}]; it's loading or something else. Size: {}", - core.getName(), - transientCores.estimatedSize()); - } - transientCores.put(core.getName(), core); // put back - } else if (core.getOpenCount() > 1) { - // maybe a *long* running operation is happening or intense load - if (log.isInfoEnabled()) { - log.info( - "NOT evicting transient core [{}]; it's still in use. Size: {}", - core.getName(), - transientCores.estimatedSize()); - } - transientCores.put(core.getName(), core); // put back - } else { - // common case -- can evict it - if (log.isInfoEnabled()) { - log.info("Closing transient core [{}] evicted from the cache", core.getName()); - } - solrCores.queueCoreToClose(core); - } - } - - @Override - public void close() { - transientCores.invalidateAll(); - transientCores.cleanUp(); - } - - @Override - public SolrCore addCore(String name, SolrCore core) { - return transientCores.asMap().put(name, core); - } - - @Override - public Set getAllCoreNames() { - return Collections.unmodifiableSet(transientDescriptors.keySet()); - } - - @Override - public Set getLoadedCoreNames() { - return Collections.unmodifiableSet(transientCores.asMap().keySet()); - } - - @Override - public SolrCore removeCore(String name) { - return transientCores.asMap().remove(name); - } - - @Override - public SolrCore getCore(String name) { - return name == null ? null : transientCores.getIfPresent(name); - } - - @Override - public boolean containsCore(String name) { - return name != null && transientCores.asMap().containsKey(name); - } - - @Override - public void addTransientDescriptor(String rawName, CoreDescriptor cd) { - transientDescriptors.put(rawName, cd); - } - - @Override - public CoreDescriptor getTransientDescriptor(String name) { - return transientDescriptors.get(name); - } - - @Override - public Collection getTransientDescriptors() { - return Collections.unmodifiableCollection(transientDescriptors.values()); - } - - @Override - public CoreDescriptor removeTransientDescriptor(String name) { - return transientDescriptors.remove(name); - } - - @Override - public int getStatus(String coreName) { - // no_op for default handler. - return 0; - } - - @Override - public void setStatus(String coreName, int status) { - // no_op for default handler. - } -} diff --git a/solr/core/src/java/org/apache/solr/core/TransientSolrCores.java b/solr/core/src/java/org/apache/solr/core/TransientSolrCores.java deleted file mode 100644 index 2b50b2d81e3..00000000000 --- a/solr/core/src/java/org/apache/solr/core/TransientSolrCores.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.core; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; - -/** A {@link SolrCores} that supports {@link CoreDescriptor#isTransient()}. */ -@Deprecated(since = "9.2") -public class TransientSolrCores extends SolrCores { - - protected final TransientSolrCoreCache transientSolrCoreCache; - - public TransientSolrCores(CoreContainer container, int cacheSize) { - super(container); - transientSolrCoreCache = new TransientSolrCoreCacheDefault(this, cacheSize); - } - - @Override - protected void close() { - super.close(); - transientSolrCoreCache.close(); - } - - @Override - public void addCoreDescriptor(CoreDescriptor p) { - if (p.isTransient()) { - synchronized (modifyLock) { - transientSolrCoreCache.addTransientDescriptor(p.getName(), p); - } - } else { - super.addCoreDescriptor(p); - } - } - - @Override - public void removeCoreDescriptor(CoreDescriptor p) { - if (p.isTransient()) { - synchronized (modifyLock) { - transientSolrCoreCache.removeTransientDescriptor(p.getName()); - } - } else { - super.removeCoreDescriptor(p); - } - } - - @Override - // Returns the old core if there was a core of the same name. - // WARNING! This should be the _only_ place you put anything into the list of transient cores! - public SolrCore putCore(CoreDescriptor cd, SolrCore core) { - if (cd.isTransient()) { - synchronized (modifyLock) { - addCoreDescriptor(cd); // cd must always be registered if we register a core - return transientSolrCoreCache.addCore(cd.getName(), core); - } - } else { - return super.putCore(cd, core); - } - } - - @Override - public List getLoadedCoreNames() { - synchronized (modifyLock) { - List coreNames = super.getLoadedCoreNames(); // mutable - coreNames.addAll(transientSolrCoreCache.getLoadedCoreNames()); - assert isSet(coreNames); - return coreNames; - } - } - - @Override - public List getAllCoreNames() { - synchronized (modifyLock) { - List coreNames = super.getAllCoreNames(); // mutable - coreNames.addAll(transientSolrCoreCache.getAllCoreNames()); - assert isSet(coreNames); - return coreNames; - } - } - - private static boolean isSet(Collection collection) { - return collection.size() == new HashSet<>(collection).size(); - } - - @Override - public int getNumLoadedTransientCores() { - synchronized (modifyLock) { - return transientSolrCoreCache.getLoadedCoreNames().size(); - } - } - - @Override - public int getNumUnloadedCores() { - synchronized (modifyLock) { - return super.getNumUnloadedCores() - + transientSolrCoreCache.getAllCoreNames().size() - - transientSolrCoreCache.getLoadedCoreNames().size(); - } - } - - @Override - public int getNumAllCores() { - synchronized (modifyLock) { - return super.getNumAllCores() + transientSolrCoreCache.getAllCoreNames().size(); - } - } - - @Override - public SolrCore remove(String name) { - synchronized (modifyLock) { - SolrCore ret = super.remove(name); - // It could have been a newly-created core. It could have been a transient core. The - // newly-created cores in particular should be checked. It could have been a dynamic core. - if (ret == null) { - ret = transientSolrCoreCache.removeCore(name); - } - return ret; - } - } - - @Override - protected SolrCore getLoadedCoreWithoutIncrement(String name) { - synchronized (modifyLock) { - final var core = super.getLoadedCoreWithoutIncrement(name); - return core != null ? core : transientSolrCoreCache.getCore(name); - } - } - - @Override - public boolean isLoaded(String name) { - synchronized (modifyLock) { - return super.isLoaded(name) || transientSolrCoreCache.containsCore(name); - } - } - - @Override - public CoreDescriptor getCoreDescriptor(String coreName) { - synchronized (modifyLock) { - CoreDescriptor coreDescriptor = super.getCoreDescriptor(coreName); - if (coreDescriptor != null) { - return coreDescriptor; - } - return transientSolrCoreCache.getTransientDescriptor(coreName); - } - } - - @Override - public List getCoreDescriptors() { - synchronized (modifyLock) { - List coreDescriptors = new ArrayList<>(getNumAllCores()); - coreDescriptors.addAll(super.getCoreDescriptors()); - coreDescriptors.addAll(transientSolrCoreCache.getTransientDescriptors()); - return coreDescriptors; - } - } -} diff --git a/solr/core/src/test/org/apache/solr/core/TestCoreDiscovery.java b/solr/core/src/test/org/apache/solr/core/TestCoreDiscovery.java index 12fe0cedab0..6129b62cafa 100644 --- a/solr/core/src/test/org/apache/solr/core/TestCoreDiscovery.java +++ b/solr/core/src/test/org/apache/solr/core/TestCoreDiscovery.java @@ -41,6 +41,7 @@ import org.apache.solr.common.util.RetryUtil; import org.junit.After; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -312,6 +313,7 @@ public void testPropFilePersistence() throws Exception { // counts are correct. @Test + @Ignore public void testTooManyTransientCores() throws Exception { setMeUp(); diff --git a/solr/core/src/test/org/apache/solr/core/TestLazyCores.java b/solr/core/src/test/org/apache/solr/core/TestLazyCores.java index 30b038609cb..c44082a6ce9 100644 --- a/solr/core/src/test/org/apache/solr/core/TestLazyCores.java +++ b/solr/core/src/test/org/apache/solr/core/TestLazyCores.java @@ -28,13 +28,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.regex.Pattern; import org.apache.solr.SolrTestCaseJ4; -import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.StreamingResponseCallback; -import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer; -import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CoreAdminParams; @@ -47,11 +42,12 @@ import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.UpdateHandler; -import org.apache.solr.util.LogListener; import org.apache.solr.util.ReadOnlyCoresLocator; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; +@Ignore public class TestLazyCores extends SolrTestCaseJ4 { /** Transient core cache max size defined in the test solr-transientCores.xml */ @@ -1029,87 +1025,4 @@ private void check10(SolrCore core) { makeReq(core, "q", "*:*"), "//result[@numFound='10']"); } - - public void testDontEvictUsedCore() throws Exception { - // If a core is being used for a long time (say a long indexing batch) but nothing else for it, - // and if the transient cache has pressure and thus wants to unload a core, we should not - // unload it (yet). - - CoreContainer cc = init(); - String[] transientCoreNames = - new String[] { - "collection2", "collection3", "collection6", "collection7", "collection8", "collection9" - }; - - try (LogListener logs = - LogListener.info(TransientSolrCoreCacheDefault.class.getName()) - .substring("NOT evicting transient core [" + transientCoreNames[0] + "]")) { - cc.waitForLoadingCoresToFinish(1000); - var solr = new EmbeddedSolrServer(cc, null); - final var longReqTimeMs = 5000; // plenty of time for a slow/busy CI - - // First, start a long request on the first transient core. - // We do this via relying on EmbeddedSolrServer to keep the core open as it works with - // this streaming callback mechanism. - var longRequestLatch = new CountDownLatch(1); - var thread = - new Thread("longRequest") { - @Override - public void run() { - try { - solr.queryAndStreamResponse( - transientCoreNames[0], - params("q", "*:*"), - new StreamingResponseCallback() { - @Override - public void streamSolrDocument(SolrDocument doc) {} - - @Override - public void streamDocListInfo(long numFound, long start, Float maxScore) { - try { - // the core remains open until the test calls countDown() - longRequestLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - }); - } catch (SolrServerException | IOException e) { - fail(e.toString()); - } - } - }; - thread.start(); - - System.out.println("Inducing pressure on cache by querying many cores..."); - // Now hammer on other transient cores to create transient cache pressure - for (int round = 0; round < 5 && logs.getCount() == 0; round++) { - // note: we skip over the first; we want the first to remain non-busy - for (int i = 1; i < transientCoreNames.length; i++) { - solr.query(transientCoreNames[i], params("q", "*:*")); - } - } - // Show that the cache logs that it was asked to evict but did not. - // To show the bug behavior, comment this out and also comment out the corresponding logic - // that fixes it at the spot this message is logged. - assertTrue(logs.getCount() > 0); - - System.out.println("Done inducing pressure; now load first core"); - assertTrue("long request should still be busy", thread.isAlive()); - // Do another request on the first core - solr.query(transientCoreNames[0], params("q", "id:wakeUp")); - - longRequestLatch.countDown(); - thread.join(longReqTimeMs); - assertFalse(thread.isAlive()); - - // Do another request on the first core - solr.query(transientCoreNames[0], params("q", "id:justCheckingAgain")); - - logs.getQueue().clear(); - } finally { - cc.shutdown(); - } - } }