-
Notifications
You must be signed in to change notification settings - Fork 122
TokuFT Bulk Loader
Rich Prohaska
- What is the loader?
- How is the loader used?
- When is the loader used?
- How does the loader work?
- What is its performance?
- Possible improvements
- Where is the source?
The purpose of the loader is to bulk load fractal trees with data faster than transactionally inserting the data. The original design goal was 1M rows per second for some types of data. The data is NOT assumed to be presorted, although there is an optimization for that.
The loader is intended to bulk load fractal trees faster than transactionally inserting the data. The cost of decreased time is increased space.
All of the rows are stored in temporary loader files. In addition, the loader stores the data for ALL destination fractal trees that it is building from ONE source of the rows.
The temporary loader files may be optionally compressed.
The loader was designed to use cilk, an extension to the C language to describe parallelism for C programs. We used cilk to parallelize the sort algorithms used in the loader which was deemed necessary to reach the performance goals. Unfortunately, the cilk runtime did not run well in the MySQL server (mysqld), so we removed cilk from the loader.
The current loader still has some level of parallelism which will be discussed later.
Load data to a set of empty fractal trees associated with a higher level table or collection. In this case, there can be one or more destination fractal trees.
Create a new fractal tree index on a higher level table or collection. In this case, there is one destination fractal tree.
- Put rows into the loader
- Extract rows for each of the destination fractal trees
- Merge sort the temporary files
- Write fractal tree files from a sorted stream of rows
The application puts rows into the loader. This part of the loader accumulates these rows into an in memory row set. When the row set gets big enough, it is passed to an extractor thread to handle.
The extractor receives unsorted rows sets from the source fractal tree. For each row in the row set, the extractor generates a row for each destination fractal tree and appends that new row to an in memory buffer local to the destination fractal tree. When the buffer gets full, its rows are sorted and the rows are written to a loader temporary file.
The loader keeps track of the set of temporary files for each of the destination fractal trees.
Use a multi-stage merge sort to merge all of the temporary loader files. Remember that each of the loader files is already sorted. The last stage of the merge sort feeds a sorted stream of rows to a background thread that will create a fractal tree from the row stream.
This is repeated for all of the destination fractal trees.
The merge sort was once parallelized with cilk, but not currently.
Given a sorted stream of rows, put the rows into fractal tree leaf nodes and write the nodes. Build a tree from the leaf up and write the non-leaf nodes. Finalize the fractal tree header and block table.
Writing the fractal tree leaf nodes parallelizes the compression of the basement nodes with a background thread pool.
The loader creates new fractal tree files. The last operation of the loader is to flip the switch inside the plumbing of the fractal tree from the original fractal tree files to the new fractal tree files. There is some special trickery involved.
Each loader steals memory from the cache table for its internal data structures.
Each loader duplicates the data for the new fractal trees in temporary loader files.
How does the loader compare with fractal inserts for (1) loading a table, and (2) creating an index. The answer to this question has a large bearing on possible improvements to the loader. We can either make the current loader faster or replace the loader with fractal tree insertions.
Replace the loader versus speed up the loader.
The original design for the loader was heavily dependent on cilk for parallelization. Unfortunately, the Intel cilk runtime was not working well in a database environment so we removed it. We could implement a stripped down cilk runtime (called polyester) and try again.
We can use fractal tree insertions to replace the current loader implementation under the loader interface. The benefit is a large simplification of the code base. The question is whether performance is being reduced.
https://github.com/tokutek/ft-index/tree/master/ft/loader
https://trac.tokutek.com/tokudb/wiki/TokuWiki/Imp/BulkLoad
The loader has been a source of a lot of bugs and it has too much knowledge about fractal trees. If the performance difference between the loader and direct fractal tree insertions is not too large, then IMO the loader should be removed.
There are 2 phases: the extract phase and the merge phase.
The extract phase runs on 2 threads. The client thread feeds rows into the loader from mysql, and the extractor thread generates rows for all of the indexes and sorts the rows for each index into small loader temporary files. You may have seen tokuldXYZ files when the loader is running. At the end of the extract stage, all of the rows have been stored in these temporary files.
The merge phase runs on 2 threads. The client thread runs a multi-stage merge sort of all of the tokuld files. The last stage of the merge feeds a sequence of sorted rows to a background thread that writes them out into a fractal tree file.
There are some additional threads to double buffer the file I/O.
A long time ago, cilk parallelized these two phases quite extensively. The two sorts were parallel, and cilk would schedule all of the index builds in parallel. The loader is no longer as parallel as in version 5.0. We had to remove cilk (the parallelization software) from the loader because it did not live up to its promise. No cilk means that this parallelism is gone. Today, each index is handled sequentially.
We do have 2 threads running during the extract phase (for each client), and have 2 threads during the merge phase. In addition, the last stage of writing out fractal tree files can run on several threads which do all of the block serialization and compression in parallel.