From 193be20c867b212f5bdae01c8af6712a8480f655 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Mon, 22 Jul 2024 22:53:04 +0800 Subject: [PATCH] [feature](csv)Supports reading CSV data using LF and CRLF as line separators. (#37687) (#38099) bp #37687 --- be/src/vec/exec/format/csv/csv_reader.cpp | 20 +- be/src/vec/exec/format/csv/csv_reader.h | 1 + .../new_plain_text_line_reader.cpp | 9 +- .../file_reader/new_plain_text_line_reader.h | 106 +++++++- .../vec/exec/format/json/new_json_reader.cpp | 5 +- .../org/apache/doris/qe/SessionVariable.java | 17 ++ gensrc/thrift/PaloInternalService.thrift | 2 + .../lf_crlf_and_quotes.csv | 134 ++++++++++ .../lf_crlf_and_quotes.csv.gz | Bin 0 -> 337 bytes .../lf_crlf_not_quotes.csv | 135 ++++++++++ .../lf_crlf_not_quotes.csv.gz | Bin 0 -> 264 bytes .../tvf/test_tvf_csv_line_end.out | 201 +++++++++++++++ .../tvf/test_tvf_csv_line_end.groovy | 243 ++++++++++++++++++ 13 files changed, 851 insertions(+), 22 deletions(-) create mode 100644 regression-test/data/external_table_p0/tvf/test_csv_line_end_lf_crlf/lf_crlf_and_quotes.csv create mode 100644 regression-test/data/external_table_p0/tvf/test_csv_line_end_lf_crlf/lf_crlf_and_quotes.csv.gz create mode 100644 regression-test/data/external_table_p0/tvf/test_csv_line_end_lf_crlf/lf_crlf_not_quotes.csv create mode 100644 regression-test/data/external_table_p0/tvf/test_csv_line_end_lf_crlf/lf_crlf_not_quotes.csv.gz create mode 100644 regression-test/data/external_table_p0/tvf/test_tvf_csv_line_end.out create mode 100644 regression-test/suites/external_table_p0/tvf/test_tvf_csv_line_end.groovy diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 7894b5c57ae415..efd77045997f1c 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -370,17 +370,21 @@ Status CsvReader::init_reader(bool is_load) { _options.converted_from_string = _trim_double_quotes; _not_trim_enclose = (!_trim_double_quotes && _enclose == '\"'); + if (_state != nullptr) { + _keep_cr = _state->query_options().keep_carriage_return; + } + std::shared_ptr text_line_reader_ctx; if (_enclose == 0) { - text_line_reader_ctx = - std::make_shared(_line_delimiter, _line_delimiter_length); + text_line_reader_ctx = std::make_shared( + _line_delimiter, _line_delimiter_length, _keep_cr); _fields_splitter = std::make_unique( _trim_tailing_spaces, false, _value_separator, _value_separator_length, -1); } else { text_line_reader_ctx = std::make_shared( _line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length, - _file_slot_descs.size() - 1, _enclose, _escape); + _file_slot_descs.size() - 1, _enclose, _escape, _keep_cr); _fields_splitter = std::make_unique( _trim_tailing_spaces, !_not_trim_enclose, @@ -878,20 +882,24 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { _options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0]; } + if (_state != nullptr) { + _keep_cr = _state->query_options().keep_carriage_return; + } + // create decompressor. // _decompressor may be nullptr if this is not a compressed file RETURN_IF_ERROR(_create_decompressor()); std::shared_ptr text_line_reader_ctx; if (_enclose == 0) { - text_line_reader_ctx = - std::make_shared(_line_delimiter, _line_delimiter_length); + text_line_reader_ctx = std::make_shared( + _line_delimiter, _line_delimiter_length, _keep_cr); _fields_splitter = std::make_unique( _trim_tailing_spaces, _trim_double_quotes, _value_separator, _value_separator_length); } else { text_line_reader_ctx = std::make_shared( _line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length, - _file_slot_descs.size() - 1, _enclose, _escape); + _file_slot_descs.size() - 1, _enclose, _escape, _keep_cr); _fields_splitter = std::make_unique( _trim_tailing_spaces, false, std::static_pointer_cast(text_line_reader_ctx), diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 65eba62a54c997..3b600190459343 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -288,6 +288,7 @@ class CsvReader : public GenericReader { bool _trim_tailing_spaces = false; // `should_not_trim` is to manage the case that: user do not expect to trim double quotes but enclose is double quotes bool _not_trim_enclose = true; + bool _keep_cr = false; io::IOContext* _io_ctx = nullptr; diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp index 8dce6e589afde2..415e4c1e34933e 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -21,6 +21,9 @@ #include #include +#ifdef __AVX2__ +#include +#endif #include #include #include @@ -42,7 +45,6 @@ // leave these 2 size small for debugging namespace doris { - const uint8_t* EncloseCsvLineReaderContext::read_line_impl(const uint8_t* start, const size_t length) { _total_len = length; @@ -82,12 +84,11 @@ void EncloseCsvLineReaderContext::on_col_sep_found(const uint8_t* start, } size_t EncloseCsvLineReaderContext::update_reading_bound(const uint8_t* start) { - _result = (uint8_t*)memmem(start + _idx, _total_len - _idx, line_delimiter.c_str(), - line_delimiter_len); + _result = call_find_line_sep(start + _idx, _total_len - _idx); if (_result == nullptr) { return _total_len; } - return _result - start + line_delimiter_len; + return _result - start + line_delimiter_length(); } template diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h index 0b0d9f133fae00..c91b503cbe5c0d 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h @@ -47,7 +47,6 @@ class TextLineReaderContextIf { // info about the current line may be record to the ctx, like column seprator pos. /// @return line delimiter pos if found, otherwise return nullptr. virtual const uint8_t* read_line(const uint8_t* start, const size_t len) = 0; - /// @return length of line delimiter [[nodiscard]] virtual size_t line_delimiter_length() const = 0; @@ -62,30 +61,117 @@ class BaseTextLineReaderContext : public TextLineReaderContextIf { public: explicit BaseTextLineReaderContext(const std::string& line_delimiter_, - const size_t line_delimiter_len_) - : line_delimiter(line_delimiter_), line_delimiter_len(line_delimiter_len_) {} + const size_t line_delimiter_len_, const bool keep_cr_) + : line_delimiter(line_delimiter_), + line_delimiter_len(line_delimiter_len_), + keep_cr(keep_cr_) { + use_memmem = line_delimiter_len != 1 || line_delimiter != "\n" || keep_cr; + if (use_memmem) { + find_line_delimiter_func = &BaseTextLineReaderContext::find_multi_char_line_sep; + } else { + find_line_delimiter_func = &BaseTextLineReaderContext::find_lf_crlf_line_sep; + } + } inline const uint8_t* read_line(const uint8_t* start, const size_t len) final { return static_cast(this)->read_line_impl(start, len); } - [[nodiscard]] inline size_t line_delimiter_length() const final { return line_delimiter_len; } + [[nodiscard]] inline size_t line_delimiter_length() const final { + return line_delimiter_len + line_crlf; + } inline void refresh() final { return static_cast(this)->refresh_impl(); }; + inline const uint8_t* find_multi_char_line_sep(const uint8_t* start, const size_t length) { + return static_cast( + memmem(start, length, line_delimiter.c_str(), line_delimiter_len)); + } + + const uint8_t* find_lf_crlf_line_sep(const uint8_t* start, const size_t length) { + line_crlf = false; + if (start == nullptr || length == 0) { + return nullptr; + } + size_t i = 0; +#ifdef __AVX2__ + // const uint8_t* end = start + length; + const __m256i newline = _mm256_set1_epi8('\n'); + const __m256i carriage_return = _mm256_set1_epi8('\r'); + + const size_t simd_width = 32; + // Process 32 bytes at a time using AVX2 + for (; i + simd_width <= length; i += simd_width) { + __m256i data = _mm256_loadu_si256(reinterpret_cast(start + i)); + + // Compare with '\n' and '\r' + __m256i cmp_newline = _mm256_cmpeq_epi8(data, newline); + __m256i cmp_carriage_return = _mm256_cmpeq_epi8(data, carriage_return); + + // Check if there is a match + int mask_newline = _mm256_movemask_epi8(cmp_newline); + int mask_carriage_return = _mm256_movemask_epi8(cmp_carriage_return); + + if (mask_newline != 0 || mask_carriage_return != 0) { + int pos_lf = (mask_newline != 0) ? i + __builtin_ctz(mask_newline) : INT32_MAX; + int pos_cr = (mask_carriage_return != 0) ? i + __builtin_ctz(mask_carriage_return) + : INT32_MAX; + if (pos_lf < pos_cr) { + return start + pos_lf; + } else if (pos_cr < pos_lf) { + if (pos_lf != INT32_MAX) { + if (pos_lf - 1 >= 0 && start[pos_lf - 1] == '\r') { + //check xxx\r\r\r\nxxx + line_crlf = true; + return start + pos_lf - 1; + } + // xxx\rxxxx\nxx + return start + pos_lf; + } else if (i + simd_width < length && start[i + simd_width - 1] == '\r' && + start[i + simd_width] == '\n') { + //check [/r/r/r/r/r/r/rxxx/r] [\nxxxx] + line_crlf = true; + return start + i + simd_width - 1; + } + } + } + } + + // Process remaining bytes +#endif + for (; i < length; ++i) { + if (start[i] == '\n') { + return &start[i]; + } + if (start[i] == '\r' && (i + 1 < length) && start[i + 1] == '\n') { + line_crlf = true; + return &start[i]; + } + } + return nullptr; + } + const uint8_t* call_find_line_sep(const uint8_t* start, const size_t length) { + return (this->*find_line_delimiter_func)(start, length); + } + protected: const std::string line_delimiter; const size_t line_delimiter_len; + bool keep_cr = false; + bool line_crlf = false; + bool use_memmem = true; + using FindLineDelimiterFunc = const uint8_t* (BaseTextLineReaderContext::*)(const uint8_t*, + size_t); + FindLineDelimiterFunc find_line_delimiter_func; }; - class PlainTextLineReaderCtx final : public BaseTextLineReaderContext { public: explicit PlainTextLineReaderCtx(const std::string& line_delimiter_, - const size_t line_delimiter_len_) - : BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_) {} + const size_t line_delimiter_len_, const bool keep_cr_) + : BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_, keep_cr_) {} inline const uint8_t* read_line_impl(const uint8_t* start, const size_t length) { - return (uint8_t*)memmem(start, length, line_delimiter.c_str(), line_delimiter_len); + return call_find_line_sep(start, length); } inline void refresh_impl() {} @@ -119,8 +205,8 @@ class EncloseCsvLineReaderContext final const size_t line_delimiter_len_, const std::string& column_sep_, const size_t column_sep_len_, size_t col_sep_num, - const char enclose, const char escape) - : BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_), + const char enclose, const char escape, const bool keep_cr_) + : BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_, keep_cr_), _enclose(enclose), _escape(escape), _column_sep_len(column_sep_len_), diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index fabb699d6f3f76..44f92aab34896b 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -421,8 +421,9 @@ Status NewJsonReader::_open_line_reader() { } _line_reader = NewPlainTextLineReader::create_unique( _profile, _file_reader, _decompressor.get(), - std::make_shared(_line_delimiter, _line_delimiter_length), size, - _current_offset); + std::make_shared(_line_delimiter, _line_delimiter_length, + false), + size, _current_offset); return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 046dc28c4a4dee..047bd2f10fb559 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -552,6 +552,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE = "enable_pushdown_minmax_on_unique"; + public static final String KEEP_CARRIAGE_RETURN = "keep_carriage_return"; + public static final String ENABLE_PUSHDOWN_STRING_MINMAX = "enable_pushdown_string_minmax"; // When set use fix replica = true, the fixed replica maybe bad, try to use the health one if @@ -1755,6 +1757,12 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { "The maximum number of partitions created during table creation"}) public int createTablePartitionMaxNum = 10000; + + @VariableMgr.VarAttr(name = KEEP_CARRIAGE_RETURN, + description = {"在同时处理\r和\r\n作为CSV的行分隔符时,是否保留\r", + "When processing both \\n and \\r\\n as CSV line separators, should \\r be retained?"}) + public boolean keepCarriageReturn = false; + @VariableMgr.VarAttr(name = FORCE_JNI_SCANNER, description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"}) private boolean forceJniScanner = false; @@ -3133,6 +3141,14 @@ public void setEnableUnicodeNameSupport(boolean enableUnicodeNameSupport) { this.enableUnicodeNameSupport = enableUnicodeNameSupport; } + public boolean isKeepCarriageReturn() { + return keepCarriageReturn; + } + + public void setKeepCarriageReturn(boolean keepCarriageReturn) { + this.keepCarriageReturn = keepCarriageReturn; + } + public boolean isDropTableIfCtasFailed() { return dropTableIfCtasFailed; } @@ -3395,6 +3411,7 @@ public TQueryOptions toThrift() { tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull); tResult.setSerdeDialect(getSerdeDialect()); + tResult.setKeepCarriageReturn(keepCarriageReturn); return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 53aefc41010b7d..6b0df79f14ca47 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -305,6 +305,8 @@ struct TQueryOptions { 117: optional bool read_csv_empty_line_as_null = false; 118: optional TSerdeDialect serde_dialect = TSerdeDialect.DORIS; + + 119: optional bool keep_carriage_return = false; // \n,\r\n split line in CSV. // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false } diff --git a/regression-test/data/external_table_p0/tvf/test_csv_line_end_lf_crlf/lf_crlf_and_quotes.csv b/regression-test/data/external_table_p0/tvf/test_csv_line_end_lf_crlf/lf_crlf_and_quotes.csv new file mode 100644 index 00000000000000..2564cafe1add8b --- /dev/null +++ b/regression-test/data/external_table_p0/tvf/test_csv_line_end_lf_crlf/lf_crlf_and_quotes.csv @@ -0,0 +1,134 @@ +1 ,Alice , 30 ,New York +w,w,w,"w" +"w",w,"w","w" +"10","abc" ,"ttt","def" +2,Bob,25,Los Angeles +3,"Ch a rlie ",35,Chicago +4,abc,def,sss +w,w,w,w + 5 , ttt , d e f ,sss +w,w,w,w +" wrweqreqer ","234 ", 32323 ,"3232" +w, w, w,w +w,"w",w,"w" +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,"w",w,w +w,w,w,w +w,w,w, w +w,w, w , w +w,w,w,w +w,"w","w",w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +"w","w",w,"w" +""w,"w",w,"w +""w,"w",w,"w +""w,"w",w,"w +""w,"w",w,"w +""w,"w",w,"w +w,w,w,w +w,w,w,w +w,"w","w","w" +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,""w +w,w,w,w +w,w,w,w +w,w,"w",w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +"w","w","w","w" +"w","w","w","w" +w,w,w,"w" +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +w,w,w,w +w,w,w,w +"w","w","w","w" +w,w,w,w +w,w,w,w +"w","w","w","w" +w,w,w,w +w,w,w,w +"w","w","w","w" +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +"w","w","w","w" +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +w,w,w,w +12,34,"abc ",def +"w",w,w,"w" +"w","w","w","w" diff --git a/regression-test/data/external_table_p0/tvf/test_csv_line_end_lf_crlf/lf_crlf_and_quotes.csv.gz b/regression-test/data/external_table_p0/tvf/test_csv_line_end_lf_crlf/lf_crlf_and_quotes.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..84f73cb2100789812eaeab67d1afc8d2c19dc951 GIT binary patch literal 337 zcmV-X0j~ZZiwFqDM3rU$18inrV{&X}Utw-!UvYJBbY*icV{>)@#Z}F2gD?!fNBZ8O z^4W(%-~qZ`r`>m?G;Pt)Pys2gKZl=CK@oy=T7qIbvHjVOVgdks$g|Wq2GD!(hq1fQ zqWprg>?T=)lBH!kG1v>2oKg}qs_R;$v$>E8`K>r{81s8k?GE2plbZ@kGaYY0f-=t> zk_=)U^YNCY$+bW~RK((w#S>Rm)!|zxuMoQ^1y0s83@&hbjiuKm*wUJNY3`;J6bf}D z1Ui%{u{uhf1)9$~*5~Ivjp^iIFq`OLG+Kqx9@>&mH+?lmKn)ls@%)BbFFm|gsh{c`c->?gp&WSq1)@)l^Fkf-n@^lgu4F zz*|TQ2T-=gy-UEVL?W?m((7CK1kqMZV`78_)Ayz`)7ONUR&`Yh;!D~Jv)DEAjL1;? zb^tAkmy{PJX{~854iTeuQxHpO({ONcv4Q7_l2+DA zl2U3kPOTGkU%wL{dN9%mT&1`QNNd7465> backends = sql """ show backends """ + assertTrue(backends.size() > 0) + def be_id = backends[0][0] + def dataFilePath = context.config.dataPath + "/external_table_p0/tvf/test_csv_line_end_lf_crlf" + + def outFilePath="/test_csv_line_end_lf_crlf" + + for (List backend : backends) { + def be_host = backend[1] + scpFiles ("root", be_host, dataFilePath, outFilePath, false); + } + + String filename = "lf_crlf_not_quotes.csv" + + sql """set enable_nereids_planner=true""" + sql """set enable_fallback_to_original_planner=false""" + + sql """ set keep_carriage_return = true; """ + // qt_csv_1""" + // select * from local( + // "file_path" = "${outFilePath}/${filename}", + // "backend_id" = "${be_id}", + // "format" = "csv", + // "column_separator" = "," + // ) + // order by c1,c2,c3,c4; + // """ + + qt_csv_2""" + select * from local( + "file_path" = "${outFilePath}/${filename}", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = "," + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + List> result1 = sql """ + select * from local( + "file_path" = "${outFilePath}/${filename}", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = "," + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + List> result2 = sql """ + select * from local( + "file_path" = "${outFilePath}/${filename}.gz", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = ",", + "compress_type"="gz" + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + log.info("result2 = ${result2}") + assertTrue(result1.size() == result2.size()); + for(int i =0 ;i < result1.size();i++){ + for(int j =0 ; j< result1.size();j++) { + assertTrue(result1[i][j] == result2[i][j] ); + } + } + + sql """ set keep_carriage_return = false; """ + + // qt_csv_3 """ + // select * from local( + // "file_path" = "${outFilePath}/${filename}", + // "backend_id" = "${be_id}", + // "format" = "csv", + // "column_separator" = "," + // ) + // order by c1,c2,c3,c4; + // """ + + qt_csv_4 """ + select * from local( + "file_path" = "${outFilePath}/${filename}", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = "," + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + result1 = sql """ + select * from local( + "file_path" = "${outFilePath}/${filename}", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = "," + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + result2 = sql """ + select * from local( + "file_path" = "${outFilePath}/${filename}.gz", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = ",", + "compress_type"="gz" + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + log.info("result2 = ${result2}") + assertTrue(result1.size() == result2.size()); + for(int i =0 ;i < result1.size();i++){ + for(int j =0 ; j< result1.size();j++) { + assertTrue(result1[i][j] == result2[i][j] ); + } + } + + + filename = "lf_crlf_and_quotes.csv" + + sql """ set keep_carriage_return = true; """ + // qt_csv_5""" + // select * from local( + // "file_path" = "${outFilePath}/${filename}", + // "backend_id" = "${be_id}", + // "format" = "csv", + // "column_separator" = ",", + // "trim_double_quotes"="true" + // ) + // order by c1,c2,c3,c4; + // """ + + qt_csv_6""" + select * from local( + "file_path" = "${outFilePath}/${filename}", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = ",", + "trim_double_quotes"="true" + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + + result1 = sql """ + select * from local( + "file_path" = "${outFilePath}/${filename}", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = ",", + "trim_double_quotes"="true" + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + result2 = sql """ + select * from local( + "file_path" = "${outFilePath}/${filename}.gz", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = ",", + "trim_double_quotes"="true", + "compress_type"="gz" + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + log.info("result2 = ${result2}") + assertTrue(result1.size() == result2.size()); + for(int i =0 ;i < result1.size();i++){ + for(int j =0 ; j< result1.size();j++) { + assertTrue(result1[i][j] == result2[i][j] ); + } + } + + sql """ set keep_carriage_return = false; """ + + // qt_csv_7 """ + // select * from local( + // "file_path" = "${outFilePath}/${filename}", + // "backend_id" = "${be_id}", + // "format" = "csv", + // "column_separator" = ",", + // "trim_double_quotes"="true" + // ) + // order by c1,c2,c3,c4; + // """ + + qt_csv_8 """ + select * from local( + "file_path" = "${outFilePath}/${filename}", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = ",", + "trim_double_quotes"="true" + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + result1 = sql """ + select * from local( + "file_path" = "${outFilePath}/${filename}", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = ",", + "trim_double_quotes"="true" + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + result2 = sql """ + select * from local( + "file_path" = "${outFilePath}/${filename}.gz", + "backend_id" = "${be_id}", + "format" = "csv", + "column_separator" = ",", + "trim_double_quotes"="true", + "compress_type"="gz" + ) where length(c4) >= 2 + order by c1,c2,c3,c4; + """ + log.info("result2 = ${result2}") + assertTrue(result1.size() == result2.size()); + for(int i =0 ;i < result1.size();i++){ + for(int j =0 ; j< result1.size();j++) { + assertTrue(result1[i][j] == result2[i][j] ); + } + } + + + + +}