Skip to content

Commit

Permalink
[feature](csv)Supports reading CSV data using LF and CRLF as line sep…
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter authored Jul 22, 2024
1 parent aff3f29 commit 193be20
Show file tree
Hide file tree
Showing 13 changed files with 851 additions and 22 deletions.
20 changes: 14 additions & 6 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,17 +370,21 @@ Status CsvReader::init_reader(bool is_load) {
_options.converted_from_string = _trim_double_quotes;
_not_trim_enclose = (!_trim_double_quotes && _enclose == '\"');

if (_state != nullptr) {
_keep_cr = _state->query_options().keep_carriage_return;
}

std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
if (_enclose == 0) {
text_line_reader_ctx =
std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length);
text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(
_line_delimiter, _line_delimiter_length, _keep_cr);

_fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
_trim_tailing_spaces, false, _value_separator, _value_separator_length, -1);
} else {
text_line_reader_ctx = std::make_shared<EncloseCsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_file_slot_descs.size() - 1, _enclose, _escape);
_file_slot_descs.size() - 1, _enclose, _escape, _keep_cr);

_fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>(
_trim_tailing_spaces, !_not_trim_enclose,
Expand Down Expand Up @@ -878,20 +882,24 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
_options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0];
}

if (_state != nullptr) {
_keep_cr = _state->query_options().keep_carriage_return;
}

// create decompressor.
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(_create_decompressor());
std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
if (_enclose == 0) {
text_line_reader_ctx =
std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length);
text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(
_line_delimiter, _line_delimiter_length, _keep_cr);
_fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
_trim_tailing_spaces, _trim_double_quotes, _value_separator,
_value_separator_length);
} else {
text_line_reader_ctx = std::make_shared<EncloseCsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_file_slot_descs.size() - 1, _enclose, _escape);
_file_slot_descs.size() - 1, _enclose, _escape, _keep_cr);
_fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>(
_trim_tailing_spaces, false,
std::static_pointer_cast<EncloseCsvLineReaderContext>(text_line_reader_ctx),
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ class CsvReader : public GenericReader {
bool _trim_tailing_spaces = false;
// `should_not_trim` is to manage the case that: user do not expect to trim double quotes but enclose is double quotes
bool _not_trim_enclose = true;
bool _keep_cr = false;

io::IOContext* _io_ctx = nullptr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <glog/logging.h>
#include <string.h>

#ifdef __AVX2__
#include <immintrin.h>
#endif
#include <algorithm>
#include <cstddef>
#include <cstring>
Expand All @@ -42,7 +45,6 @@
// leave these 2 size small for debugging

namespace doris {

const uint8_t* EncloseCsvLineReaderContext::read_line_impl(const uint8_t* start,
const size_t length) {
_total_len = length;
Expand Down Expand Up @@ -82,12 +84,11 @@ void EncloseCsvLineReaderContext::on_col_sep_found(const uint8_t* start,
}

size_t EncloseCsvLineReaderContext::update_reading_bound(const uint8_t* start) {
_result = (uint8_t*)memmem(start + _idx, _total_len - _idx, line_delimiter.c_str(),
line_delimiter_len);
_result = call_find_line_sep(start + _idx, _total_len - _idx);
if (_result == nullptr) {
return _total_len;
}
return _result - start + line_delimiter_len;
return _result - start + line_delimiter_length();
}

template <bool SingleChar>
Expand Down
106 changes: 96 additions & 10 deletions be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class TextLineReaderContextIf {
// info about the current line may be record to the ctx, like column seprator pos.
/// @return line delimiter pos if found, otherwise return nullptr.
virtual const uint8_t* read_line(const uint8_t* start, const size_t len) = 0;

/// @return length of line delimiter
[[nodiscard]] virtual size_t line_delimiter_length() const = 0;

Expand All @@ -62,30 +61,117 @@ class BaseTextLineReaderContext : public TextLineReaderContextIf {

public:
explicit BaseTextLineReaderContext(const std::string& line_delimiter_,
const size_t line_delimiter_len_)
: line_delimiter(line_delimiter_), line_delimiter_len(line_delimiter_len_) {}
const size_t line_delimiter_len_, const bool keep_cr_)
: line_delimiter(line_delimiter_),
line_delimiter_len(line_delimiter_len_),
keep_cr(keep_cr_) {
use_memmem = line_delimiter_len != 1 || line_delimiter != "\n" || keep_cr;
if (use_memmem) {
find_line_delimiter_func = &BaseTextLineReaderContext::find_multi_char_line_sep;
} else {
find_line_delimiter_func = &BaseTextLineReaderContext::find_lf_crlf_line_sep;
}
}

inline const uint8_t* read_line(const uint8_t* start, const size_t len) final {
return static_cast<Ctx*>(this)->read_line_impl(start, len);
}

[[nodiscard]] inline size_t line_delimiter_length() const final { return line_delimiter_len; }
[[nodiscard]] inline size_t line_delimiter_length() const final {
return line_delimiter_len + line_crlf;
}

inline void refresh() final { return static_cast<Ctx*>(this)->refresh_impl(); };

inline const uint8_t* find_multi_char_line_sep(const uint8_t* start, const size_t length) {
return static_cast<uint8_t*>(
memmem(start, length, line_delimiter.c_str(), line_delimiter_len));
}

const uint8_t* find_lf_crlf_line_sep(const uint8_t* start, const size_t length) {
line_crlf = false;
if (start == nullptr || length == 0) {
return nullptr;
}
size_t i = 0;
#ifdef __AVX2__
// const uint8_t* end = start + length;
const __m256i newline = _mm256_set1_epi8('\n');
const __m256i carriage_return = _mm256_set1_epi8('\r');

const size_t simd_width = 32;
// Process 32 bytes at a time using AVX2
for (; i + simd_width <= length; i += simd_width) {
__m256i data = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(start + i));

// Compare with '\n' and '\r'
__m256i cmp_newline = _mm256_cmpeq_epi8(data, newline);
__m256i cmp_carriage_return = _mm256_cmpeq_epi8(data, carriage_return);

// Check if there is a match
int mask_newline = _mm256_movemask_epi8(cmp_newline);
int mask_carriage_return = _mm256_movemask_epi8(cmp_carriage_return);

if (mask_newline != 0 || mask_carriage_return != 0) {
int pos_lf = (mask_newline != 0) ? i + __builtin_ctz(mask_newline) : INT32_MAX;
int pos_cr = (mask_carriage_return != 0) ? i + __builtin_ctz(mask_carriage_return)
: INT32_MAX;
if (pos_lf < pos_cr) {
return start + pos_lf;
} else if (pos_cr < pos_lf) {
if (pos_lf != INT32_MAX) {
if (pos_lf - 1 >= 0 && start[pos_lf - 1] == '\r') {
//check xxx\r\r\r\nxxx
line_crlf = true;
return start + pos_lf - 1;
}
// xxx\rxxxx\nxx
return start + pos_lf;
} else if (i + simd_width < length && start[i + simd_width - 1] == '\r' &&
start[i + simd_width] == '\n') {
//check [/r/r/r/r/r/r/rxxx/r] [\nxxxx]
line_crlf = true;
return start + i + simd_width - 1;
}
}
}
}

// Process remaining bytes
#endif
for (; i < length; ++i) {
if (start[i] == '\n') {
return &start[i];
}
if (start[i] == '\r' && (i + 1 < length) && start[i + 1] == '\n') {
line_crlf = true;
return &start[i];
}
}
return nullptr;
}
const uint8_t* call_find_line_sep(const uint8_t* start, const size_t length) {
return (this->*find_line_delimiter_func)(start, length);
}

protected:
const std::string line_delimiter;
const size_t line_delimiter_len;
bool keep_cr = false;
bool line_crlf = false;
bool use_memmem = true;
using FindLineDelimiterFunc = const uint8_t* (BaseTextLineReaderContext::*)(const uint8_t*,
size_t);
FindLineDelimiterFunc find_line_delimiter_func;
};

class PlainTextLineReaderCtx final : public BaseTextLineReaderContext<PlainTextLineReaderCtx> {
public:
explicit PlainTextLineReaderCtx(const std::string& line_delimiter_,
const size_t line_delimiter_len_)
: BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_) {}
const size_t line_delimiter_len_, const bool keep_cr_)
: BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_, keep_cr_) {}

inline const uint8_t* read_line_impl(const uint8_t* start, const size_t length) {
return (uint8_t*)memmem(start, length, line_delimiter.c_str(), line_delimiter_len);
return call_find_line_sep(start, length);
}

inline void refresh_impl() {}
Expand Down Expand Up @@ -119,8 +205,8 @@ class EncloseCsvLineReaderContext final
const size_t line_delimiter_len_,
const std::string& column_sep_,
const size_t column_sep_len_, size_t col_sep_num,
const char enclose, const char escape)
: BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_),
const char enclose, const char escape, const bool keep_cr_)
: BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_, keep_cr_),
_enclose(enclose),
_escape(escape),
_column_sep_len(column_sep_len_),
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,9 @@ Status NewJsonReader::_open_line_reader() {
}
_line_reader = NewPlainTextLineReader::create_unique(
_profile, _file_reader, _decompressor.get(),
std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length), size,
_current_offset);
std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length,
false),
size, _current_offset);
return Status::OK();
}

Expand Down
17 changes: 17 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE = "enable_pushdown_minmax_on_unique";

public static final String KEEP_CARRIAGE_RETURN = "keep_carriage_return";

public static final String ENABLE_PUSHDOWN_STRING_MINMAX = "enable_pushdown_string_minmax";

// When set use fix replica = true, the fixed replica maybe bad, try to use the health one if
Expand Down Expand Up @@ -1755,6 +1757,12 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
"The maximum number of partitions created during table creation"})
public int createTablePartitionMaxNum = 10000;


@VariableMgr.VarAttr(name = KEEP_CARRIAGE_RETURN,
description = {"在同时处理\r\r\n作为CSV的行分隔符时,是否保留\r",
"When processing both \\n and \\r\\n as CSV line separators, should \\r be retained?"})
public boolean keepCarriageReturn = false;

@VariableMgr.VarAttr(name = FORCE_JNI_SCANNER,
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"})
private boolean forceJniScanner = false;
Expand Down Expand Up @@ -3133,6 +3141,14 @@ public void setEnableUnicodeNameSupport(boolean enableUnicodeNameSupport) {
this.enableUnicodeNameSupport = enableUnicodeNameSupport;
}

public boolean isKeepCarriageReturn() {
return keepCarriageReturn;
}

public void setKeepCarriageReturn(boolean keepCarriageReturn) {
this.keepCarriageReturn = keepCarriageReturn;
}

public boolean isDropTableIfCtasFailed() {
return dropTableIfCtasFailed;
}
Expand Down Expand Up @@ -3395,6 +3411,7 @@ public TQueryOptions toThrift() {

tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
tResult.setSerdeDialect(getSerdeDialect());
tResult.setKeepCarriageReturn(keepCarriageReturn);
return tResult;
}

Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ struct TQueryOptions {
117: optional bool read_csv_empty_line_as_null = false;

118: optional TSerdeDialect serde_dialect = TSerdeDialect.DORIS;

119: optional bool keep_carriage_return = false; // \n,\r\n split line in CSV.
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
}
Expand Down
Loading

0 comments on commit 193be20

Please sign in to comment.