From 3f1224abda1f067f41d99d4d5f9d9b3f9d022812 Mon Sep 17 00:00:00 2001 From: Lightman <31928846+Lchangliang@users.noreply.github.com> Date: Mon, 25 Mar 2024 23:15:01 +0800 Subject: [PATCH] (cloud-merge)[feature] Support to create table with "file_cache_ttl_seconds" property (#32409) --- be/src/cloud/cloud_rowset_builder.cpp | 1 + be/src/cloud/cloud_tablet.cpp | 83 +++++++++++++++++-- be/src/olap/base_tablet.h | 1 + be/src/olap/rowset/beta_rowset.h | 2 +- be/src/olap/rowset/beta_rowset_reader.cpp | 8 ++ be/src/olap/rowset/rowset.h | 2 + be/src/olap/rowset/rowset_reader_context.h | 1 + be/src/olap/tablet_meta.cpp | 2 + be/src/olap/tablet_meta.h | 13 +++ be/src/olap/tablet_reader.cpp | 1 + be/test/testutil/mock_rowset.h | 2 + .../analysis/ModifyTablePropertiesClause.java | 3 + .../java/org/apache/doris/catalog/Env.java | 7 +- .../apache/doris/catalog/TableProperty.java | 3 +- .../apache/doris/journal/JournalEntity.java | 2 +- .../org/apache/doris/persist/EditLog.java | 6 +- .../apache/doris/persist/OperationType.java | 2 +- 17 files changed, 124 insertions(+), 15 deletions(-) diff --git a/be/src/cloud/cloud_rowset_builder.cpp b/be/src/cloud/cloud_rowset_builder.cpp index 72010fa6e9c53c..d8bb50dc5f2a00 100644 --- a/be/src/cloud/cloud_rowset_builder.cpp +++ b/be/src/cloud/cloud_rowset_builder.cpp @@ -65,6 +65,7 @@ Status CloudRowsetBuilder::init() { context.mow_context = mow_context; context.write_file_cache = _req.write_file_cache; context.partial_update_info = _partial_update_info; + context.file_cache_ttl_sec = _tablet->ttl_seconds(); // New loaded data is always written to latest shared storage // TODO(AlexYue): use the passed resource id to retrive the corresponding // fs to pass to the RowsetWriterContext diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 92e6699ceec4d6..08ed09d90f5055 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -95,11 +95,6 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version, return capture_rs_readers_unlocked(version_path, rs_splits); } -Status CloudTablet::sync_meta() { - // TODO(lightman): FileCache - return Status::NotSupported("CloudTablet::sync_meta is not implemented"); -} - // There are only two tablet_states RUNNING and NOT_READY in cloud mode // This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS. Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) { @@ -618,4 +613,82 @@ Status CloudTablet::calc_delete_bitmap_for_compaciton( return Status::OK(); } +Status CloudTablet::sync_meta() { + if (!config::enable_file_cache) { + return Status::OK(); + } + + TabletMetaSharedPtr tablet_meta; + auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta); + if (!st.ok()) { + if (st.is()) { + // TODO(Lchangliang): recycle_resources_by_self(); + } + return st; + } + if (tablet_meta->tablet_state() != TABLET_RUNNING) { // impossible + return Status::InternalError("invalid tablet state. tablet_id={}", tablet_id()); + } + + auto new_ttl_seconds = tablet_meta->ttl_seconds(); + if (_tablet_meta->ttl_seconds() != new_ttl_seconds) { + _tablet_meta->set_ttl_seconds(new_ttl_seconds); + int64_t cur_time = UnixSeconds(); + std::shared_lock rlock(_meta_lock); + for (auto& [_, rs] : _rs_version_map) { + for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) { + int64_t new_expiration_time = + new_ttl_seconds + rs->rowset_meta()->newest_write_timestamp(); + new_expiration_time = new_expiration_time > cur_time ? new_expiration_time : 0; + auto file_key = io::BlockFileCache::hash( + io::Path(rs->segment_file_path(seg_id)).filename().native()); + auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); + file_cache->modify_expiration_time(file_key, new_expiration_time); + } + } + } + + auto new_compaction_policy = tablet_meta->compaction_policy(); + if (_tablet_meta->compaction_policy() != new_compaction_policy) { + _tablet_meta->set_compaction_policy(new_compaction_policy); + } + auto new_time_series_compaction_goal_size_mbytes = + tablet_meta->time_series_compaction_goal_size_mbytes(); + if (_tablet_meta->time_series_compaction_goal_size_mbytes() != + new_time_series_compaction_goal_size_mbytes) { + _tablet_meta->set_time_series_compaction_goal_size_mbytes( + new_time_series_compaction_goal_size_mbytes); + } + auto new_time_series_compaction_file_count_threshold = + tablet_meta->time_series_compaction_file_count_threshold(); + if (_tablet_meta->time_series_compaction_file_count_threshold() != + new_time_series_compaction_file_count_threshold) { + _tablet_meta->set_time_series_compaction_file_count_threshold( + new_time_series_compaction_file_count_threshold); + } + auto new_time_series_compaction_time_threshold_seconds = + tablet_meta->time_series_compaction_time_threshold_seconds(); + if (_tablet_meta->time_series_compaction_time_threshold_seconds() != + new_time_series_compaction_time_threshold_seconds) { + _tablet_meta->set_time_series_compaction_time_threshold_seconds( + new_time_series_compaction_time_threshold_seconds); + } + auto new_time_series_compaction_empty_rowsets_threshold = + tablet_meta->time_series_compaction_empty_rowsets_threshold(); + if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() != + new_time_series_compaction_empty_rowsets_threshold) { + _tablet_meta->set_time_series_compaction_empty_rowsets_threshold( + new_time_series_compaction_empty_rowsets_threshold); + } + auto new_time_series_compaction_level_threshold = + tablet_meta->time_series_compaction_level_threshold(); + if (_tablet_meta->time_series_compaction_level_threshold() != + new_time_series_compaction_level_threshold) { + _tablet_meta->set_time_series_compaction_level_threshold( + new_time_series_compaction_level_threshold); + } + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index f32139b989bdfa..347f960841c41f 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -60,6 +60,7 @@ class BaseTablet { int32_t schema_hash() const { return _tablet_meta->schema_hash(); } KeysType keys_type() const { return _tablet_meta->tablet_schema()->keys_type(); } size_t num_key_columns() const { return _tablet_meta->tablet_schema()->num_key_columns(); } + int64_t ttl_seconds() const { return _tablet_meta->ttl_seconds(); } std::mutex& get_schema_change_lock() { return _schema_change_lock; } bool enable_unique_key_merge_on_write() const { #ifdef BE_TEST diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index 46f16f4d50230f..41cecadf783747 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -49,7 +49,7 @@ class BetaRowset final : public Rowset { Status create_reader(RowsetReaderSharedPtr* result) override; - std::string segment_file_path(int segment_id) const; + std::string segment_file_path(int segment_id) const override; static std::string segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id, int segment_id); diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index a0dff2613aec0a..f5e0b8e5c62b49 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -238,6 +238,14 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_context->runtime_state->query_options().disable_file_cache; } + _read_options.io_ctx.expiration_time = + read_context->ttl_seconds == 0 + ? 0 + : _rowset->rowset_meta()->newest_write_timestamp() + read_context->ttl_seconds; + if (_read_options.io_ctx.expiration_time <= UnixSeconds()) { + _read_options.io_ctx.expiration_time = 0; + } + // load segments bool should_use_cache = use_cache || _read_context->reader_type == ReaderType::READER_QUERY; RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, &_segment_cache_handle, diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index fd5264f3ac048e..f15527ddd6b1bd 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -169,6 +169,8 @@ class Rowset : public std::enable_shared_from_this { // TODO should we rename the method to remove_files() to be more specific? virtual Status remove() = 0; + virtual std::string segment_file_path(int segment_id) const = 0; + // close to clear the resource owned by rowset // including: open files, indexes and so on // NOTICE: can not call this function in multithreads diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index d5683924a9ec0c..8bfdeda60a8568 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -83,6 +83,7 @@ struct RowsetReaderContext { RowsetId rowset_id; // slots that cast may be eliminated in storage layer std::map target_cast_type_for_variants; + int64_t ttl_seconds = 0; }; } // namespace doris diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 322ff03625100e..a15e809b8b8ad9 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -529,6 +529,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { _creation_time = tablet_meta_pb.creation_time(); _cumulative_layer_point = tablet_meta_pb.cumulative_layer_point(); _tablet_uid = TabletUid(tablet_meta_pb.tablet_uid()); + _ttl_seconds = tablet_meta_pb.ttl_seconds(); if (tablet_meta_pb.has_tablet_type()) { _tablet_type = tablet_meta_pb.tablet_type(); } else { @@ -647,6 +648,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { tablet_meta_pb->set_cumulative_layer_point(cumulative_layer_point()); *(tablet_meta_pb->mutable_tablet_uid()) = tablet_uid().to_proto(); tablet_meta_pb->set_tablet_type(_tablet_type); + tablet_meta_pb->set_ttl_seconds(_ttl_seconds); switch (tablet_state()) { case TABLET_NOTREADY: tablet_meta_pb->set_tablet_state(PB_NOTREADY); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 58201c1c1f1d54..6c5233eac53e61 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -273,6 +273,16 @@ class TabletMeta { return _time_series_compaction_level_threshold; } + int64_t ttl_seconds() const { + std::shared_lock rlock(_meta_lock); + return _ttl_seconds; + } + + void set_ttl_seconds(int64_t ttl_seconds) { + std::lock_guard wlock(_meta_lock); + _ttl_seconds = ttl_seconds; + } + private: Status _save_meta(DataDir* data_dir); @@ -328,6 +338,9 @@ class TabletMeta { int64_t _time_series_compaction_empty_rowsets_threshold = 0; int64_t _time_series_compaction_level_threshold = 0; + // cloud + int64_t _ttl_seconds = 0; + mutable std::shared_mutex _meta_lock; }; diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index 7b40ff4eae1238..b2afd1360d4e4f 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -262,6 +262,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { _reader_context.common_expr_ctxs_push_down = read_params.common_expr_ctxs_push_down; _reader_context.output_columns = &read_params.output_columns; _reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt; + _reader_context.ttl_seconds = _tablet->ttl_seconds(); return Status::OK(); } diff --git a/be/test/testutil/mock_rowset.h b/be/test/testutil/mock_rowset.h index 50065ebe6b43d4..89fbb8cac3b5d7 100644 --- a/be/test/testutil/mock_rowset.h +++ b/be/test/testutil/mock_rowset.h @@ -50,6 +50,8 @@ class MockRowset : public Rowset { return Status::NotSupported("MockRowset not support this method."); } + std::string segment_file_path(int segment_id) const override { return ""; } + Status get_segments_key_bounds(std::vector* segments_key_bounds) override { // TODO(zhangchen): remove this after we implemented memrowset. if (is_mem_rowset_) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index c2bc7bc7d0dea0..8a3543d3d36f8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -299,6 +299,9 @@ public void analyze(Analyzer analyzer) throws AnalysisException { } this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS)) { + this.needTableStable = false; + this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else { throw new AnalysisException("Unknown table property: " + properties.keySet()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 10592282155041..2fe4361778e821 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -4925,7 +4925,8 @@ public void modifyTableProperties(Database db, OlapTable table, Map