Skip to content

Commit

Permalink
SDK DDB client update and search interface implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Arjun kumar Giri <[email protected]>
  • Loading branch information
arjunkumargiri committed Jun 24, 2024
1 parent 1277830 commit 4a4b18a
Show file tree
Hide file tree
Showing 5 changed files with 457 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -38,30 +42,41 @@
import org.opensearch.sdk.UpdateDataObjectRequest;
import org.opensearch.sdk.UpdateDataObjectResponse;

import lombok.AllArgsConstructor;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;

import lombok.extern.log4j.Log4j2;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

/**
* DDB implementation of {@link SdkClient}. DDB table name will be mapped to index name.
*
*/
@AllArgsConstructor
@Log4j2
public class DDBOpenSearchClient implements SdkClient {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String DEFAULT_TENANT = "DEFAULT_TENANT";

private static final String HASH_KEY = "tenant_id";
private static final String RANGE_KEY = "id";
private static final String SOURCE = "source";

private DynamoDbClient dynamoDbClient;
private RemoteClusterIndicesClient remoteClusterIndicesClient;

public DDBOpenSearchClient(DynamoDbClient dynamoDbClient, RemoteClusterIndicesClient remoteClusterIndicesClient) {
this.dynamoDbClient = dynamoDbClient;
this.remoteClusterIndicesClient = remoteClusterIndicesClient;
}

/**
* DDB implementation to write data objects to DDB table. Tenant ID will be used as hash key and document ID will
Expand All @@ -76,16 +91,18 @@ public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRe
final String tableName = getTableName(request.index());
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<PutDataObjectResponse>) () -> {
String source = Strings.toString(MediaTypeRegistry.JSON, request.dataObject());
final Map<String, AttributeValue> item = Map
.ofEntries(
Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()),
Map.entry(RANGE_KEY, AttributeValue.builder().s(id).build()),
Map.entry(SOURCE, AttributeValue.builder().s(source).build())
);
final PutItemRequest putItemRequest = PutItemRequest.builder().tableName(tableName).item(item).build();
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(source);
Map<String, AttributeValue> item = convertJsonObjectToItem(jsonNode);
item.put(HASH_KEY, AttributeValue.builder().s(tenantId).build());
item.put(RANGE_KEY, AttributeValue.builder().s(id).build());
final PutItemRequest putItemRequest = PutItemRequest.builder().tableName(tableName).item(item).build();

dynamoDbClient.putItem(putItemRequest);
return new PutDataObjectResponse.Builder().id(id).created(true).build();
dynamoDbClient.putItem(putItemRequest);
return new PutDataObjectResponse.Builder().id(id).created(true).build();
} catch (IOException e) {
throw new OpenSearchStatusException("Failed to parse data object " + request.id(), RestStatus.BAD_REQUEST);
}
}), executor);
}

Expand Down Expand Up @@ -114,7 +131,8 @@ public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRe
return new GetDataObjectResponse.Builder().id(request.id()).parser(Optional.empty()).build();
}

String source = getItemResponse.item().get(SOURCE).s();
final ObjectNode sourceObject = convertToObjectNode((getItemResponse.item()));
final String source = OBJECT_MAPPER.writeValueAsString(sourceObject);
XContentParser parser = JsonXContent.jsonXContent
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source);
return new GetDataObjectResponse.Builder().id(request.id()).parser(Optional.of(parser)).build();
Expand All @@ -125,10 +143,37 @@ public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRe
}), executor);
}

/**
* Makes use of DDB update request to update data object.
*
*/
@Override
public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDataObjectRequest request, Executor executor) {
// TODO: Implement update
return null;
final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT;
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<UpdateDataObjectResponse>) () -> {
try {
String source = Strings.toString(MediaTypeRegistry.JSON, request.dataObject());
JsonNode jsonNode = OBJECT_MAPPER.readTree(source);
Map<String, AttributeValue> updateItem = convertJsonObjectToItem(jsonNode);
updateItem.put(HASH_KEY, AttributeValue.builder().s(tenantId).build());
updateItem.put(RANGE_KEY, AttributeValue.builder().s(request.id()).build());
UpdateItemRequest updateItemRequest = UpdateItemRequest
.builder()
.tableName(getTableName(request.index()))
.key(updateItem)
.build();
dynamoDbClient.updateItem(updateItemRequest);

return new UpdateDataObjectResponse.Builder().id(request.id()).shardId(request.index()).updated(true).build();
} catch (IOException e) {
log.error("Error updating {} in {}: {}", request.id(), request.index(), e.getMessage(), e);
// Rethrow unchecked exception on update IOException
throw new OpenSearchStatusException(
"Parsing error updating data object " + request.id() + " in index " + request.index(),
RestStatus.BAD_REQUEST
);
}
}), executor);
}

/**
Expand All @@ -155,16 +200,140 @@ public CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDat
}), executor);
}

/**
* DDB data needs to be synced with opensearch cluster. {@link RemoteClusterIndicesClient} will then be used to
* search data in opensearch cluster.
*
* @param request
* @param executor
* @return Search data object response
*/
@Override
public CompletionStage<SearchDataObjectResponse> searchDataObjectAsync(SearchDataObjectRequest request, Executor executor) {
// TODO will implement this later.

return null;
return this.remoteClusterIndicesClient.searchDataObjectAsync(request, executor);
}

private String getTableName(String index) {
// Table name will be same as index name. As DDB table name does not support dot(.)
// it will be removed from name.
return index.replaceAll("\\.", "");
}

@VisibleForTesting
static Map<String, AttributeValue> convertJsonObjectToItem(JsonNode jsonNode) {
Map<String, AttributeValue> item = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();

while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();

if (field.getValue().isTextual()) {
item.put(field.getKey(), AttributeValue.builder().s(field.getValue().asText()).build());
} else if (field.getValue().isNumber()) {
item.put(field.getKey(), AttributeValue.builder().n(field.getValue().asText()).build());
} else if (field.getValue().isBoolean()) {
item.put(field.getKey(), AttributeValue.builder().bool(field.getValue().asBoolean()).build());
} else if (field.getValue().isNull()) {
item.put(field.getKey(), AttributeValue.builder().nul(true).build());
} else if (field.getValue().isObject()) {
item.put(field.getKey(), AttributeValue.builder().m(convertJsonObjectToItem(field.getValue())).build());
} else if (field.getValue().isArray()) {
item.put(field.getKey(), AttributeValue.builder().l(convertJsonArrayToAttributeValueList(field.getValue())).build());
} else {
throw new IllegalArgumentException("Unsupported field type: " + field.getValue());
}
}

return item;
}

@VisibleForTesting
static List<AttributeValue> convertJsonArrayToAttributeValueList(JsonNode jsonArray) {
List<AttributeValue> attributeValues = new ArrayList<>();

for (JsonNode element : jsonArray) {
if (element.isTextual()) {
attributeValues.add(AttributeValue.builder().s(element.asText()).build());
} else if (element.isNumber()) {
attributeValues.add(AttributeValue.builder().n(element.asText()).build());
} else if (element.isBoolean()) {
attributeValues.add(AttributeValue.builder().bool(element.asBoolean()).build());
} else if (element.isNull()) {
attributeValues.add(AttributeValue.builder().nul(true).build());
} else if (element.isObject()) {
attributeValues.add(AttributeValue.builder().m(convertJsonObjectToItem(element)).build());
} else if (element.isArray()) {
attributeValues.add(AttributeValue.builder().l(convertJsonArrayToAttributeValueList(element)).build());
} else {
throw new IllegalArgumentException("Unsupported field type: " + element);
}

}

return attributeValues;
}

@VisibleForTesting
static ObjectNode convertToObjectNode(Map<String, AttributeValue> item) {
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();

item.forEach((key, value) -> {
switch (value.type()) {
case S:
objectNode.put(key, value.s());
break;
case N:
objectNode.put(key, value.n());
break;
case BOOL:
objectNode.put(key, value.bool());
break;
case L:
objectNode.put(key, convertToArrayNode(value.l()));
break;
case M:
objectNode.set(key, convertToObjectNode(value.m()));
break;
case NUL:
objectNode.putNull(key);
break;
default:
throw new IllegalArgumentException("Unsupported AttributeValue type: " + value.type());
}
});

return objectNode;

}

@VisibleForTesting
static ArrayNode convertToArrayNode(final List<AttributeValue> attributeValueList) {
ArrayNode arrayNode = OBJECT_MAPPER.createArrayNode();
attributeValueList.forEach(attribute -> {
switch (attribute.type()) {
case S:
arrayNode.add(attribute.s());
break;
case N:
arrayNode.add(attribute.n());
break;
case BOOL:
arrayNode.add(attribute.bool());
break;
case L:
arrayNode.add(convertToArrayNode(attribute.l()));
break;
case M:
arrayNode.add(convertToObjectNode(attribute.m()));
break;
case NUL:
arrayNode.add((JsonNode) null);
break;
default:
throw new IllegalArgumentException("Unsupported AttributeValue type: " + attribute.type());
}
});
return arrayNode;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public SdkClientModule() {
SdkClientModule(String remoteMetadataType, String remoteMetadataEndpoint, String region) {
this.remoteMetadataType = remoteMetadataType;
this.remoteMetadataEndpoint = remoteMetadataEndpoint;
this.region = region;
this.region = region == null ? "us-west-2" : region;
}

@Override
Expand All @@ -80,7 +80,8 @@ protected void configure() {
return;
case AWS_DYNAMO_DB:
log.info("Using dynamo DB as metadata store");
bind(SdkClient.class).toInstance(new DDBOpenSearchClient(createDynamoDbClient()));
bind(SdkClient.class)
.toInstance(new DDBOpenSearchClient(createDynamoDbClient(), new RemoteClusterIndicesClient(createOpenSearchClient())));
return;
default:
log.info("Using local opensearch cluster as metadata store");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ml.sdkclient;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;

@AllArgsConstructor
@Builder
@Getter
public class ComplexDataObject implements ToXContentObject {
private String testString;
private long testNumber;
private boolean testBool;
private List<String> testList;
private TestDataObject testObject;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field("testString", this.testString);
xContentBuilder.field("testNumber", this.testNumber);
xContentBuilder.field("testBool", this.testBool);
xContentBuilder.field("testList", this.testList);
xContentBuilder.field("testObject", this.testObject);
return xContentBuilder.endObject();
}

public static ComplexDataObject parse(XContentParser parser) throws IOException {
ComplexDataObjectBuilder builder = ComplexDataObject.builder();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
if ("testString".equals(fieldName)) {
builder.testString(parser.text());
} else if ("testNumber".equals(fieldName)) {
builder.testNumber(parser.longValue());
} else if ("testBool".equals(fieldName)) {
builder.testBool(parser.booleanValue());
} else if ("testList".equals(fieldName)) {
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
List<String> list = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
list.add(parser.text());
}
builder.testList(list);
} else if ("testObject".equals(fieldName)) {
builder.testObject(TestDataObject.parse(parser));
}
}
return builder.build();
}
}
Loading

0 comments on commit 4a4b18a

Please sign in to comment.