Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Illegal mutation in kafka-to-bigquery #2036

Open
atognolag opened this issue Nov 26, 2024 · 1 comment
Open

[Bug]: Illegal mutation in kafka-to-bigquery #2036

atognolag opened this issue Nov 26, 2024 · 1 comment
Labels
bug Something isn't working needs triage p2

Comments

@atognolag
Copy link

Related Template(s)

kafka_to_bigquery_flex

Template Version

2024-11-06-01_rc00

What happened?

When using the DirectRunner I got the following error:

BigQueryWriteUtils.BigQueryDynamicWrite/ConvertGenericRecordToTableRow/ParMultiDo(GenericRecordToTableRow) illegaly mutated value

The root cause is in BigQueryAvroUtils.java > convertRequiredField that is emptying the input object's bytebuffer

Relevant log output

{"message":"com.google.cloud.teleport.v2.common.UncaughtExceptionLogger - The template launch failed.\norg.apache.beam.sdk.util.IllegalMutationException: PTransform BigQueryWriteUtils.BigQueryDynamicWrite/ConvertGenericRecordToTableRow/ParMultiDo(GenericRecordToTableRow) illegaly mutated value FailsafeElement{originalPayload\u003dorg.apache.beam.sdk.io.kafka.KafkaRecord@c606b0d5, payload\u003d{\"data\": {\"id\": 1, \"name\": \"Ale\", \"age\": 37}, \"beforeData\": {\"id\": null, \"name\": \"Ale\", \"age\": 37}, \"headers\": {\"operation\": \"UPDATE\", \"changeSequence\": \"20241125155727630000000000000000005\", \"timestamp\": \"1970-01-01T00:00:00.000\", \"streamPosition\": \"00000000/0B9D7D68.2.00000000/0B9D7D68\", \"transactionId\": \"DD220A00000000000000000000000000\", \"changeMask\": \"\\u0000\", \"columnMask\": \"\\u0007\", \"transactionEventCounter\": 1, \"transactionLastEvent\": true}}, errorMessage\u003dnull, stacktrace\u003dnull} of class class com.google.cloud.teleport.v2.values.FailsafeElement. Input values must not be mutated in any way.\n\tat org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.verifyUnmodified(ImmutabilityEnforcementFactory.java:154)\n\tat org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.afterElement(ImmutabilityEnforcementFactory.java:131)\n\tat org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:175)\n\tat org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n","severity":"ERROR"}

org.apache.beam.sdk.util.IllegalMutationException: PTransform BigQueryWriteUtils.BigQueryDynamicWrite/ConvertGenericRecordToTableRow/ParMultiDo(GenericRecordToTableRow) illegaly mutated value FailsafeElement{originalPayload=org.apache.beam.sdk.io.kafka.KafkaRecord@c606b0d5, payload={"data": {"id": 1, "name": "Ale", "age": 37}, "beforeData": {"id": null, "name": "Ale", "age": 37}, "headers": {"operation": "UPDATE", "changeSequence": "20241125155727630000000000000000005", "timestamp": "1970-01-01T00:00:00.000", "streamPosition": "00000000/0B9D7D68.2.00000000/0B9D7D68", "transactionId": "DD220A00000000000000000000000000", "changeMask": "\u0000", "columnMask": "\u0007", "transactionEventCounter": 1, "transactionLastEvent": true}}, errorMessage=null, stacktrace=null} of class class com.google.cloud.teleport.v2.values.FailsafeElement. Input values must not be mutated in any way.

	at org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.verifyUnmodified(ImmutabilityEnforcementFactory.java:154)

	at org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.afterElement(ImmutabilityEnforcementFactory.java:131)

	at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:175)

	at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)

	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)

	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)

	at --- Async.Stack.Trace --- (captured by IntelliJ IDEA debugger)

	at java.base/java.util.concurrent.FutureTask.<init>(FutureTask.java:151)

	at java.base/java.util.concurrent.AbstractExecutorService.newTaskFor(AbstractExecutorService.java:93)

	at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:117)

	at org.apache.beam.runners.direct.TransformExecutorServices$ParallelTransformExecutor.schedule(TransformExecutorServices.java:79)

	at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.evaluateBundle(ExecutorServiceParallelExecutor.java:242)

	at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.process(ExecutorServiceParallelExecutor.java:218)

	at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.process(ExecutorServiceParallelExecutor.java:63)

	at org.apache.beam.runners.direct.QuiescenceDriver.processBundle(QuiescenceDriver.java:179)

	at org.apache.beam.runners.direct.QuiescenceDriver.processBundle(QuiescenceDriver.java:162)

	at org.apache.beam.runners.direct.QuiescenceDriver.applyUpdate(QuiescenceDriver.java:150)

	at org.apache.beam.runners.direct.QuiescenceDriver.drive(QuiescenceDriver.java:129)

	at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$2.run(ExecutorServiceParallelExecutor.java:187)

	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)

	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
@atognolag
Copy link
Author

Created PR: Avoid illegal-mutation error #2037 to fix this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage p2
Projects
None yet
Development

No branches or pull requests

1 participant