Skip to content

Commit

Permalink
[fix](broker-load) fix file offset for compressed file apache#24564
Browse files Browse the repository at this point in the history
Co-authored-by: Kang <[email protected]>
  • Loading branch information
TangSiyang2001 and xiaokang authored Sep 20, 2023
1 parent a2e29d1 commit 0fb79e4
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
Expand Down Expand Up @@ -206,27 +207,27 @@ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context
// header_type
TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path);
context.params.setFormatType(formatType);
context.params.setCompressType(
Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path)
);
TFileCompressType compressType =
Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path);
context.params.setCompressType(compressType);
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
context.fileGroup.getColumnNamesFromPath());
// Assign scan range locations only for broker load.
// stream load has only one file, and no need to set multi scan ranges.
if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) {
// Now only support split plain text
if ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable)
if (compressType == TFileCompressType.PLAIN
&& (formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable)
|| formatType == TFileFormatType.FORMAT_JSON) {
long rangeBytes = bytesPerInstance - curInstanceBytes;
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset += rangeBytes;
} else {
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes,
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, leftBytes,
columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset = 0;
i++;
}

Expand Down

0 comments on commit 0fb79e4

Please sign in to comment.