Skip to content

Commit

Permalink
[fix](csv-parsing) Reset state every time after extending output buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 committed Dec 6, 2024
1 parent 08d1e76 commit 1edb495
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ void EncloseCsvLineReaderContext::_on_normal(const uint8_t* start, size_t& len)
_state.forward_to(ReaderState::START);
return;
}
// TODO(tsy): maybe potential bug when a multi-char is not read completely
_idx = len;
}

Expand Down Expand Up @@ -325,13 +324,13 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool
*eof = true;
return Status::OK();
}
_line_reader_ctx->refresh();
int found_line_delimiter = 0;
size_t offset = 0;
bool stream_end = true;
while (!done()) {
// find line delimiter in current decompressed data
uint8_t* cur_ptr = _output_buf + _output_buf_pos;
_line_reader_ctx->refresh();
const uint8_t* pos = _line_reader_ctx->read_line(cur_ptr, output_buf_read_remaining());

if (pos == nullptr) {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
200000

-- !sql --
100000

Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,60 @@ suite("test_stream_load_big_file_with_special_delimiter", "p1") {

sql "sync"
qt_sql "select count(*) from ${tableName}"

tableName = "test_csv_big_file_truncate_delimiter";
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
`measureid` VARCHAR(500) NOT NULL,
`measuretag` VARCHAR(500) NOT NULL,
`timestamp` VARCHAR(500) NOT NULL,
`ds` VARCHAR(255) NULL,
`hh` VARCHAR(255) NULL,
`meter_id` VARCHAR(500) NULL,
`maintenance_team` VARCHAR(1000) NULL,
`psr_class_name` VARCHAR(500) NULL,
`inst_id` VARCHAR(500) NULL,
`location_type` VARCHAR(500) NULL,
`name` VARCHAR(500) NULL,
`depart` VARCHAR(500) NULL,
`measurepoint_id` VARCHAR(500) NULL,
`district` VARCHAR(500) NULL,
`enddevice_psr_class_name` VARCHAR(500) NULL,
`enddevice_psr_id` VARCHAR(500) NULL,
`root_id` VARCHAR(500) NULL,
`rt` VARCHAR(500) NULL,
`measurevalue` VARCHAR(500) NULL,
`dataquality` VARCHAR(500) NULL,
`datatablename` VARCHAR(500) NULL,
`tag` VARCHAR(500) NULL,
`equip_src_id` VARCHAR(500) NULL,
`root_class_name` VARCHAR(500) NULL,
`ssid` VARCHAR(500) NULL,
`sysdate_uep` VARCHAR(500) NULL
) ENGINE=OLAP
DUPLICATE KEY(`measureid`, `measuretag`, `timestamp`, `ds`)
AUTO PARTITION BY LIST (`ds`)(
)
DISTRIBUTED BY HASH(`measureid`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""
streamLoad {
table "${tableName}"

set 'column_separator', '@@@'
set 'columns', 'hh,ds,meter_id,maintenance_team,measureid,psr_class_name,inst_id,location_type,name,depart,measurepoint_id,district,enddevice_psr_class_name,enddevice_psr_id,root_id,measuretag,rt,measurevalue,timestamp,dataquality,datatablename,tag,equip_src_id,root_class_name,ssid,sysdate_uep'
set 'enclose', '`'
set 'format', "CSV"
set 'compress_type', 'GZ'

file 'test_csv_big_file_truncate_delimiter.csv.gz'
}

sql "sync"
qt_sql "select count(*) from ${tableName}"

}

0 comments on commit 1edb495

Please sign in to comment.