Skip to content

Commit

Permalink
[core] close files in CompactRewriter to avoid file leaks.
Browse files Browse the repository at this point in the history
  • Loading branch information
liming30 committed May 27, 2024
1 parent a2363e6 commit 6f7c9d7
Show file tree
Hide file tree
Showing 3 changed files with 377 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.ExceptionUtils;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.IOUtils;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -123,6 +125,7 @@ private CompactResult rewriteOrProduceChangelog(
CloseableIterator<ChangelogResult> iterator = null;
RollingFileWriter<KeyValue, DataFileMeta> compactFileWriter = null;
RollingFileWriter<KeyValue, DataFileMeta> changelogFileWriter = null;
Exception collectedExceptions = null;

try {
iterator =
Expand Down Expand Up @@ -151,18 +154,22 @@ private CompactResult rewriteOrProduceChangelog(
}
}
}
} catch (Exception e) {
collectedExceptions = e;
} finally {
if (iterator != null) {
iterator.close();
}
if (compactFileWriter != null) {
compactFileWriter.close();
}
if (changelogFileWriter != null) {
changelogFileWriter.close();
try {
IOUtils.closeAll(iterator, compactFileWriter, changelogFileWriter);
} catch (Exception e) {
collectedExceptions = ExceptionUtils.firstOrSuppressed(e, collectedExceptions);
}
}

if (null != collectedExceptions) {
compactFileWriter.abort();
changelogFileWriter.abort();
throw collectedExceptions;
}

List<DataFileMeta> before = extractFilesFromSections(sections);
List<DataFileMeta> after =
compactFileWriter != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.utils.ExceptionUtils;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.IOUtils;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -75,13 +77,31 @@ protected CompactResult rewriteCompaction(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {
RollingFileWriter<KeyValue, DataFileMeta> writer =
writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT);
RecordReader<KeyValue> reader =
readerForMergeTree(sections, new ReducerMergeFunctionWrapper(mfFactory.create()));
if (dropDelete) {
reader = new DropDeleteReader(reader);
RecordReader<KeyValue> reader = null;
Exception collectedExceptions = null;
try {
reader =
readerForMergeTree(
sections, new ReducerMergeFunctionWrapper(mfFactory.create()));
if (dropDelete) {
reader = new DropDeleteReader(reader);
}
writer.write(new RecordReaderIterator<>(reader));
} catch (Exception e) {
collectedExceptions = e;
} finally {
try {
IOUtils.closeAll(reader, writer);
} catch (Exception e) {
collectedExceptions = ExceptionUtils.firstOrSuppressed(e, collectedExceptions);
}
}
writer.write(new RecordReaderIterator<>(reader));
writer.close();

if (null != collectedExceptions) {
writer.abort();
throw collectedExceptions;
}

List<DataFileMeta> before = extractFilesFromSections(sections);
notifyRewriteCompactBefore(before);
return new CompactResult(before, writer.result());
Expand Down
Loading

0 comments on commit 6f7c9d7

Please sign in to comment.