From d1c9eaad9b8b86bf74382cc826ce5dfbb36229b0 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 10 Sep 2024 15:22:49 -0700 Subject: [PATCH] trying to upgrade the client --- pom.xml | 2 +- .../OpenSearchHighLevelRestClient.java | 18 +++++++++++------- tests/integration/pom.xml | 5 +++++ .../io/sinks/OpenSearchSinkTester.java | 1 + 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index d129fdfc48920..d06b80570f5cd 100644 --- a/pom.xml +++ b/pom.xml @@ -159,7 +159,7 @@ flexible messaging model and an intuitive client API. 2.7.5 3.3.5 2.4.10 - 1.2.4 + 2.16.0 8.5.2 334 2.13 diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java index 3b7059cafd1d5..7b7393db31f46 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java @@ -50,12 +50,12 @@ import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.CreateIndexResponse; import org.opensearch.client.indices.GetIndexRequest; -import org.opensearch.common.Strings; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.ByteSizeUnit; -import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.builder.SearchSourceBuilder; @@ -229,7 +229,8 @@ public boolean indexDocument(String index, String documentId, String documentSou if (!Strings.isNullOrEmpty(documentId)) { indexRequest.id(documentId); } - indexRequest.type(config.getTypeName()); + // no longer needed? + //indexRequest.type(config.getTypeName()); indexRequest.source(documentSource, XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); @@ -245,7 +246,8 @@ public boolean indexDocument(String index, String documentId, String documentSou public boolean deleteDocument(String index, String documentId) throws IOException { DeleteRequest deleteRequest = Requests.deleteRequest(index); deleteRequest.id(documentId); - deleteRequest.type(config.getTypeName()); + // no longer needed? + //deleteRequest.type(config.getTypeName()); DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT); if (log.isDebugEnabled()) { log.debug("delete result {}", deleteResponse.getResult()); @@ -301,7 +303,8 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO if (!Strings.isNullOrEmpty(request.getDocumentId())) { indexRequest.id(request.getDocumentId()); } - indexRequest.type(config.getTypeName()); + // no longer needed? + //indexRequest.type(config.getTypeName()); indexRequest.source(request.getDocumentSource(), XContentType.JSON); if (log.isDebugEnabled()) { log.debug("append index request id={}, type={}, source={}", request.getDocumentId(), config.getTypeName(), @@ -314,7 +317,8 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO public void appendDeleteRequest(BulkProcessor.BulkDeleteRequest request) throws IOException { DeleteRequest deleteRequest = new DeleteRequestWithPulsarRecord(request.getIndex(), request.getRecord()); deleteRequest.id(request.getDocumentId()); - deleteRequest.type(config.getTypeName()); + // no longer needed? + //deleteRequest.type(config.getTypeName()); if (log.isDebugEnabled()) { log.debug("append delete request id={}, type={}", request.getDocumentId(), config.getTypeName()); } diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index 1d77c1bea5ae7..5045d645d0dc9 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -207,6 +207,11 @@ aws-java-sdk-core test + + org.projectlombok + lombok + test + diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java index 5b8deabb114f6..11ab7c53df764 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java @@ -49,6 +49,7 @@ protected ElasticsearchContainer createElasticContainer() { DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH) .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); return new ElasticsearchContainer(dockerImageName) + .withEnv("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "0pEn7earch!") .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m") .withEnv("bootstrap.memory_lock", "true") .withEnv("plugins.security.disabled", "true");