Skip to content

Commit

Permalink
Add concurrency support to DDB client
Browse files Browse the repository at this point in the history
Signed-off-by: Arjun kumar Giri <[email protected]>
  • Loading branch information
arjunkumargiri committed Jul 18, 2024
1 parent a20ddd8 commit d5caadf
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest.Builder;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;

/**
* DDB implementation of {@link SdkClient}. DDB table name will be mapped to index name.
Expand All @@ -79,6 +81,8 @@ public class DDBOpenSearchClient implements SdkClientDelegate {

private static final String HASH_KEY = "_tenant_id";
private static final String RANGE_KEY = "_id";

private static final String SOURCE = "_source";
private static final String SEQ_NO_KEY = "_seq_no";

private DynamoDbClient dynamoDbClient;
Expand Down Expand Up @@ -108,25 +112,37 @@ public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRe
final String id = request.id() != null ? request.id() : UUID.randomUUID().toString();
final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT;
final String tableName = getTableName(request.index());
final GetItemRequest getItemRequest = getGetItemRequest(request.tenantId(), request.id(), request.index());

return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<PutDataObjectResponse>) () -> {
try {
GetItemResponse getItemResponse = dynamoDbClient.getItem(getItemRequest);
Long sequenceNumber = 1L;
if (getItemResponse != null && getItemResponse.item() != null && getItemResponse.item().containsKey(SEQ_NO_KEY)) {
sequenceNumber = Long.parseLong(getItemResponse.item().get(SEQ_NO_KEY).n()) + 1;
}
String source = Strings.toString(MediaTypeRegistry.JSON, request.dataObject());
JsonNode jsonNode = OBJECT_MAPPER.readTree(source);
Map<String, AttributeValue> item = JsonTransformer.convertJsonObjectToDDBAttributeMap(jsonNode);
Map<String, AttributeValue> sourceMap = JsonTransformer.convertJsonObjectToDDBAttributeMap(jsonNode);
Map<String, AttributeValue> item = new HashMap<>();
item.put(HASH_KEY, AttributeValue.builder().s(tenantId).build());
item.put(RANGE_KEY, AttributeValue.builder().s(id).build());
item.put(SOURCE, AttributeValue.builder().m(sourceMap).build());
item.put(SEQ_NO_KEY, AttributeValue.builder().n(sequenceNumber.toString()).build());
Builder builder = PutItemRequest.builder().tableName(tableName).item(item);
if (!request.overwriteIfExists()) {
builder.conditionExpression("attribute_not_exists(" + HASH_KEY + ") AND attribute_not_exists(" + RANGE_KEY + ")");
}
final PutItemRequest putItemRequest = builder.build();

// TODO need to initialize/return SEQ_NO here
// If document doesn't exist, return 0
// If document exists, overwrite and increment and return SEQ_NO
dynamoDbClient.putItem(putItemRequest);
// TODO need to pass seqNo to simulated response
String simulatedIndexResponse = simulateOpenSearchResponse(request.index(), id, source, Map.of("result", "created"));
String simulatedIndexResponse = simulateOpenSearchResponse(
request.index(),
id,
source,
sequenceNumber,
Map.of("result", "created")
);
return PutDataObjectResponse.builder().id(id).parser(createParser(simulatedIndexResponse)).build();
} catch (IOException e) {
// Rethrow unchecked exception on XContent parsing error
Expand All @@ -142,33 +158,34 @@ public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRe
*/
@Override
public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRequest request, Executor executor) {
final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT;
final GetItemRequest getItemRequest = GetItemRequest
.builder()
.tableName(getTableName(request.index()))
.key(
Map
.ofEntries(
Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()),
Map.entry(RANGE_KEY, AttributeValue.builder().s(request.id()).build())
// TODO need to fetch SEQ_NO_KEY
)
)
.build();
final GetItemRequest getItemRequest = getGetItemRequest(request.tenantId(), request.id(), request.index());
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<GetDataObjectResponse>) () -> {
try {
final GetItemResponse getItemResponse = dynamoDbClient.getItem(getItemRequest);
ObjectNode sourceObject;
boolean found;
String sequenceNumberString = null;
if (getItemResponse == null || getItemResponse.item() == null || getItemResponse.item().isEmpty()) {
found = false;
sourceObject = null;
} else {
found = true;
sourceObject = JsonTransformer.convertDDBAttributeValueMapToObjectNode(getItemResponse.item());
sourceObject = JsonTransformer.convertDDBAttributeValueMapToObjectNode(getItemResponse.item().get(SOURCE).m());
if (getItemResponse.item().containsKey(SEQ_NO_KEY)) {
sequenceNumberString = getItemResponse.item().get(SEQ_NO_KEY).n();
}
}
final String source = OBJECT_MAPPER.writeValueAsString(sourceObject);
String simulatedGetResponse = simulateOpenSearchResponse(request.index(), request.id(), source, Map.of("found", found));
final Long sequenceNumber = sequenceNumberString == null || sequenceNumberString.isEmpty()
? null
: Long.parseLong(sequenceNumberString);
String simulatedGetResponse = simulateOpenSearchResponse(
request.index(),
request.id(),
source,
sequenceNumber,
Map.of("found", found)
);
XContentParser parser = JsonXContent.jsonXContent
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, simulatedGetResponse);
// This would consume parser content so we need to create a new parser for the map
Expand Down Expand Up @@ -201,15 +218,15 @@ public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDat
Map<String, AttributeValue> updateItem = JsonTransformer.convertJsonObjectToDDBAttributeMap(jsonNode);
updateItem.remove(HASH_KEY);
updateItem.remove(RANGE_KEY);
Map<String, AttributeValueUpdate> updateAttributeValue = updateItem
.entrySet()
.stream()
.collect(
Collectors
.toMap(
Map.Entry::getKey,
entry -> AttributeValueUpdate.builder().action(AttributeAction.PUT).value(entry.getValue()).build()
)
Map<String, AttributeValueUpdate> updateAttributeValue = new HashMap<>();
updateAttributeValue
.put(
SOURCE,
AttributeValueUpdate
.builder()
.action(AttributeAction.PUT)
.value(AttributeValue.builder().m(updateItem).build())
.build()
);
Map<String, AttributeValue> updateKey = new HashMap<>();
updateKey.put(HASH_KEY, AttributeValue.builder().s(tenantId).build());
Expand All @@ -219,20 +236,34 @@ public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDat
.tableName(getTableName(request.index()))
.key(updateKey)
.attributeUpdates(updateAttributeValue);
updateItemRequestBuilder
.updateExpression("SET #seqNo = #seqNo + :incr")
.expressionAttributeNames(Map.of("#seqNo", SEQ_NO_KEY))
.expressionAttributeValues(Map.of(":incr", AttributeValue.builder().n("1").build()));
if (request.ifSeqNo() != null) {
// Get current document version and put in attribute map. Ignore primary term on DDB.
int currentSeqNo = jsonNode.has(SEQ_NO_KEY) ? jsonNode.get(SEQ_NO_KEY).asInt() : 0;
updateItemRequestBuilder
.conditionExpression("#seqNo = :currentSeqNo")
.expressionAttributeNames(Map.of("#seqNo", SEQ_NO_KEY))
.expressionAttributeValues(
Map.of(":currentSeqNo", AttributeValue.builder().n(Integer.toString(currentSeqNo)).build())
Map.of(":currentSeqNo", AttributeValue.builder().n(Long.toString(request.ifSeqNo())).build())
);
}
UpdateItemRequest updateItemRequest = updateItemRequestBuilder.build();
dynamoDbClient.updateItem(updateItemRequest);
// TODO need to pass seqNo to simulated response
String simulatedUpdateResponse = simulateOpenSearchResponse(request.index(), request.id(), source, Map.of("found", true));
UpdateItemResponse updateItemResponse = dynamoDbClient.updateItem(updateItemRequest);
Long sequenceNumber = null;
if (updateItemResponse != null
&& updateItemResponse.attributes() != null
&& updateItemResponse.attributes().containsKey(SEQ_NO_KEY)) {
sequenceNumber = Long.parseLong(updateItemResponse.attributes().get(SEQ_NO_KEY).n());
}
String simulatedUpdateResponse = simulateOpenSearchResponse(
request.index(),
request.id(),
source,
sequenceNumber,
Map.of("result", "updated")
);
return UpdateDataObjectResponse.builder().id(request.id()).parser(createParser(simulatedUpdateResponse)).build();
} catch (ConditionalCheckFailedException ccfe) {
log.error("Document version conflict updating {} in {}: {}", request.id(), request.index(), ccfe.getMessage(), ccfe);
Expand Down Expand Up @@ -273,16 +304,16 @@ public CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDat
.build();
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<DeleteDataObjectResponse>) () -> {
try {
// TODO need to return SEQ_NO here
// If document doesn't exist, increment and return highest seq no ever seen, but we would have to track seqNo here
// If document never existed, return -2 (unassigned) for seq no (probably what we have to do here)
// If document exists, increment and return SEQ_NO
dynamoDbClient.deleteItem(deleteItemRequest);
// TODO need to pass seqNo to simulated response
DeleteItemResponse deleteItemResponse = dynamoDbClient.deleteItem(deleteItemRequest);
Long sequenceNumber = null;
if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(SEQ_NO_KEY)) {
sequenceNumber = Long.parseLong(deleteItemResponse.attributes().get(SEQ_NO_KEY).n());
}
String simulatedDeleteResponse = simulateOpenSearchResponse(
request.index(),
request.id(),
null,
sequenceNumber,
Map.of("result", "deleted")
);
return DeleteDataObjectResponse.builder().id(request.id()).parser(createParser(simulatedDeleteResponse)).build();
Expand Down Expand Up @@ -321,14 +352,36 @@ private XContentParser createParser(String json) throws IOException {
return jsonXContent.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS, json);
}

private String simulateOpenSearchResponse(String index, String id, String source, Map<String, Object> additionalFields) {
private GetItemRequest getGetItemRequest(String requestTenantId, String documentId, String index) {
final String tenantId = requestTenantId != null ? requestTenantId : DEFAULT_TENANT;
final GetItemRequest getItemRequest = GetItemRequest
.builder()
.tableName(getTableName(index))
.key(
Map
.ofEntries(
Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()),
Map.entry(RANGE_KEY, AttributeValue.builder().s(documentId).build())
)
)
.build();
return getItemRequest;
}

private String simulateOpenSearchResponse(
String index,
String id,
String source,
Long sequenceNumber,
Map<String, Object> additionalFields
) {
StringBuilder sb = new StringBuilder("{");
// Fields with a DDB counterpart
sb.append("\"_index\":\"").append(index).append("\",");
sb.append("\"_id\":\"").append(id).append("\",");
// Fields we must simulate using default values
sb.append("\"_primary_term\":").append(UNASSIGNED_PRIMARY_TERM).append(",");
sb.append("\"_seq_no\":").append(UNASSIGNED_SEQ_NO).append(",");
sb.append("\"_primary_term\":").append(sequenceNumber == null ? UNASSIGNED_PRIMARY_TERM : 1).append(",");
sb.append("\"_seq_no\":").append(sequenceNumber == null ? UNASSIGNED_SEQ_NO : sequenceNumber).append(",");
sb.append("\"_version\":").append(-1).append(",");
sb.append("\"_shards\":").append(Strings.toString(MediaTypeRegistry.JSON, new ShardInfo())).append(",");
// Finish up
Expand Down
Loading

0 comments on commit d5caadf

Please sign in to comment.