Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH] The deserializer is not supported: need a(n) "ARRAY" field but got "MAP<STRING, STRING>". #1295

Open
VIKCT001 opened this issue Sep 20, 2024 · 6 comments
Assignees

Comments

@VIKCT001
Copy link

VIKCT001 commented Sep 20, 2024

Recently we have been migrated to dataproc image 2.2 version along with supporting scala 2.12.18 and spark 3.5 version.

package test

import org.apache.spark.sql.SparkSession
import test.Model._

object readbigquery   {
  def main(args: Array[String]) = {

    val runLocally = true
    val jobName ="Test Spark Logging Case"

    implicit val spark: SparkSession = Some(SparkSession.builder.appName(jobName))
      .map(sparkSessionBuilder =>
        if (runLocally) sparkSessionBuilder.master("local[2]") else sparkSessionBuilder
      )
      .map(_.getOrCreate())
      .get

    val datasetConfigurationRowsRaw = getDatasetConfigurations(spark,confProjectId="test-project",mappingsDatasetName="MAPPINGS",datasetConfigurationsTableName="dataset_configurations")
    println(s"datasetConfigurationRowsRaw:$datasetConfigurationRowsRaw")

  }

  def getDatasetConfigurations(
                                spark: SparkSession,
                                confProjectId: String,
                                mappingsDatasetName: String,
                                datasetConfigurationsTableName: String,
                              ): Seq[DatasetConfigurationRow] = {

    import org.apache.spark.sql.functions._
    import spark.implicits._
    spark.read
      .format("bigquery")
      .option("table", s"$confProjectId.$mappingsDatasetName.$datasetConfigurationsTableName")
      .option("project", confProjectId)
      .load()
      .select(
        col("technology"),
        col("name"),
        col("source_name"),
        col("temporal_unit"),
        col("regional_unit"),
        col("identifier_column_names"),
        col("metadata_column_mappings"),
        col("column_mappings"),
        col("timestamp_column_name"))
      .as[DatasetConfigurationRow]
      .collect()
  }
}
package test

object Model {

case class DatasetConfigurationRow
(
  technology: String,
  name: String,
  identifier_column_names: Seq[String],
  column_mappings: Seq[ColumnMapping],
  timestamp_column_name: String,
)


case class ColumnMapping
(
  mapping_type: String,
  source_column_name: String,
  column_name: String,
  name: String,
  display_name: String,
  description: String,
  keep_source_column: Boolean,
  formula: String,
  functions: Functions
)

case class DataUnitFunction(key: String, value: String)

case class Functions
(
  fun_temporal: String,
  fun_regional: String,
  fun_temporal_unit: Seq[DataUnitFunction],
  fun_regional_unit: Seq[DataUnitFunction]
)

}

Above is the snippet of the actual code which was working fine with dataproc image 2.0 and scala 2.12.16 and spark 3.3 version.

Here we are reading bigquery table and trying to load them in set of case classes as they are defined above. Now we are facing below issue

Exception in thread "main" org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH] The deserializer is not supported: need a(n) "ARRAY" field but got "MAP<STRING, STRING>".

adding schema of a given bigquery table

CREATE TABLE `test-project.MAPPINGS.dataset_configurations`
(
  technology STRING,
  name STRING,
  identifier_column_names ARRAY<STRING>,
  column_mappings ARRAY<STRUCT<mapping_type STRING, source_column_name STRING, column_name STRING, name STRING, display_name STRING, description STRING, keep_source_column BOOL, formula STRING, functions STRUCT<fun_temporal STRING, fun_regional STRING, fun_temporal_unit ARRAY<STRUCT<key STRING, value STRING>>, fun_regional_unit ARRAY<STRUCT<key STRING, value STRING>>>>>,
  timestamp_column_name STRING
)

;

@VIKCT001
Copy link
Author

VIKCT001 commented Sep 26, 2024

I have created a sample code to generate the given error. The same piece of code was working fine with old connector on dataproc image version 2.0

@VIKCT001
Copy link
Author

if we complie our code as it was earlier we see below issue.

E```
xception in thread "main" org.apache.spark.sql.AnalysisException: Multiple sources found for bigquery
(com.google.cloud.spark.bigquery.v2.Spark34BigQueryTableProvider,
com.google.cloud.spark.bigquery.v2.Spark33BigQueryTableProvider), please specify the fully qualified class name.
at org.apache.spark.sql.errors.QueryCompilationErrors$.findMultipleDataSourceError(QueryCompilationErrors.scala:1576)



ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.12.16"

val sparkVersion = "3.3.0"

lazy val root = (project in file("."))
.settings(
name := "testing"
)

//libraryDependencies += "org.apache.spark" %% "spark-core" % "3.3.0"
//libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.0"

libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % Provided
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % Provided
libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "2.22.0"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.10.5"
libraryDependencies += "com.holdenkarau" %% "spark-testing-base" % s"${sparkVersion}_1.4.3"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8"

@davidrabinowitz
Copy link
Member

please don't add the spark bigquery connector using --jars/--packages, it is already built into image 2.2. See here if you want to change the version or type

@VIKCT001
Copy link
Author

we are not adding it as suggested in document but still we are getting this issue..

Exception in thread "main" org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH] 
The deserializer is not supported: need a(n) "ARRAY" field but got "MAP<STRING, STRING>".

Also, we have created our cluster with below parameter
Metadata SPARK_BQ_CONNECTOR_VERSION 0.41.0

@VIKCT001
Copy link
Author

VIKCT001 commented Sep 26, 2024

I had to change the schema of the case class Functions to solve this.

  case class Functions
(
  fun_temporal: String,
  fun_regional: String,
  fun_temporal_unit: Option[Map[String,String]]=None,
  fun_regional_unit: Option[Map[String,String]]=None
)

```Below is the dataframe schema which is loaded successfully to the set of case classes.

|-- technology: string (nullable = true)
 |-- name: string (nullable = true)
 |-- identifier_column_names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- timestamp_column_name: string (nullable = true)
 |-- column_mappings: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- mapping_type: string (nullable = true)
 |    |    |-- source_column_name: string (nullable = true)
 |    |    |-- column_name: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- display_name: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- keep_source_column: boolean (nullable = true)
 |    |    |-- formula: string (nullable = true)
 |    |    |-- functions: struct (nullable = true)
 |    |    |    |-- fun_temporal: string (nullable = true)
 |    |    |    |-- fun_regional: string (nullable = true)
 |    |    |    |-- fun_temporal_unit: map (nullable = false)
 |    |    |    |    |-- key: string
 |    |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |    |-- fun_regional_unit: map (nullable = false)
 |    |    |    |    |-- key: string
 |    |    |    |    |-- value: string (valueContainsNull = true)

Just to mention that same piece of code was working fine with old connector and **scala 2.12.16** and **spark 3.3 version**. This time we have to make changes to our application code to accommodate the connector and version changes

@VIKCT001
Copy link
Author

VIKCT001 commented Sep 28, 2024

Above solution is not working. Now code is giving null pointer exception with different set of data.

below is the stacktrace.

java.lang.NullPointerException: null
	at org.apache.spark.unsafe.UTF8StringBuilder.append(UTF8StringBuilder.java:76) ~[spark-unsafe_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_fieldToString_0_0$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_fieldToString_1_2$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_elementToString_1$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[spark-sql_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) ~[spark-sql_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) ~[spark-sql_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) ~[spark-common-utils_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) ~[spark-common-utils_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) [spark-core_2.12-3.5.0.jar:3.5.0]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
24/09/28 23:22:58 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (dnb-uat-pipe-pm-grp-a-transform-v1-m.c.bmas-eu-pm-dnb-uat-pipe.internal executor driver): java.lang.NullPointerException
	at org.apache.spark.unsafe.UTF8StringBuilder.append(UTF8StringBuilder.java:76)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_fieldToString_0_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_fieldToString_1_2$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_elementToString_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants