Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"behavior.on.null.values": "delete" doesn't work properly #203

Open
OpenSanfi opened this issue May 29, 2023 · 1 comment
Open

"behavior.on.null.values": "delete" doesn't work properly #203

OpenSanfi opened this issue May 29, 2023 · 1 comment

Comments

@OpenSanfi
Copy link

I'm working with opensearch-connector-for-apache-kafka-3.0.0.
The "behavior.on.null.values": "delete" doesn't work properly.
Sometime delete with success.
Othertime it doesn't delete and there are several duplicated elements in the indexes.
This is my opensearch sink configuration:

{
"name": "open-sink-yyy-user-profiling-xxx",
"config": {
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"type.name": "kafkaconnect-yyy-crsm",
"behavior.on.null.values": "delete",
"connection.password": "",
"topics": "yyy-attributes-category-xxx,yyy-business-user-profiles-xxx,yyy-user-roles-xxx,yyy-business-users-xxx,yyy-offering-business-users-xxx,yyy-users-xxx,yyy-offering-users-xxx,yyy-user-attributes-xxx,yyy-user-relationships-xxx,yyy-user-invitations-xxx,yyy-users-devices-xxx,yyy-iam-group-mappings-xxx",
"tasks.max": "10",
"batch.size": "500",
"connection.timeout.ms": "30000",
"connection.username": "",
"max.retries": "12",
"retry.backoff.ms": "1000",
"schema.ignore": "true",
"value.converter.schemas.enable": "false",
"name": "open-sink-yyy-user-profiling-xxx",
"errors.tolerance": "all",
"connection.url": "http://dcpp-opensearch-cluster:9201",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"read.timeout.ms": "30000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"behavior.on.version.conflict": "warn"
},

We tried also without this setting "behavior.on.version.conflict": "warn"

But in that case the connectors fails and doesnt't work for any operation with this kind of exception

"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
nCaused by: org.apache.kafka.connect.errors.ConnectException: Failed to bulk processing after total of 1 attempt(s)
at io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry(RetryUtil.java:137)at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.execute
(BulkProcessor.java:367)
at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.call(BulkProcessor.java:356)
at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.call(BulkProcessor.java:337)...
4 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: One of the item in the bulk response failed. Reason:
[yyy-user-invitations-xxx/0FNsOYA_R2-Qx7JM04k6Fw][[yyy-user-invitations-xxx][0]]
OpenSearchException[OpenSearch exception [type=version_conflict_engine_exception,
reason=[yyy-1f585694-df20-4a97-bb29-0e73fb5a4832]: version conflict, current version [3] is higher or equal to the one provided [0]]]
at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.handleVersionConflict
(BulkProcessor.java:429)at io.aiven.kafka.connect.opensearch.BulkProcessor$BulkTask.lambda$execute$0(BulkProcessor.java:382)
at io.aiven.kafka.connect.opensearch.RetryUtil.callWithRetry(RetryUtil.java:119)... 7 more\n"

Please can you help me?
Regards
Gabriella

@freitasskeeled
Copy link

Not sure if it helps but we had a similar problem and we found out it was related with the default value for key.ignore.id.strategy which is the topic.partition.offset. Changed it to record.key and it stopped happening.

For us, the change made sense but it will depend on your use case/setup.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants