From 7e9b903ac26e42b4da32dacbabcbad23e9bbe36c Mon Sep 17 00:00:00 2001 From: Drew Farris Date: Mon, 6 Jan 2025 23:30:25 +0000 Subject: [PATCH 1/3] Implements an ArrowRootAllocationProvider spi --- pom.xml | 33 ++++++++++++++++ .../spi/ArrowRootAllocatorProvider.java | 38 +++++++++++++++++++ .../emissary.spi.InitializationProvider | 1 + .../spi/ArrowRootAllocatorProviderTest.java | 30 +++++++++++++++ 4 files changed, 102 insertions(+) create mode 100644 src/main/java/emissary/spi/ArrowRootAllocatorProvider.java create mode 100644 src/test/java/emissary/spi/ArrowRootAllocatorProviderTest.java diff --git a/pom.xml b/pom.xml index 1f12890c08..cc01c5755d 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ ${project.version} ${project.basedir}/contrib/checkstyle.xml + 16.1.0 1.16.0 4.4 1.27.1 @@ -197,6 +198,16 @@ spymemcached ${dep.spymemcached.version} + + org.apache.arrow + arrow-memory-core + ${dep.arrow.version} + + + org.apache.arrow + arrow-memory-netty + ${dep.arrow.version} + org.apache.commons commons-collections4 @@ -401,6 +412,17 @@ net.spy spymemcached + + org.apache.arrow + arrow-memory-core + + + + org.checkerframework + checker-qual + + + org.apache.commons commons-collections4 @@ -523,6 +545,13 @@ error_prone_annotations test + + org.apache.arrow + arrow-memory-netty + test + + true + org.glassfish.jersey.test-framework.providers jersey-test-framework-provider-jetty @@ -1018,6 +1047,10 @@ + + + arrow-git.properties + false true true diff --git a/src/main/java/emissary/spi/ArrowRootAllocatorProvider.java b/src/main/java/emissary/spi/ArrowRootAllocatorProvider.java new file mode 100644 index 0000000000..30de578440 --- /dev/null +++ b/src/main/java/emissary/spi/ArrowRootAllocatorProvider.java @@ -0,0 +1,38 @@ +package emissary.spi; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; + +public class ArrowRootAllocatorProvider implements InitializationProvider { + + private static final Object initalizationLock = new Object(); + private static BufferAllocator arrowRootAllocator = null; + + @Override + public void initialize() { + synchronized (initalizationLock) { + arrowRootAllocator = new RootAllocator(); + } + } + + @Override + public void shutdown() { + synchronized (initalizationLock) { + arrowRootAllocator.close(); + arrowRootAllocator = null; + } + InitializationProvider.super.shutdown(); + } + + public static BufferAllocator getArrowRootAllocator() { + synchronized (initalizationLock) { + if (arrowRootAllocator == null) { + throw new IllegalStateException("Arrow Root Allocator has not been initalized by the " + + "ArrowRootAllocatorProvider or is already shutdown, is emissary.spi.ArrowRootAllocatorProver " + + "listed in META-INF/services/emissary.spi.InitalizationProvider?"); + } else { + return arrowRootAllocator; + } + } + } +} diff --git a/src/main/resources/META-INF/services/emissary.spi.InitializationProvider b/src/main/resources/META-INF/services/emissary.spi.InitializationProvider index 43f3840efa..bb822f5482 100644 --- a/src/main/resources/META-INF/services/emissary.spi.InitializationProvider +++ b/src/main/resources/META-INF/services/emissary.spi.InitializationProvider @@ -1,2 +1,3 @@ emissary.spi.JavaCharSetInitializationProvider emissary.spi.ClassLocationVerificationProvider +emissary.spi.ArrowRootAllocatorProvider \ No newline at end of file diff --git a/src/test/java/emissary/spi/ArrowRootAllocatorProviderTest.java b/src/test/java/emissary/spi/ArrowRootAllocatorProviderTest.java new file mode 100644 index 0000000000..4c96b2f1f7 --- /dev/null +++ b/src/test/java/emissary/spi/ArrowRootAllocatorProviderTest.java @@ -0,0 +1,30 @@ +package emissary.spi; + +import emissary.test.core.junit5.UnitTest; + +import org.apache.arrow.memory.BufferAllocator; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ArrowRootAllocatorProviderTest extends UnitTest { + @Test + public void testArrowRootAllocatorProvider() { + ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider(); + provider.initialize(); + BufferAllocator allocator = ArrowRootAllocatorProvider.getArrowRootAllocator(); + assertNotNull(allocator); + } + + @Test() + public void testArrowRootAllocatorProviderAfterShutdown() { + ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider(); + provider.initialize(); + BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator(); + assertNotNull(allocatorOne); + provider.shutdown(); + assertThrows(IllegalStateException.class, ArrowRootAllocatorProvider::getArrowRootAllocator, "expected IllegalStateException"); + } +} + From 4ddbc49096e11feb9fb6de2d0f3e52eb938e9410 Mon Sep 17 00:00:00 2001 From: Drew Farris Date: Thu, 9 Jan 2025 19:13:24 +0000 Subject: [PATCH 2/3] Adds documentation and tests for ArrowRootAllocationProvider spi --- .../spi/ArrowRootAllocatorProvider.java | 98 +++++++++++++++++-- .../spi/ArrowRootAllocatorProviderTest.java | 80 ++++++++++++++- 2 files changed, 167 insertions(+), 11 deletions(-) diff --git a/src/main/java/emissary/spi/ArrowRootAllocatorProvider.java b/src/main/java/emissary/spi/ArrowRootAllocatorProvider.java index 30de578440..c12963fe3f 100644 --- a/src/main/java/emissary/spi/ArrowRootAllocatorProvider.java +++ b/src/main/java/emissary/spi/ArrowRootAllocatorProvider.java @@ -2,36 +2,114 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collection; + +/** + * Provides a central class for obtaining references to the Arrow root memory allocator. Activate this by including it + * in the list of classes in + * + *
+ * META - INF / services / emissary.spi.InitializationProvider
+ * 
+ * + * Classes wishing to get a reference to the Arrow root allocator should call the {@link #getArrowRootAllocator()}. They + * are free to create child allocators as needed, but they are responsible for managing any buffers created using either + * the root or a chile allocator and calling + * + *
+ * close()
+ * 
+ * + * on them when they are no longer needed. The {@link #shutdown()} method will automatically close any child allocators + * created, but will throw an {@link java.lang.IllegalStateException} if there are allocated buffers that have not been + * closed (potentially leaking memory). Provides debug and trace level logging for detailed behavior. + */ public class ArrowRootAllocatorProvider implements InitializationProvider { + private static final Logger logger = LoggerFactory.getLogger(ArrowRootAllocatorProvider.class); - private static final Object initalizationLock = new Object(); + private static final Object allocatorLock = new Object(); private static BufferAllocator arrowRootAllocator = null; @Override public void initialize() { - synchronized (initalizationLock) { + logger.trace("Waiting for allocator lock in initialize()"); + synchronized (allocatorLock) { + logger.debug("Creating new Arrow root allocator"); + + // creates a RootAllocator with the default memory settings, we may consider implementing a limit here + // that is set via a system property here instead. arrowRootAllocator = new RootAllocator(); + + logger.trace("Releasing allocator lock in initialize()"); } } + /** Shuts down the root allocator and any child allocators */ @Override public void shutdown() { - synchronized (initalizationLock) { + logger.trace("Waiting for allocator lock in shutdown()"); + synchronized (allocatorLock) { + logger.trace("Closing Arrow allocators"); + Collection children = arrowRootAllocator.getChildAllocators(); + if (children.isEmpty()) { + logger.trace("Root allocator has no children to close"); + } else { + if (logger.isTraceEnabled()) { + logger.trace("Attempting to clode {} child allocators", children.size()); + } + for (BufferAllocator child : children) { + if (logger.isDebugEnabled()) { + logger.debug("Shutting down child allocator: {}", child.getName()); + } + try { + child.close(); + if (logger.isTraceEnabled()) { + logger.trace("Successfully closed child allocator {}", child.getName()); + } + } catch (IllegalStateException e) { + // it's ok to catch this, another IllegalStateException will be thrown when closing the root allocator. + logger.warn("IllegalStateException when closing child allocator {}, message: {}", child.getName(), e.getMessage()); + } + } + } + + logger.trace("Closing root allocator"); arrowRootAllocator.close(); + logger.debug("Successfully closed root allocator"); arrowRootAllocator = null; + logger.trace("Releasing allocator lock in shutdown()"); } InitializationProvider.super.shutdown(); } + /** + * Obtain a reference to the arrow root allocator. Any buffers or child allocators allocated using this instance must be + * + *
+     * close()
+     * 
+ * + * 'd once they are no longer used. + * + * @return the Arrow root allocator + */ public static BufferAllocator getArrowRootAllocator() { - synchronized (initalizationLock) { - if (arrowRootAllocator == null) { - throw new IllegalStateException("Arrow Root Allocator has not been initalized by the " + - "ArrowRootAllocatorProvider or is already shutdown, is emissary.spi.ArrowRootAllocatorProver " + - "listed in META-INF/services/emissary.spi.InitalizationProvider?"); - } else { - return arrowRootAllocator; + logger.trace("Waiting for allocator lock in getArrowRootAllocator()"); + synchronized (allocatorLock) { + try { + if (arrowRootAllocator == null) { + throw new IllegalStateException("Arrow Root Allocator has not been initialized by the " + + "ArrowRootAllocatorProvider or is already shutdown, is emissary.spi.ArrowRootAllocatorProver " + + "listed in META-INF/services/emissary.spi.InitializationProvider?"); + } else { + logger.trace("Returning root allocator"); + return arrowRootAllocator; + } + } finally { + logger.trace("Releasing allocator lock in getArrowRootAllocator()"); } } } diff --git a/src/test/java/emissary/spi/ArrowRootAllocatorProviderTest.java b/src/test/java/emissary/spi/ArrowRootAllocatorProviderTest.java index 4c96b2f1f7..29a560d27b 100644 --- a/src/test/java/emissary/spi/ArrowRootAllocatorProviderTest.java +++ b/src/test/java/emissary/spi/ArrowRootAllocatorProviderTest.java @@ -2,21 +2,86 @@ import emissary.test.core.junit5.UnitTest; +import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +/** + * Tests various ArrowRootAllocatorProvider scenarios and demonstrates expected behavior when conditions related to + * failing to close various Arrow resources occurs. + */ public class ArrowRootAllocatorProviderTest extends UnitTest { + /** shutdown is clean if no memory has been allocated and no child allocators have been created */ @Test public void testArrowRootAllocatorProvider() { ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider(); provider.initialize(); BufferAllocator allocator = ArrowRootAllocatorProvider.getArrowRootAllocator(); assertNotNull(allocator); + provider.shutdown(); + } + + /** creating a buffer and not closing it will cause a leak */ + @Test + public void testArrowRootAllocatorShutdownLeak() { + final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider(); + provider.initialize(); + BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator(); + assertNotNull(allocatorOne); + ArrowBuf buffer = allocatorOne.buffer(1024); + assertThrows(IllegalStateException.class, provider::shutdown, + "expected IllegalStateException attempting to shutdown allocator with allocated buffer open"); + } + + /** + * creating a child allocator and not closing it before the root allocator provider is shutdown is OK, as long as that + * child allocator doesn't have any open buffers. The root allocator provider attempts to shut down all children. + */ + @Test + public void testArrowRootAllocatorShutdownChildClean() { + final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider(); + provider.initialize(); + BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator(); + assertNotNull(allocatorOne); + BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048); + assertNotNull(allocatorChild); + } + + /** + * creating a child allocator and not closing its buffers before the root allocator provider is shutdown should fail + * when the root allocator provider attempts to shut down all children. + */ + @Test + public void testArrowRootAllocatorShutdownChildLeak() { + final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider(); + provider.initialize(); + BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator(); + assertNotNull(allocatorOne); + BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048); + allocatorChild.buffer(1024); + assertNotNull(allocatorChild); + assertThrows(IllegalStateException.class, provider::shutdown, + "expected IllegalStateException attempting to shutdown allocator with child allocator open"); + } + + /** both allocated buffers and child allocators must be closed before the root allocator can be shutdown cleanly */ + @Test + public void testArrowRootAllocatorShutdownAfterProperClose() { + final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider(); + provider.initialize(); + BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator(); + assertNotNull(allocatorOne); + BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048); + ArrowBuf buffer = allocatorChild.buffer(1024); + buffer.close(); + allocatorChild.close(); + provider.shutdown(); } + /** the root allocator can't be obtained after shutdown */ @Test() public void testArrowRootAllocatorProviderAfterShutdown() { ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider(); @@ -24,7 +89,20 @@ public void testArrowRootAllocatorProviderAfterShutdown() { BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator(); assertNotNull(allocatorOne); provider.shutdown(); - assertThrows(IllegalStateException.class, ArrowRootAllocatorProvider::getArrowRootAllocator, "expected IllegalStateException"); + assertThrows(IllegalStateException.class, ArrowRootAllocatorProvider::getArrowRootAllocator, + "expected IllegalStateException attempting to get an allocator after shutdown"); + } + + /** the root allocator won't allocate after shutdown */ + @Test + public void testArrowRootAllocatorProviderAllocateAfterShutdown() { + ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider(); + provider.initialize(); + BufferAllocator allocator = ArrowRootAllocatorProvider.getArrowRootAllocator(); + assertNotNull(allocator); + provider.shutdown(); + assertThrows(IllegalStateException.class, () -> allocator.buffer(1024), + "expected IllegalStateException attempting to allocate after provider is shutdown"); } } From 33ef510aaf75acf0ae566bac4879561a638c45e5 Mon Sep 17 00:00:00 2001 From: Drew Farris Date: Thu, 9 Jan 2025 19:47:08 +0000 Subject: [PATCH 3/3] Removed test scope from arrow-memory-netty so ci jobs work --- pom.xml | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index cc01c5755d..b27474a6ac 100644 --- a/pom.xml +++ b/pom.xml @@ -202,6 +202,13 @@ org.apache.arrow arrow-memory-core ${dep.arrow.version} + + + + org.checkerframework + checker-qual + +
org.apache.arrow @@ -415,13 +422,12 @@ org.apache.arrow arrow-memory-core - - - - org.checkerframework - checker-qual - - + + + org.apache.arrow + arrow-memory-netty + + true org.apache.commons @@ -545,13 +551,6 @@ error_prone_annotations test - - org.apache.arrow - arrow-memory-netty - test - - true - org.glassfish.jersey.test-framework.providers jersey-test-framework-provider-jetty