Skip to content

Commit

Permalink
(cloud-merge)[feature] Support to create table with "file_cache_ttl_s…
Browse files Browse the repository at this point in the history
…econds" property (apache#32409)
  • Loading branch information
Lchangliang authored Mar 25, 2024
1 parent be313cb commit 3f1224a
Show file tree
Hide file tree
Showing 17 changed files with 124 additions and 15 deletions.
1 change: 1 addition & 0 deletions be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Status CloudRowsetBuilder::init() {
context.mow_context = mow_context;
context.write_file_cache = _req.write_file_cache;
context.partial_update_info = _partial_update_info;
context.file_cache_ttl_sec = _tablet->ttl_seconds();
// New loaded data is always written to latest shared storage
// TODO(AlexYue): use the passed resource id to retrive the corresponding
// fs to pass to the RowsetWriterContext
Expand Down
83 changes: 78 additions & 5 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
return capture_rs_readers_unlocked(version_path, rs_splits);
}

Status CloudTablet::sync_meta() {
// TODO(lightman): FileCache
return Status::NotSupported("CloudTablet::sync_meta is not implemented");
}

// There are only two tablet_states RUNNING and NOT_READY in cloud mode
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) {
Expand Down Expand Up @@ -618,4 +613,82 @@ Status CloudTablet::calc_delete_bitmap_for_compaciton(
return Status::OK();
}

Status CloudTablet::sync_meta() {
if (!config::enable_file_cache) {
return Status::OK();
}

TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
if (!st.ok()) {
if (st.is<ErrorCode::NOT_FOUND>()) {
// TODO(Lchangliang): recycle_resources_by_self();
}
return st;
}
if (tablet_meta->tablet_state() != TABLET_RUNNING) { // impossible
return Status::InternalError("invalid tablet state. tablet_id={}", tablet_id());
}

auto new_ttl_seconds = tablet_meta->ttl_seconds();
if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
_tablet_meta->set_ttl_seconds(new_ttl_seconds);
int64_t cur_time = UnixSeconds();
std::shared_lock rlock(_meta_lock);
for (auto& [_, rs] : _rs_version_map) {
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
int64_t new_expiration_time =
new_ttl_seconds + rs->rowset_meta()->newest_write_timestamp();
new_expiration_time = new_expiration_time > cur_time ? new_expiration_time : 0;
auto file_key = io::BlockFileCache::hash(
io::Path(rs->segment_file_path(seg_id)).filename().native());
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
file_cache->modify_expiration_time(file_key, new_expiration_time);
}
}
}

auto new_compaction_policy = tablet_meta->compaction_policy();
if (_tablet_meta->compaction_policy() != new_compaction_policy) {
_tablet_meta->set_compaction_policy(new_compaction_policy);
}
auto new_time_series_compaction_goal_size_mbytes =
tablet_meta->time_series_compaction_goal_size_mbytes();
if (_tablet_meta->time_series_compaction_goal_size_mbytes() !=
new_time_series_compaction_goal_size_mbytes) {
_tablet_meta->set_time_series_compaction_goal_size_mbytes(
new_time_series_compaction_goal_size_mbytes);
}
auto new_time_series_compaction_file_count_threshold =
tablet_meta->time_series_compaction_file_count_threshold();
if (_tablet_meta->time_series_compaction_file_count_threshold() !=
new_time_series_compaction_file_count_threshold) {
_tablet_meta->set_time_series_compaction_file_count_threshold(
new_time_series_compaction_file_count_threshold);
}
auto new_time_series_compaction_time_threshold_seconds =
tablet_meta->time_series_compaction_time_threshold_seconds();
if (_tablet_meta->time_series_compaction_time_threshold_seconds() !=
new_time_series_compaction_time_threshold_seconds) {
_tablet_meta->set_time_series_compaction_time_threshold_seconds(
new_time_series_compaction_time_threshold_seconds);
}
auto new_time_series_compaction_empty_rowsets_threshold =
tablet_meta->time_series_compaction_empty_rowsets_threshold();
if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() !=
new_time_series_compaction_empty_rowsets_threshold) {
_tablet_meta->set_time_series_compaction_empty_rowsets_threshold(
new_time_series_compaction_empty_rowsets_threshold);
}
auto new_time_series_compaction_level_threshold =
tablet_meta->time_series_compaction_level_threshold();
if (_tablet_meta->time_series_compaction_level_threshold() !=
new_time_series_compaction_level_threshold) {
_tablet_meta->set_time_series_compaction_level_threshold(
new_time_series_compaction_level_threshold);
}

return Status::OK();
}

} // namespace doris
1 change: 1 addition & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class BaseTablet {
int32_t schema_hash() const { return _tablet_meta->schema_hash(); }
KeysType keys_type() const { return _tablet_meta->tablet_schema()->keys_type(); }
size_t num_key_columns() const { return _tablet_meta->tablet_schema()->num_key_columns(); }
int64_t ttl_seconds() const { return _tablet_meta->ttl_seconds(); }
std::mutex& get_schema_change_lock() { return _schema_change_lock; }
bool enable_unique_key_merge_on_write() const {
#ifdef BE_TEST
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class BetaRowset final : public Rowset {

Status create_reader(RowsetReaderSharedPtr* result) override;

std::string segment_file_path(int segment_id) const;
std::string segment_file_path(int segment_id) const override;

static std::string segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id,
int segment_id);
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_context->runtime_state->query_options().disable_file_cache;
}

_read_options.io_ctx.expiration_time =
read_context->ttl_seconds == 0
? 0
: _rowset->rowset_meta()->newest_write_timestamp() + read_context->ttl_seconds;
if (_read_options.io_ctx.expiration_time <= UnixSeconds()) {
_read_options.io_ctx.expiration_time = 0;
}

// load segments
bool should_use_cache = use_cache || _read_context->reader_type == ReaderType::READER_QUERY;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, &_segment_cache_handle,
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
// TODO should we rename the method to remove_files() to be more specific?
virtual Status remove() = 0;

virtual std::string segment_file_path(int segment_id) const = 0;

// close to clear the resource owned by rowset
// including: open files, indexes and so on
// NOTICE: can not call this function in multithreads
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ struct RowsetReaderContext {
RowsetId rowset_id;
// slots that cast may be eliminated in storage layer
std::map<std::string, PrimitiveType> target_cast_type_for_variants;
int64_t ttl_seconds = 0;
};

} // namespace doris
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
_creation_time = tablet_meta_pb.creation_time();
_cumulative_layer_point = tablet_meta_pb.cumulative_layer_point();
_tablet_uid = TabletUid(tablet_meta_pb.tablet_uid());
_ttl_seconds = tablet_meta_pb.ttl_seconds();
if (tablet_meta_pb.has_tablet_type()) {
_tablet_type = tablet_meta_pb.tablet_type();
} else {
Expand Down Expand Up @@ -647,6 +648,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
tablet_meta_pb->set_cumulative_layer_point(cumulative_layer_point());
*(tablet_meta_pb->mutable_tablet_uid()) = tablet_uid().to_proto();
tablet_meta_pb->set_tablet_type(_tablet_type);
tablet_meta_pb->set_ttl_seconds(_ttl_seconds);
switch (tablet_state()) {
case TABLET_NOTREADY:
tablet_meta_pb->set_tablet_state(PB_NOTREADY);
Expand Down
13 changes: 13 additions & 0 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,16 @@ class TabletMeta {
return _time_series_compaction_level_threshold;
}

int64_t ttl_seconds() const {
std::shared_lock rlock(_meta_lock);
return _ttl_seconds;
}

void set_ttl_seconds(int64_t ttl_seconds) {
std::lock_guard wlock(_meta_lock);
_ttl_seconds = ttl_seconds;
}

private:
Status _save_meta(DataDir* data_dir);

Expand Down Expand Up @@ -328,6 +338,9 @@ class TabletMeta {
int64_t _time_series_compaction_empty_rowsets_threshold = 0;
int64_t _time_series_compaction_level_threshold = 0;

// cloud
int64_t _ttl_seconds = 0;

mutable std::shared_mutex _meta_lock;
};

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
_reader_context.common_expr_ctxs_push_down = read_params.common_expr_ctxs_push_down;
_reader_context.output_columns = &read_params.output_columns;
_reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt;
_reader_context.ttl_seconds = _tablet->ttl_seconds();

return Status::OK();
}
Expand Down
2 changes: 2 additions & 0 deletions be/test/testutil/mock_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class MockRowset : public Rowset {
return Status::NotSupported("MockRowset not support this method.");
}

std::string segment_file_path(int segment_id) const override { return ""; }

Status get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) override {
// TODO(zhangchen): remove this after we implemented memrowset.
if (is_mem_rowset_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
}
this.needTableStable = false;
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS)) {
this.needTableStable = false;
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
} else {
throw new AnalysisException("Unknown table property: " + properties.keySet());
}
Expand Down
7 changes: 4 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -4925,7 +4925,8 @@ public void modifyTableProperties(Database db, OlapTable table, Map<String, Stri
.buildDisableAutoCompaction()
.buildEnableSingleReplicaCompaction()
.buildTimeSeriesCompactionEmptyRowsetsThreshold()
.buildTimeSeriesCompactionLevelThreshold();
.buildTimeSeriesCompactionLevelThreshold()
.buildTTLSeconds();

// need to update partition info meta
for (Partition partition : table.getPartitions()) {
Expand All @@ -4936,7 +4937,7 @@ public void modifyTableProperties(Database db, OlapTable table, Map<String, Stri
ModifyTablePropertyOperationLog info =
new ModifyTablePropertyOperationLog(db.getId(), table.getId(), table.getName(),
properties);
editLog.logModifyInMemory(info);
editLog.logModifyTableProperties(info);
}

public void updateBinlogConfig(Database db, OlapTable table, BinlogConfig newBinlogConfig) {
Expand Down Expand Up @@ -4971,7 +4972,7 @@ public void replayModifyTableProperty(short opCode, ModifyTablePropertyOperation

// need to replay partition info meta
switch (opCode) {
case OperationType.OP_MODIFY_IN_MEMORY:
case OperationType.OP_MODIFY_TABLE_PROPERTIES:
for (Partition partition : olapTable.getPartitions()) {
olapTable.getPartitionInfo().setIsInMemory(partition.getId(), tableProperty.isInMemory());
// storage policy re-use modify in memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public TableProperty buildProperty(short opCode) {
case OperationType.OP_MODIFY_REPLICATION_NUM:
buildReplicaAllocation();
break;
case OperationType.OP_MODIFY_IN_MEMORY:
case OperationType.OP_MODIFY_TABLE_PROPERTIES:
buildInMemory();
buildMinLoadReplicaNum();
buildStorageMedium();
Expand All @@ -149,6 +149,7 @@ public TableProperty buildProperty(short opCode) {
buildDisableAutoCompaction();
buildTimeSeriesCompactionEmptyRowsetsThreshold();
buildTimeSeriesCompactionLevelThreshold();
buildTTLSeconds();
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ public void readFields(DataInput in) throws IOException {
break;
}
case OperationType.OP_DYNAMIC_PARTITION:
case OperationType.OP_MODIFY_IN_MEMORY:
case OperationType.OP_MODIFY_TABLE_PROPERTIES:
case OperationType.OP_MODIFY_REPLICATION_NUM:
case OperationType.OP_UPDATE_BINLOG_CONFIG: {
data = ModifyTablePropertyOperationLog.read(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
break;
}
case OperationType.OP_DYNAMIC_PARTITION:
case OperationType.OP_MODIFY_IN_MEMORY:
case OperationType.OP_MODIFY_TABLE_PROPERTIES:
case OperationType.OP_UPDATE_BINLOG_CONFIG:
case OperationType.OP_MODIFY_REPLICATION_NUM: {
ModifyTablePropertyOperationLog log = (ModifyTablePropertyOperationLog) journal.getData();
Expand Down Expand Up @@ -1831,8 +1831,8 @@ public void logModifyDefaultDistributionBucketNum(ModifyTableDefaultDistribution
logEdit(OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM, info);
}

public long logModifyInMemory(ModifyTablePropertyOperationLog info) {
return logModifyTableProperty(OperationType.OP_MODIFY_IN_MEMORY, info);
public long logModifyTableProperties(ModifyTablePropertyOperationLog info) {
return logModifyTableProperty(OperationType.OP_MODIFY_TABLE_PROPERTIES, info);
}

public long logUpdateBinlogConfig(ModifyTablePropertyOperationLog info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public class OperationType {
// modify table properties: inMemory, StoragePolicy, IsBeingSynced, CompactionPolicy,
// TimeSeriesCompactionFileCountThreshold, SeriesCompactionTimeThresholdSeconds,
// SkipWriteIndexOnLoad, EnableSingleReplicaCompaction.
public static final short OP_MODIFY_IN_MEMORY = 267;
public static final short OP_MODIFY_TABLE_PROPERTIES = 267;

// set table default distribution bucket num
public static final short OP_MODIFY_DISTRIBUTION_BUCKET_NUM = 268;
Expand Down

0 comments on commit 3f1224a

Please sign in to comment.