Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark的StorageLevel源码分析 #10

Open
cjuexuan opened this issue Feb 18, 2016 · 0 comments
Open

spark的StorageLevel源码分析 #10

cjuexuan opened this issue Feb 18, 2016 · 0 comments
Labels

Comments

@cjuexuan
Copy link
Member

spark的存储

类的全名为 org.apache.spark.storage.StorageLevel

class相关

首先该类主构造器中定义了5个属性:

class StorageLevel private(
                              private var _useDisk: Boolean,
                              private var _useMemory: Boolean,
                              private var _useOffHeap: Boolean,
                              private var _deserialized: Boolean,
                              private var _replication: Int = 1
                            ) extends Externalizable{
  override def readExternal(in: ObjectInput): Unit = ???

  override def writeExternal(out: ObjectOutput): Unit = ???
}

由于并不希望所有的东西都被序列化,或者说在对象还原之后,内部的子对象会重新创建而不需要将该子对象序列化,所以这里用有着readExternalwriteExternal方法的Externalizable接口代替Serializable,在序列化和反序列化时会调用对应的方法

再看辅助构造器:

 def this() = this(false, true, false, false) 
   // TODO: Also add fields for caching priority, dataset ID, and flushing.
  private def this(flags: Int, replication: Int) {
    this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
  }

一个是私有的,一个是对外开放的,私有的那个过一会再说怎么用,现在先卖个关子

接下来是一些方法:

 def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication

  assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes")

override def hashCode(): Int = toInt * 41 + replication

这里明确告诉我们replication要小于40,而useOffHeap的使用主要是避免一些jvm gc和节约jvm 内存,注意如果是useOffHeap,那么将不支持useDiskuseMemory,也不支持deserialized,同时replication要等于1,源代码是:

  if (useOffHeap) {
    require(!useDisk, "Off-heap storage level does not support using disk")
    require(!useMemory, "Off-heap storage level does not support using heap memory")
    require(!deserialized, "Off-heap storage level does not support deserialized storage")
    require(replication == 1, "Off-heap storage level does not support multiple replication")
  }

里面有个很有意思的方法,toInt

def toInt: Int = {
    var ret = 0
    if (_useDisk) {
      ret |= 8
    }
    if (_useMemory) {
      ret |= 4
    }
    if (_useOffHeap) {
      ret |= 2
    }
    if (_deserialized) {
      ret |= 1
    }
    ret
  }

这个方法主要是利用二进制数实现一个使用存储的判等,如果使用_useDisk,最终的ret是1000:

object A extends App{
  val _useDisk = true
  val _useMemory = false
  val _useOffHeap = false
  val _deserialized = false

  def toInt: Int = {
    var ret = 0
    if (_useDisk) {
      ret |= 8
    }
    if (_useMemory) {
      ret |= 4
    }
    if (_useOffHeap) {
      ret |= 2
    }
    if (_deserialized) {
      ret |= 1
    }
    ret
  }
  println(Integer.toBinaryString(toInt))//1000,二进制的8

}

那clone和equals方法也容易理解

  override def clone(): StorageLevel = {
    new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
  }

  override def equals(other: Any): Boolean = other match {
    case s: StorageLevel =>
      s.useDisk == useDisk &&
      s.useMemory == useMemory &&
      s.useOffHeap == useOffHeap &&
      s.deserialized == deserialized &&
      s.replication == replication
    case _ =>
      false
  }

而有效性检查则是

def isValid: Boolean = (useMemory || useDisk || useOffHeap) && (replication > 0)

也就是只要replication大于0以及使用了内存、磁盘或者OffHeap的任意一种就可以
接下来查看刚才接口中还未实现的两个方法:
先看序列化时候调用的writeExternal

  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
    out.writeByte(toInt)
    out.writeByte(_replication)
  }

先介绍

  def tryOrIOException(block: => Unit) {
    try {
      block
    } catch {
      case e: IOException => throw e
      case NonFatal(t) => throw new IOException(t)
    }
  }

避免了每次去catchIOException,而对代码块进行封装,
这里调用toInt去保存此时的关于useDisk,useMemory,useMemory的状态,比如如果是useDisk,其他为false,那么则会写入8,也就是二进制的1000,不得不佩服设计的巧妙,同时也将replication作为第二个字节写入了,这样就保存了此时的对象状态
下面进行反序列化

  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
    val flags = in.readByte()
    _useDisk = (flags & 8) != 0
    _useMemory = (flags & 4) != 0
    _useOffHeap = (flags & 2) != 0
    _deserialized = (flags & 1) != 0
    _replication = in.readByte()
  }

如果我们在序列化阶段写入的是8,那么此时flags就是8,那么8与8进行&操作结果将是8,也就是1000,是不等于0的,所以_useDisk为true,其他部分进行&操作是false,所以能在反序列化的时候得到正确的值
最后看一下描述以及toString

  @throws(classOf[IOException])
  private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)

  override def toString: String = {
    s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, $replication)"
  }
  def description: String = {
    var result = ""
    result += (if (useDisk) "Disk " else "")
    result += (if (useMemory) "Memory " else "")
    result += (if (useOffHeap) "ExternalBlockStore " else "")
    result += (if (deserialized) "Deserialized " else "Serialized ")
    result += s"${replication}x Replicated"
    result
  }

这个也没啥介绍的,比较直观

object

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)

如果是DISK_ONLY,那么则构建的是new StorageLevel(true, false, false, false,1),也就是useDisk属性为true,其余为false,replication为1,使用默认值,在这里省略为:new StorageLevel(true, false, false, false)
同样我们知道了如果是replication为2,则是DISK_ONLY_2,以此类推

而fromString方法主要使用模式匹配选择上述策略

  @DeveloperApi
  def fromString(s: String): StorageLevel = s match {
    case "NONE" => NONE
    case "DISK_ONLY" => DISK_ONLY
    case "DISK_ONLY_2" => DISK_ONLY_2
    case "MEMORY_ONLY" => MEMORY_ONLY
    case "MEMORY_ONLY_2" => MEMORY_ONLY_2
    case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
    case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
    case "MEMORY_AND_DISK" => MEMORY_AND_DISK
    case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
    case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
    case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
    case "OFF_HEAP" => OFF_HEAP
    case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
  }

接着看两个私有方法,一个用了线程安全的ConcurrentHashMap构建了一个cache,另一个则使用putIfAbsent的原子性操作去修改和或获取level,这个方法类似put,但只有当不含有该key时才能添加进去,如果已经包含该key,则会保存现有值

  private[spark] val storageLevelCache = new ConcurrentHashMap[StorageLevel, StorageLevel]()

  private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
    storageLevelCache.putIfAbsent(level, level)
    storageLevelCache.get(level)
  }

最后介绍几个apply方法:

  /**
   * :: DeveloperApi ::
   * Create a new StorageLevel object without setting useOffHeap.
   注释有问题,这里应该是返回一个新的对象而需要设置全部属性
   */
  @DeveloperApi
  def apply(
      useDisk: Boolean,
      useMemory: Boolean,
      useOffHeap: Boolean,
      deserialized: Boolean,
      replication: Int): StorageLevel = {
    getCachedStorageLevel(
      new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))
  }

  /**
   * :: DeveloperApi ::
   * Create a new StorageLevel object.
   这里应该是将useOffHeap设为了false,传递参数时不需要给定useOffHeap的值
   */
  @DeveloperApi
  def apply(
      useDisk: Boolean,
      useMemory: Boolean,
      deserialized: Boolean,
      replication: Int = 1): StorageLevel = {
    getCachedStorageLevel(new StorageLevel(useDisk, useMemory, false, deserialized, replication))
  }

  /**
   * :: DeveloperApi ::
   * Create a new StorageLevel object from its integer representation.
   通过flags和replication设置,调用的就是上述那个私有的辅助构造器
   */
  @DeveloperApi
  def apply(flags: Int, replication: Int): StorageLevel = {
    getCachedStorageLevel(new StorageLevel(flags, replication))
  }

  /**
   * :: DeveloperApi ::
   * Read StorageLevel object from ObjectInput stream.
   这边是序列化相关的做法
   */
  @DeveloperApi
  def apply(in: ObjectInput): StorageLevel = {
    val obj = new StorageLevel()
    obj.readExternal(in)
    getCachedStorageLevel(obj)
  }

这是源码,但其实我们发现注释有问题,我给出了中文注释
java api:

package org.apache.spark.api.java;

import org.apache.spark.storage.StorageLevel;

/**
 * Expose some commonly useful storage level constants.
 */
public class StorageLevels {
  public static final StorageLevel NONE = create(false, false, false, false, 1);
  public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
  public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
  public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
  public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
  public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);
  public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2);
  public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1);
  public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2);
  public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1);
  public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2);
  public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1);

  /**
   * Create a new StorageLevel object.
   * @param useDisk saved to disk, if true
   * @param useMemory saved to memory, if true
   * @param deserialized saved as deserialized objects, if true
   * @param replication replication factor
   */
  @Deprecated
  public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized,
      int replication) {
    return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication);
  }

  /**
   * Create a new StorageLevel object.
   * @param useDisk saved to disk, if true
   * @param useMemory saved to memory, if true
   * @param useOffHeap saved to Tachyon, if true
   * @param deserialized saved as deserialized objects, if true
   * @param replication replication factor
   */
  public static StorageLevel create(
    boolean useDisk,
    boolean useMemory,
    boolean useOffHeap,
    boolean deserialized,
    int replication) {
    return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication);
  }
}

这个也是很容易理解的,最后我们看一下官方的使用建议

使用

storage level meaning
MEMORY_ONLY 将rdd作为java对象存储在jvm中反序列化,如果rdd在内存中装不下,一些分区将不会被cached,而且在需要的时候将进行再次计算,这是默认的存储级别
MEMORY_AND_DISK 将rdd作为java对象在jvm中反序列化,如果rdd在内存中装不下,超出部分将被保存在磁盘,在需要的时候从磁盘读取
MEMORY_ONLY_SER 将rdd作为序列化对象存储(每个分区占一个字节数组),通常比反序列化更高效利用空间,尤其是用fast serializer,但读取的cpu使用率更高
MEMORY_AND_DISK_SER 与MEMORY_ONLY_SER类似,但是在内存装不下时使用磁盘存储,而不是在需要的时候重新计算
DISK_ONLY 只将rdd存储在磁盘上
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和前面的level类似,只不过将分区复制到两个集群节点上,即replication=2
OFF_HEAP (experimental) Tachyon中用序列化格式存储rdd,相比MEMORY_ONLY_SER,OFF_HEAP减少垃圾回收的开销,使得executors更小以及共享内存池。这在高并发环境下有吸引力。此外,由于rdd驻留在Tachyon中,executor的崩溃不会造成数据的丢失,这种模式下,Tachyon中的内存是可废弃的,因此,Tachyon并不会尝试去重建从内存中清除的块。

使用建议:
优先使用默认方案,如果内存不够则使用MEMORY_ONLY_SER来节约空间,不到万不得已不使用disk,除非进行大数据量操作

@cjuexuan cjuexuan added the spark label Feb 18, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant