Skip to content

Commit

Permalink
[fix](parquet) fix time zone error(isAdjustedToUTC=true) in parquet r…
Browse files Browse the repository at this point in the history
…eader (apache#33675)

Revert apache#33217 and add some fix for regression test.
The modification of PR(apache#33217) is wrong, which introduce some issues about the time zone(apache/hudi#11003).
If isAdjustedToUTC = false, the reader should display the same value no mater what local time zone is. For example:
When stored as `1970-01-03 12:00:00`,
If isAdjustedToUTC = true, UTC8 should read as `1970-01-03 20:00:00`, UTC6 should read as `1970-01-03 18:00:00`
If isAdjustedToUTC = false, UTC8 and UTC6 should read as `1970-01-03 12:00:00`, which is the same as `1970-01-03 12:00:00` in UTC0
  • Loading branch information
AshinGau authored Apr 17, 2024
1 parent d810269 commit 3117098
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 148 deletions.
11 changes: 8 additions & 3 deletions be/src/vec/exec/format/parquet/parquet_column_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
namespace doris::vectorized::parquet {

struct ConvertParams {
// schema.logicalType.TIMESTAMP.isAdjustedToUTC == true
// schema.logicalType.TIMESTAMP.isAdjustedToUTC == false
static const cctz::time_zone utc0;
// schema.logicalType.TIMESTAMP.isAdjustedToUTC == false, we should set local time zone
// schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set local time zone
cctz::time_zone* ctz = nullptr;
size_t offset_days = 0;
int64_t second_mask = 1;
Expand Down Expand Up @@ -72,8 +72,13 @@ struct ConvertParams {
const auto& schema = field_schema->parquet_schema;
if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) {
const auto& timestamp_info = schema.logicalType.TIMESTAMP;
if (timestamp_info.isAdjustedToUTC) {
if (!timestamp_info.isAdjustedToUTC) {
// should set timezone to utc+0
// Reference: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#instant-semantics-timestamps-normalized-to-utc
// If isAdjustedToUTC = false, the reader should display the same value no mater what local time zone is. For example:
// When a timestamp is stored as `1970-01-03 12:00:00`,
// if isAdjustedToUTC = true, UTC8 should read as `1970-01-03 20:00:00`, UTC6 should read as `1970-01-03 18:00:00`
// if isAdjustedToUTC = false, UTC8 and UTC6 should read as `1970-01-03 12:00:00`, which is the same as `1970-01-03 12:00:00` in UTC0
ctz = const_cast<cctz::time_zone*>(&utc0);
}
const auto& time_unit = timestamp_info.unit;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/parquet_pred_cmp.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ class ParquetPredicate {
const auto& schema = col_schema->parquet_schema;
if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) {
const auto& timestamp_info = schema.logicalType.TIMESTAMP;
if (timestamp_info.isAdjustedToUTC) {
if (!timestamp_info.isAdjustedToUTC) {
// should set timezone to utc+0
resolved_ctz = cctz::utc_time_zone();
}
Expand Down
12 changes: 7 additions & 5 deletions be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
io::FileDescription file_description = {
.path = fmt::format("{}/{}-{}{}", _write_info.write_path, _file_name, _file_name_index,
_get_file_extension(_file_format_type, _hive_compress_type))};
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name())};
_fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer));

Expand Down Expand Up @@ -193,9 +192,7 @@ THivePartitionUpdate VHivePartitionWriter::_build_partition_update() {
location.__set_write_path(_write_info.write_path);
location.__set_target_path(_write_info.target_path);
hive_partition_update.__set_location(location);
hive_partition_update.__set_file_names(
{fmt::format("{}-{}{}", _file_name, _file_name_index,
_get_file_extension(_file_format_type, _hive_compress_type))});
hive_partition_update.__set_file_names({_get_target_file_name()});
hive_partition_update.__set_row_count(_row_count);
hive_partition_update.__set_file_size(_input_size_in_bytes);
return hive_partition_update;
Expand Down Expand Up @@ -241,5 +238,10 @@ std::string VHivePartitionWriter::_get_file_extension(TFileFormatType::type file
return fmt::format("{}{}", compress_name, file_format_name);
}

std::string VHivePartitionWriter::_get_target_file_name() {
return fmt::format("{}-{}{}", _file_name, _file_name_index,
_get_file_extension(_file_format_type, _hive_compress_type));
}

} // namespace vectorized
} // namespace doris
3 changes: 3 additions & 0 deletions be/src/vec/sink/writer/vhive_partition_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class VHivePartitionWriter {

inline size_t written_len() { return _file_format_transformer->written_len(); }

private:
std::string _get_target_file_name();

private:
Status _projection_and_filter_block(doris::vectorized::Block& input_block,
const vectorized::IColumn::Filter* filter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ public void setIsTempUser(boolean isTempUser) {

// for USER() function
public UserIdentity getUserIdentity() {
return new UserIdentity(qualifiedUser, remoteIP);
return UserIdentity.createAnalyzedUserIdentWithIp(qualifiedUser, remoteIP);
}

public UserIdentity getCurrentUserIdentity() {
Expand Down

Large diffs are not rendered by default.

Loading

0 comments on commit 3117098

Please sign in to comment.