diff --git a/pom.xml b/pom.xml
index d129fdfc489205..d06b80570f5cdd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@ flexible messaging model and an intuitive client API.
2.7.5
3.3.5
2.4.10
- 1.2.4
+ 2.16.0
8.5.2
334
2.13
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
index 3b7059cafd1d53..7b7393db31f461 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
@@ -50,12 +50,12 @@
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
-import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
-import org.opensearch.common.unit.ByteSizeUnit;
-import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.core.common.Strings;
+import org.opensearch.core.common.unit.ByteSizeUnit;
+import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
@@ -229,7 +229,8 @@ public boolean indexDocument(String index, String documentId, String documentSou
if (!Strings.isNullOrEmpty(documentId)) {
indexRequest.id(documentId);
}
- indexRequest.type(config.getTypeName());
+ // no longer needed?
+ //indexRequest.type(config.getTypeName());
indexRequest.source(documentSource, XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
@@ -245,7 +246,8 @@ public boolean indexDocument(String index, String documentId, String documentSou
public boolean deleteDocument(String index, String documentId) throws IOException {
DeleteRequest deleteRequest = Requests.deleteRequest(index);
deleteRequest.id(documentId);
- deleteRequest.type(config.getTypeName());
+ // no longer needed?
+ //deleteRequest.type(config.getTypeName());
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
if (log.isDebugEnabled()) {
log.debug("delete result {}", deleteResponse.getResult());
@@ -301,7 +303,8 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO
if (!Strings.isNullOrEmpty(request.getDocumentId())) {
indexRequest.id(request.getDocumentId());
}
- indexRequest.type(config.getTypeName());
+ // no longer needed?
+ //indexRequest.type(config.getTypeName());
indexRequest.source(request.getDocumentSource(), XContentType.JSON);
if (log.isDebugEnabled()) {
log.debug("append index request id={}, type={}, source={}", request.getDocumentId(), config.getTypeName(),
@@ -314,7 +317,8 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO
public void appendDeleteRequest(BulkProcessor.BulkDeleteRequest request) throws IOException {
DeleteRequest deleteRequest = new DeleteRequestWithPulsarRecord(request.getIndex(), request.getRecord());
deleteRequest.id(request.getDocumentId());
- deleteRequest.type(config.getTypeName());
+ // no longer needed?
+ //deleteRequest.type(config.getTypeName());
if (log.isDebugEnabled()) {
log.debug("append delete request id={}, type={}", request.getDocumentId(), config.getTypeName());
}
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 1d77c1bea5ae7a..5045d645d0dc90 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -207,6 +207,11 @@
aws-java-sdk-core
test
+
+ org.projectlombok
+ lombok
+ test
+
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
index 5b8deabb114f60..11ab7c53df7641 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
@@ -49,6 +49,7 @@ protected ElasticsearchContainer createElasticContainer() {
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
return new ElasticsearchContainer(dockerImageName)
+ .withEnv("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "0pEn7earch!")
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("bootstrap.memory_lock", "true")
.withEnv("plugins.security.disabled", "true");