From f7a4d0e5d78f4a98c62691b412778ac9dadfd689 Mon Sep 17 00:00:00 2001 From: hw_syl_zenghua Date: Thu, 31 Oct 2024 19:51:11 +0800 Subject: [PATCH 1/3] merge paral_compact_editting into merge_main MR-title: compaction with file size condition in parallel Created-by: hw_syl_zenghua Author-id: 7155563 MR-id: 7278395 Commit-by: zenghua Merged-by: hw_syl_zenghua Description: merge "paral_compact_editting" into "merge_main" fix ci Signed-off-by: zenghua , compaction with file size condition in parallel Signed-off-by: zenghua See merge request: 42b369588d84469d95d7b738fc58da8e/LakeSoul/for-nanhang!7 --- .../datasources/LakeSoulFileWriter.scala | 75 ++++++++++----- .../v2/merge/MergeParquetScan.scala | 30 +++++- .../v2/parquet/NativeParquetFileFormat.scala | 3 +- .../parquet/NativeParquetOutputWriter.scala | 15 ++- .../sql/lakesoul/DelayedCommitProtocol.scala | 39 +++++--- .../lakesoul/DelayedCopyCommitProtocol.scala | 36 ++----- .../sql/lakesoul/TransactionalWrite.scala | 13 +-- .../catalog/LakeSoulScanBuilder.scala | 1 - .../lakesoul/commands/CompactionCommand.scala | 96 +++++++------------ .../lakesoul/sources/LakeSoulSQLConf.scala | 19 ++++ .../spark/sql/vectorized/NativeIOUtils.scala | 18 +++- .../lakesoul/commands/CompactionSuite.scala | 71 ++++++++------ .../lakesoul/lakesoul/io/NativeIOWriter.java | 9 ++ rust/lakesoul-io/src/lakesoul_io_config.rs | 4 + rust/lakesoul-io/src/lakesoul_writer.rs | 4 + 15 files changed, 258 insertions(+), 175 deletions(-) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala index 880a99649..da78e4b5b 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala @@ -34,14 +34,23 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf import org.apache.spark.sql.vectorized.ArrowFakeRowAdaptor import org.apache.spark.util.{SerializableConfiguration, Utils} import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_NON_PARTITION_TABLE_PART_DESC, LAKESOUL_RANGE_PARTITION_SPLITTER} +import com.dmetasoul.lakesoul.meta.DBUtil import com.dmetasoul.lakesoul.spark.clean.CleanOldCompaction.splitCompactFilePath -import org.apache.spark.sql.lakesoul.DelayedCopyCommitProtocol +import org.apache.spark.sql.execution.datasources.v2.parquet.{NativeParquetCompactionColumnarOutputWriter, NativeParquetOutputWriter} +import org.apache.spark.sql.lakesoul.{DelayedCommitProtocol, DelayedCopyCommitProtocol} import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType} import java.util.{Date, UUID} +import scala.collection.JavaConverters.mapAsScalaMapConverter +import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable` /** A helper object for writing FileFormat data out to a location. */ object LakeSoulFileWriter extends Logging { + val MAX_FILE_SIZE_KEY = "max_file_size" + val HASH_BUCKET_ID_KEY = "hash_bucket_id" + val SNAPPY_COMPRESS_RATIO = 3 + val COPY_FILE_WRITER_KEY = "copy_file_writer" + /** * Basic work flow of this command is: * 1. Driver side setup, including output committer initialization and data source specific @@ -178,7 +187,11 @@ object LakeSoulFileWriter extends Logging { val nativeIOEnable = sparkSession.sessionState.conf.getConf(LakeSoulSQLConf.NATIVE_IO_ENABLE) def nativeWrap(plan: SparkPlan): RDD[InternalRow] = { - if (isCompaction && !isCDC && !isBucketNumChanged && nativeIOEnable) { + if (isCompaction + && staticBucketId != -1 + && !isCDC + && !isBucketNumChanged + && nativeIOEnable) { plan match { case withPartitionAndOrdering(_, _, child) => return nativeWrap(child) @@ -203,8 +216,8 @@ object LakeSoulFileWriter extends Logging { try { // for compaction, we won't break ordering from batch scan val (rdd, concurrentOutputWriterSpec) = - if (isCompaction && options.getOrElse("copyCompactedFile", "").nonEmpty) { - val data = Seq(InternalRow(options("copyCompactedFile"))) + if (isCompaction && options.getOrElse(COPY_FILE_WRITER_KEY, "false").toBoolean) { + val data = Seq(InternalRow(COPY_FILE_WRITER_KEY)) (sparkSession.sparkContext.parallelize(data), None) } else if (!isBucketNumChanged && (orderingMatched || isCompaction)) { (nativeWrap(empty2NullPlan), None) @@ -410,6 +423,7 @@ object LakeSoulFileWriter extends Logging { private var recordsInFile: Long = _ private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC) .map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/")) + private val maxFileSize = options.get(MAX_FILE_SIZE_KEY) /** Given an input row, returns the corresponding `bucketId` */ protected lazy val getBucketId: InternalRow => Int = { @@ -419,26 +433,56 @@ object LakeSoulFileWriter extends Logging { row => proj(row).getInt(0) } + override protected def releaseCurrentWriter(): Unit = { + if (currentWriter != null) { + try { + currentWriter.close() + if (maxFileSize.isDefined) { + currentWriter.asInstanceOf[NativeParquetOutputWriter].flushResult.foreach(result => { + val (partitionDesc, flushResult) = result + val partitionDescList = if (partitionDesc == "-4") { + DBUtil.parsePartitionDesc(options.getOrElse("partValue", LAKESOUL_NON_PARTITION_TABLE_PART_DESC)).asScala.toList + } else { + DBUtil.parsePartitionDesc(partitionDesc).asScala.toList + } + committer.asInstanceOf[DelayedCommitProtocol].addOutputFile(partitionDescList, flushResult.map(_.getFilePath).toList) + }) + } + statsTrackers.foreach(_.closeFile(currentWriter.path())) + } finally { + currentWriter = null + } + } + } + private def newOutputWriter(record: InternalRow): Unit = { recordsInFile = 0 releaseResources() val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) val suffix = if (bucketSpec.isDefined) { - val bucketIdStr = if (partitionId == -1) { - BucketingUtils.bucketIdToString(getBucketId(record)) + val bucketId = if (partitionId == -1) { + getBucketId(record) } else { - BucketingUtils.bucketIdToString(partitionId) + partitionId } + taskAttemptContext.getConfiguration.set(HASH_BUCKET_ID_KEY, bucketId.toString) + + val bucketIdStr = BucketingUtils.bucketIdToString(bucketId) f"$bucketIdStr.c$fileCounter%03d" + ext } else { f"-c$fileCounter%03d" + ext } + if (maxFileSize.isDefined) { + taskAttemptContext.getConfiguration.set(MAX_FILE_SIZE_KEY, maxFileSize.get) + } + val currentPath = committer.newTaskTempFile( taskAttemptContext, partValue, - suffix) + if (maxFileSize.isDefined) "" else suffix + ) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, @@ -473,21 +517,8 @@ object LakeSoulFileWriter extends Logging { customMetrics: Map[String, SQLMetric] = Map.empty) extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { - private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC) - .map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/")) - - /** Given an input row, returns the corresponding `bucketId` */ - protected lazy val getSrcPath: InternalRow => String = { - row => row.get(0, StringType).asInstanceOf[String] - } - override def write(record: InternalRow): Unit = { - val dstPath = committer.newTaskTempFile( - taskAttemptContext, - partValue, - getSrcPath(record)) - - statsTrackers.foreach(_.newFile(dstPath)) + logInfo("copy file") } } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala index 5137c2007..0f339ea0d 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.sources.{EqualTo, Filter, Not} import org.apache.spark.sql.lakesoul._ import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf +import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.{COMPACTION_TASK, SCAN_FILE_NUMBER_LIMIT} import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo, TimestampFormatter} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -412,15 +413,34 @@ case class OnePartitionMergeBucketScan(sparkSession: SparkSession, val fileWithBucketId = groupByPartition.head._2 .groupBy(_.fileBucketId).map(f => (f._1, f._2.toArray)) + val fileNumLimit = options.getOrDefault(SCAN_FILE_NUMBER_LIMIT.key, Int.MaxValue.toString).toInt + val isCompactionTask = options.getOrDefault(COMPACTION_TASK.key, COMPACTION_TASK.defaultValueString).toBoolean + Seq.tabulate(bucketNum) { bucketId => var files = fileWithBucketId.getOrElse(bucketId, Array.empty) - val isSingleFile = files.length == 1 + var groupedFiles = if (fileNumLimit < Int.MaxValue && isCompactionTask) { + val groupedFiles = new ArrayBuffer[Array[MergePartitionedFile]] + for (i <- files.indices by fileNumLimit) { + groupedFiles += files.slice(i, i + fileNumLimit) + } + groupedFiles.toArray + } else { + Array(files) + } - if (!isSingleFile) { - val versionFiles = for (version <- files.indices) yield files(version).copy(writeVersion = version + 1) - files = versionFiles.toArray + var allPartitionIsSingleFile = true + var isSingleFile = false + + for (index <- groupedFiles.indices) { + isSingleFile = groupedFiles(index).length == 1 + if (!isSingleFile) { + val versionFiles = for (elem <- groupedFiles(index).indices) yield groupedFiles(index)(elem).copy(writeVersion = elem) + groupedFiles(index) = versionFiles.toArray + allPartitionIsSingleFile = false + } } - MergeFilePartition(bucketId, Array(files), isSingleFile) + + MergeFilePartition(bucketId, groupedFiles, allPartitionIsSingleFile) } } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala index e47044cb5..bc817a9a2 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala @@ -31,7 +31,8 @@ class NativeParquetFileFormat extends FileFormat if (options.getOrElse("isCompaction", "false").toBoolean && !options.getOrElse("isCDC", "false").toBoolean && - !options.getOrElse("isBucketNumChanged", "false").toBoolean + !options.getOrElse("isBucketNumChanged", "false").toBoolean && + options.contains("staticBucketId") ) { new OutputWriterFactory { override def newInstance( diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala index c4d599ef8..9cf9f9139 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala @@ -5,6 +5,7 @@ package org.apache.spark.sql.execution.datasources.v2.parquet import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter +import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter.FlushResult import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.types.pojo.Schema @@ -18,19 +19,29 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.{GlutenUtils, NativeIOUtils} +import java.util +import scala.collection.JavaConverters.mapAsScalaMapConverter +import scala.collection.mutable + class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZoneId: String, context: TaskAttemptContext) extends OutputWriter { val NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE: Int = SQLConf.get.getConf(LakeSoulSQLConf.NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE) private var recordCount = 0 + var flushResult: mutable.Map[String, util.List[FlushResult]] = mutable.Map.empty + val arrowSchema: Schema = ArrowUtils.toArrowSchema(dataSchema, timeZoneId) protected val nativeIOWriter: NativeIOWriter = new NativeIOWriter(arrowSchema) GlutenUtils.setArrowAllocator(nativeIOWriter) nativeIOWriter.setRowGroupRowNumber(NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE) - nativeIOWriter.addFile(path) + if (path.endsWith(".parquet")) { + nativeIOWriter.addFile(path) + } else { + nativeIOWriter.withPrefix(path) + } NativeIOUtils.setNativeIOOptions(nativeIOWriter, NativeIOUtils.getNativeIOOptions(context, new Path(path))) @@ -59,7 +70,7 @@ class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZo recordWriter.finish() nativeIOWriter.write(root) - nativeIOWriter.flush() + flushResult = nativeIOWriter.flush().asScala recordWriter.reset() root.close() diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala index 086012cf0..0c82d0215 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala @@ -90,26 +90,39 @@ class DelayedCommitProtocol(jobId: String, } override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - val filename = getFileName(taskContext, ext) val partitionValues = dir.map(parsePartitions).getOrElse(List.empty[(String, String)]) val unescapedDir = if (partitionValues.nonEmpty) { Some(partitionValues.map(partitionValue => partitionValue._1 + "=" + partitionValue._2).mkString("/")) } else { dir } - val relativePath = randomPrefixLength.map { prefixLength => - getRandomPrefix(prefixLength) // Generate a random prefix as a first choice - }.orElse { - // or else write into the partition unescaped directory if it is partitioned + if (ext.isEmpty) { unescapedDir - }.map { subDir => - new Path(subDir, filename) - }.getOrElse(new Path(filename)) // or directly write out to the output path - - val absolutePath = new Path(path, relativePath).toUri.toString - //returns the absolute path to the file - addedFiles.append((partitionValues, absolutePath)) - absolutePath + .map(new Path(path, _)) + .getOrElse(new Path(path)) + .toUri.toString + } else { + val filename = getFileName(taskContext, ext) + + val relativePath = randomPrefixLength.map { prefixLength => + getRandomPrefix(prefixLength) // Generate a random prefix as a first choice + }.orElse { + // or else write into the partition unescaped directory if it is partitioned + unescapedDir + }.map { subDir => + new Path(subDir, filename) + }.getOrElse(new Path(filename)) // or directly write out to the output path + + + val absolutePath = new Path(path, relativePath).toUri.toString + //returns the absolute path to the file + addedFiles.append((partitionValues, absolutePath)) + absolutePath + } + } + + def addOutputFile(partitionValues: List[(String, String)], files: List[String]): Unit = { + files.foreach(file => addedFiles.append((partitionValues, file))) } override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala index b46be810e..4284f3026 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala @@ -23,31 +23,16 @@ import scala.util.Random /** * Writes out the files to `path` and returns a list of them in `addedStatuses`. */ -class DelayedCopyCommitProtocol(jobId: String, +class DelayedCopyCommitProtocol(srcFiles: Seq[DataFileInfo], + jobId: String, dstPath: String, randomPrefixLength: Option[Int]) extends DelayedCommitProtocol(jobId, dstPath, randomPrefixLength) with Serializable with Logging { - @transient private var copyFiles: ArrayBuffer[(String, String)] = _ - - override def setupJob(jobContext: JobContext): Unit = { - - } - - override def abortJob(jobContext: JobContext): Unit = { - // TODO: Best effort cleanup - } - - override def setupTask(taskContext: TaskAttemptContext): Unit = { - copyFiles = new ArrayBuffer[(String, String)] - } - - override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - val (srcCompactDir, srcBasePath) = splitCompactFilePath(ext) - copyFiles += dir.getOrElse("-5") -> ext - new Path(dstPath, srcBasePath).toString + throw new UnsupportedOperationException( + s"$this does not support adding files with an absolute path") } override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { @@ -57,15 +42,14 @@ class DelayedCopyCommitProtocol(jobId: String, override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { - if (copyFiles.nonEmpty) { - val fs = new Path(copyFiles.head._2).getFileSystem(taskContext.getConfiguration) - val statuses = copyFiles.map { f => - val (partitionDesc, srcPath) = f - val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcPath) + if (srcFiles.nonEmpty) { + val fs = new Path(srcFiles.head.path).getFileSystem(taskContext.getConfiguration) + val statuses = srcFiles.map { srcFile => + val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcFile.path) val dstFile = new Path(dstPath, srcBasePath) - FileUtil.copy(fs, new Path(srcPath), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration) + FileUtil.copy(fs, new Path(srcFile.path), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration) val status = fs.getFileStatus(dstFile) - DataFileInfo(partitionDesc, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime) + DataFileInfo(srcFile.range_partitions, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime) } new TaskCommitMessage(statuses) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala index 455e652b8..66109a1d3 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala @@ -12,6 +12,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.COPY_FILE_WRITER_KEY import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, LakeSoulFileWriter, WriteJobStatsTracker} import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} import org.apache.spark.sql.functions.{col, when} @@ -103,7 +104,8 @@ trait TransactionalWrite { */ def writeFiles(oriData: Dataset[_], writeOptions: Option[LakeSoulOptions], - isCompaction: Boolean): (Seq[DataFileInfo], Path) = { + isCompaction: Boolean, + copyCompactedFile: Seq[DataFileInfo] = Seq.empty): (Seq[DataFileInfo], Path) = { val spark = oriData.sparkSession // LakeSoul always writes timestamp data with timezone=UTC spark.conf.set("spark.sql.session.timeZone", "UTC") @@ -160,7 +162,7 @@ trait TransactionalWrite { options.put("isBucketNumChanged", "false") } val cdcCol = snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey) - if (cdcCol.nonEmpty) { + if (cdcCol.nonEmpty && copyCompactedFile.isEmpty) { options.put("isCDC", "true") val cdcColName = cdcCol.get if (writeOptions.forall(_.options.getOrElse("fullCompaction", "true").equals("true"))) { @@ -214,10 +216,9 @@ trait TransactionalWrite { output.length < data.schema.size) } - val committer = if (writeOptions.exists(_.options.getOrElse("copyCompactedFile", "").nonEmpty)) { - val srcPath = writeOptions.get.options.get("copyCompactedFile") - options.put("copyCompactedFile", srcPath.get) - new DelayedCopyCommitProtocol("lakesoul", outputPath.toString, None) + val committer = if (copyCompactedFile.nonEmpty) { + options.put(COPY_FILE_WRITER_KEY, "true") + new DelayedCopyCommitProtocol(copyCompactedFile, "lakesoul", outputPath.toString, None) } else { getCommitter(outputPath) } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala index eb47fa81d..830371192 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala @@ -116,7 +116,6 @@ case class LakeSoulScanBuilder(sparkSession: SparkSession, } val writableOptions = mutable.Map.empty[String, String] ++ options.asScala if (fileIndex.snapshotManagement.snapshot.getPartitionInfoArray.forall(p => p.commit_op.equals("CompactionCommit"))) { - println(s"set NATIVE_IO_IS_COMPACTED with ${fileIndex.snapshotManagement.snapshot.getPartitionInfoArray.mkString("Array(", ", ", ")")}") writableOptions.put(NATIVE_IO_IS_COMPACTED.key, "true") } val updatedOptions = new CaseInsensitiveStringMap(writableOptions.asJava) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala index f74a35bf5..b0df3475c 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala @@ -14,13 +14,14 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.{MAX_FILE_SIZE_KEY, SNAPPY_COMPRESS_RATIO} import org.apache.spark.sql.execution.datasources.v2.merge.MergeDeltaParquetScan import org.apache.spark.sql.execution.datasources.v2.parquet.{NativeParquetScan, ParquetScan} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} -import org.apache.spark.sql.functions.{expr, forall} +import org.apache.spark.sql.functions.expr import org.apache.spark.sql.lakesoul.catalog.LakeSoulTableV2 import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors -import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.RENAME_COMPACTED_FILE +import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.{COMPACTION_TASK, SCAN_FILE_NUMBER_LIMIT} import org.apache.spark.sql.lakesoul.utils.TableInfo import org.apache.spark.sql.lakesoul.{BatchDataSoulFileIndexV2, LakeSoulOptions, SnapshotManagement, TransactionCommit} import org.apache.spark.sql.types.StructType @@ -29,9 +30,9 @@ import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.util.Utils import java.util.UUID +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.collection.JavaConverters._ case class CompactionCommand(snapshotManagement: SnapshotManagement, conditionString: String, @@ -69,8 +70,7 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, files: Seq[DataFileInfo], readPartitionInfo: Array[PartitionInfoScala], compactionPath: String, - fullCompaction: Boolean, - copyCompactedFile: String = ""): List[DataCommitInfo] = { + copySrcFiles: Boolean = false): List[DataCommitInfo] = { if (newBucketNum.isEmpty && readPartitionInfo.forall(p => p.commit_op.equals("CompactionCommit") && p.read_files.length == 1)) { logInfo("=========== All Partitions Have Compacted, This Operation Will Cancel!!! ===========") return List.empty @@ -85,7 +85,11 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, Option(mergeOperatorInfo) ) val option = new CaseInsensitiveStringMap( - Map("basePath" -> tc.tableInfo.table_path_s.get, "isCompaction" -> "true").asJava) + Map("basePath" -> tc.tableInfo.table_path_s.get, + "isCompaction" -> "true", + SCAN_FILE_NUMBER_LIMIT.key -> fileNumLimit.getOrElse(Int.MaxValue).toString, + COMPACTION_TASK.key -> "true" + ).asJava) val partitionNames = readPartitionInfo.head.range_value.split(',').map(p => { p.split('=').head @@ -123,13 +127,19 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, val map = mutable.HashMap[String, String]() map.put("isCompaction", "true") map.put("compactionPath", compactionPath) - map.put("fullCompaction", fullCompaction.toString) - if (copyCompactedFile.nonEmpty) { - map.put("copyCompactedFile", copyCompactedFile) + + val copyCompactedFiles = if (copySrcFiles) { + files + } else { + Seq.empty } if (readPartitionInfo.nonEmpty) { map.put("partValue", readPartitionInfo.head.range_value) } + if (fileSizeLimit.isDefined) { + map.put("fullCompaction", "false") + map.put(MAX_FILE_SIZE_KEY, (fileSizeLimit.get * SNAPPY_COMPRESS_RATIO).toString) + } if (bucketNumChanged) { map.put("newBucketNum", newBucketNum.get.toString) } else if (tableInfo.hash_partition_columns.nonEmpty) { @@ -140,7 +150,7 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, } logInfo(s"write CompactData with Option=$map") - val (newFiles, path) = tc.writeFiles(compactDF, Some(new LakeSoulOptions(map.toMap, spark.sessionState.conf)), isCompaction = true) + val (newFiles, path) = tc.writeFiles(compactDF, Some(new LakeSoulOptions(map.toMap, spark.sessionState.conf)), isCompaction = true, copyCompactedFiles) tc.createDataCommitInfo(newFiles, Seq.empty, "", -1)._1 } @@ -195,63 +205,21 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, def compactSinglePartition(sparkSession: SparkSession, tc: TransactionCommit, files: Seq[DataFileInfo], sourcePartition: PartitionInfoScala): String = { logInfo(s"Compacting Single Partition=${sourcePartition} with ${files.length} files") - val bucketedFiles = if (tableInfo.hash_partition_columns.isEmpty || bucketNumChanged) { - Seq(-1 -> files) + val (copyFiles, scanFiles) = if (fileSizeLimit.isEmpty || bucketNumChanged || force) { + (Seq.empty, files) } else { - files.groupBy(_.file_bucket_id) + files.splitAt(files.indexWhere(_.size < fileSizeLimit.get * 0.5)) } - val compactionPath = newCompactPath - val allDataCommitInfo = bucketedFiles.flatMap(groupByBucketId => { - val (bucketId, files) = groupByBucketId - val groupedFiles = if (fileNumLimit.isDefined || fileSizeLimit.isDefined) { - val groupedFiles = new ArrayBuffer[Seq[DataFileInfo]] - var groupHead = 0 - var groupSize = 0L - var groupFileCount = 0 - for (i <- files.indices) { - // each group contains at least one file - if (i == groupHead) { - groupSize += files(i).size - groupFileCount += 1 - } else if (fileSizeLimit.exists(groupSize + files(i).size > _) || fileNumLimit.exists(groupFileCount + 1 > _)) { - // if the file size limit is reached, or the file count limit is reached, we need to start a new group - groupedFiles += files.slice(groupHead, i) - groupHead = i - groupSize = files(i).size - groupFileCount = 1 - } else { - // otherwise, we add the file to the current group - groupSize += files(i).size - groupFileCount += 1 - } - } - // add the last group to the groupedFiles - groupedFiles += files.slice(groupHead, files.length) - groupedFiles - } else { - Seq(files) - } - val fullCompaction = groupedFiles.size == 1 - groupedFiles.flatMap(files => { - lazy val incrementFiles = if (force || newBucketNum.isDefined) { - false - } else { - files.size == 1 && splitCompactFilePath(files.head.path)._1.nonEmpty - } - if (!incrementFiles) { - executeCompaction(sparkSession, tc, files, Array(sourcePartition), compactionPath, fullCompaction) - } else { - logInfo(s"== Partition ${sourcePartition.range_value} has no increment file.") - val origCompactedFile = files.head - if (sparkSession.sessionState.conf.getConf(RENAME_COMPACTED_FILE)) { - renameOldCompactedFile(tc, origCompactedFile, sourcePartition.range_value, compactionPath) - } else { - executeCompaction(sparkSession, tc, files, Array(sourcePartition), compactionPath, fullCompaction, origCompactedFile.path) - } - } - }) - }) + + val compactionPath = newCompactPath + val allDataCommitInfo = new ArrayBuffer[DataCommitInfo] + if (copyFiles.nonEmpty) { + allDataCommitInfo ++= executeCompaction(sparkSession, tc, copyFiles, Array(sourcePartition), compactionPath, true) + } + if (scanFiles.nonEmpty) { + allDataCommitInfo ++= executeCompaction(sparkSession, tc, scanFiles, Array(sourcePartition), compactionPath) + } if (allDataCommitInfo.nonEmpty) { val compactDataCommitInfoId = UUID.randomUUID diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala index 9bdc4893e..49e50b431 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala @@ -167,4 +167,23 @@ object LakeSoulSQLConf { """.stripMargin) .booleanConf .createWithDefault(false) + + val SCAN_FILE_NUMBER_LIMIT: ConfigEntry[Int] = + buildConf("scan.file.number.limit") + .doc( + """ + |If SCAN_FILE_NUMBER_LIMIT < Int.MaxValue, Scan will scan file with number less than SCAN_FILE_NUMBER_LIMIT per file group + """.stripMargin) + .intConf + .createWithDefault(Int.MaxValue) + + + val COMPACTION_TASK: ConfigEntry[Boolean] = + buildConf("scan.file.size.limit") + .doc( + """ + |If SCAN_FILE_NUMBER_LIMIT < Int.MaxValue, Scan will scan file with number less than SCAN_FILE_NUMBER_LIMIT per file group + """.stripMargin) + .booleanConf + .createWithDefault(false) } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala index 1f550ef7f..c01e7a61f 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala @@ -15,6 +15,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.{HASH_BUCKET_ID_KEY, MAX_FILE_SIZE_KEY} import org.apache.spark.sql.execution.datasources.parquet.{ParquetReadSupport, ParquetWriteSupport} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -32,10 +33,11 @@ class NativeIOOptions(val s3Bucket: String, val s3Region: String, val fsUser: String, val defaultFS: String, - val virtual_path_style: Boolean + val virtual_path_style: Boolean, + val others: Map[String, String] = Map.empty ) -object NativeIOUtils{ +object NativeIOUtils { def asArrayColumnVector(vectorSchemaRoot: VectorSchemaRoot): Array[ColumnVector] = { asScalaIteratorConverter(vectorSchemaRoot.getFieldVectors.iterator()) @@ -62,6 +64,13 @@ object NativeIOUtils{ var defaultFS = taskAttemptContext.getConfiguration.get("fs.defaultFS") if (defaultFS == null) defaultFS = taskAttemptContext.getConfiguration.get("fs.default.name") val fileSystem = file.getFileSystem(taskAttemptContext.getConfiguration) + var otherOptions = Map[String, String]() + if (taskAttemptContext.getConfiguration.get(HASH_BUCKET_ID_KEY, "").nonEmpty) { + otherOptions += HASH_BUCKET_ID_KEY -> taskAttemptContext.getConfiguration.get(HASH_BUCKET_ID_KEY) + } + if (taskAttemptContext.getConfiguration.get(MAX_FILE_SIZE_KEY, "").nonEmpty) { + otherOptions += MAX_FILE_SIZE_KEY -> taskAttemptContext.getConfiguration.get(MAX_FILE_SIZE_KEY) + } if (hasS3AFileSystemClass) { fileSystem match { case s3aFileSystem: S3AFileSystem => @@ -71,11 +80,11 @@ object NativeIOUtils{ val s3aAccessKey = taskAttemptContext.getConfiguration.get("fs.s3a.access.key") val s3aSecretKey = taskAttemptContext.getConfiguration.get("fs.s3a.secret.key") val virtualPathStyle = taskAttemptContext.getConfiguration.getBoolean("fs.s3a.path.style.access", false) - return new NativeIOOptions(awsS3Bucket, s3aAccessKey, s3aSecretKey, s3aEndpoint, s3aRegion, user, defaultFS, virtualPathStyle) + return new NativeIOOptions(awsS3Bucket, s3aAccessKey, s3aSecretKey, s3aEndpoint, s3aRegion, user, defaultFS, virtualPathStyle, otherOptions) case _ => } } - new NativeIOOptions(null, null, null, null, null, user, defaultFS, false) + new NativeIOOptions(null, null, null, null, null, user, defaultFS, false, otherOptions) } def setNativeIOOptions(nativeIO: NativeIOBase, options: NativeIOOptions): Unit = { @@ -89,6 +98,7 @@ object NativeIOUtils{ options.defaultFS, options.virtual_path_style ) + options.others.foreach(options => nativeIO.setOption(options._1, options._2)) } def setParquetConfigurations(sparkSession: SparkSession, hadoopConf: Configuration, readDataSchema: StructType): Unit = { diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala index 810f10db9..cfad5989b 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala @@ -547,29 +547,30 @@ class CompactionSuite extends QueryTest // Get initial PartitionInfo count val initialFileCount = getFileList(tablePath).length - println(s"before compact initialPartitionInfoCount=$initialFileCount") + println(s"before ${c}th time compact file count=$initialFileCount") lakeSoulTable.toDF.show // Perform limited compaction (group every compactGroupSize PartitionInfo) lakeSoulTable.compaction(fileNumLimit = Some(compactGroupSize)) // Get PartitionInfo count after compaction - val compactedFileCount = getFileList(tablePath).length + val compactedFileList = getFileList(tablePath) + val compactedFileCount = compactedFileList.length - println(s"after compact compactedPartitionInfoCount=$compactedFileCount") + println(s"after ${c}th time compact file count=$compactedFileCount") lakeSoulTable.toDF.show // Verify results - assert(compactedFileCount < initialFileCount, - s"Compaction should reduce the number of files, but it changed from ${initialFileCount} to $compactedFileCount") - + assert(compactedFileCount <= hashBucketNum, + s"Compaction should have hashBucketNum files, but it has $compactedFileCount") - assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, - s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") - assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, - s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") + // assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, + // s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") + // + // assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, + // s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") } // Verify data integrity @@ -648,15 +649,15 @@ class CompactionSuite extends QueryTest lakeSoulTable.toDF.show // Verify results - assert(compactedFileCount < initialFileCount, - s"Compaction should reduce the number of files, but it changed from ${initialFileCount} to $compactedFileCount") + assert(compactedFileCount <= hashBucketNum, + s"Compaction should have hashBucketNum files, but it has $compactedFileCount") - assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, - s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") - - assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, - s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") + // assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, + // s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") + // + // assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, + // s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") } // Verify data integrity @@ -713,23 +714,26 @@ class CompactionSuite extends QueryTest // Get initial PartitionInfo count val initialMaxFileSize = getFileList(tablePath).map(_.size).max - println(s"before compact initialMaxFileSize=$initialMaxFileSize") - - // Perform limited compaction (group every compactGroupSize PartitionInfo) + println(s"before ${c}th compact initialMaxFileSize=$initialMaxFileSize") LakeSoulTable.uncached(tablePath) - lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + spark.time({ + // Perform limited compaction (group every compactGroupSize PartitionInfo) + lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + // lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = true) + // lakeSoulTable.compaction() + }) // Get PartitionInfo count after compaction val compactedFiles = getFileList(tablePath) val compactedFileMax = compactedFiles.map(_.size).max - println(s"after compact compactedFileMax=$compactedFileMax") + println(s"after ${c}th compact compactedFileMax=$compactedFileMax") // Verify results - assert(compactedFileMax >= initialMaxFileSize, - s"Compaction should reduce the number of files, but it changed from ${initialMaxFileSize} to $compactedFileMax") + // assert(compactedFileMax >= initialMaxFileSize, + // s"Compaction should increase the max size of files, but it changed from ${initialMaxFileSize} to $compactedFileMax") - assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.2, + assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.1, s"Compaction should produce file with upper-bounded size, but there is a larger ${compactedFileMax} file size") val (compactDir, _) = splitCompactFilePath(compactedFiles.head.path) @@ -755,7 +759,7 @@ class CompactionSuite extends QueryTest val hashBucketNum = 4 val compactRounds = 5 val upsertPerRounds = 10 - val rowsPerUpsert = 1002 + val rowsPerUpsert = 1000 val compactFileSize = "10KB" // Create test data @@ -799,23 +803,26 @@ class CompactionSuite extends QueryTest // Get initial PartitionInfo count val initialMaxFileSize = getFileList(tablePath).map(_.size).max - println(s"before compact initialMaxFileSize=$initialMaxFileSize") + println(s"before ${c}th compact initialMaxFileSize=$initialMaxFileSize") // Perform limited compaction (group every compactGroupSize PartitionInfo) LakeSoulTable.uncached(tablePath) - lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + spark.time({ + lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + // lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = true) + }) // Get PartitionInfo count after compaction val compactedFiles = getFileList(tablePath) val compactedFileMax = compactedFiles.map(_.size).max - println(s"after compact compactedFileMax=$compactedFileMax") + println(s"after ${c}th compact compactedFileMax=$compactedFileMax") // Verify results // assert(compactedFileMax >= initialMaxFileSize, // s"Compaction should reduce the number of files, but it changed from ${initialMaxFileSize} to $compactedFileMax") - assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.2, + assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.1, s"Compaction should produce file with upper-bounded size, but there is a larger ${compactedFileMax} file size") val (compactDir, _) = splitCompactFilePath(compactedFiles.head.path) @@ -825,7 +832,9 @@ class CompactionSuite extends QueryTest // Verify data integrity LakeSoulTable.uncached(tablePath) - val compactedData = lakeSoulTable.toDF.orderBy("id", "date").collect() + val finalData = lakeSoulTable.toDF.orderBy("id", "date") + // println(finalData.queryExecution) + val compactedData = finalData.collect() // println(compactedData.mkString("Array(", ", ", ")")) assert(compactedData.length == 6 + rowsPerUpsert * upsertPerRounds * compactRounds / 2, s"The compressed data should have ${6 + rowsPerUpsert * upsertPerRounds * compactRounds / 2} rows, but it actually has ${compactedData.length} rows") diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java index 5af7f9023..7a30b369a 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java @@ -166,6 +166,15 @@ public String getFilePath() { public String getFileExistCols() { return fileExistCols; } + + @Override + public String toString() { + return "FlushResult{" + + "filePath='" + filePath + '\'' + + ", fileSize=" + fileSize + + ", fileExistCols='" + fileExistCols + '\'' + + '}'; + } } public static FlushResult decodeFlushResult(String encoded) { diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index 4230e6cc7..b6dde9c0f 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -173,6 +173,10 @@ impl LakeSoulIOConfig { self.option(OPTION_KEY_MEM_LIMIT).map(|x| x.parse().unwrap()) } + pub fn max_file_size_option(&self) -> Option { + self.option(OPTION_KEY_MAX_FILE_SIZE).map(|x| x.parse().unwrap()) + } + pub fn pool_size(&self) -> Option { self.option(OPTION_KEY_POOL_SIZE).map(|x| x.parse().unwrap()) } diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index 370a461d4..3d034c59b 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -41,6 +41,10 @@ impl SyncSendableMutableLakeSoulWriter { let writer = Self::create_writer(writer_config).await?; let schema = writer.schema(); + if let Some(max_file_size) = config.max_file_size_option() { + config.max_file_size = Some(max_file_size); + } + if let Some(mem_limit) = config.mem_limit() { if config.use_dynamic_partition { config.max_file_size = Some((mem_limit as f64 * 0.15) as u64); From a941323e406926ced0465cef327137bcab30e3f2 Mon Sep 17 00:00:00 2001 From: zenghua Date: Fri, 8 Nov 2024 10:19:33 +0800 Subject: [PATCH 2/3] merge_on_read is required for scaning partial compacted data Signed-off-by: zenghua --- rust/lakesoul-io/src/lakesoul_io_config.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index b6dde9c0f..99bcbf956 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -190,7 +190,8 @@ impl LakeSoulIOConfig { } pub fn is_compacted(&self) -> bool { - self.option(OPTION_KEY_IS_COMPACTED).map_or(false, |x| x.eq("true")) + // self.option(OPTION_KEY_IS_COMPACTED).map_or(false, |x| x.eq("true")) + false } } From e856d0078ae6b3325f2d7076004e092a062f76cd Mon Sep 17 00:00:00 2001 From: zenghua Date: Thu, 14 Nov 2024 18:32:37 +0800 Subject: [PATCH 3/3] fix compaction with no partition colum Signed-off-by: zenghua --- .../datasources/LakeSoulFileWriter.scala | 21 ++- .../lakesoul/DelayedCopyCommitProtocol.scala | 19 ++- .../sql/lakesoul/TransactionalWrite.scala | 4 +- .../lakesoul/commands/CompactionCommand.scala | 2 +- .../lakesoul/commands/CompactionSuite.scala | 153 ++++++++++++++++++ .../local/LakeSoulLocalJavaWriter.java | 3 +- 6 files changed, 187 insertions(+), 15 deletions(-) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala index da78e4b5b..8a7b25345 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala @@ -50,6 +50,7 @@ object LakeSoulFileWriter extends Logging { val HASH_BUCKET_ID_KEY = "hash_bucket_id" val SNAPPY_COMPRESS_RATIO = 3 val COPY_FILE_WRITER_KEY = "copy_file_writer" + val COPY_FILE_WRITER_SPLITTER = "\u0001" /** * Basic work flow of this command is: @@ -216,8 +217,8 @@ object LakeSoulFileWriter extends Logging { try { // for compaction, we won't break ordering from batch scan val (rdd, concurrentOutputWriterSpec) = - if (isCompaction && options.getOrElse(COPY_FILE_WRITER_KEY, "false").toBoolean) { - val data = Seq(InternalRow(COPY_FILE_WRITER_KEY)) + if (isCompaction && options.getOrElse(COPY_FILE_WRITER_KEY, "").nonEmpty) { + val data = options(COPY_FILE_WRITER_KEY).split(COPY_FILE_WRITER_SPLITTER).map(InternalRow(_)).toSeq (sparkSession.sparkContext.parallelize(data), None) } else if (!isBucketNumChanged && (orderingMatched || isCompaction)) { (nativeWrap(empty2NullPlan), None) @@ -439,9 +440,14 @@ object LakeSoulFileWriter extends Logging { currentWriter.close() if (maxFileSize.isDefined) { currentWriter.asInstanceOf[NativeParquetOutputWriter].flushResult.foreach(result => { - val (partitionDesc, flushResult) = result - val partitionDescList = if (partitionDesc == "-4") { - DBUtil.parsePartitionDesc(options.getOrElse("partValue", LAKESOUL_NON_PARTITION_TABLE_PART_DESC)).asScala.toList + var (partitionDesc, flushResult) = result + partitionDesc = if (partitionDesc == "-4") { + options.getOrElse("partValue", LAKESOUL_NON_PARTITION_TABLE_PART_DESC) + } else { + partitionDesc + } + val partitionDescList = if (partitionDesc == "-5") { + List.empty } else { DBUtil.parsePartitionDesc(partitionDesc).asScala.toList } @@ -516,8 +522,13 @@ object LakeSoulFileWriter extends Logging { options: Map[String, String], customMetrics: Map[String, SQLMetric] = Map.empty) extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { + private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC) + .map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/")) override def write(record: InternalRow): Unit = { + val path = record.get(0, StringType).toString + val currentPath = committer.newTaskTempFile(taskAttemptContext, partValue, path) + statsTrackers.foreach(_.newFile(currentPath)) logInfo("copy file") } } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala index 4284f3026..0c018acea 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala @@ -30,9 +30,17 @@ class DelayedCopyCommitProtocol(srcFiles: Seq[DataFileInfo], extends DelayedCommitProtocol(jobId, dstPath, randomPrefixLength) with Serializable with Logging { + @transient private var addedFileInfo: ArrayBuffer[DataFileInfo] = _ + @transient private var pathToFileInfo: Map[String, Seq[DataFileInfo]] = _ + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + addedFileInfo = new ArrayBuffer[DataFileInfo] + pathToFileInfo = srcFiles.groupBy(_.path) + } + override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - throw new UnsupportedOperationException( - s"$this does not support adding files with an absolute path") + addedFileInfo += pathToFileInfo(ext).head + ext } override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { @@ -41,10 +49,9 @@ class DelayedCopyCommitProtocol(srcFiles: Seq[DataFileInfo], } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { - - if (srcFiles.nonEmpty) { - val fs = new Path(srcFiles.head.path).getFileSystem(taskContext.getConfiguration) - val statuses = srcFiles.map { srcFile => + if (addedFileInfo.nonEmpty) { + val fs = new Path(addedFileInfo.head.path).getFileSystem(taskContext.getConfiguration) + val statuses = addedFileInfo.map { srcFile => val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcFile.path) val dstFile = new Path(dstPath, srcBasePath) FileUtil.copy(fs, new Path(srcFile.path), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala index 66109a1d3..ce6d43f0d 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala @@ -12,7 +12,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.COPY_FILE_WRITER_KEY +import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.{COPY_FILE_WRITER_KEY, COPY_FILE_WRITER_SPLITTER} import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, LakeSoulFileWriter, WriteJobStatsTracker} import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} import org.apache.spark.sql.functions.{col, when} @@ -217,7 +217,7 @@ trait TransactionalWrite { } val committer = if (copyCompactedFile.nonEmpty) { - options.put(COPY_FILE_WRITER_KEY, "true") + options.put(COPY_FILE_WRITER_KEY, copyCompactedFile.map(_.path).mkString(COPY_FILE_WRITER_SPLITTER)) new DelayedCopyCommitProtocol(copyCompactedFile, "lakesoul", outputPath.toString, None) } else { getCommitter(outputPath) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala index b0df3475c..1e4f47eb3 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala @@ -208,7 +208,7 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, val (copyFiles, scanFiles) = if (fileSizeLimit.isEmpty || bucketNumChanged || force) { (Seq.empty, files) } else { - files.splitAt(files.indexWhere(_.size < fileSizeLimit.get * 0.5)) + files.splitAt(files.indexWhere(file => file.size < fileSizeLimit.get * 0.5 || splitCompactFilePath(file.path)._2.isEmpty)) } diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala index cfad5989b..c8e2ef34b 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala @@ -4,10 +4,15 @@ package org.apache.spark.sql.lakesoul.commands +import com.dmetasoul.lakesoul.lakesoul.local.LakeSoulLocalJavaWriter +import com.dmetasoul.lakesoul.lakesoul.local.LakeSoulLocalJavaWriter.ArrowTypeMockDataGenerator +import com.dmetasoul.lakesoul.meta.DBConfig.TableInfoProperty import com.dmetasoul.lakesoul.meta.LakeSoulOptions.SHORT_TABLE_NAME import com.dmetasoul.lakesoul.meta.{DBUtil, DataFileInfo, DataOperation, SparkMetaVersion} import com.dmetasoul.lakesoul.spark.clean.CleanOldCompaction.splitCompactFilePath import com.dmetasoul.lakesoul.tables.LakeSoulTable +import org.apache.arrow.vector.types.{DateUnit, TimeUnit} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType} import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf import org.apache.spark.sql.functions.lit @@ -23,6 +28,12 @@ import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterEach import org.scalatestplus.junit.JUnitRunner +import java.time.ZoneId +import java.util +import java.util.{Arrays, List} +import scala.collection.JavaConverters.mapAsJavaMapConverter + + @RunWith(classOf[JUnitRunner]) class CompactionSuite extends QueryTest with SharedSparkSession with BeforeAndAfterEach @@ -920,4 +931,146 @@ class CompactionSuite extends QueryTest sm.getTableInfoOnly } + + test("compaction with concurrent data insertion and compaction") { + withTempDir { tempDir => + // val tempDir = org.apache.spark.util.Utils.createDirectory(System.getProperty("java.io.tmpdir")) + val tablePath = tempDir.getCanonicalPath + // val spark = SparkSession.active + + val hashBucketNum = 4 + val compactRounds = 10 + val compactGapMs = 10000 + val upsertRounds = 100 + val upsertGapMs = 100 + val upsertRows = 1024 + val compactGroupSize = 3 + val cdc = true + val ranges = 2; + + val fields: util.List[Field] = if (cdc) { + util.Arrays.asList( + new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("date", FieldType.nullable(new ArrowType.Utf8), null), + new Field("value", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("range", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field(TableInfoProperty.CDC_CHANGE_COLUMN_DEFAULT, FieldType.nullable(new ArrowType.Utf8), null), + ) + } else { + util.Arrays.asList( + new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("date", FieldType.nullable(new ArrowType.Utf8), null), + new Field("value", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("range", FieldType.nullable(new ArrowType.Int(32, true)), null), + ) + } + // if (cdc) fields = util.Arrays.asList(new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null), new Field("range", FieldType.nullable(new ArrowType.Int(32, true)), null), new Field("int", FieldType.nullable(new ArrowType.Int(32, true)), null), new Field("utf8", FieldType.nullable(new ArrowType.Utf8), null), new Field("decimal", FieldType.nullable(ArrowType.Decimal.createDecimal(10, 3, null)), null), new Field("boolean", FieldType.nullable(new ArrowType.Bool), null), new Field("date", FieldType.nullable(new ArrowType.Date(DateUnit.DAY)), null), new Field("datetimeSec", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.SECOND, ZoneId.of("UTC").toString)), null), new Field(TableInfoProperty.CDC_CHANGE_COLUMN_DEFAULT, FieldType.notNullable(new ArrowType.Utf8), null), new Field("datetimeMilli", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, ZoneId.of("UTC").toString)), null)) + // else fields = util.Arrays.asList(new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null), new Field("range", FieldType.nullable(new ArrowType.Int(32, true)), null), new Field("int", FieldType.nullable(new ArrowType.Int(32, true)), null), new Field("utf8", FieldType.nullable(new ArrowType.Utf8), null), new Field("decimal", FieldType.nullable(ArrowType.Decimal.createDecimal(10, 3, null)), null), new Field("boolean", FieldType.nullable(new ArrowType.Bool), null), new Field("date", FieldType.nullable(new ArrowType.Date(DateUnit.DAY)), null), new Field("datetimeSec", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.SECOND, ZoneId.of("UTC").toString)), null), new Field("datetimeMilli", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, ZoneId.of("UTC").toString)), null)) + + val numCols = fields.size + // Create test data + val df = if (cdc) { + Seq( + (-1, "2023-01-01", 10, ranges + 1, "insert"), + (-2, "2023-01-02", 20, ranges + 1, "insert"), + (-3, "2023-01-03", 30, ranges + 1, "insert"), + (-4, "2023-01-04", 40, ranges + 1, "insert"), + (-5, "2023-01-05", 50, ranges + 1, "insert") + ).toDF("id", "date", "value", "range", TableInfoProperty.CDC_CHANGE_COLUMN_DEFAULT) + } else { + Seq( + (1, "2023-01-01", 10, ranges + 1), + (2, "2023-01-02", 20, ranges + 1), + (3, "2023-01-03", 30, ranges + 1), + (4, "2023-01-04", 40, ranges + 1), + (5, "2023-01-05", 50, ranges + 1) + ).toDF("id", "date", "value", "range") + } + + // Write initial data + df.write + .format("lakesoul") + // .option("rangePartitions", "range") + .option("hashPartitions", "id") + .option(SHORT_TABLE_NAME, "compaction_size_limit_table") + .option("hashBucketNum", hashBucketNum.toString) + .option(TableInfoProperty.CDC_CHANGE_COLUMN, TableInfoProperty.CDC_CHANGE_COLUMN_DEFAULT) + .save(tablePath) + + val lakeSoulTable = LakeSoulTable.forPath(tablePath) + val insertThread = new Thread { + + override def run(): Unit = { + val localWriter = new LakeSoulLocalJavaWriter() + val params = Map( + ("lakesoul.pg.url", "jdbc:postgresql://127.0.0.1:5433/test_lakesoul_meta?stringtype=unspecified"), + ("lakesoul.pg.username", "yugabyte"), + ("lakesoul.pg.password", "yugabyte"), + (LakeSoulLocalJavaWriter.TABLE_NAME, "compaction_size_limit_table") + ) + localWriter.init(params.asJava) + + for (c <- 0 until upsertRounds) { + println(s"upsertRound = $c") + for (i <- c * upsertRows until c * upsertRows + upsertRows) { + val row: Array[AnyRef] = new Array[AnyRef](if (cdc) { + numCols - 1 + } + else { + numCols + }) + var j: Int = 0 + var k: Int = 0 + while (j < numCols) { + if (!fields.get(j).getName.contains(TableInfoProperty.CDC_CHANGE_COLUMN_DEFAULT)) { + if (fields.get(j).getName.contains("id")) { + row(k) = i.asInstanceOf[AnyRef] + k += 1 + } + else { + if (fields.get(j).getName.contains("range")) { + row(k) = (i % ranges).asInstanceOf[AnyRef] + k += 1 + } + else { + row(k) = fields.get(j).getType.accept(ArrowTypeMockDataGenerator.INSTANCE) + k += 1 + } + } + } + + j += 1 + } + localWriter.writeAddRow(row) + // if (cdc && i % 7 == 0) { + // localWriter.writeDeleteRow(row) + // } + } + localWriter.commit() + Thread.sleep(upsertGapMs) + } + localWriter.close() + } + } + LakeSoulTable.uncached(tablePath) + val compactionThread = new Thread { + override def run(): Unit = { + for (c <- 1 to compactRounds) { + println(s"compactRound = $c") + + lakeSoulTable.compaction(fileNumLimit = Some(2), fileSizeLimit = Some("10KB"), force = false) + Thread.sleep(compactGapMs) // Simulate compaction delay + } + } + } + // insertThread.start() + // compactionThread.start() + // insertThread.join() + // compactionThread.join() + // val compactedData = lakeSoulTable.toDF.orderBy("id", "date").collect() + // assert(compactedData.length == upsertRounds * upsertRows + 5, s"The compressed data should have 105 rows, but it actually has ${compactedData.length} rows") + } + } + + } diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/local/LakeSoulLocalJavaWriter.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/local/LakeSoulLocalJavaWriter.java index 84523d00d..f2b260c7b 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/local/LakeSoulLocalJavaWriter.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/local/LakeSoulLocalJavaWriter.java @@ -181,6 +181,7 @@ void write(Object[] row) { } public void writeAddRow(Object[] row) { +// System.out.println(Arrays.toString(row)); if (cdcColumn != null) { Object[] addRow = new Object[row.length + 1]; for (int i = 0; i < row.length; i++) { @@ -271,7 +272,7 @@ public void close() throws Exception { } } - private static class ArrowTypeMockDataGenerator + public static class ArrowTypeMockDataGenerator implements ArrowType.ArrowTypeVisitor { long count = 0;