diff --git a/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java b/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java index be35242236..59f10ee792 100644 --- a/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java +++ b/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java @@ -61,11 +61,13 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest.Builder; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse; /** * DDB implementation of {@link SdkClient}. DDB table name will be mapped to index name. @@ -79,6 +81,8 @@ public class DDBOpenSearchClient implements SdkClientDelegate { private static final String HASH_KEY = "_tenant_id"; private static final String RANGE_KEY = "_id"; + + private static final String SOURCE = "_source"; private static final String SEQ_NO_KEY = "_seq_no"; private DynamoDbClient dynamoDbClient; @@ -108,25 +112,37 @@ public CompletionStage putDataObjectAsync(PutDataObjectRe final String id = request.id() != null ? request.id() : UUID.randomUUID().toString(); final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT; final String tableName = getTableName(request.index()); + final GetItemRequest getItemRequest = getGetItemRequest(request.tenantId(), request.id(), request.index()); + return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction) () -> { try { + GetItemResponse getItemResponse = dynamoDbClient.getItem(getItemRequest); + Long sequenceNumber = 1L; + if (getItemResponse != null && getItemResponse.item() != null && getItemResponse.item().containsKey(SEQ_NO_KEY)) { + sequenceNumber = Long.parseLong(getItemResponse.item().get(SEQ_NO_KEY).n()) + 1; + } String source = Strings.toString(MediaTypeRegistry.JSON, request.dataObject()); JsonNode jsonNode = OBJECT_MAPPER.readTree(source); - Map item = JsonTransformer.convertJsonObjectToDDBAttributeMap(jsonNode); + Map sourceMap = JsonTransformer.convertJsonObjectToDDBAttributeMap(jsonNode); + Map item = new HashMap<>(); item.put(HASH_KEY, AttributeValue.builder().s(tenantId).build()); item.put(RANGE_KEY, AttributeValue.builder().s(id).build()); + item.put(SOURCE, AttributeValue.builder().m(sourceMap).build()); + item.put(SEQ_NO_KEY, AttributeValue.builder().n(sequenceNumber.toString()).build()); Builder builder = PutItemRequest.builder().tableName(tableName).item(item); if (!request.overwriteIfExists()) { builder.conditionExpression("attribute_not_exists(" + HASH_KEY + ") AND attribute_not_exists(" + RANGE_KEY + ")"); } final PutItemRequest putItemRequest = builder.build(); - // TODO need to initialize/return SEQ_NO here - // If document doesn't exist, return 0 - // If document exists, overwrite and increment and return SEQ_NO dynamoDbClient.putItem(putItemRequest); - // TODO need to pass seqNo to simulated response - String simulatedIndexResponse = simulateOpenSearchResponse(request.index(), id, source, Map.of("result", "created")); + String simulatedIndexResponse = simulateOpenSearchResponse( + request.index(), + id, + source, + sequenceNumber, + Map.of("result", "created") + ); return PutDataObjectResponse.builder().id(id).parser(createParser(simulatedIndexResponse)).build(); } catch (IOException e) { // Rethrow unchecked exception on XContent parsing error @@ -142,33 +158,34 @@ public CompletionStage putDataObjectAsync(PutDataObjectRe */ @Override public CompletionStage getDataObjectAsync(GetDataObjectRequest request, Executor executor) { - final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT; - final GetItemRequest getItemRequest = GetItemRequest - .builder() - .tableName(getTableName(request.index())) - .key( - Map - .ofEntries( - Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()), - Map.entry(RANGE_KEY, AttributeValue.builder().s(request.id()).build()) - // TODO need to fetch SEQ_NO_KEY - ) - ) - .build(); + final GetItemRequest getItemRequest = getGetItemRequest(request.tenantId(), request.id(), request.index()); return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction) () -> { try { final GetItemResponse getItemResponse = dynamoDbClient.getItem(getItemRequest); ObjectNode sourceObject; boolean found; + String sequenceNumberString = null; if (getItemResponse == null || getItemResponse.item() == null || getItemResponse.item().isEmpty()) { found = false; sourceObject = null; } else { found = true; - sourceObject = JsonTransformer.convertDDBAttributeValueMapToObjectNode(getItemResponse.item()); + sourceObject = JsonTransformer.convertDDBAttributeValueMapToObjectNode(getItemResponse.item().get(SOURCE).m()); + if (getItemResponse.item().containsKey(SEQ_NO_KEY)) { + sequenceNumberString = getItemResponse.item().get(SEQ_NO_KEY).n(); + } } final String source = OBJECT_MAPPER.writeValueAsString(sourceObject); - String simulatedGetResponse = simulateOpenSearchResponse(request.index(), request.id(), source, Map.of("found", found)); + final Long sequenceNumber = sequenceNumberString == null || sequenceNumberString.isEmpty() + ? null + : Long.parseLong(sequenceNumberString); + String simulatedGetResponse = simulateOpenSearchResponse( + request.index(), + request.id(), + source, + sequenceNumber, + Map.of("found", found) + ); XContentParser parser = JsonXContent.jsonXContent .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, simulatedGetResponse); // This would consume parser content so we need to create a new parser for the map @@ -201,15 +218,15 @@ public CompletionStage updateDataObjectAsync(UpdateDat Map updateItem = JsonTransformer.convertJsonObjectToDDBAttributeMap(jsonNode); updateItem.remove(HASH_KEY); updateItem.remove(RANGE_KEY); - Map updateAttributeValue = updateItem - .entrySet() - .stream() - .collect( - Collectors - .toMap( - Map.Entry::getKey, - entry -> AttributeValueUpdate.builder().action(AttributeAction.PUT).value(entry.getValue()).build() - ) + Map updateAttributeValue = new HashMap<>(); + updateAttributeValue + .put( + SOURCE, + AttributeValueUpdate + .builder() + .action(AttributeAction.PUT) + .value(AttributeValue.builder().m(updateItem).build()) + .build() ); Map updateKey = new HashMap<>(); updateKey.put(HASH_KEY, AttributeValue.builder().s(tenantId).build()); @@ -219,20 +236,34 @@ public CompletionStage updateDataObjectAsync(UpdateDat .tableName(getTableName(request.index())) .key(updateKey) .attributeUpdates(updateAttributeValue); + updateItemRequestBuilder + .updateExpression("SET #seqNo = #seqNo + :incr") + .expressionAttributeNames(Map.of("#seqNo", SEQ_NO_KEY)) + .expressionAttributeValues(Map.of(":incr", AttributeValue.builder().n("1").build())); if (request.ifSeqNo() != null) { // Get current document version and put in attribute map. Ignore primary term on DDB. - int currentSeqNo = jsonNode.has(SEQ_NO_KEY) ? jsonNode.get(SEQ_NO_KEY).asInt() : 0; updateItemRequestBuilder .conditionExpression("#seqNo = :currentSeqNo") .expressionAttributeNames(Map.of("#seqNo", SEQ_NO_KEY)) .expressionAttributeValues( - Map.of(":currentSeqNo", AttributeValue.builder().n(Integer.toString(currentSeqNo)).build()) + Map.of(":currentSeqNo", AttributeValue.builder().n(Long.toString(request.ifSeqNo())).build()) ); } UpdateItemRequest updateItemRequest = updateItemRequestBuilder.build(); - dynamoDbClient.updateItem(updateItemRequest); - // TODO need to pass seqNo to simulated response - String simulatedUpdateResponse = simulateOpenSearchResponse(request.index(), request.id(), source, Map.of("found", true)); + UpdateItemResponse updateItemResponse = dynamoDbClient.updateItem(updateItemRequest); + Long sequenceNumber = null; + if (updateItemResponse != null + && updateItemResponse.attributes() != null + && updateItemResponse.attributes().containsKey(SEQ_NO_KEY)) { + sequenceNumber = Long.parseLong(updateItemResponse.attributes().get(SEQ_NO_KEY).n()); + } + String simulatedUpdateResponse = simulateOpenSearchResponse( + request.index(), + request.id(), + source, + sequenceNumber, + Map.of("result", "updated") + ); return UpdateDataObjectResponse.builder().id(request.id()).parser(createParser(simulatedUpdateResponse)).build(); } catch (ConditionalCheckFailedException ccfe) { log.error("Document version conflict updating {} in {}: {}", request.id(), request.index(), ccfe.getMessage(), ccfe); @@ -273,16 +304,16 @@ public CompletionStage deleteDataObjectAsync(DeleteDat .build(); return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction) () -> { try { - // TODO need to return SEQ_NO here - // If document doesn't exist, increment and return highest seq no ever seen, but we would have to track seqNo here - // If document never existed, return -2 (unassigned) for seq no (probably what we have to do here) - // If document exists, increment and return SEQ_NO - dynamoDbClient.deleteItem(deleteItemRequest); - // TODO need to pass seqNo to simulated response + DeleteItemResponse deleteItemResponse = dynamoDbClient.deleteItem(deleteItemRequest); + Long sequenceNumber = null; + if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(SEQ_NO_KEY)) { + sequenceNumber = Long.parseLong(deleteItemResponse.attributes().get(SEQ_NO_KEY).n()); + } String simulatedDeleteResponse = simulateOpenSearchResponse( request.index(), request.id(), null, + sequenceNumber, Map.of("result", "deleted") ); return DeleteDataObjectResponse.builder().id(request.id()).parser(createParser(simulatedDeleteResponse)).build(); @@ -321,14 +352,36 @@ private XContentParser createParser(String json) throws IOException { return jsonXContent.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS, json); } - private String simulateOpenSearchResponse(String index, String id, String source, Map additionalFields) { + private GetItemRequest getGetItemRequest(String requestTenantId, String documentId, String index) { + final String tenantId = requestTenantId != null ? requestTenantId : DEFAULT_TENANT; + final GetItemRequest getItemRequest = GetItemRequest + .builder() + .tableName(getTableName(index)) + .key( + Map + .ofEntries( + Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()), + Map.entry(RANGE_KEY, AttributeValue.builder().s(documentId).build()) + ) + ) + .build(); + return getItemRequest; + } + + private String simulateOpenSearchResponse( + String index, + String id, + String source, + Long sequenceNumber, + Map additionalFields + ) { StringBuilder sb = new StringBuilder("{"); // Fields with a DDB counterpart sb.append("\"_index\":\"").append(index).append("\","); sb.append("\"_id\":\"").append(id).append("\","); // Fields we must simulate using default values - sb.append("\"_primary_term\":").append(UNASSIGNED_PRIMARY_TERM).append(","); - sb.append("\"_seq_no\":").append(UNASSIGNED_SEQ_NO).append(","); + sb.append("\"_primary_term\":").append(sequenceNumber == null ? UNASSIGNED_PRIMARY_TERM : 1).append(","); + sb.append("\"_seq_no\":").append(sequenceNumber == null ? UNASSIGNED_SEQ_NO : sequenceNumber).append(","); sb.append("\"_version\":").append(-1).append(","); sb.append("\"_shards\":").append(Strings.toString(MediaTypeRegistry.JSON, new ShardInfo())).append(","); // Finish up diff --git a/plugin/src/test/java/org/opensearch/ml/sdkclient/DDBOpenSearchClientTests.java b/plugin/src/test/java/org/opensearch/ml/sdkclient/DDBOpenSearchClientTests.java index aa342c0560..c83bd41136 100644 --- a/plugin/src/test/java/org/opensearch/ml/sdkclient/DDBOpenSearchClientTests.java +++ b/plugin/src/test/java/org/opensearch/ml/sdkclient/DDBOpenSearchClientTests.java @@ -36,6 +36,7 @@ import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.update.UpdateResponse; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; @@ -82,6 +83,7 @@ public class DDBOpenSearchClientTests extends OpenSearchTestCase { private static final String HASH_KEY = "_tenant_id"; private static final String RANGE_KEY = "_id"; + private static final String SEQ_NUM = "_seq_no"; private static final String TEST_ID = "123"; private static final String TENANT_ID = "TEST_TENANT_ID"; @@ -151,17 +153,44 @@ public void testPutDataObject_HappyCase() throws IOException { IndexResponse indexActionResponse = IndexResponse.fromXContent(response.parser()); assertEquals(TEST_ID, indexActionResponse.getId()); assertEquals(DocWriteResponse.Result.CREATED, indexActionResponse.getResult()); + assertEquals(1, indexActionResponse.getSeqNo()); PutItemRequest putItemRequest = putItemRequestArgumentCaptor.getValue(); Assert.assertEquals(TEST_INDEX, putItemRequest.tableName()); Assert.assertEquals(TEST_ID, putItemRequest.item().get(RANGE_KEY).s()); Assert.assertEquals(TENANT_ID, putItemRequest.item().get(HASH_KEY).s()); + Assert.assertEquals("1", putItemRequest.item().get(SEQ_NUM).n()); Assert .assertEquals( "attribute_not_exists(" + HASH_KEY + ") AND attribute_not_exists(" + RANGE_KEY + ")", putItemRequest.conditionExpression() ); - Assert.assertEquals("foo", putItemRequest.item().get("data").s()); + Assert.assertEquals("foo", putItemRequest.item().get("_source").m().get("data").s()); + } + + @Test + public void testPutDataObject_ExistingDocument_UpdatesSequenceNumber() throws IOException { + PutDataObjectRequest putRequest = PutDataObjectRequest + .builder() + .index(TEST_INDEX) + .id(TEST_ID) + .overwriteIfExists(false) + .tenantId(TENANT_ID) + .dataObject(testDataObject) + .build(); + Mockito + .when(dynamoDbClient.getItem(Mockito.any(GetItemRequest.class))) + .thenReturn(GetItemResponse.builder().item(ImmutableMap.of(SEQ_NUM, AttributeValue.builder().n("5").build())).build()); + Mockito.when(dynamoDbClient.putItem(Mockito.any(PutItemRequest.class))).thenReturn(PutItemResponse.builder().build()); + PutDataObjectResponse response = sdkClient + .putDataObjectAsync(putRequest, testThreadPool.executor(GENERAL_THREAD_POOL)) + .toCompletableFuture() + .join(); + Mockito.verify(dynamoDbClient).putItem(putItemRequestArgumentCaptor.capture()); + PutItemRequest putItemRequest = putItemRequestArgumentCaptor.getValue(); + IndexResponse indexActionResponse = IndexResponse.fromXContent(response.parser()); + assertEquals(6, indexActionResponse.getSeqNo()); + Assert.assertEquals("6", putItemRequest.item().get(SEQ_NUM).n()); } @Test @@ -185,13 +214,13 @@ public void testPutDataObject_WithComplexData() throws IOException { sdkClient.putDataObjectAsync(putRequest, testThreadPool.executor(GENERAL_THREAD_POOL)).toCompletableFuture().join(); Mockito.verify(dynamoDbClient).putItem(putItemRequestArgumentCaptor.capture()); PutItemRequest putItemRequest = putItemRequestArgumentCaptor.getValue(); - Assert.assertEquals("testString", putItemRequest.item().get("testString").s()); - Assert.assertEquals("123", putItemRequest.item().get("testNumber").n()); - Assert.assertEquals(true, putItemRequest.item().get("testBool").bool()); - Assert.assertEquals("123", putItemRequest.item().get("testList").l().get(0).s()); - Assert.assertEquals("hello", putItemRequest.item().get("testList").l().get(1).s()); - Assert.assertEquals(null, putItemRequest.item().get("testList").l().get(2).s()); - Assert.assertEquals("foo", putItemRequest.item().get("testObject").m().get("data").s()); + Assert.assertEquals("testString", putItemRequest.item().get("_source").m().get("testString").s()); + Assert.assertEquals("123", putItemRequest.item().get("_source").m().get("testNumber").n()); + Assert.assertEquals(true, putItemRequest.item().get("_source").m().get("testBool").bool()); + Assert.assertEquals("123", putItemRequest.item().get("_source").m().get("testList").l().get(0).s()); + Assert.assertEquals("hello", putItemRequest.item().get("_source").m().get("testList").l().get(1).s()); + Assert.assertEquals(null, putItemRequest.item().get("_source").m().get("testList").l().get(2).s()); + Assert.assertEquals("foo", putItemRequest.item().get("_source").m().get("testObject").m().get("data").s()); } @Test @@ -237,7 +266,20 @@ public void testGetDataObject_HappyCase() throws IOException { GetDataObjectRequest getRequest = GetDataObjectRequest.builder().index(TEST_INDEX).id(TEST_ID).tenantId(TENANT_ID).build(); GetItemResponse getItemResponse = GetItemResponse .builder() - .item(Map.ofEntries(Map.entry("data", AttributeValue.builder().s("foo").build()))) + .item( + Map + .ofEntries( + Map + .entry( + "_source", + AttributeValue + .builder() + .m(Map.ofEntries(Map.entry("data", AttributeValue.builder().s("foo").build()))) + .build() + ), + Map.entry(SEQ_NUM, AttributeValue.builder().n("1").build()) + ) + ) .build(); Mockito.when(dynamoDbClient.getItem(Mockito.any(GetItemRequest.class))).thenReturn(getItemResponse); GetDataObjectResponse response = sdkClient @@ -252,11 +294,13 @@ public void testGetDataObject_HappyCase() throws IOException { Assert.assertEquals(TEST_ID, response.id()); Assert.assertEquals("foo", response.source().get("data")); XContentParser parser = response.parser(); + GetResponse getResponse = GetResponse.fromXContent(parser); + Assert.assertEquals(1, getResponse.getSeqNo()); XContentParser dataParser = XContentHelper .createParser( NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, - GetResponse.fromXContent(parser).getSourceAsBytesRef(), + getResponse.getSourceAsBytesRef(), XContentType.JSON ); ensureExpectedToken(XContentParser.Token.START_OBJECT, dataParser.nextToken(), dataParser); @@ -271,19 +315,8 @@ public void testGetDataObject_ComplexDataObject() throws IOException { .item( Map .ofEntries( - Map.entry("testString", AttributeValue.builder().s("testString").build()), - Map.entry("testNumber", AttributeValue.builder().n("123").build()), - Map.entry("testBool", AttributeValue.builder().bool(true).build()), - Map - .entry( - "testList", - AttributeValue.builder().l(Arrays.asList(AttributeValue.builder().s("testString").build())).build() - ), - Map - .entry( - "testObject", - AttributeValue.builder().m(ImmutableMap.of("data", AttributeValue.builder().s("foo").build())).build() - ) + Map.entry("_source", AttributeValue.builder().m(getComplexDataSource()).build()), + Map.entry(SEQ_NUM, AttributeValue.builder().n("1").build()) ) ) .build(); @@ -345,7 +378,9 @@ public void testGetDataObject_DDBException_ThrowsOSException() throws IOExceptio @Test public void testDeleteDataObject_HappyCase() throws IOException { DeleteDataObjectRequest deleteRequest = DeleteDataObjectRequest.builder().id(TEST_ID).index(TEST_INDEX).tenantId(TENANT_ID).build(); - Mockito.when(dynamoDbClient.deleteItem(deleteItemRequestArgumentCaptor.capture())).thenReturn(DeleteItemResponse.builder().build()); + Mockito + .when(dynamoDbClient.deleteItem(deleteItemRequestArgumentCaptor.capture())) + .thenReturn(DeleteItemResponse.builder().attributes(ImmutableMap.of(SEQ_NUM, AttributeValue.builder().n("5").build())).build()); DeleteDataObjectResponse deleteResponse = sdkClient .deleteDataObjectAsync(deleteRequest, testThreadPool.executor(GENERAL_THREAD_POOL)) .toCompletableFuture() @@ -358,10 +393,12 @@ public void testDeleteDataObject_HappyCase() throws IOException { DeleteResponse deleteActionResponse = DeleteResponse.fromXContent(deleteResponse.parser()); assertEquals(TEST_ID, deleteActionResponse.getId()); + assertEquals(5, deleteActionResponse.getSeqNo()); assertEquals(DocWriteResponse.Result.DELETED, deleteActionResponse.getResult()); assertEquals(0, deleteActionResponse.getShardInfo().getFailed()); assertEquals(0, deleteActionResponse.getShardInfo().getSuccessful()); assertEquals(0, deleteActionResponse.getShardInfo().getTotal()); + } @Test @@ -393,12 +430,12 @@ public void updateDataObjectAsync_HappyCase() { assertEquals(TEST_INDEX, updateItemRequest.tableName()); assertEquals(TEST_ID, updateItemRequest.key().get(RANGE_KEY).s()); assertEquals(TENANT_ID, updateItemRequest.key().get(HASH_KEY).s()); - assertEquals("foo", updateItemRequest.attributeUpdates().get("data").value().s()); + assertEquals("foo", updateItemRequest.attributeUpdates().get("_source").value().m().get("data").s()); } @Test - public void updateDataObjectAsync_HappyCaseWithMap() { + public void updateDataObjectAsync_HappyCaseWithMap() throws Exception { UpdateDataObjectRequest updateRequest = UpdateDataObjectRequest .builder() .id(TEST_ID) @@ -406,18 +443,21 @@ public void updateDataObjectAsync_HappyCaseWithMap() { .tenantId(TENANT_ID) .dataObject(Map.of("foo", "bar")) .build(); - Mockito.when(dynamoDbClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenReturn(UpdateItemResponse.builder().build()); + Mockito + .when(dynamoDbClient.updateItem(updateItemRequestArgumentCaptor.capture())) + .thenReturn(UpdateItemResponse.builder().attributes(ImmutableMap.of(SEQ_NUM, AttributeValue.builder().n("5").build())).build()); UpdateDataObjectResponse updateResponse = sdkClient .updateDataObjectAsync(updateRequest, testThreadPool.executor(GENERAL_THREAD_POOL)) .toCompletableFuture() .join(); assertEquals(TEST_ID, updateResponse.id()); UpdateItemRequest updateItemRequest = updateItemRequestArgumentCaptor.getValue(); - assertEquals(TEST_ID, updateRequest.id()); assertEquals(TEST_INDEX, updateItemRequest.tableName()); assertEquals(TEST_ID, updateItemRequest.key().get(RANGE_KEY).s()); assertEquals(TENANT_ID, updateItemRequest.key().get(HASH_KEY).s()); - assertEquals("bar", updateItemRequest.attributeUpdates().get("foo").value().s()); + assertEquals("bar", updateItemRequest.attributeUpdates().get("_source").value().m().get("foo").s()); + UpdateResponse response = UpdateResponse.fromXContent(updateResponse.parser()); + Assert.assertEquals(5, response.getSeqNo()); } @Test @@ -497,4 +537,19 @@ public void searchDataObjectAsync_SystemIndex() { Assert.assertEquals("test_index", searchDataObjectRequestArgumentCaptor.getValue().indices()[0]); } + private Map getComplexDataSource() { + return Map + .ofEntries( + Map.entry("testString", AttributeValue.builder().s("testString").build()), + Map.entry("testNumber", AttributeValue.builder().n("123").build()), + Map.entry("testBool", AttributeValue.builder().bool(true).build()), + Map.entry("testList", AttributeValue.builder().l(Arrays.asList(AttributeValue.builder().s("testString").build())).build()), + Map + .entry( + "testObject", + AttributeValue.builder().m(ImmutableMap.of("data", AttributeValue.builder().s("foo").build())).build() + ) + ); + } + }