Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Shaded Jackson libraries cause issues #973

Open
normanj-bitquill opened this issue Dec 6, 2024 · 3 comments
Open

[BUG] Shaded Jackson libraries cause issues #973

normanj-bitquill opened this issue Dec 6, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@normanj-bitquill
Copy link
Contributor

What is the bug?
The Spark connector (Flint) appears to have issues with JSON. In FlintJacksonParser, it is calling a method on a Spark class to create a JsonFactory.

https://github.com/opensearch-project/opensearch-spark/blob/main/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala#L53

The Flint Jar currently has Jackson shaded. Calling a Spark class to create the JsonFactory will return an instance of com.fasterxml.jackson.core.JsonFactory. Since Jackson was shaded, FlintJacksonParser is expecting an instance of shaded.flint.com.fasterxml.jackson.core.JsonFactory.

This mismatch causes problems with at least the spark-shell any time Flint needs to parse JSON.

Not shading Jackson libraries and relying on them in the Spark distribution fixes this problem.

How can one reproduce the bug?
Steps to reproduce the behavior:

  1. Start an OpenSearch server
  2. Configure a Spark server to with the Flint extension. Add the settings so that it can connect to the OpenSearch server.
  3. Run spark-shell on the host running the Spark server
  4. Try to show the results of a query of an OpenSearch index. For example:
    spark.sql("SELECT * FROM dev.default.test").show()
    

What is the expected behavior?
The results should be formatted and displayed. For example:

+---+---+
|  x|  y|
+---+---+
|  1|foo|
|  2|bar|
|  3|baz|
+---+---+

What is your host/environment?

  • OS: Mac OS X 15.1.1
  • Version: main branch, built locally as of today
  • Plugins: Spark 3.5.3 with flint-spark-integration-assembly-0.7.0-SNAPSHOT.jar

Do you have any screenshots?
N/A

Do you have any additional context?
Stack Trace:

java.lang.NoSuchMethodError: 'shaded.flint.com.fasterxml.jackson.core.JsonFactory org.apache.spark.sql.catalyst.json.JSONOptions.buildJsonFactory()'
	at org.apache.spark.sql.flint.json.FlintJacksonParser.<init>(FlintJacksonParser.scala:53)
	at org.apache.spark.sql.flint.FlintPartitionReader.parser$lzycompute(FlintPartitionReader.scala:31)
	at org.apache.spark.sql.flint.FlintPartitionReader.parser(FlintPartitionReader.scala:31)
	at org.apache.spark.sql.flint.FlintPartitionReader.safeParser$lzycompute(FlintPartitionReader.scala:39)
	at org.apache.spark.sql.flint.FlintPartitionReader.safeParser(FlintPartitionReader.scala:37)
	at org.apache.spark.sql.flint.FlintPartitionReader.next(FlintPartitionReader.scala:53)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
24/12/06 22:30:05 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (9585a65ef886 executor driver): java.lang.NoSuchMethodError: 'shaded.flint.com.fasterxml.jackson.core.JsonFactory org.apache.spark.sql.catalyst.json.JSONOptions.buildJsonFactory()'
	at org.apache.spark.sql.flint.json.FlintJacksonParser.<init>(FlintJacksonParser.scala:53)
	at org.apache.spark.sql.flint.FlintPartitionReader.parser$lzycompute(FlintPartitionReader.scala:31)
	at org.apache.spark.sql.flint.FlintPartitionReader.parser(FlintPartitionReader.scala:31)
	at org.apache.spark.sql.flint.FlintPartitionReader.safeParser$lzycompute(FlintPartitionReader.scala:39)
	at org.apache.spark.sql.flint.FlintPartitionReader.safeParser(FlintPartitionReader.scala:37)
	at org.apache.spark.sql.flint.FlintPartitionReader.next(FlintPartitionReader.scala:53)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

24/12/06 22:30:05 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (9585a65ef886 executor driver): java.lang.NoSuchMethodError: 'shaded.flint.com.fasterxml.jackson.core.JsonFactory org.apache.spark.sql.catalyst.json.JSONOptions.buildJsonFactory()'
	at org.apache.spark.sql.flint.json.FlintJacksonParser.<init>(FlintJacksonParser.scala:53)
	at org.apache.spark.sql.flint.FlintPartitionReader.parser$lzycompute(FlintPartitionReader.scala:31)
	at org.apache.spark.sql.flint.FlintPartitionReader.parser(FlintPartitionReader.scala:31)
	at org.apache.spark.sql.flint.FlintPartitionReader.safeParser$lzycompute(FlintPartitionReader.scala:39)
	at org.apache.spark.sql.flint.FlintPartitionReader.safeParser(FlintPartitionReader.scala:37)
	at org.apache.spark.sql.flint.FlintPartitionReader.next(FlintPartitionReader.scala:53)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
@normanj-bitquill normanj-bitquill added bug Something isn't working untriaged labels Dec 6, 2024
@noCharger
Copy link
Collaborator

noCharger commented Jan 8, 2025

@normanj-bitquill Thanks for reporting this issue. Can you provide more details on how to reproduce this issue? Especially this step: Try to show the results of a query of an OpenSearch index: spark.sql("SELECT * FROM dev.default.test").show()

  • Is this "dev" a glue catalog and "test" a glue table? If yes, how did you setup the connection? If not, could you share steps to reproduce?
  • Could you share the create table query? Just curious if this is due to some specific data type

Cannot reproduce this on my local over spark catalog, neither from managed service

spark-sql (default)> CREATE TABLE spark_catalog.default.test (name STRING, age INT, address STRUCT<first: STRING, second: STRUCT<city: STRING, street: STRING>>) USING JSON;

> SELECT * FROM spark_catalog.default.test;
Jane Smith      25      {"first":"456 Elm St","second":{"city":"Los Angeles","street":"Hollywood Blvd"}}
Alice Brown     28      {"first":"101 Pine Rd","second":{"city":"San Francisco","street":"Market St"}}
Bob Johnson     35      {"first":"789 Oak Ave","second":{"city":"Chicago","street":"Michigan Ave"}}
John Doe        30      {"first":"123 Main St","second":{"city":"New York","street":"Broadway"}}
Time taken: 0.999 seconds, Fetched 4 row(s)

@dai-chen
Copy link
Collaborator

dai-chen commented Jan 9, 2025

@noCharger Based on the stacktrace, it seems our Flint data source and reader was invoked. If so, probably we need to read from some OpenSearch index to reproduce the issue?

@noCharger
Copy link
Collaborator

@noCharger Based on the stacktrace, it seems our Flint data source and reader was invoked. If so, probably we need to read from some OpenSearch index to reproduce the issue?

Thx, able to reproduce this when reading on OpenSearch index

import org.apache.spark.sql.DataFrame

scala> val reader = spark.read
reader: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@b70d29c

scala>   .format("flint")
res7: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@b70d29c

scala>   .schema("accountId STRING, eventName STRING, eventSource STRING")
res8: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@b70d29c

scala> val df: DataFrame = reader.load("myindex")
df: org.apache.spark.sql.DataFrame = [accountId: string, eventName: string ... 1 more field]

scala> df.show()
25/01/09 10:43:00 WARN RestClient: request [POST http://localhost:9200/myindex/_search?typed_keys=true&max_concurrent_shard_requests=5&ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true&ignore_throttled=true&search_type=query_then_fetch&batched_reduce_size=512&ccs_minimize_roundtrips=true] returned 1 warnings: [299 OpenSearch-2.15.1-SNAPSHOT-943f74e4c38c5a6a7267970919c9019c9cb2610f "Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled"]
25/01/09 10:43:00 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: 'shaded.flint.com.fasterxml.jackson.core.JsonFactory org.apache.spark.sql.catalyst.json.JSONOptions.buildJsonFactory()'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants