Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark][CherryPick] compaction with file size condition in parallel #556

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,24 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.vectorized.ArrowFakeRowAdaptor
import org.apache.spark.util.{SerializableConfiguration, Utils}
import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_NON_PARTITION_TABLE_PART_DESC, LAKESOUL_RANGE_PARTITION_SPLITTER}
import com.dmetasoul.lakesoul.meta.DBUtil
import com.dmetasoul.lakesoul.spark.clean.CleanOldCompaction.splitCompactFilePath
import org.apache.spark.sql.lakesoul.DelayedCopyCommitProtocol
import org.apache.spark.sql.execution.datasources.v2.parquet.{NativeParquetCompactionColumnarOutputWriter, NativeParquetOutputWriter}
import org.apache.spark.sql.lakesoul.{DelayedCommitProtocol, DelayedCopyCommitProtocol}
import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType}

import java.util.{Date, UUID}
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`

/** A helper object for writing FileFormat data out to a location. */
object LakeSoulFileWriter extends Logging {
val MAX_FILE_SIZE_KEY = "max_file_size"
val HASH_BUCKET_ID_KEY = "hash_bucket_id"
val SNAPPY_COMPRESS_RATIO = 3
val COPY_FILE_WRITER_KEY = "copy_file_writer"
val COPY_FILE_WRITER_SPLITTER = "\u0001"

/**
* Basic work flow of this command is:
* 1. Driver side setup, including output committer initialization and data source specific
Expand Down Expand Up @@ -178,7 +188,11 @@ object LakeSoulFileWriter extends Logging {
val nativeIOEnable = sparkSession.sessionState.conf.getConf(LakeSoulSQLConf.NATIVE_IO_ENABLE)

def nativeWrap(plan: SparkPlan): RDD[InternalRow] = {
if (isCompaction && !isCDC && !isBucketNumChanged && nativeIOEnable) {
if (isCompaction
&& staticBucketId != -1
&& !isCDC
&& !isBucketNumChanged
&& nativeIOEnable) {
plan match {
case withPartitionAndOrdering(_, _, child) =>
return nativeWrap(child)
Expand All @@ -203,8 +217,8 @@ object LakeSoulFileWriter extends Logging {
try {
// for compaction, we won't break ordering from batch scan
val (rdd, concurrentOutputWriterSpec) =
if (isCompaction && options.getOrElse("copyCompactedFile", "").nonEmpty) {
val data = Seq(InternalRow(options("copyCompactedFile")))
if (isCompaction && options.getOrElse(COPY_FILE_WRITER_KEY, "").nonEmpty) {
val data = options(COPY_FILE_WRITER_KEY).split(COPY_FILE_WRITER_SPLITTER).map(InternalRow(_)).toSeq
(sparkSession.sparkContext.parallelize(data), None)
} else if (!isBucketNumChanged && (orderingMatched || isCompaction)) {
(nativeWrap(empty2NullPlan), None)
Expand Down Expand Up @@ -410,6 +424,7 @@ object LakeSoulFileWriter extends Logging {
private var recordsInFile: Long = _
private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC)
.map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/"))
private val maxFileSize = options.get(MAX_FILE_SIZE_KEY)

/** Given an input row, returns the corresponding `bucketId` */
protected lazy val getBucketId: InternalRow => Int = {
Expand All @@ -419,26 +434,61 @@ object LakeSoulFileWriter extends Logging {
row => proj(row).getInt(0)
}

override protected def releaseCurrentWriter(): Unit = {
if (currentWriter != null) {
try {
currentWriter.close()
if (maxFileSize.isDefined) {
currentWriter.asInstanceOf[NativeParquetOutputWriter].flushResult.foreach(result => {
var (partitionDesc, flushResult) = result
partitionDesc = if (partitionDesc == "-4") {
options.getOrElse("partValue", LAKESOUL_NON_PARTITION_TABLE_PART_DESC)
} else {
partitionDesc
}
val partitionDescList = if (partitionDesc == "-5") {
List.empty
} else {
DBUtil.parsePartitionDesc(partitionDesc).asScala.toList
}
committer.asInstanceOf[DelayedCommitProtocol].addOutputFile(partitionDescList, flushResult.map(_.getFilePath).toList)
})
}
statsTrackers.foreach(_.closeFile(currentWriter.path()))
} finally {
currentWriter = null
}
}
}

private def newOutputWriter(record: InternalRow): Unit = {
recordsInFile = 0
releaseResources()

val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
val suffix = if (bucketSpec.isDefined) {
val bucketIdStr = if (partitionId == -1) {
BucketingUtils.bucketIdToString(getBucketId(record))
val bucketId = if (partitionId == -1) {
getBucketId(record)
} else {
BucketingUtils.bucketIdToString(partitionId)
partitionId
}
taskAttemptContext.getConfiguration.set(HASH_BUCKET_ID_KEY, bucketId.toString)

val bucketIdStr = BucketingUtils.bucketIdToString(bucketId)
f"$bucketIdStr.c$fileCounter%03d" + ext
} else {
f"-c$fileCounter%03d" + ext
}

if (maxFileSize.isDefined) {
taskAttemptContext.getConfiguration.set(MAX_FILE_SIZE_KEY, maxFileSize.get)
}

val currentPath = committer.newTaskTempFile(
taskAttemptContext,
partValue,
suffix)
if (maxFileSize.isDefined) "" else suffix
)

currentWriter = description.outputWriterFactory.newInstance(
path = currentPath,
Expand Down Expand Up @@ -472,22 +522,14 @@ object LakeSoulFileWriter extends Logging {
options: Map[String, String],
customMetrics: Map[String, SQLMetric] = Map.empty)
extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) {

private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC)
.map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/"))

/** Given an input row, returns the corresponding `bucketId` */
protected lazy val getSrcPath: InternalRow => String = {
row => row.get(0, StringType).asInstanceOf[String]
}

override def write(record: InternalRow): Unit = {
val dstPath = committer.newTaskTempFile(
taskAttemptContext,
partValue,
getSrcPath(record))

statsTrackers.foreach(_.newFile(dstPath))
val path = record.get(0, StringType).toString
val currentPath = committer.newTaskTempFile(taskAttemptContext, partValue, path)
statsTrackers.foreach(_.newFile(currentPath))
logInfo("copy file")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.sources.{EqualTo, Filter, Not}
import org.apache.spark.sql.lakesoul._
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.{COMPACTION_TASK, SCAN_FILE_NUMBER_LIMIT}
import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo, TimestampFormatter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -412,15 +413,34 @@ case class OnePartitionMergeBucketScan(sparkSession: SparkSession,
val fileWithBucketId = groupByPartition.head._2
.groupBy(_.fileBucketId).map(f => (f._1, f._2.toArray))

val fileNumLimit = options.getOrDefault(SCAN_FILE_NUMBER_LIMIT.key, Int.MaxValue.toString).toInt
val isCompactionTask = options.getOrDefault(COMPACTION_TASK.key, COMPACTION_TASK.defaultValueString).toBoolean

Seq.tabulate(bucketNum) { bucketId =>
var files = fileWithBucketId.getOrElse(bucketId, Array.empty)
val isSingleFile = files.length == 1
var groupedFiles = if (fileNumLimit < Int.MaxValue && isCompactionTask) {
val groupedFiles = new ArrayBuffer[Array[MergePartitionedFile]]
for (i <- files.indices by fileNumLimit) {
groupedFiles += files.slice(i, i + fileNumLimit)
}
groupedFiles.toArray
} else {
Array(files)
}

if (!isSingleFile) {
val versionFiles = for (version <- files.indices) yield files(version).copy(writeVersion = version + 1)
files = versionFiles.toArray
var allPartitionIsSingleFile = true
var isSingleFile = false

for (index <- groupedFiles.indices) {
isSingleFile = groupedFiles(index).length == 1
if (!isSingleFile) {
val versionFiles = for (elem <- groupedFiles(index).indices) yield groupedFiles(index)(elem).copy(writeVersion = elem)
groupedFiles(index) = versionFiles.toArray
allPartitionIsSingleFile = false
}
}
MergeFilePartition(bucketId, Array(files), isSingleFile)

MergeFilePartition(bucketId, groupedFiles, allPartitionIsSingleFile)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class NativeParquetFileFormat extends FileFormat

if (options.getOrElse("isCompaction", "false").toBoolean &&
!options.getOrElse("isCDC", "false").toBoolean &&
!options.getOrElse("isBucketNumChanged", "false").toBoolean
!options.getOrElse("isBucketNumChanged", "false").toBoolean &&
options.contains("staticBucketId")
) {
new OutputWriterFactory {
override def newInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.apache.spark.sql.execution.datasources.v2.parquet

import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter.FlushResult
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.types.pojo.Schema
Expand All @@ -18,19 +19,29 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{GlutenUtils, NativeIOUtils}

import java.util
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.mutable

class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZoneId: String, context: TaskAttemptContext) extends OutputWriter {

val NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE: Int = SQLConf.get.getConf(LakeSoulSQLConf.NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE)

private var recordCount = 0

var flushResult: mutable.Map[String, util.List[FlushResult]] = mutable.Map.empty

val arrowSchema: Schema = ArrowUtils.toArrowSchema(dataSchema, timeZoneId)

protected val nativeIOWriter: NativeIOWriter = new NativeIOWriter(arrowSchema)

GlutenUtils.setArrowAllocator(nativeIOWriter)
nativeIOWriter.setRowGroupRowNumber(NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE)
nativeIOWriter.addFile(path)
if (path.endsWith(".parquet")) {
nativeIOWriter.addFile(path)
} else {
nativeIOWriter.withPrefix(path)
}

NativeIOUtils.setNativeIOOptions(nativeIOWriter, NativeIOUtils.getNativeIOOptions(context, new Path(path)))

Expand Down Expand Up @@ -59,7 +70,7 @@ class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZo
recordWriter.finish()

nativeIOWriter.write(root)
nativeIOWriter.flush()
flushResult = nativeIOWriter.flush().asScala

recordWriter.reset()
root.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,39 @@ class DelayedCommitProtocol(jobId: String,
}

override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val filename = getFileName(taskContext, ext)
val partitionValues = dir.map(parsePartitions).getOrElse(List.empty[(String, String)])
val unescapedDir = if (partitionValues.nonEmpty) {
Some(partitionValues.map(partitionValue => partitionValue._1 + "=" + partitionValue._2).mkString("/"))
} else {
dir
}
val relativePath = randomPrefixLength.map { prefixLength =>
getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
}.orElse {
// or else write into the partition unescaped directory if it is partitioned
if (ext.isEmpty) {
unescapedDir
}.map { subDir =>
new Path(subDir, filename)
}.getOrElse(new Path(filename)) // or directly write out to the output path

val absolutePath = new Path(path, relativePath).toUri.toString
//returns the absolute path to the file
addedFiles.append((partitionValues, absolutePath))
absolutePath
.map(new Path(path, _))
.getOrElse(new Path(path))
.toUri.toString
} else {
val filename = getFileName(taskContext, ext)

val relativePath = randomPrefixLength.map { prefixLength =>
getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
}.orElse {
// or else write into the partition unescaped directory if it is partitioned
unescapedDir
}.map { subDir =>
new Path(subDir, filename)
}.getOrElse(new Path(filename)) // or directly write out to the output path


val absolutePath = new Path(path, relativePath).toUri.toString
//returns the absolute path to the file
addedFiles.append((partitionValues, absolutePath))
absolutePath
}
}

def addOutputFile(partitionValues: List[(String, String)], files: List[String]): Unit = {
files.foreach(file => addedFiles.append((partitionValues, file)))
}

override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,24 @@ import scala.util.Random
/**
* Writes out the files to `path` and returns a list of them in `addedStatuses`.
*/
class DelayedCopyCommitProtocol(jobId: String,
class DelayedCopyCommitProtocol(srcFiles: Seq[DataFileInfo],
jobId: String,
dstPath: String,
randomPrefixLength: Option[Int])
extends DelayedCommitProtocol(jobId, dstPath, randomPrefixLength)
with Serializable with Logging {

@transient private var copyFiles: ArrayBuffer[(String, String)] = _

override def setupJob(jobContext: JobContext): Unit = {

}

override def abortJob(jobContext: JobContext): Unit = {
// TODO: Best effort cleanup
}
@transient private var addedFileInfo: ArrayBuffer[DataFileInfo] = _
@transient private var pathToFileInfo: Map[String, Seq[DataFileInfo]] = _

override def setupTask(taskContext: TaskAttemptContext): Unit = {
copyFiles = new ArrayBuffer[(String, String)]
addedFileInfo = new ArrayBuffer[DataFileInfo]
pathToFileInfo = srcFiles.groupBy(_.path)
}


override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val (srcCompactDir, srcBasePath) = splitCompactFilePath(ext)
copyFiles += dir.getOrElse("-5") -> ext
new Path(dstPath, srcBasePath).toString
addedFileInfo += pathToFileInfo(ext).head
ext
}

override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
Expand All @@ -56,16 +49,14 @@ class DelayedCopyCommitProtocol(jobId: String,
}

override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {

if (copyFiles.nonEmpty) {
val fs = new Path(copyFiles.head._2).getFileSystem(taskContext.getConfiguration)
val statuses = copyFiles.map { f =>
val (partitionDesc, srcPath) = f
val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcPath)
if (addedFileInfo.nonEmpty) {
val fs = new Path(addedFileInfo.head.path).getFileSystem(taskContext.getConfiguration)
val statuses = addedFileInfo.map { srcFile =>
val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcFile.path)
val dstFile = new Path(dstPath, srcBasePath)
FileUtil.copy(fs, new Path(srcPath), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration)
FileUtil.copy(fs, new Path(srcFile.path), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration)
val status = fs.getFileStatus(dstFile)
DataFileInfo(partitionDesc, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime)
DataFileInfo(srcFile.range_partitions, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime)
}

new TaskCommitMessage(statuses)
Expand Down
Loading
Loading