Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark3升级后es-spark不能正常工作的分析 #72

Open
cjuexuan opened this issue Sep 19, 2020 · 3 comments
Open

spark3升级后es-spark不能正常工作的分析 #72

cjuexuan opened this issue Sep 19, 2020 · 3 comments
Labels

Comments

@cjuexuan
Copy link
Owner

背景

最近在忙着升级spark3,我们自己改的代码基本都已经搞定了,但是外部数据源es还有些问题,这篇文章主要说一下存在的问题和如何修复

现象

我们升级spark3之后,集成测试有些索引是能正常工作的,有些索引却不能读取了,主要的异常信息如下:

scala.None$ is not a valid external type for schema of string
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_3$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_2$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:211)
	... 36 common frames omitted

分析

对于一个问题的分析有多种方式,一种是正向跟踪源代码,找到完全出问题的地方,这种方式更加严谨,但是如果链路很长的时候,跟起来比较累,所以正向跟踪的我等下会在最后补充一下,这里我们说另一种排查的方式,那就是多次实验,观察现象,大胆假设,小心验证,接下来我们先用这种方式排查解决下该问题

大胆假设,小心验证

首先,我们从测试结果看到,并不是所有的索引都不能读了,而是部分索引不能读,我们的第一感觉就是是不是某种类型不支持?如果我们能找到那个不能读的类型,就能针对性的修复了

接下来,我们就来找一下相关代码,首先,我们看看能不能让es少读几个字段,这样我们就可以通过二分,很快的找到有问题的字段了
我们知道,如果用户不指定schema的情况下,es会通过index的mapping信息获取到index的schema
这部分逻辑具体可以看下ElasticsearchRelationlazySchema,这里不展开,而如果用户提供了schema就会用用户的schema,所以我们只要把schema塞到es的dataframe里即可,
至于塞schema的逻辑,也非常简单,我们只需要copy出EsSparkSQLesDF代码,然后增加schema即可

				sparkSession.read.format("org.elasticsearch.spark.sql")
					.options(esConf.asProperties.asScala.toMap)
				.schema(StructType.fromDDL("这里对index的所有字段做二分")) //新增自定义schema逻辑
					.load

然后,我们就找到那个有问题的字段,并且看一下这个字段和其他字段的区别,这里略过排查过程,给出现象,那就是有问题的字段是包含空值,而没有问题的字段是不包含空值的,这也能印证我们刚看到的异常信息
scala.None$ is not a valid external type for schema of string

接着,我们找一下es-spark里什么时候会用到None,这里直接文本搜索一下,基本就可以找到了,es-spark包里用None的地方一共有五处,其中ScalaValueWriterRowValueReader以及DefaultSource里都是用在条件判断上,真正赋予None的是ScalaValueReadernullValue方法

  def nullValue() = { None }

至于调用的地方,主要是ScalaValueReadercheckNull方法

  def checkNull(converter: (String, Parser) => Any, value: String, parser: Parser) = {
    if (value != null) {
      if (!StringUtils.hasText(value) && emptyAsNull) {
        nullValue()
      }
      else {
        converter(value, parser).asInstanceOf[AnyRef]
      }
    }
    else {
      nullValue()
    }
  }

这也符合我们刚看到的现象,当出现空值的时候,es-spark会给他赋值成None,但是None在spark3上不能用来填充那些空值,所以出现了上述异常,那么我们就把这一行代码改成

  def nullValue() = { null }

然后编译发包,验证通过

更近一步,正向分析

到这里,问题是解决了,但是有好奇心的小伙伴肯定想到了,为什么这份代码能在spark2.4正常运行,到了spark3就不行了呢,我们还是希望知道到底spark哪里改了,导致这部分代码的行为改变了

spark3

首先,我们找到errorMsg的地方:ValidateExternalType,这里可以通过文本搜索找到

  private lazy val errMsg = s" is not a valid external type for schema of ${expected.simpleString}"

  private lazy val checkType: (Any) => Boolean = expected match {
    case _: DecimalType =>
      (value: Any) => {
        value.isInstanceOf[java.math.BigDecimal] || value.isInstanceOf[scala.math.BigDecimal] ||
          value.isInstanceOf[Decimal]
      }
    case _: ArrayType =>
      (value: Any) => {
        value.getClass.isArray || value.isInstanceOf[Seq[_]]
      }
    case _ =>
    // 这里抛出了异常,因为
      val dataTypeClazz = ScalaReflection.javaBoxedType(dataType)
      (value: Any) => {
        dataTypeClazz.isInstance(value)
      }
  }

  override def eval(input: InternalRow): Any = {
    val result = child.eval(input)
    if (checkType(result)) {
      result
    } else {
      throw new RuntimeException(s"${result.getClass.getName}$errMsg")
    }
  }

RowEncoder代码:

        val convertedField = if (field.nullable) {
          If(
            Invoke(inputObject, "isNullAt", BooleanType, Literal(index) :: Nil),
            // Because we strip UDTs, `field.dataType` can be different from `fieldValue.dataType`.
            // We should use `fieldValue.dataType` here.
            Literal.create(null, fieldValue.dataType),
            fieldValue
          )
        } else {
          fieldValue
        }

最终生成的代码是

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, diagnostics), StringType), true, false) AS diagnostics#50

也就是如果数据是null的话,则返回null,否则的话,当成UTF8String去处理,进入校验环节,然后抛出了我们刚才看到异常

调用的入口在DataSourceStrategy

  private[sql] def toCatalystRDD(
      relation: BaseRelation,
      output: Seq[Attribute],
      rdd: RDD[Row]): RDD[InternalRow] = {
    if (relation.needConversion) {
      val toRow = RowEncoder(StructType.fromAttributes(output)).createSerializer()
      rdd.mapPartitions { iterator =>
        iterator.map(toRow)
      }
    } else {
      rdd.asInstanceOf[RDD[InternalRow]]
    }
  }

我们这里梳理下3的逻辑

  1. DatasourceStrategy里面要将RDD[Row]转换到RDD[InternalRow],以满足spark sql的要求
  2. toRow方法通过RowEncoder.createSerializer实现对row的转换
  3. rowEncoder里面会判断Field是否nullable,如果nullable并且row里的数据也是null的,将返回null,否则会读取row的数据,转换,校验类型
  4. 由于es-spark对于null字段返回了None,所以走到校验逻辑中去了,由于None不是UTF8String,所以就报错了

spark2.4

但是spark2.4没有报错,那么我们从入口看一下spark2.4的逻辑

  private[this] def toCatalystRDD(
      relation: LogicalRelation,
      output: Seq[Attribute],
      rdd: RDD[Row]): RDD[InternalRow] = {
    if (relation.relation.needConversion) {
      execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
    } else {
      rdd.asInstanceOf[RDD[InternalRow]]
    }
  }
  
    /**
   * Convert the objects inside Row into the types Catalyst expected.
   */
  def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = {
    data.mapPartitions { iterator =>
      val numColumns = outputTypes.length
      val mutableRow = new GenericInternalRow(numColumns)
      val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
      iterator.map { r =>
        var i = 0
        while (i < numColumns) {
          mutableRow(i) = converters(i)(r(i))
          i += 1
        }

        mutableRow
      }
    }
  }
  
    def createToCatalystConverter(dataType: DataType): Any => Any = {
    if (isPrimitive(dataType)) {
      // Although the `else` branch here is capable of handling inbound conversion of primitives,
      // we add some special-case handling for those types here. The motivation for this relates to
      // Java method invocation costs: if we have rows that consist entirely of primitive columns,
      // then returning the same conversion function for all of the columns means that the call site
      // will be monomorphic instead of polymorphic. In microbenchmarks, this actually resulted in
      // a measurable performance impact. Note that this optimization will be unnecessary if we
      // use code generation to construct Scala Row -> Catalyst Row converters.
      def convert(maybeScalaValue: Any): Any = {
        if (maybeScalaValue.isInstanceOf[Option[Any]]) {
          maybeScalaValue.asInstanceOf[Option[Any]].orNull
        } else {
          maybeScalaValue
        }
      }
      convert
    } else {
      getConverterForType(dataType).toCatalyst
    }
  }

关键就是createToCatalystConverter中的toCatalyst

    final def toCatalyst(@Nullable maybeScalaValue: Any): CatalystType = {
      if (maybeScalaValue == null) {
        null.asInstanceOf[CatalystType]
      } else if (maybeScalaValue.isInstanceOf[Option[ScalaInputType]]) {
        val opt = maybeScalaValue.asInstanceOf[Option[ScalaInputType]]
        if (opt.isDefined) {
          toCatalystImpl(opt.get)
        } else {
          null.asInstanceOf[CatalystType] //这里处理了相关逻辑
        }
      } else {
        toCatalystImpl(maybeScalaValue.asInstanceOf[ScalaInputType])
      }
    }

在这里成功的处理了Option的情况

找了下git log,应该是23262
出于性能的考虑,把rowToRowRdd换成了RowEncoder,感兴趣的小伙伴可以自行跟踪下相关逻辑

@scxwhite
Copy link

大佬 牛逼,我也正在升级spark3.0遇见了你这个问题,根据你的方案解决了。请问你的es-spark 3.0是自己编译的吗?
我用的这个 elastic/elasticsearch-hadoop#1498

@cjuexuan
Copy link
Owner Author

@scxwhite 是我们自己编译的

@One-Big-Bug
Copy link

您们都是大佬,学习了

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants