Skip to content

Commit

Permalink
Adds documentation and tests for ArrowRootAllocationProvider spi
Browse files Browse the repository at this point in the history
  • Loading branch information
drewfarris committed Jan 9, 2025
1 parent 7e9b903 commit 4ddbc49
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 11 deletions.
98 changes: 88 additions & 10 deletions src/main/java/emissary/spi/ArrowRootAllocatorProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,114 @@

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;

/**
* Provides a central class for obtaining references to the Arrow root memory allocator. Activate this by including it
* in the list of classes in
*
* <pre>
* META - INF / services / emissary.spi.InitializationProvider
* </pre>
*
* Classes wishing to get a reference to the Arrow root allocator should call the {@link #getArrowRootAllocator()}. They
* are free to create child allocators as needed, but they are responsible for managing any buffers created using either
* the root or a chile allocator and calling
*
* <pre>
* close()
* </pre>
*
* on them when they are no longer needed. The {@link #shutdown()} method will automatically close any child allocators
* created, but will throw an {@link java.lang.IllegalStateException} if there are allocated buffers that have not been
* closed (potentially leaking memory). Provides debug and trace level logging for detailed behavior.
*/
public class ArrowRootAllocatorProvider implements InitializationProvider {
private static final Logger logger = LoggerFactory.getLogger(ArrowRootAllocatorProvider.class);

private static final Object initalizationLock = new Object();
private static final Object allocatorLock = new Object();
private static BufferAllocator arrowRootAllocator = null;

@Override
public void initialize() {
synchronized (initalizationLock) {
logger.trace("Waiting for allocator lock in initialize()");
synchronized (allocatorLock) {
logger.debug("Creating new Arrow root allocator");

// creates a RootAllocator with the default memory settings, we may consider implementing a limit here
// that is set via a system property here instead.
arrowRootAllocator = new RootAllocator();

logger.trace("Releasing allocator lock in initialize()");
}
}

/** Shuts down the root allocator and any child allocators */
@Override
public void shutdown() {
synchronized (initalizationLock) {
logger.trace("Waiting for allocator lock in shutdown()");
synchronized (allocatorLock) {
logger.trace("Closing Arrow allocators");
Collection<BufferAllocator> children = arrowRootAllocator.getChildAllocators();
if (children.isEmpty()) {
logger.trace("Root allocator has no children to close");
} else {
if (logger.isTraceEnabled()) {
logger.trace("Attempting to clode {} child allocators", children.size());
}
for (BufferAllocator child : children) {
if (logger.isDebugEnabled()) {
logger.debug("Shutting down child allocator: {}", child.getName());
}
try {
child.close();
if (logger.isTraceEnabled()) {
logger.trace("Successfully closed child allocator {}", child.getName());
}
} catch (IllegalStateException e) {
// it's ok to catch this, another IllegalStateException will be thrown when closing the root allocator.
logger.warn("IllegalStateException when closing child allocator {}, message: {}", child.getName(), e.getMessage());
}
}
}

logger.trace("Closing root allocator");
arrowRootAllocator.close();
logger.debug("Successfully closed root allocator");
arrowRootAllocator = null;
logger.trace("Releasing allocator lock in shutdown()");
}
InitializationProvider.super.shutdown();
}

/**
* Obtain a reference to the arrow root allocator. Any buffers or child allocators allocated using this instance must be
*
* <pre>
* close()
* </pre>
*
* 'd once they are no longer used.
*
* @return the Arrow root allocator
*/
public static BufferAllocator getArrowRootAllocator() {
synchronized (initalizationLock) {
if (arrowRootAllocator == null) {
throw new IllegalStateException("Arrow Root Allocator has not been initalized by the " +
"ArrowRootAllocatorProvider or is already shutdown, is emissary.spi.ArrowRootAllocatorProver " +
"listed in META-INF/services/emissary.spi.InitalizationProvider?");
} else {
return arrowRootAllocator;
logger.trace("Waiting for allocator lock in getArrowRootAllocator()");
synchronized (allocatorLock) {
try {
if (arrowRootAllocator == null) {
throw new IllegalStateException("Arrow Root Allocator has not been initialized by the " +
"ArrowRootAllocatorProvider or is already shutdown, is emissary.spi.ArrowRootAllocatorProver " +
"listed in META-INF/services/emissary.spi.InitializationProvider?");
} else {
logger.trace("Returning root allocator");
return arrowRootAllocator;
}
} finally {
logger.trace("Releasing allocator lock in getArrowRootAllocator()");
}
}
}
Expand Down
80 changes: 79 additions & 1 deletion src/test/java/emissary/spi/ArrowRootAllocatorProviderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,107 @@

import emissary.test.core.junit5.UnitTest;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
* Tests various ArrowRootAllocatorProvider scenarios and demonstrates expected behavior when conditions related to
* failing to close various Arrow resources occurs.
*/
public class ArrowRootAllocatorProviderTest extends UnitTest {
/** shutdown is clean if no memory has been allocated and no child allocators have been created */
@Test
public void testArrowRootAllocatorProvider() {
ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocator = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocator);
provider.shutdown();
}

/** creating a buffer and not closing it will cause a leak */
@Test
public void testArrowRootAllocatorShutdownLeak() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
ArrowBuf buffer = allocatorOne.buffer(1024);
assertThrows(IllegalStateException.class, provider::shutdown,
"expected IllegalStateException attempting to shutdown allocator with allocated buffer open");
}

/**
* creating a child allocator and not closing it before the root allocator provider is shutdown is OK, as long as that
* child allocator doesn't have any open buffers. The root allocator provider attempts to shut down all children.
*/
@Test
public void testArrowRootAllocatorShutdownChildClean() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048);
assertNotNull(allocatorChild);
}

/**
* creating a child allocator and not closing its buffers before the root allocator provider is shutdown should fail
* when the root allocator provider attempts to shut down all children.
*/
@Test
public void testArrowRootAllocatorShutdownChildLeak() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048);
allocatorChild.buffer(1024);
assertNotNull(allocatorChild);
assertThrows(IllegalStateException.class, provider::shutdown,
"expected IllegalStateException attempting to shutdown allocator with child allocator open");
}

/** both allocated buffers and child allocators must be closed before the root allocator can be shutdown cleanly */
@Test
public void testArrowRootAllocatorShutdownAfterProperClose() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048);
ArrowBuf buffer = allocatorChild.buffer(1024);
buffer.close();
allocatorChild.close();
provider.shutdown();
}

/** the root allocator can't be obtained after shutdown */
@Test()
public void testArrowRootAllocatorProviderAfterShutdown() {
ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
provider.shutdown();
assertThrows(IllegalStateException.class, ArrowRootAllocatorProvider::getArrowRootAllocator, "expected IllegalStateException");
assertThrows(IllegalStateException.class, ArrowRootAllocatorProvider::getArrowRootAllocator,
"expected IllegalStateException attempting to get an allocator after shutdown");
}

/** the root allocator won't allocate after shutdown */
@Test
public void testArrowRootAllocatorProviderAllocateAfterShutdown() {
ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocator = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocator);
provider.shutdown();
assertThrows(IllegalStateException.class, () -> allocator.buffer(1024),
"expected IllegalStateException attempting to allocate after provider is shutdown");
}
}

0 comments on commit 4ddbc49

Please sign in to comment.