-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[compact]support compact unaware bucket for combine_mode #2858
[compact]support compact unaware bucket for combine_mode #2858
Conversation
f6274ef
to
a43c51e
Compare
65f0781
to
cc99417
Compare
1d80aac
to
c4ef352
Compare
} | ||
|
||
@VisibleForTesting | ||
Iterable<Future<CommitMessage>> result() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any place using this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done ,this has been removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been finished at #3175
|
||
private final FileStoreTable table; | ||
private final String commitUser; | ||
|
||
private transient AppendOnlyFileStoreWrite write; | ||
private UnwareBucketCompactionHelper compactionHelper; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done,this has been finished at #3175
} | ||
|
||
public AppendOnlyCompactionTask( | ||
BinaryRow partition, List<DataFileMeta> files, Identifier identifier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a new class MultiTableAppendOnlyCompactionTask
which extends AppendOnlyCompactionTask
seems better, in this way, we don't need Identifier.EMPTY which may cause hidden bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done ,this change has been finished at #3174
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove MultiTablesCompactorSourceFunction
if you have renamed it
* unaware bucket table. | ||
* </ol> | ||
*/ | ||
public abstract class AbstractBucketScanLogic<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe another name? Like MultiTableScanBase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done ,this has been finished at #3179
* This class is responsible for implementing the scanning logic {@link AbstractBucketScanLogic} for | ||
* the table with multi bucket such as dynamic or fixed bucket table. | ||
*/ | ||
public class MultiBucketScanLogic extends AbstractBucketScanLogic<Tuple2<Split, String>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MultiAwareBucketTableScan
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done ,this has been finished at #3179
* This class is responsible for implementing the scanning logic {@link AbstractBucketScanLogic} for | ||
* the table with fix single bucket such as unaware bucket table. | ||
*/ | ||
public class UnwareBucketScanLogic extends AbstractBucketScanLogic<AppendOnlyCompactionTask> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MultiUnawareBuckeTableScan
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done ,this has been finished at #3179
* </ol> | ||
*/ | ||
public abstract class CompactionFileScanner<T> { | ||
protected final AtomicBoolean isRunning; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
可以在 CompactionFileScanner 和 AtomicBoolean 之间加个换行
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
其他的类也一样
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done ,this has been finished at #3179
@Override | ||
public void scan(SourceFunction.SourceContext<T> ctx) throws Exception { | ||
while (isRunning.get()) { | ||
Boolean isEmpty = tableScanLogic.collectFiles(ctx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of weird here, use 'null' to tag 'end', can we find a better way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a simple Enum was added for a more clear expression at #3179
c4ef352
to
5366f01
Compare
39ef4b0
to
ad6e873
Compare
splits.forEach(ctx::collect); | ||
public void open(Configuration parameters) throws Exception { | ||
super.open(parameters); | ||
tableScanLogic = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just tableScan?
/** It is responsible for monitoring compactor source of aware bucket table in batch mode. */ | ||
public class CombinedAwareBatchSourceFunction | ||
extends CombinedCompactorSourceFunction<Tuple2<Split, String>> { | ||
private static final Logger LOGGER = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line feed
splits.forEach(ctx::collect); | ||
public void open(Configuration parameters) throws Exception { | ||
super.open(parameters); | ||
tableScanLogic = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just tableScan
*/ | ||
public class CombinedUnawareBatchSourceFunction | ||
extends CombinedCompactorSourceFunction<MultiTableAppendOnlyCompactionTask> { | ||
private static final Logger LOGGER = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line feed
@Override | ||
public void open(Configuration parameters) throws Exception { | ||
super.open(parameters); | ||
tableScanLogic = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
*/ | ||
public class CombinedUnawareStreamingSourceFunction | ||
extends CombinedCompactorSourceFunction<MultiTableAppendOnlyCompactionTask> { | ||
private final long monitorInterval; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line feed
@Override | ||
public void open(Configuration parameters) throws Exception { | ||
super.open(parameters); | ||
tableScanLogic = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
/** Compaction task for multi table . */ | ||
public class MultiTableAppendOnlyCompactionTask extends AppendOnlyCompactionTask { | ||
private final Identifier tableIdentifier; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line feed
/** Serializer for {@link MultiTableAppendOnlyCompactionTask}. */ | ||
public class MultiTableCompactionTaskSerializer | ||
implements VersionedSerializer<MultiTableAppendOnlyCompactionTask> { | ||
private static final int CURRENT_VERSION = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line feed
* table with multi bucket such as dynamic or fixed bucket table. | ||
*/ | ||
public class MultiAwareBucketTableScan extends MultiTableScanBase<Tuple2<Split, String>> { | ||
protected transient Map<Identifier, BucketsTable> tablesMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line feed (new line)
* </ol> | ||
*/ | ||
public abstract class MultiTableScanBase<T> { | ||
private static final Logger LOG = LoggerFactory.getLogger(MultiTableScanBase.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line feed
|
||
/** The Compactor of unaware bucket table to execute {@link AppendOnlyCompactionTask}. */ | ||
public class UnawareBucketCompactor { | ||
private final FileStoreTable table; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line feed
6ecddb2
to
fafc6b0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Refact the compaction of combine mode for supporting compacting unaware bucket table
Purpose
Linked issue: close #2670
Tests
CompactDatabaseActionITCase.java for IT case in stream and batch mode
CompactionTaskSerializerTest.java for the test of serializing compaction task
AppendOnlyMultiTableCompactionWorkerOperatorTest.java for the test of AppendOnlyMultiTableCompactionWorkerOperator
API and Format
Documentation