diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java
index bd21f208faac4..3fa4f7de7e717 100644
--- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java
+++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java
@@ -9,6 +9,7 @@
package org.elasticsearch.repositories.azure;
import fixture.azure.AzureHttpHandler;
+import fixture.azure.MockAzureBlobStore;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;
@@ -184,7 +185,12 @@ long getUploadBlockSize() {
@SuppressForbidden(reason = "this test uses a HttpHandler to emulate an Azure endpoint")
private static class AzureBlobStoreHttpHandler extends AzureHttpHandler implements BlobStoreHttpHandler {
AzureBlobStoreHttpHandler(final String account, final String container) {
- super(account, container, null /* no auth header validation - sometimes it's omitted in these tests (TODO why?) */);
+ super(
+ account,
+ container,
+ null /* no auth header validation - sometimes it's omitted in these tests (TODO why?) */,
+ MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE
+ );
}
}
diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java
index 6d5c17c392141..40be0f8ca78c4 100644
--- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java
+++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java
@@ -10,6 +10,7 @@
package org.elasticsearch.repositories.azure;
import fixture.azure.AzureHttpFixture;
+import fixture.azure.MockAzureBlobStore;
import com.azure.core.exception.HttpResponseException;
import com.azure.storage.blob.BlobContainerClient;
@@ -60,7 +61,8 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi
System.getProperty("test.azure.container"),
System.getProperty("test.azure.tenant_id"),
System.getProperty("test.azure.client_id"),
- AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_ACCOUNT)
+ AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_ACCOUNT),
+ MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE
);
@Override
diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java
index 52bc1ee1399d4..73936d82fc204 100644
--- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java
+++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java
@@ -180,7 +180,7 @@ protected String buildKey(String blobName) {
}
private boolean skipRegisterOperation(ActionListener> listener) {
- return skipCas(listener) || skipIfNotPrimaryOnlyLocationMode(listener);
+ return skipIfNotPrimaryOnlyLocationMode(listener);
}
private boolean skipIfNotPrimaryOnlyLocationMode(ActionListener> listener) {
diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
index 3c64bb9f3b830..b4567a92184fc 100644
--- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
+++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
@@ -40,6 +40,7 @@
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
+import com.azure.storage.blob.specialized.BlobLeaseClient;
import com.azure.storage.blob.specialized.BlobLeaseClientBuilder;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
@@ -1010,7 +1011,7 @@ private static BytesReference innerCompareAndExchangeRegister(
}
return currentValue;
} finally {
- leaseClient.releaseLease();
+ bestEffortRelease(leaseClient);
}
} else {
if (expected.length() == 0) {
@@ -1020,6 +1021,29 @@ private static BytesReference innerCompareAndExchangeRegister(
}
}
+ /**
+ * Release the lease, ignoring conflicts due to expiry
+ *
+ * @see Outcomes of lease operations by lease state
+ * @param leaseClient The client for the lease
+ */
+ private static void bestEffortRelease(BlobLeaseClient leaseClient) {
+ try {
+ leaseClient.releaseLease();
+ } catch (BlobStorageException blobStorageException) {
+ if (blobStorageException.getStatusCode() == RestStatus.CONFLICT.getStatus()) {
+ // This is OK, we tried to release a lease that was expired/re-acquired
+ logger.debug(
+ "Ignored conflict on release: errorCode={}, message={}",
+ blobStorageException.getErrorCode(),
+ blobStorageException.getMessage()
+ );
+ } else {
+ throw blobStorageException;
+ }
+ }
+ }
+
private static BytesReference downloadRegisterBlob(
String containerPath,
String blobKey,
diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java
index 6730e5c3c81bd..812d519e60260 100644
--- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java
+++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java
@@ -10,6 +10,7 @@
package org.elasticsearch.repositories.azure;
import fixture.azure.AzureHttpHandler;
+import fixture.azure.MockAzureBlobStore;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.bytes.BytesReference;
@@ -26,7 +27,7 @@ public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase {
@SuppressForbidden(reason = "use a http server")
@Before
public void configureAzureHandler() {
- httpServer.createContext("/", new AzureHttpHandler(ACCOUNT, CONTAINER, null));
+ httpServer.createContext("/", new AzureHttpHandler(ACCOUNT, CONTAINER, null, MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE));
}
public void testOperationPurposeIsReflectedInBlobStoreStats() throws IOException {
diff --git a/modules/repository-azure/src/yamlRestTest/java/org/elasticsearch/repositories/azure/RepositoryAzureClientYamlTestSuiteIT.java b/modules/repository-azure/src/yamlRestTest/java/org/elasticsearch/repositories/azure/RepositoryAzureClientYamlTestSuiteIT.java
index 64dde0248ad2c..b24574da36825 100644
--- a/modules/repository-azure/src/yamlRestTest/java/org/elasticsearch/repositories/azure/RepositoryAzureClientYamlTestSuiteIT.java
+++ b/modules/repository-azure/src/yamlRestTest/java/org/elasticsearch/repositories/azure/RepositoryAzureClientYamlTestSuiteIT.java
@@ -10,6 +10,7 @@
package org.elasticsearch.repositories.azure;
import fixture.azure.AzureHttpFixture;
+import fixture.azure.MockAzureBlobStore;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
@@ -47,7 +48,8 @@ public class RepositoryAzureClientYamlTestSuiteIT extends ESClientYamlSuiteTestC
AZURE_TEST_CONTAINER,
AZURE_TEST_TENANT_ID,
AZURE_TEST_CLIENT_ID,
- decideAuthHeaderPredicate()
+ decideAuthHeaderPredicate(),
+ MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE
);
private static Predicate decideAuthHeaderPredicate() {
diff --git a/modules/repository-azure/src/yamlRestTest/resources/rest-api-spec/test/repository_azure/20_repository.yml b/modules/repository-azure/src/yamlRestTest/resources/rest-api-spec/test/repository_azure/20_repository.yml
index a4a7d0b22a0ed..968e93cf9fc55 100644
--- a/modules/repository-azure/src/yamlRestTest/resources/rest-api-spec/test/repository_azure/20_repository.yml
+++ b/modules/repository-azure/src/yamlRestTest/resources/rest-api-spec/test/repository_azure/20_repository.yml
@@ -193,6 +193,20 @@ setup:
container: zHHkfSqlbnBsbpSgvCYtxrEfFLqghXtyPvvvKPNBnRCicNHQLE
client: integration_test
+---
+"Register a read-only repository with a non existing container":
+
+ - do:
+ catch: /repository_verification_exception/
+ snapshot.create_repository:
+ repository: repository
+ body:
+ type: azure
+ settings:
+ container: zHHkfSqlbnBsbpSgvCYtxrEfFLqghXtyPvvvKPNBnRCicNHQLE
+ client: integration_test
+ readonly: true
+
---
"Register a repository with a non existing client":
diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java
index 39105e0a27dc9..ab4d54f4fc451 100644
--- a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java
+++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java
@@ -45,6 +45,7 @@ public class AzureHttpFixture extends ExternalResource {
private final String clientId;
private final String tenantId;
private final Predicate authHeaderPredicate;
+ private final MockAzureBlobStore.LeaseExpiryPredicate leaseExpiryPredicate;
private HttpServer server;
private HttpServer metadataServer;
@@ -116,7 +117,8 @@ public AzureHttpFixture(
String container,
@Nullable String rawTenantId,
@Nullable String rawClientId,
- Predicate authHeaderPredicate
+ Predicate authHeaderPredicate,
+ MockAzureBlobStore.LeaseExpiryPredicate leaseExpiryPredicate
) {
final var tenantId = Strings.hasText(rawTenantId) ? rawTenantId : null;
final var clientId = Strings.hasText(rawClientId) ? rawClientId : null;
@@ -135,6 +137,7 @@ public AzureHttpFixture(
this.tenantId = tenantId;
this.clientId = clientId;
this.authHeaderPredicate = authHeaderPredicate;
+ this.leaseExpiryPredicate = leaseExpiryPredicate;
}
private String scheme() {
@@ -193,7 +196,10 @@ protected void before() {
}
case HTTP -> {
server = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
- server.createContext("/" + account, new AzureHttpHandler(account, container, actualAuthHeaderPredicate));
+ server.createContext(
+ "/" + account,
+ new AzureHttpHandler(account, container, actualAuthHeaderPredicate, leaseExpiryPredicate)
+ );
server.start();
oauthTokenServiceServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
@@ -222,7 +228,10 @@ protected void before() {
final var httpsServer = HttpsServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
this.server = httpsServer;
httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
- httpsServer.createContext("/" + account, new AzureHttpHandler(account, container, actualAuthHeaderPredicate));
+ httpsServer.createContext(
+ "/" + account,
+ new AzureHttpHandler(account, container, actualAuthHeaderPredicate, leaseExpiryPredicate)
+ );
httpsServer.start();
}
{
diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java
index bbcfe1f75dc06..904f4581ad2c9 100644
--- a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java
+++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java
@@ -15,7 +15,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
@@ -27,7 +26,6 @@
import org.elasticsearch.xcontent.XContentType;
import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
@@ -43,11 +41,11 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static fixture.azure.MockAzureBlobStore.failTestWithAssertionError;
import static org.elasticsearch.repositories.azure.AzureFixtureHelper.assertValidBlockId;
/**
@@ -56,17 +54,29 @@
@SuppressForbidden(reason = "Uses a HttpServer to emulate an Azure endpoint")
public class AzureHttpHandler implements HttpHandler {
private static final Logger logger = LogManager.getLogger(AzureHttpHandler.class);
+ private static final Pattern RANGE_HEADER_PATTERN = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$");
+ static final String X_MS_LEASE_ID = "x-ms-lease-id";
+ static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
+ static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
+ static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
+ static final String X_MS_BLOB_TYPE = "x-ms-blob-type";
+ static final String X_MS_BLOB_CONTENT_LENGTH = "x-ms-blob-content-length";
- private final Map blobs;
private final String account;
private final String container;
private final Predicate authHeaderPredicate;
-
- public AzureHttpHandler(final String account, final String container, @Nullable Predicate authHeaderPredicate) {
+ private final MockAzureBlobStore mockAzureBlobStore;
+
+ public AzureHttpHandler(
+ final String account,
+ final String container,
+ @Nullable Predicate authHeaderPredicate,
+ MockAzureBlobStore.LeaseExpiryPredicate leaseExpiryPredicate
+ ) {
this.account = Objects.requireNonNull(account);
this.container = Objects.requireNonNull(container);
this.authHeaderPredicate = authHeaderPredicate;
- this.blobs = new ConcurrentHashMap<>();
+ this.mockAzureBlobStore = new MockAzureBlobStore(leaseExpiryPredicate);
}
private static List getAuthHeader(HttpExchange exchange) {
@@ -134,7 +144,7 @@ public void handle(final HttpExchange exchange) throws IOException {
final String blockId = params.get("blockid");
assert assertValidBlockId(blockId);
- blobs.put(blockId, Streams.readFully(exchange.getRequestBody()));
+ mockAzureBlobStore.putBlock(blobPath(exchange), blockId, Streams.readFully(exchange.getRequestBody()), leaseId(exchange));
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("PUT /" + account + "/" + container + "/*comp=blocklist*", request)) {
@@ -145,83 +155,124 @@ public void handle(final HttpExchange exchange) throws IOException {
.map(line -> line.substring(0, line.indexOf("")))
.toList();
- final ByteArrayOutputStream blob = new ByteArrayOutputStream();
- for (String blockId : blockIds) {
- BytesReference block = blobs.remove(blockId);
- assert block != null;
- block.writeTo(blob);
- }
- blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray()));
+ mockAzureBlobStore.putBlockList(blobPath(exchange), blockIds, leaseId(exchange));
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
+ } else if (Regex.simpleMatch("PUT /" + account + "/" + container + "*comp=lease*", request)) {
+ // Lease Blob (https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob)
+ final String leaseAction = requireHeader(exchange, "x-ms-lease-action");
+
+ switch (leaseAction) {
+ case "acquire" -> {
+ final int leaseDurationSeconds = requireIntegerHeader(exchange, X_MS_LEASE_DURATION);
+ final String proposedLeaseId = exchange.getRequestHeaders().getFirst(X_MS_PROPOSED_LEASE_ID);
+ final String newLeaseId = mockAzureBlobStore.acquireLease(
+ blobPath(exchange),
+ leaseDurationSeconds,
+ proposedLeaseId
+ );
+ exchange.getResponseHeaders().set(X_MS_LEASE_ID, newLeaseId);
+ exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
+ }
+ case "release" -> {
+ final String leaseId = requireHeader(exchange, X_MS_LEASE_ID);
+ mockAzureBlobStore.releaseLease(blobPath(exchange), leaseId);
+ exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
+ }
+ case "break" -> {
+ mockAzureBlobStore.breakLease(blobPath(exchange), getOptionalIntegerHeader(exchange, X_MS_LEASE_BREAK_PERIOD));
+ exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1);
+ }
+ case "renew", "change" -> {
+ failTestWithAssertionError("Attempt was made to use not-implemented lease action: " + leaseAction);
+ throw new MockAzureBlobStore.AzureBlobStoreError(
+ RestStatus.NOT_IMPLEMENTED,
+ "NotImplemented",
+ "Attempted to use unsupported lease API: " + leaseAction
+ );
+ }
+ default -> {
+ failTestWithAssertionError("Unrecognized lease action: " + leaseAction);
+ throw new MockAzureBlobStore.BadRequestException(
+ "InvalidHeaderValue",
+ "Invalid x-ms-lease-action header: " + leaseAction
+ );
+ }
+ }
} else if (Regex.simpleMatch("PUT /" + account + "/" + container + "/*", request)) {
// PUT Blob (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob)
+ final String blobType = requireHeader(exchange, X_MS_BLOB_TYPE);
final String ifNoneMatch = exchange.getRequestHeaders().getFirst("If-None-Match");
- if ("*".equals(ifNoneMatch)) {
- if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())) != null) {
- sendError(exchange, RestStatus.CONFLICT);
- return;
- }
- } else {
- blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody()));
- }
+ mockAzureBlobStore.putBlob(
+ blobPath(exchange),
+ Streams.readFully(exchange.getRequestBody()),
+ blobType,
+ ifNoneMatch,
+ leaseId(exchange)
+ );
exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("HEAD /" + account + "/" + container + "/*", request)) {
// Get Blob Properties (see https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties)
- final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
- if (blob == null) {
- sendError(exchange, RestStatus.NOT_FOUND);
- return;
- }
- exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blob.length()));
- exchange.getResponseHeaders().add("Content-Length", String.valueOf(blob.length()));
- exchange.getResponseHeaders().add("x-ms-blob-type", "BlockBlob");
+ final MockAzureBlobStore.AzureBlockBlob blob = mockAzureBlobStore.getBlob(blobPath(exchange), leaseId(exchange));
+
+ final Headers responseHeaders = exchange.getResponseHeaders();
+ final BytesReference blobContents = blob.getContents();
+ responseHeaders.add(X_MS_BLOB_CONTENT_LENGTH, String.valueOf(blobContents.length()));
+ responseHeaders.add("Content-Length", String.valueOf(blobContents.length()));
+ responseHeaders.add(X_MS_BLOB_TYPE, blob.type());
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
} else if (Regex.simpleMatch("GET /" + account + "/" + container + "/*", request)) {
- // GET Object (https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html)
- final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
- if (blob == null) {
- sendError(exchange, RestStatus.NOT_FOUND);
- return;
- }
+ // Get Blob (https://learn.microsoft.com/en-us/rest/api/storageservices/get-blob)
+ final MockAzureBlobStore.AzureBlockBlob blob = mockAzureBlobStore.getBlob(blobPath(exchange), leaseId(exchange));
+ final BytesReference responseContent;
+ final RestStatus successStatus;
// see Constants.HeaderConstants.STORAGE_RANGE_HEADER
final String range = exchange.getRequestHeaders().getFirst("x-ms-range");
- final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range);
- if (matcher.matches() == false) {
- throw new AssertionError("Range header does not match expected format: " + range);
- }
+ if (range != null) {
+ final Matcher matcher = RANGE_HEADER_PATTERN.matcher(range);
+ if (matcher.matches() == false) {
+ throw new MockAzureBlobStore.BadRequestException(
+ "InvalidHeaderValue",
+ "Range header does not match expected format: " + range
+ );
+ }
- final long start = Long.parseLong(matcher.group(1));
- final long end = Long.parseLong(matcher.group(2));
+ final long start = Long.parseLong(matcher.group(1));
+ final long end = Long.parseLong(matcher.group(2));
- if (blob.length() <= start) {
- exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
- exchange.sendResponseHeaders(RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus(), -1);
- return;
- }
+ final BytesReference blobContents = blob.getContents();
+ if (blobContents.length() <= start) {
+ exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
+ exchange.sendResponseHeaders(RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus(), -1);
+ return;
+ }
- var responseBlob = blob.slice(Math.toIntExact(start), Math.toIntExact(Math.min(end - start + 1, blob.length() - start)));
+ responseContent = blobContents.slice(
+ Math.toIntExact(start),
+ Math.toIntExact(Math.min(end - start + 1, blobContents.length() - start))
+ );
+ successStatus = RestStatus.PARTIAL_CONTENT;
+ } else {
+ responseContent = blob.getContents();
+ successStatus = RestStatus.OK;
+ }
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
- exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(responseBlob.length()));
- exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
+ exchange.getResponseHeaders().add(X_MS_BLOB_CONTENT_LENGTH, String.valueOf(responseContent.length()));
+ exchange.getResponseHeaders().add(X_MS_BLOB_TYPE, blob.type());
exchange.getResponseHeaders().add("ETag", "\"blockblob\"");
- exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBlob.length());
- responseBlob.writeTo(exchange.getResponseBody());
+ exchange.sendResponseHeaders(successStatus.getStatus(), responseContent.length() == 0 ? -1 : responseContent.length());
+ responseContent.writeTo(exchange.getResponseBody());
} else if (Regex.simpleMatch("DELETE /" + account + "/" + container + "/*", request)) {
// Delete Blob (https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob)
- final boolean deleted = blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath()));
- if (deleted) {
- exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1);
- } else {
- exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
- }
+ mockAzureBlobStore.deleteBlob(blobPath(exchange), leaseId(exchange));
+ exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1);
} else if (Regex.simpleMatch("GET /" + account + "/" + container + "?*restype=container*comp=list*", request)) {
// List Blobs (https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs)
@@ -239,11 +290,12 @@ public void handle(final HttpExchange exchange) throws IOException {
list.append("").append(delimiter).append("");
}
list.append("");
- for (Map.Entry blob : blobs.entrySet()) {
- if (prefix != null && blob.getKey().startsWith("/" + account + "/" + container + "/" + prefix) == false) {
- continue;
- }
- String blobPath = blob.getKey().replace("/" + account + "/" + container + "/", "");
+ final Map matchingBlobs = mockAzureBlobStore.listBlobs(
+ prefix,
+ leaseId(exchange)
+ );
+ for (Map.Entry blob : matchingBlobs.entrySet()) {
+ final String blobPath = blob.getKey();
if (delimiter != null) {
int fromIndex = (prefix != null ? prefix.length() : 0);
int delimiterPosition = blobPath.indexOf(delimiter, fromIndex);
@@ -259,7 +311,7 @@ public void handle(final HttpExchange exchange) throws IOException {
%s
BlockBlob
- """, blobPath, blob.getValue().length()));
+ """, blobPath, blob.getValue().getContents().length()));
}
if (blobPrefixes.isEmpty() == false) {
blobPrefixes.forEach(p -> list.append("").append(p).append(""));
@@ -294,7 +346,8 @@ public void handle(final HttpExchange exchange) throws IOException {
}
// Process the deletion
- if (blobs.remove("/" + account + toDelete) != null) {
+ try {
+ mockAzureBlobStore.deleteBlob(toDelete, leaseId(exchange));
final String acceptedPart = Strings.format("""
--%s
Content-Type: application/http
@@ -307,32 +360,43 @@ public void handle(final HttpExchange exchange) throws IOException {
""", responseBoundary, contentId, requestId).replaceAll("\n", "\r\n");
response.append(acceptedPart);
- } else {
- final String notFoundBody = Strings.format(
+ } catch (MockAzureBlobStore.AzureBlobStoreError e) {
+ final String errorResponseBody = Strings.format(
"""
- BlobNotFound
The specified blob does not exist.
+ %s
%s
RequestId:%s
Time:%s""",
+ e.getErrorCode(),
+ e.getMessage(),
requestId,
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now(ZoneId.of("UTC")))
);
- final String notFoundPart = Strings.format("""
- --%s
- Content-Type: application/http
- Content-ID: %s
-
- HTTP/1.1 404 The specified blob does not exist.
- x-ms-error-code: BlobNotFound
- x-ms-request-id: %s
- x-ms-version: 2018-11-09
- Content-Length: %d
- Content-Type: application/xml
-
- %s
- """, responseBoundary, contentId, requestId, notFoundBody.length(), notFoundBody)
- .replaceAll("\n", "\r\n");
- response.append(notFoundPart);
+ final String errorResponsePart = Strings.format(
+ """
+ --%s
+ Content-Type: application/http
+ Content-ID: %s
+
+ HTTP/1.1 %s %s
+ x-ms-error-code: %s
+ x-ms-request-id: %s
+ x-ms-version: 2018-11-09
+ Content-Length: %d
+ Content-Type: application/xml
+
+ %s
+ """,
+ responseBoundary,
+ contentId,
+ e.getRestStatus().getStatus(),
+ e.getMessage(),
+ e.getErrorCode(),
+ requestId,
+ errorResponseBody.length(),
+ errorResponseBody
+ ).replaceAll("\n", "\r\n");
+ response.append(errorResponsePart);
}
// Clear the state
@@ -350,19 +414,18 @@ public void handle(final HttpExchange exchange) throws IOException {
}
contentId = line.split("\\s")[1];
} else if (Regex.simpleMatch("DELETE /" + container + "/*", line)) {
- String blobName = RestUtils.decodeComponent(line.split("(\\s|\\?)")[1]);
+ final String path = RestUtils.decodeComponent(line.split("(\\s|\\?)")[1]);
if (toDelete != null) {
throw new IllegalStateException("Got multiple deletes in a single request?");
}
- toDelete = blobName;
+ toDelete = stripPrefix("/" + container + "/", path);
} else if (Regex.simpleMatch("DELETE /" + account + "/" + container + "/*", line)) {
// possible alternative DELETE url, depending on which method is used in the batch client
String path = RestUtils.decodeComponent(line.split("(\\s|\\?)")[1]);
- String blobName = path.split(account)[1];
if (toDelete != null) {
throw new IllegalStateException("Got multiple deletes in a single request?");
}
- toDelete = blobName;
+ toDelete = stripPrefix("/" + account + "/" + container + "/", path);
}
}
response.append("--").append(responseBoundary).append("--\r\n0\r\n");
@@ -372,20 +435,90 @@ public void handle(final HttpExchange exchange) throws IOException {
logger.debug("--> Sending response:\n{}", response);
exchange.getResponseBody().write(response.toString().getBytes(StandardCharsets.UTF_8));
}
- } else {
- logger.warn("--> Unrecognised request received: {}", request);
- sendError(exchange, RestStatus.BAD_REQUEST);
- }
+ } else if (Regex.simpleMatch("PUT /*/*/*master.dat", request)
+ && Regex.simpleMatch("PUT /" + account + "/" + container + "*", request) == false) {
+ // An attempt to put master.dat to a different container. This is probably
+ // org.elasticsearch.repositories.blobstore.BlobStoreRepository#startVerification
+ throw new MockAzureBlobStore.AzureBlobStoreError(
+ RestStatus.NOT_FOUND,
+ "ContainerNotFound",
+ "The specified container does not exist."
+ );
+ } else if (Regex.simpleMatch("GET /*/*restype=container*comp=list*", request)
+ && Regex.simpleMatch("GET /" + account + "/" + container + "*", request) == false) {
+ // An attempt to list the contents of a different container. This is probably
+ // org.elasticsearch.repositories.blobstore.BlobStoreRepository#startVerification for a read-only
+ // repository
+ throw new MockAzureBlobStore.AzureBlobStoreError(
+ RestStatus.NOT_FOUND,
+ "ContainerNotFound",
+ "The specified container does not exist."
+ );
+ } else {
+ final String message = "You sent a request that is not supported by AzureHttpHandler: " + request;
+ failTestWithAssertionError(message);
+ throw new MockAzureBlobStore.BadRequestException("UnrecognisedRequest", message);
+ }
+ } catch (MockAzureBlobStore.AzureBlobStoreError e) {
+ sendError(exchange, e);
+ } catch (Exception e) {
+ failTestWithAssertionError("Uncaught exception", e);
+ sendError(exchange, RestStatus.INTERNAL_SERVER_ERROR, "InternalError", e.getMessage());
} finally {
exchange.close();
}
}
+ private String requireHeader(HttpExchange exchange, String headerName) {
+ final String headerValue = exchange.getRequestHeaders().getFirst(headerName);
+ if (headerValue == null) {
+ throw new MockAzureBlobStore.BadRequestException("MissingRequiredHeader", "Missing " + headerName + " header");
+ }
+ return headerValue;
+ }
+
+ private int requireIntegerHeader(HttpExchange exchange, String headerName) {
+ final String headerValue = requireHeader(exchange, headerName);
+ try {
+ return Integer.parseInt(headerValue);
+ } catch (NumberFormatException e) {
+ throw new MockAzureBlobStore.BadRequestException("InvalidHeaderValue", "Invalid " + headerName + " header");
+ }
+ }
+
+ @Nullable
+ private Integer getOptionalIntegerHeader(HttpExchange exchange, String headerName) {
+ final String headerValue = exchange.getRequestHeaders().getFirst(headerName);
+ try {
+ return headerValue == null ? null : Integer.parseInt(headerValue);
+ } catch (NumberFormatException e) {
+ throw new MockAzureBlobStore.BadRequestException("InvalidHeaderValue", "Invalid " + headerName + " header");
+ }
+ }
+
+ @Nullable
+ private String leaseId(HttpExchange exchange) {
+ return exchange.getRequestHeaders().getFirst(X_MS_LEASE_ID);
+ }
+
+ private String blobPath(HttpExchange exchange) {
+ return stripPrefix("/" + account + "/" + container + "/", exchange.getRequestURI().getPath());
+ }
+
public Map blobs() {
- return blobs;
+ return mockAzureBlobStore.blobs();
+ }
+
+ public static void sendError(HttpExchange exchange, MockAzureBlobStore.AzureBlobStoreError error) throws IOException {
+ sendError(exchange, error.getRestStatus(), error.getErrorCode(), error.getMessage());
}
public static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException {
+ final String errorCode = toAzureErrorCode(status);
+ sendError(exchange, status, errorCode, status.toString());
+ }
+
+ public static void sendError(HttpExchange exchange, RestStatus restStatus, String errorCode, String errorMessage) throws IOException {
final Headers headers = exchange.getResponseHeaders();
headers.add("Content-Type", "application/xml");
@@ -396,20 +529,19 @@ public static void sendError(final HttpExchange exchange, final RestStatus statu
headers.add("x-ms-request-id", requestId);
}
- final String errorCode = toAzureErrorCode(status);
// see Constants.HeaderConstants.ERROR_CODE
headers.add("x-ms-error-code", errorCode);
if ("HEAD".equals(exchange.getRequestMethod())) {
- exchange.sendResponseHeaders(status.getStatus(), -1L);
+ exchange.sendResponseHeaders(restStatus.getStatus(), -1L);
} else {
final byte[] response = (String.format(Locale.ROOT, """
%s
%s
- """, errorCode, status)).getBytes(StandardCharsets.UTF_8);
- exchange.sendResponseHeaders(status.getStatus(), response.length);
+ """, errorCode, errorMessage)).getBytes(StandardCharsets.UTF_8);
+ exchange.sendResponseHeaders(restStatus.getStatus(), response.length);
exchange.getResponseBody().write(response);
}
}
@@ -428,4 +560,9 @@ private static String toAzureErrorCode(final RestStatus status) {
);
};
}
+
+ private String stripPrefix(String prefix, String toStrip) {
+ assert toStrip.startsWith(prefix);
+ return toStrip.substring(prefix.length());
+ }
}
diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/MockAzureBlobStore.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/MockAzureBlobStore.java
new file mode 100644
index 0000000000000..c694c27c1293b
--- /dev/null
+++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/MockAzureBlobStore.java
@@ -0,0 +1,484 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package fixture.azure;
+
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.CompositeBytesReference;
+import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
+import org.elasticsearch.rest.RestStatus;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class MockAzureBlobStore {
+
+ private static final Logger logger = LogManager.getLogger(MockAzureBlobStore.class);
+ private static final String BLOCK_BLOB_TYPE = "BlockBlob";
+ private static final String PAGE_BLOB_TYPE = "PageBlob";
+ private static final String APPEND_BLOB_TYPE = "AppendBlob";
+
+ private final LeaseExpiryPredicate leaseExpiryPredicate;
+ private final Map blobs;
+
+ /**
+ * Provide the means of triggering lease expiration
+ *
+ * @param leaseExpiryPredicate A Predicate that takes an active lease ID and returns true when it should be expired, or null to never fail leases
+ */
+ public MockAzureBlobStore(LeaseExpiryPredicate leaseExpiryPredicate) {
+ this.blobs = new ConcurrentHashMap<>();
+ this.leaseExpiryPredicate = Objects.requireNonNull(leaseExpiryPredicate);
+ }
+
+ public void putBlock(String path, String blockId, BytesReference content, @Nullable String leaseId) {
+ blobs.compute(path, (p, existing) -> {
+ if (existing != null) {
+ existing.putBlock(blockId, content, leaseId);
+ return existing;
+ } else {
+ final AzureBlockBlob azureBlockBlob = new AzureBlockBlob();
+ azureBlockBlob.putBlock(blockId, content, leaseId);
+ return azureBlockBlob;
+ }
+ });
+ }
+
+ public void putBlockList(String path, List blockIds, @Nullable String leaseId) {
+ final AzureBlockBlob blob = getExistingBlob(path);
+ blob.putBlockList(blockIds, leaseId);
+ }
+
+ public void putBlob(String path, BytesReference contents, String blobType, @Nullable String ifNoneMatch, @Nullable String leaseId) {
+ blobs.compute(path, (p, existingValue) -> {
+ if (existingValue != null) {
+ existingValue.setContents(contents, leaseId, ifNoneMatch);
+ return existingValue;
+ } else {
+ validateBlobType(blobType);
+ final AzureBlockBlob newBlob = new AzureBlockBlob();
+ newBlob.setContents(contents, leaseId);
+ return newBlob;
+ }
+ });
+ }
+
+ private void validateBlobType(String blobType) {
+ if (BLOCK_BLOB_TYPE.equals(blobType)) {
+ return;
+ }
+ final String errorMessage;
+ if (PAGE_BLOB_TYPE.equals(blobType) || APPEND_BLOB_TYPE.equals(blobType)) {
+ errorMessage = "Only BlockBlob is supported. This is a limitation of the MockAzureBlobStore";
+ } else {
+ errorMessage = "Invalid blobType: " + blobType;
+ }
+ // Fail the test and respond with an error
+ failTestWithAssertionError(errorMessage);
+ throw new MockAzureBlobStore.BadRequestException("InvalidHeaderValue", errorMessage);
+ }
+
+ public AzureBlockBlob getBlob(String path, @Nullable String leaseId) {
+ final AzureBlockBlob blob = getExistingBlob(path);
+ // This is the public implementation of "get blob" which will 404 for uncommitted block blobs
+ if (blob.isCommitted() == false) {
+ throw new BlobNotFoundException();
+ }
+ blob.checkLeaseForRead(leaseId);
+ return blob;
+ }
+
+ public void deleteBlob(String path, @Nullable String leaseId) {
+ final AzureBlockBlob blob = getExistingBlob(path);
+ blob.checkLeaseForWrite(leaseId);
+ blobs.remove(path);
+ }
+
+ public Map listBlobs(String prefix, @Nullable String leaseId) {
+ return blobs.entrySet().stream().filter(e -> {
+ if (prefix == null || e.getKey().startsWith(prefix)) {
+ return true;
+ }
+ return false;
+ })
+ .filter(e -> e.getValue().isCommitted())
+ .peek(e -> e.getValue().checkLeaseForRead(leaseId))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ public String acquireLease(String path, int leaseTimeSeconds, @Nullable String proposedLeaseId) {
+ final AzureBlockBlob blob = getExistingBlob(path);
+ return blob.acquireLease(proposedLeaseId, leaseTimeSeconds);
+ }
+
+ public void releaseLease(String path, @Nullable String leaseId) {
+ final AzureBlockBlob blob = getExistingBlob(path);
+ blob.releaseLease(leaseId);
+ }
+
+ public void breakLease(String path, @Nullable Integer leaseBreakPeriod) {
+ final AzureBlockBlob blob = getExistingBlob(path);
+ blob.breakLease(leaseBreakPeriod);
+ }
+
+ public Map blobs() {
+ return Maps.transformValues(blobs, AzureBlockBlob::getContents);
+ }
+
+ private AzureBlockBlob getExistingBlob(String path) {
+ final AzureBlockBlob blob = blobs.get(path);
+ if (blob == null) {
+ throw new BlobNotFoundException();
+ }
+ return blob;
+ }
+
+ static void failTestWithAssertionError(String message) {
+ ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(message));
+ }
+
+ static void failTestWithAssertionError(String message, Throwable throwable) {
+ ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(message, throwable));
+ }
+
+ public class AzureBlockBlob {
+ private final Object writeLock = new Object();
+ private final Lease lease = new Lease();
+ private final Map blocks;
+ private volatile BytesReference contents;
+
+ private AzureBlockBlob() {
+ this.blocks = new ConcurrentHashMap<>();
+ }
+
+ public void putBlock(String blockId, BytesReference content, @Nullable String leaseId) {
+ synchronized (writeLock) {
+ lease.checkLeaseForWrite(leaseId);
+ this.blocks.put(blockId, content);
+ }
+ }
+
+ public void putBlockList(List blockIds, @Nullable String leaseId) throws BadRequestException {
+ synchronized (writeLock) {
+ lease.checkLeaseForWrite(leaseId);
+ final List unresolvedBlocks = blockIds.stream().filter(bId -> blocks.containsKey(bId) == false).toList();
+ if (unresolvedBlocks.isEmpty() == false) {
+ logger.warn("Block list contained non-existent block IDs: {}", unresolvedBlocks);
+ throw new BadRequestException("InvalidBlockList", "The specified blocklist is invalid.");
+ }
+ final BytesReference[] resolvedContents = blockIds.stream().map(blocks::get).toList().toArray(new BytesReference[0]);
+ contents = CompositeBytesReference.of(resolvedContents);
+ }
+ }
+
+ private boolean matches(String ifNoneMatchHeaderValue) {
+ if (ifNoneMatchHeaderValue == null) {
+ return false;
+ }
+ // We only support *
+ if ("*".equals(ifNoneMatchHeaderValue)) {
+ return true;
+ }
+ // Fail the test, trigger an internal server error
+ failTestWithAssertionError("We've only implemented 'If-None-Match: *' in the MockAzureBlobStore");
+ throw new AzureBlobStoreError(
+ RestStatus.INTERNAL_SERVER_ERROR,
+ "UnsupportedHeader",
+ "The test fixture only supports * for If-None-Match"
+ );
+ }
+
+ public synchronized void setContents(BytesReference contents, @Nullable String leaseId) {
+ synchronized (writeLock) {
+ lease.checkLeaseForWrite(leaseId);
+ this.contents = contents;
+ this.blocks.clear();
+ }
+ }
+
+ public void setContents(BytesReference contents, @Nullable String leaseId, @Nullable String ifNoneMatchHeaderValue) {
+ synchronized (writeLock) {
+ if (matches(ifNoneMatchHeaderValue)) {
+ throw new PreconditionFailedException(
+ "TargetConditionNotMet",
+ "The target condition specified using HTTP conditional header(s) is not met."
+ );
+ }
+ setContents(contents, leaseId);
+ }
+ }
+
+ /**
+ * Get the committed contents of the blob
+ *
+ * @return The last committed contents of the blob, or null if the blob is uncommitted
+ */
+ @Nullable
+ public BytesReference getContents() {
+ return contents;
+ }
+
+ public String type() {
+ return BLOCK_BLOB_TYPE;
+ }
+
+ public boolean isCommitted() {
+ return contents != null;
+ }
+
+ @Override
+ public String toString() {
+ return "MockAzureBlockBlob{" + "blocks=" + blocks + ", contents=" + contents + '}';
+ }
+
+ public String acquireLease(@Nullable String proposedLeaseId, int leaseTimeSeconds) {
+ synchronized (writeLock) {
+ return lease.acquire(proposedLeaseId, leaseTimeSeconds);
+ }
+ }
+
+ public void releaseLease(String leaseId) {
+ synchronized (writeLock) {
+ lease.release(leaseId);
+ }
+ }
+
+ public void breakLease(@Nullable Integer leaseBreakPeriod) {
+ synchronized (writeLock) {
+ lease.breakLease(leaseBreakPeriod);
+ }
+ }
+
+ public void checkLeaseForRead(@Nullable String leaseId) {
+ lease.checkLeaseForRead(leaseId);
+ }
+
+ public void checkLeaseForWrite(@Nullable String leaseId) {
+ lease.checkLeaseForWrite(leaseId);
+ }
+ }
+
+ /**
+ * @see acquire/release rules
+ * @see read/write rules
+ */
+ public class Lease {
+
+ /**
+ * Minimal set of states, we don't support breaking/broken
+ */
+ enum State {
+ Available,
+ Leased,
+ Expired,
+ Broken
+ }
+
+ private String leaseId;
+ private State state = State.Available;
+ private int leaseDurationSeconds;
+
+ public synchronized String acquire(@Nullable String proposedLeaseId, int leaseDurationSeconds) {
+ maybeExpire(proposedLeaseId);
+ switch (state) {
+ case Available, Expired, Broken -> {
+ final State prevState = state;
+ state = State.Leased;
+ leaseId = proposedLeaseId != null ? proposedLeaseId : UUID.randomUUID().toString();
+ validateLeaseDuration(leaseDurationSeconds);
+ this.leaseDurationSeconds = leaseDurationSeconds;
+ logger.debug("Granting lease, prior state={}, leaseId={}, expires={}", prevState, leaseId);
+ }
+ case Leased -> {
+ if (leaseId.equals(proposedLeaseId) == false) {
+ logger.debug("Mismatch on acquire - proposed leaseId: {}, active leaseId: {}", proposedLeaseId, leaseId);
+ throw new ConflictException(
+ "LeaseIdMismatchWithLeaseOperation",
+ "The lease ID specified did not match the lease ID for the blob/container."
+ );
+ }
+ validateLeaseDuration(leaseDurationSeconds);
+ }
+ }
+ return leaseId;
+ }
+
+ public synchronized void release(String requestLeaseId) {
+ switch (state) {
+ case Available -> throw new ConflictException(
+ "LeaseNotPresentWithLeaseOperation",
+ "There is currently no lease on the blob/container."
+ );
+ case Leased, Expired, Broken -> {
+ if (leaseId.equals(requestLeaseId) == false) {
+ logger.debug("Mismatch on release - submitted leaseId: {}, active leaseId: {}", requestLeaseId, this.leaseId);
+ throw new ConflictException(
+ "LeaseIdMismatchWithLeaseOperation",
+ "The lease ID specified did not match the lease ID for the blob/container."
+ );
+ }
+ state = State.Available;
+ this.leaseId = null;
+ }
+ }
+ }
+
+ public synchronized void breakLease(Integer leaseBreakPeriod) {
+ // We haven't implemented the "Breaking" state so we don't support 'breaks' for non-infinite leases unless break-period is 0
+ if (leaseDurationSeconds != -1 && (leaseBreakPeriod == null || leaseBreakPeriod != 0)) {
+ failTestWithAssertionError(
+ "MockAzureBlobStore only supports breaking non-infinite leases with 'x-ms-lease-break-period: 0'"
+ );
+ }
+ switch (state) {
+ case Available -> throw new ConflictException(
+ "LeaseNotPresentWithLeaseOperation",
+ "There is currently no lease on the blob/container."
+ );
+ case Leased, Expired, Broken -> state = State.Broken;
+ }
+ }
+
+ public synchronized void checkLeaseForWrite(@Nullable String requestLeaseId) {
+ maybeExpire(requestLeaseId);
+ switch (state) {
+ case Available, Expired, Broken -> {
+ if (requestLeaseId != null) {
+ throw new PreconditionFailedException(
+ "LeaseLost",
+ "A lease ID was specified, but the lease for the blob/container has expired."
+ );
+ }
+ }
+ case Leased -> {
+ if (requestLeaseId == null) {
+ throw new PreconditionFailedException(
+ "LeaseIdMissing",
+ "There is currently a lease on the blob/container and no lease ID was specified in the request."
+ );
+ }
+ if (leaseId.equals(requestLeaseId) == false) {
+ throw new ConflictException(
+ "LeaseIdMismatchWithBlobOperation",
+ "The lease ID specified did not match the lease ID for the blob."
+ );
+ }
+ }
+ }
+ }
+
+ public synchronized void checkLeaseForRead(@Nullable String requestLeaseId) {
+ maybeExpire(requestLeaseId);
+ switch (state) {
+ case Available, Expired, Broken -> {
+ if (requestLeaseId != null) {
+ throw new PreconditionFailedException(
+ "LeaseLost",
+ "A lease ID was specified, but the lease for the blob/container has expired."
+ );
+ }
+ }
+ case Leased -> {
+ if (requestLeaseId != null && requestLeaseId.equals(leaseId) == false) {
+ throw new ConflictException(
+ "LeaseIdMismatchWithBlobOperation",
+ "The lease ID specified did not match the lease ID for the blob."
+ );
+ }
+ }
+ }
+ }
+
+ /**
+ * If there's an active lease, ask the predicate if we should expire the existing it
+ *
+ * @param requestLeaseId The lease of the request
+ */
+ private void maybeExpire(String requestLeaseId) {
+ if (state == State.Leased && leaseExpiryPredicate.shouldExpireLease(leaseId, requestLeaseId)) {
+ logger.debug("Expiring lease, id={}", leaseId);
+ state = State.Expired;
+ }
+ }
+
+ private void validateLeaseDuration(long leaseTimeSeconds) {
+ if (leaseTimeSeconds != -1 && (leaseTimeSeconds < 15 || leaseTimeSeconds > 60)) {
+ throw new BadRequestException(
+ "InvalidHeaderValue",
+ AzureHttpHandler.X_MS_LEASE_DURATION + " must be between 16 and 60 seconds (was " + leaseTimeSeconds + ")"
+ );
+ }
+ }
+ }
+
+ public static class AzureBlobStoreError extends RuntimeException {
+ private final RestStatus restStatus;
+ private final String errorCode;
+
+ public AzureBlobStoreError(RestStatus restStatus, String errorCode, String message) {
+ super(message);
+ this.restStatus = restStatus;
+ this.errorCode = errorCode;
+ }
+
+ public RestStatus getRestStatus() {
+ return restStatus;
+ }
+
+ public String getErrorCode() {
+ return errorCode;
+ }
+ }
+
+ public static class BlobNotFoundException extends AzureBlobStoreError {
+ public BlobNotFoundException() {
+ super(RestStatus.NOT_FOUND, "BlobNotFound", "The specified blob does not exist.");
+ }
+ }
+
+ public static class BadRequestException extends AzureBlobStoreError {
+ public BadRequestException(String errorCode, String message) {
+ super(RestStatus.BAD_REQUEST, errorCode, message);
+ }
+ }
+
+ public static class ConflictException extends AzureBlobStoreError {
+ public ConflictException(String errorCode, String message) {
+ super(RestStatus.CONFLICT, errorCode, message);
+ }
+ }
+
+ public static class PreconditionFailedException extends AzureBlobStoreError {
+ public PreconditionFailedException(String errorCode, String message) {
+ super(RestStatus.PRECONDITION_FAILED, errorCode, message);
+ }
+ }
+
+ public interface LeaseExpiryPredicate {
+
+ LeaseExpiryPredicate NEVER_EXPIRE = (activeLeaseId, requestLeaseId) -> false;
+
+ /**
+ * Should the lease be expired?
+ *
+ * @param activeLeaseId The current active lease ID
+ * @param requestLeaseId The request lease ID (if any)
+ * @return true to expire the lease, false otherwise
+ */
+ boolean shouldExpireLease(String activeLeaseId, @Nullable String requestLeaseId);
+ }
+}
diff --git a/x-pack/plugin/repositories-metering-api/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java b/x-pack/plugin/repositories-metering-api/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java
index 7029a38edcb5a..d21dc4b2982f1 100644
--- a/x-pack/plugin/repositories-metering-api/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java
+++ b/x-pack/plugin/repositories-metering-api/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/repositories/metering/azure/AzureRepositoriesMeteringIT.java
@@ -7,6 +7,7 @@
package org.elasticsearch.xpack.repositories.metering.azure;
import fixture.azure.AzureHttpFixture;
+import fixture.azure.MockAzureBlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Booleans;
@@ -37,7 +38,8 @@ public class AzureRepositoriesMeteringIT extends AbstractRepositoriesMeteringAPI
AZURE_TEST_CONTAINER,
System.getProperty("test.azure.tenant_id"),
System.getProperty("test.azure.client_id"),
- AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_TEST_ACCOUNT)
+ AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_TEST_ACCOUNT),
+ MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE
);
private static TestTrustStore trustStore = new TestTrustStore(
diff --git a/x-pack/plugin/searchable-snapshots/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/AzureSearchableSnapshotsIT.java b/x-pack/plugin/searchable-snapshots/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/AzureSearchableSnapshotsIT.java
index 610b58453716c..f65db6dab1e68 100644
--- a/x-pack/plugin/searchable-snapshots/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/AzureSearchableSnapshotsIT.java
+++ b/x-pack/plugin/searchable-snapshots/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/searchablesnapshots/AzureSearchableSnapshotsIT.java
@@ -8,6 +8,7 @@
package org.elasticsearch.xpack.searchablesnapshots;
import fixture.azure.AzureHttpFixture;
+import fixture.azure.MockAzureBlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Booleans;
@@ -38,7 +39,8 @@ public class AzureSearchableSnapshotsIT extends AbstractSearchableSnapshotsRestT
AZURE_TEST_CONTAINER,
System.getProperty("test.azure.tenant_id"),
System.getProperty("test.azure.client_id"),
- AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_TEST_ACCOUNT)
+ AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_TEST_ACCOUNT),
+ MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE
);
private static TestTrustStore trustStore = new TestTrustStore(
diff --git a/x-pack/plugin/snapshot-based-recoveries/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/AzureSnapshotBasedRecoveryIT.java b/x-pack/plugin/snapshot-based-recoveries/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/AzureSnapshotBasedRecoveryIT.java
index 591d4582d5905..8142b40166840 100644
--- a/x-pack/plugin/snapshot-based-recoveries/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/AzureSnapshotBasedRecoveryIT.java
+++ b/x-pack/plugin/snapshot-based-recoveries/qa/azure/src/javaRestTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/AzureSnapshotBasedRecoveryIT.java
@@ -8,6 +8,7 @@
package org.elasticsearch.xpack.snapshotbasedrecoveries.recovery;
import fixture.azure.AzureHttpFixture;
+import fixture.azure.MockAzureBlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Booleans;
@@ -37,7 +38,8 @@ public class AzureSnapshotBasedRecoveryIT extends AbstractSnapshotBasedRecoveryR
AZURE_TEST_CONTAINER,
System.getProperty("test.azure.tenant_id"),
System.getProperty("test.azure.client_id"),
- AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_TEST_ACCOUNT)
+ AzureHttpFixture.sharedKeyForAccountPredicate(AZURE_TEST_ACCOUNT),
+ MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE
);
private static TestTrustStore trustStore = new TestTrustStore(
diff --git a/x-pack/plugin/snapshot-repo-test-kit/qa/azure/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/AzureRepositoryAnalysisRestIT.java b/x-pack/plugin/snapshot-repo-test-kit/qa/azure/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/AzureRepositoryAnalysisRestIT.java
index a9b8fe51c01cc..03906b3cf69da 100644
--- a/x-pack/plugin/snapshot-repo-test-kit/qa/azure/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/AzureRepositoryAnalysisRestIT.java
+++ b/x-pack/plugin/snapshot-repo-test-kit/qa/azure/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/AzureRepositoryAnalysisRestIT.java
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import static org.hamcrest.Matchers.blankOrNullString;
@@ -49,7 +50,10 @@ public class AzureRepositoryAnalysisRestIT extends AbstractRepositoryAnalysisRes
AZURE_TEST_CONTAINER,
AZURE_TEST_TENANT_ID,
AZURE_TEST_CLIENT_ID,
- decideAuthHeaderPredicate()
+ decideAuthHeaderPredicate(),
+ // 5% of the time, in a contended lease scenario, expire the existing lease
+ (currentLeaseId, requestLeaseId) -> currentLeaseId.equals(requestLeaseId) == false
+ && ThreadLocalRandom.current().nextDouble() < 0.05
);
private static Predicate decideAuthHeaderPredicate() {
@@ -78,12 +82,6 @@ private static Predicate decideAuthHeaderPredicate() {
() -> "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=" + fixture.getAddress(),
s -> USE_FIXTURE
)
- .apply(c -> {
- if (USE_FIXTURE) {
- // test fixture does not support CAS yet; TODO fix this
- c.systemProperty("test.repository_test_kit.skip_cas", "true");
- }
- })
.systemProperty(
"tests.azure.credentials.disable_instance_discovery",
() -> "true",