Skip to content

Commit

Permalink
[SPARK-26368][SQL] Make it clear that getOrInferFileFormatSchema does…
Browse files Browse the repository at this point in the history
…n't create InMemoryFileIndex

## What changes were proposed in this pull request?
I was looking at the code and it was a bit difficult to see the life cycle of InMemoryFileIndex passed into getOrInferFileFormatSchema, because once it is passed in, and another time it was created in getOrInferFileFormatSchema. It'd be easier to understand the life cycle if we move the creation of it out.

## How was this patch tested?
This is a simple code move and should be covered by existing tests.

Closes apache#23317 from rxin/SPARK-26368.

Authored-by: Reynold Xin <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
  • Loading branch information
rxin authored and gatorsmile committed Dec 14, 2018
1 parent 93139af commit 2d8838d
Showing 1 changed file with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,14 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
* @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list
* @param getFileIndex [[InMemoryFileIndex]] for getting partition schema and file list
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns.
*/
private def getOrInferFileFormatSchema(
format: FileFormat,
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
// The operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val tempFileIndex = fileIndex.getOrElse {
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
createInMemoryFileIndex(globbedPaths)
}
getFileIndex: () => InMemoryFileIndex): (StructType, StructType) = {
lazy val tempFileIndex = getFileIndex()

val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
Expand Down Expand Up @@ -236,7 +229,15 @@ case class DataSource(
"you may be able to create a static DataFrame on that directory with " +
"'spark.read.load(directory)' and infer schema from it.")
}
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)

val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, () => {
// The operations below are expensive therefore try not to do them if we don't need to,
// e.g., in streaming mode, we have already inferred and registered partition columns,
// we will never have to materialize the lazy val below
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
createInMemoryFileIndex(globbedPaths)
})
SourceInfo(
s"FileSource[$path]",
StructType(dataSchema ++ partitionSchema),
Expand Down Expand Up @@ -370,7 +371,7 @@ case class DataSource(
} else {
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
getOrInferFileFormatSchema(format, Some(index))
getOrInferFileFormatSchema(format, () => index)
(index, resultDataSchema, resultPartitionSchema)
}

Expand Down

0 comments on commit 2d8838d

Please sign in to comment.