-
Notifications
You must be signed in to change notification settings - Fork 979
BH Projection Framework
Drill provides a Project operator to handle the common projection tasks:
- Remove an unused column
- Rename a column
- Promote a column from inside an array or map to top level.
- Reorder columns.
- Create a new column as an expression of existing columns and constants.
Originally, readers load all columns, then the Project operator removed those not needed. The team quickly realized that a performance improvement can be had by asking the reader to omit unprojected columns. As a result, each reader has implemented projection-pushdown its own way. The projection framework provides a common scan projection-pushdown framework so that we can test it once and use it everywhere. The projection framework is coupled with the result set loader to form the overall scan framework.
Projection in a scan (AKA "projection push down") must handle several key tasks:
- Parse the project list in the physical plan (provided as a list of
SchemaPath
elements) - If the projection is a wildcard ("*", AKA "star query"), create the scan output based on the schema of the data source
- If the projection is explicit (a list of columns), create the scan output based on that list, making up null columns as necessary
- Handle file metadata (AKA "implicit") columns: file name, file path, dirx, and so on.
Projection provides information for other parts of the scan:
- The list of desired columns for the reader.
- The list of null columns for the null column builder.
- The list of file metadata columns for the file metadata builder.
The ScanSchemaOrchestrator
class coordinates the tasks described below.
We've noted three groups of columns: "table" columns, null columns and file metadata columns. The scan framework builds these separately. (Each reader need not worry about the null and file metadata columns: there is only one right way to create them.) Suppose we have the following query:
SELECT a, b, dir0, filename, c, d
Suppose that the table can provides columns (c, e, a). Columns (dir0, filename) are file metadata columns. This means that columns (b, d) are null. We create the three groups. Then, we must project them into the output in the order specified by the query:
Table Output
+----------+ +----------+
| c |-----+ +------------>| a |
| e | X | | +--------->| b |
| a |--------+ | +------>| dir |
+----------+ | | | +---->| filename |
File Metadata +-----|--|-|---->| c |
+----------+ | | | +->| d |
| filename |-----------|--|-+ | +----------+
| dir0 |-----------|--+ |
+----------+ | |
Nulls | |
+----------+ | |
| b |-----------+ |
| d |-------------------+
+----------+
Logically, this is just a map of (source, source offset) pairs ordered by output columns. (But, the above picture is more impressive than a list of numbers...)
Projection would be moderately challenging if we only had to handle top-level columns. Implement the mapping descried above, and we'd be done. But, Drill is far more powerful. Drill uses JSON as its native data model. This means we must deal with maps, arrays, lists and unions.
Drill allows the following kinds of column expressions in the SELECT
clause:
- Simple column:
c
- Array element:
c[2]
- Map member:
m.c
- Combinations of the above:
m1.m2.c[3]
The simple column is handled via the mapping described above.
Array elements are handled in two steps. A query can request any number of elements: c[2], c[4], c[0]
. The scan operator either reads column c
or not. So, the scan operator coalescences the array indexes into a single column projection, with an associated set of indexes: c[0, 2, 4]
. The reader can decide to read only the specific elements (as in the CSV reader) or read all elements.
The project operator will then remove the unwanted elements. It does this by applying a seemingly odd rule: naming a array index implicitly creates a top-level column that contains that element. Thus, while the scanner may read array c
, populated with all (or just the three) array elements, the project operator creates three distinct top-level columns, then discards the array. So:
Query: SELECT c[2], c[4], c[0] ...
Scan Project: c[0, 2, 4]
Reader: c["fred", "", "barney", "", "wilma"]
Project operator: EXPR$0: "fred", EXPR$1: "barney", EXPR$2: "wilma"
While this makes sense from a SQL perspective, it seems it would be kind of nice to be able to ask for array elements while leaving them in their array structure. (This will be clearer for maps below.)
In any event, the job of the scan project framework is simply to tell the reader that:
- Column "c" is wanted.
- Column "c" must be an array.
- If the reader can help, we only need elements (0, 2, 4).
Drill also allows the user to select specific map elements. Again, Drill maps these to top-level columns.
Suppose we have the project list m.a, m.b, m.c.d
. The projection mechanism parses these into a column reference tree that has one entry per map column. The result is: m(a, b, c(d))
.
Readers such as JSON and Parquet support maps ("objects" in JSON, structures in Parquet.) Here, it is important to remember how Drill implements "maps." If you are a Java, Python or JSON developer, you may read "map" as "a hash table of name/value pairs in which each map can have a distinct set of keys." Drill, however, implements maps more as structs in Impala or Hive: "a nested tuple in which each row has the same set of struct members." So, when you see the Drill "map" type, think "struct" instead.
A reader generally reads rows and structs recursively. Projection anticipates this by representing the projection list for a row in exactly the same way as a projection list for a map. So, a reader can, say, identify that columns x, m
are projected at the root (row) level. Then, the reader can learn that, within map m
, columns a, b, c
are projected. And so on.
This reasoning shows that, when structs (maps) are involved, projection in the scan is worked out recursively with each row/map level applying the same rules. This is why the project mechanism parses maps into a tree.
Readers can elect to read only the projected map members, or all map members and let Drill discard the unwanted ones. (The result set loader does the "discard" step by creating dummy writers for unwanted map members. This means that, in the result set loader, projection and dummy writers also form a recursive tree.)
As with arrays, the Project operator moves referenced map members to the top level.
This seems even more of a bug than a feature when considering maps. Suppose we have this structure:
{ customer_id: 10,
name: "bob",
ship_addr: {
street: "1234 Maple St.",
city: "Centerville",
zip: 12345 }
bill_addr: {
street: "9999 Nine Pines Way",
city: "Vacas Muertes",
zip: 12543 }
}
Suppose I ask for the name and `bill_addr:
SELECT name, bill_addr FROM ...
I will get a VarChar
and a Map
.
Suppose I just want to know the city and zip from the billing address:
SELECT name, bill_addr.city, bill_addr.zip FROM ...
I might expect to get a map, bill_addr
with just two fields. What I actually get are three top-level fields: name, EXPR$0, EXPR$
. This seems awkward: I can either get the entire map, or I blow up the map and get top-level columns.
In any event, we can now show the map projection lifecycle as we did for arrays:
Query: SELECT a, m.b, m.c ...
Scan Project: a, m(b, c)
Reader: "fred", {b: "123 Main St.", c: 12345}
Project operator: a: "fred", EXPR$0: "123 Main St.", EXPR$1: 12345
But, the following is the handling for maps as a whole:
Query: SELECT a, m ...
Scan Project: a, m
Reader: "fred", {b: "123 Main St.", c: 12345}
Project operator: a: "fred", m{b: "123 Main St.", c: 12345}
Everything said about top-level columns applies in maps as well. Thus, if m.d
is a column that does not exist in the table, the scan operator must create a null column and splice it into the map (so that the Project operator can then promote it to top level.) Map entries can also be other maps or columns.
Bottom line: projection must be recursive.
We noted that Drill has three column projection formats:
- Simple column:
c
- Array element:
c[2]
- Map member:
m.c
A reader should validate that the table column is actually of the same "kind" as the projected column:
- Simple column: Any Drill type.
- Array element: Must be a repeated Drill type, a list or a repeated list.
- Map member: The parent column must be a map or repeated map.
Most readers do not presently do this validation as it is, frankly, somewhat obscure. If the reader simply asks for the name of projected column m.c[3]
, it will learn it is m
, and can match an INT
(say) of that name.
The new projection mechanism does, in fact, do the above validation as part of the projection process.
Once the projection analysis phase is complete, we have our projection columns parsed into three groups: table, nulls and file metadata. Three distinct mechanisms handle the three cases.
Table projection is done inside the result set loader. That is, the result set loader allows the reader to a) query which columns are projected, and b) to create writers for the entire set of table columns, including unprojected columns.
The scan projection mechanism then takes the data from the projected columns and maps it into the final, output container, merging the table columns with those from the other two mechanisms.
The second mechanism handles null columns. This turns out to be a bit more complex than one might think. The null handling mechanism does the following:
- Determines the type of the null column
- Creates (or reuses) the vectors
- Fills the vectors with nulls to match the table row count
Drill traditionally uses nullable INT
for null columns. But, suppose we are reading a column from CSV. If the column existed, it would be VARCHAR
. So, logically, if the column does not exist in, say, file a.csv, we should create a nullable VARCHAR
column so the types match with b.csv that does contain the column.
By similar reasoning, nullable INT
does not make sense for JSON either since JSON will never create an INT
column. (JSON always creates BIGINT
columns.) So, should the column be nullable BIGINT
or maybe nullable VARCHAR
? If JSON is reading in all-text mode, then clearly the column should be nullable VARCHAR
, otherwise there is no right answer.
Suppose the user has projected column c[1]
. Should column c
be created as nullable INT
or as repeated int? Similarly, if the user projects column m.c
, then column c
should be a null column, but inside map m
.
Suppose that we are reading two files. The first file has columns (a, b, c). The second file does not contain c
. Should the type of c
be the same as in the first file, or should it revert to nullable INT
?
In most cases, all answers are wrong. Drill really needs an ability for the user to specify the type. However, until then, the projection framework chooses a type least likely to lead to a hard schema change. (That is, use hints from the kind of reader, from the project syntax, and from prior files to suggest the null column type.) Moving forward, Drill really needs strong schema rules so that the user can not only predict what Drill will do, but specify the outcome.
Drill provides file metadata columns of two kinds:
- Information about the file itself (filename, fully qualified name, path and suffix.)
- Partition directories.
The file metadata manager, derived from existing implementations, resolves metadata columns.
- The names of columns are specified in session options.
- If the project list is explicit, add only the requested columns.
- If the project list is a wildcard, add all metadata columns. (With an option to disable this feature if/when the planner can do the right thing. See below.)
- Determine the deepest directory scanned in this present scan.
- Gather information for each file as the file is scanned.
- Populate the vectors with data to match the row count of the table batch.
The handling of wildcards is surprising (at least to this developer.) If we say:
SELECT *, filename FROM my.csv
Calcite applies a rule that says "wildcard plus anything is a wildcard", so Calcite rewrites the above query as:
SELECT * FROM my.csv
Since we don't know if the user wanted the metadata columns or not, we must select all of them in the reader. Then, the Project operator will discard those that are not wanted. This is, obviously, a horrible performance hit. So, the best solution is to convince Calcite that some columns are special and should not be folded into the wildcard. This is left as an exercise for later.
For now, the metadata mechanism follows the existing rules of including all metadata columns when the query is a wildcard. However, an option exists to turn off this behavior, anticipating that Calcite will eventually be fixed.
Once the three mechanisms have populated their respective columns, the projection mechanism assembles the columns into the output batch in the order requested in the project list. Or, for a wildcard query, in the order (table columns, file metadata columns, partition columns). (Note that this order restores the order prior to Drill 1.12. Jinfeng reversed the order in the 1.12 release.)
If the project list is explicit, the projection mechanism builds the output vector container with columns in the requested order. At present, this is not actually necessary as Drill expects columns to be in random order; Drill does not apply a specific column order until the Screen operator.
The problem, however, is that Drill uses a linear search to look up columns by name, and this can be slow for queries with many columns. This projection framework guarantees a fixed order as a baby step toward eventually using the classic position-based indexing for Drill column references.
Another odd aspect of Drill semantics is worth mentioning. Each Drill operator is both a record batch and an operator. As a record batch, the operator represents the "output batch" for that operator. Each downstream operator (or at least some) enforce a variant:
- If the operator returns
OK
status, then the set of vectors is identical to those for the previous batch - Identity extends to the vectors: they must be the same Java objects as the previous batch
- Only the buffers in the vectors can change from batch to batch.
The above invariant is not too hard to maintain for some operators (project or filter, say.) But, for the scan operator, it is quite the challenge. The scan operator hosts a series of readers, each of which should be independent of the other readers. That is, the reader for a.csv
should not know or care if it comes before, after, or in another fragment from that for b.csv
. But, if a.csv
and b.csv
are hosted in the same scan operator, they must return the same set of vectors.
In the original scan operator, the Mutator
class attempts to handle this case. (It is not clear now readers that don't use the Mutator
class achieve this invariant.)
The revised projection framework uses a concept called the "vector cache." Each time that a reader asks the result set loader to create a column, the result set loader checks the vector cache to see if a vector for that column has already been created. Thus, if two files both have columns (a, b) of type VARCHAR
, then they will use the same vectors to hold the data.
Similarly, if the first file has (a, b), but the second file has only (a), and the user asks for (a, b), then the null column handler will retrieve b from the vector cache and use that existing vector to determine the type of the null column (assuming that the cached vector is nullable or repeated.)
The vector cache implements a tree structure to recursively apply the above rules to maps, repeated maps, unions, repeated unions (AKA "lists"), and repeated lists.
The projection mechanism divides the work into a number of steps or phases.
- Scan-level resolution
- File-level resolution
- Table-level resolution
- Final projection
Scan-level projection accepts a projection list in the form of a list of SchemaPath
elements. It uses a schema path parser, RequestedTupleImpl
to parse the list into the tree form discussed above. (That is, a.b, a.c
becomes a(b, c)
and so on.) Note that the RequestedTupleImpl
is used both here an in the result set loader.
Then, with the parsed columns, the scan-level projection analyzes each column, tagging it if it can be resolved before seeing either file or table information. For example:
- If this is a CSV parser, recognize the special
columns[]
column. - Identify file metadata and partition directory columns.
- Identify if the the query consists of just a wildcard.
All other columns are left in an "unresolved" state, pending the next phase of resolution.
The discussion above mentioned several "special" columns that Drill recognizes: columns[]
and file metadata. However, columns[]
is unique to CSV. File metadata applies only to file-based sources but not to, say, the Kafka reader. To avoid massive if-statements and unwanted tight coupling, the scan-level projection provides a generic framework for column parsing. Format-specific parsing rules reside in a pluggable column parser. Thus, there is a columns[]
parser for CSV files and a file metadata parser or files. CSV files, in fact, use both parsers.
The next step in the resolution is done at the start of each new file (or other reader.) Metadata columns are resolved with the actual file name and directory information for the present reader.
The final resolution step occurs when we learn the schema for a table. As noted earlier, we must handle two cases: "early" and "late" schema. In early schema, the reader determines the schema up front (as in Parquet.) In late schema, the reader figures out the schema as it goes along. Earlier designs handled the two cases separately. But, experience showed that the best approach is simply to wait for the first batch of data. At this point, regardless of how the schema was determined, we have a concrete schema and matching vectors.
At this point, projection resolution takes two distinct paths:
- If the query is a wildcard (
SELECT *
), then the output project list is the same as the table schema. (WildcardSchemaProjection
) - If the query is explicit (a list of columns), then we must resolve the projection list against the table schema, filling in null columns for those not present in the table schema. (
ExplicitSchemaProjection
)
The explicit projection must be done recursively down through maps (since the user can request (a, m.b, m.c
).
In either case, the result is a list (tree, actually) of fully-resolved columns in which each holds two numbers:
- The destination index (which is implied by the column order).
- The source (table, null, metadata) and source index.
See SchemaLevelProjection
for the overall implementation.
The final step is to perform the physical projection: to assemble the output container based on the mapping provided by the projection list.
The ResolvedTuple
class holds the resolved columns for a row or map, and coordinates building the output container for that tuple.
The reader is free to change the schema at any time. (The only meaningful change is to add a column to the end of those that are already present: either in the top-level row or in a map.)
Each time a new column appears, the table-level projection must be recomputed. Perhaps a column which was previously null can not be matched to the new table column. Or, if the query is a wildcard, the new column must be added to the output.
This is another place that illustrates the precariousness of Drill's schema-free approach. Should we allow columns to be added? If we guessed nullable INT
for a nullable column, is it helpful to later discover that it was really a VARCHAR
? Should we allow readers to change column types, such as JSON changing from, say, BIGINT
to FLOAT8
when we see the value 10 followed by 10.1? Again, there is no good answer. While the solution implemented here has been tested (and so works at the code level), there is not assurance that the solution makes sense to the user. Yet again, having a schema would make this issue far, far more deterministic.
Drill tries to be schema-free when reading data, but it is very much schema-full internally. This disconnection between aspiration and operation sets up a number of interesting dynamics. Suppose we read three files:
- (a, b, c)
- (a, b)
- (b, c)
When we read the first file, we project all three columns (assuming a wildcard projection). When we get to the second table, what do we do? Do we project only columns a and b? Or, do we remember that we used to have, and continue to project c, but filled with nulls?
What happens if column b changes types? Starts as, say nullable int, then becomes non-nullable int. Should we handle this case? (We can't handle the reverse case, obviously.)
Suppose we were to have an explicit project list of (a, b, c). Would our decisions be different? When we notice that column c is not in the second table, should we continue to use the type from the first or should we change the type to Drill's traditional nullable int?
There are no good answers. Since these operations are ambiguous, and really depend on what the user wants, there is no fixed correct answer. Instead, all we can do is take guesses and try to fudge individual cases.
This project tried to implement a number of the above ideas in support of an idea called "schema smoothing": don't cause a schema change exception if there is a path that prevents it. In the above, we continue to use the type of column c for the second file, and the previous type of column a for the third file. (See SchemaSmoother
.)
Note that this just a fudge. Reverse the order of tables and the fudge won't work.
The schema smoothing exercise shows that we can throw code at the problem, but we cannot fundamentally solve it. Moving forward, it would be much better to back out the schema smoothing changes, and instead provide a deterministic way for users to specify a schema and instructions for how to handle missing columns in a deterministic way.
The mechanism described above is complex: far more complex than anticipated when starting the project. As it turned out, most of the details were not apparent from reading the existing code; they instead emerged only from running tests. As a result, unit testing was a vital part of the development of this module. Testing started by running many possible use cases through the code to find holes. This was particularly fruitful when expanding the code from a single flat row to a tree of rows, maps, unions, etc.
Then, as the code matured, the CSV and JSON readers were updated, existing system tests run, and failures analyzed. This pointed out may additional obscure cases (such as the need to project map elements). Unit tests were added to the test framework for this module to cover the new cases, and the code was evolved to pass these new use cases.
If you find yourself working in this area, the only path forward to preserve your sanity is to use TDD: write the tests before (or shortly after) changing the code. Create test cases not just for the one use case of interest, but for all variations of that case.
To drive the point home: this module is too complex to make a change, run a query or two, and declare victory. Thorough unit testing is the only way to keep this code in good shape.