Skip to content

Commit

Permalink
HDFS-13603: Do not propagate ExecutionException while initializing ED…
Browse files Browse the repository at this point in the history
…EK queues for keys. (#6860)
  • Loading branch information
yzhang559 authored Jun 3, 2024
1 parent 167d4c8 commit f1e2ceb
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -947,11 +947,7 @@ public void flush() throws IOException {
@Override
public void warmUpEncryptedKeys(String... keyNames)
throws IOException {
try {
encKeyVersionQueue.initializeQueuesForKeys(keyNames);
} catch (ExecutionException e) {
throw new IOException(e);
}
encKeyVersionQueue.initializeQueuesForKeys(keyNames);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,24 @@ public ValueQueue(final int numValues, final float lowWaterMark, long expiry,
* Initializes the Value Queues for the provided keys by calling the
* fill Method with "numInitValues" values
* @param keyNames Array of key Names
* @throws ExecutionException executionException.
* @throws IOException if initialization fails for any provided keys
*/
public void initializeQueuesForKeys(String... keyNames)
throws ExecutionException {
public void initializeQueuesForKeys(String... keyNames) throws IOException {
int successfulInitializations = 0;
ExecutionException lastException = null;

for (String keyName : keyNames) {
keyQueues.get(keyName);
try {
keyQueues.get(keyName);
successfulInitializations++;
} catch (ExecutionException e) {
lastException = e;
}
}

if (keyNames.length > 0 && successfulInitializations != keyNames.length) {
throw new IOException(String.format("Failed to initialize %s queues for the provided keys.",
keyNames.length - successfulInitializations), lastException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,27 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.crypto.key.kms.ValueQueue;
import org.apache.hadoop.crypto.key.kms.ValueQueue.QueueRefiller;
import org.apache.hadoop.crypto.key.kms.ValueQueue.SyncGenerationPolicy;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
import org.junit.Assert;
import org.junit.Test;

import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.spy;


public class TestValueQueue {
Logger LOG = LoggerFactory.getLogger(TestValueQueue.class);
Expand Down Expand Up @@ -111,6 +119,41 @@ public void testWarmUp() throws Exception {
vq.shutdown();
}

/**
* Verifies that Queue is initialized (Warmed-up) for partial keys.
*/
@Test(timeout = 30000)
public void testPartialWarmUp() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<>(10, 0.5f, 30000, 1,
SyncGenerationPolicy.ALL, filler);

@SuppressWarnings("unchecked")
LoadingCache<String, LinkedBlockingQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>> kq =
(LoadingCache<String, LinkedBlockingQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>>)
FieldUtils.getField(ValueQueue.class, "keyQueues", true).get(vq);

LoadingCache<String, LinkedBlockingQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>>
kqSpy = spy(kq);
doThrow(new ExecutionException(new Exception())).when(kqSpy).get("k2");
FieldUtils.writeField(vq, "keyQueues", kqSpy, true);

Assert.assertThrows(IOException.class, () -> vq.initializeQueuesForKeys("k1", "k2", "k3"));
verify(kqSpy, times(1)).get("k2");

FillInfo[] fillInfos =
{filler.getTop(), filler.getTop(), filler.getTop()};
Assert.assertEquals(5, fillInfos[0].num);
Assert.assertEquals(5, fillInfos[1].num);
Assert.assertNull(fillInfos[2]);

Assert.assertEquals(new HashSet<>(Arrays.asList("k1", "k3")),
new HashSet<>(Arrays.asList(fillInfos[0].key,
fillInfos[1].key)));
vq.shutdown();
}

/**
* Verifies that the refill task is executed after "checkInterval" if
* num values below "lowWatermark"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,8 @@ SyncGenerationPolicy.LOW_WATERMARK, new EncryptedQueueRefiller()
}

@Override
public void warmUpEncryptedKeys(String... keyNames) throws
IOException {
try {
encKeyVersionQueue.initializeQueuesForKeys(keyNames);
} catch (ExecutionException e) {
throw new IOException(e);
}
public void warmUpEncryptedKeys(String... keyNames) throws IOException {
encKeyVersionQueue.initializeQueuesForKeys(keyNames);
}

@Override
Expand Down

0 comments on commit f1e2ceb

Please sign in to comment.