diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index e21f85497de4..c5b7c4114aaf 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -643,7 +643,7 @@ impl<'a> Binder { }; match plan.kind() { - QueryKind::Query { .. } | QueryKind::Explain { .. } => {} + QueryKind::Query | QueryKind::Explain => {} _ => { let meta_data_guard = self.metadata.read(); let tables = meta_data_guard.tables(); diff --git a/src/query/storages/fuse/src/operations/changes.rs b/src/query/storages/fuse/src/operations/changes.rs index a114fcda3218..ed414cdb7f61 100644 --- a/src/query/storages/fuse/src/operations/changes.rs +++ b/src/query/storages/fuse/src/operations/changes.rs @@ -39,6 +39,7 @@ use databend_common_expression::BASE_BLOCK_IDS_COL_NAME; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SegmentInfo; +use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::table::ChangeType; use databend_storages_common_table_meta::table::StreamMode; use databend_storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING_BEGIN_VER; @@ -222,9 +223,8 @@ impl FuseTable { let latest_segments: HashSet<&Location> = HashSet::from_iter(&latest_snapshot.segments); - let (base_snapshot, _) = - SnapshotsIO::read_snapshot(base_location.clone(), self.get_operator()) - .await?; + let base_snapshot = + self.changes_read_offset_snapshot(base_location).await?; let base_segments = HashSet::from_iter(&base_snapshot.segments); // If the base segments are a subset of the latest segments, @@ -349,8 +349,7 @@ impl FuseTable { }; let base_segments = if let Some(snapshot) = base { - let (sn, _) = - SnapshotsIO::read_snapshot(snapshot.to_string(), self.get_operator()).await?; + let sn = self.changes_read_offset_snapshot(snapshot).await?; HashSet::from_iter(sn.segments.clone()) } else { HashSet::new() @@ -435,8 +434,7 @@ impl FuseTable { return self.table_statistics(ctx, true, None).await; }; - let (base_snapshot, _) = - SnapshotsIO::read_snapshot(base_location.clone(), self.get_operator()).await?; + let base_snapshot = self.changes_read_offset_snapshot(base_location).await?; let base_summary = base_snapshot.summary.clone(); let latest_summary = if let Some(snapshot) = self.read_table_snapshot().await? { snapshot.summary.clone() @@ -499,6 +497,19 @@ impl FuseTable { } } } + + pub async fn changes_read_offset_snapshot( + &self, + base_location: &String, + ) -> Result> { + match SnapshotsIO::read_snapshot(base_location.to_string(), self.get_operator()).await { + Ok((base_snapshot, _)) => Ok(base_snapshot), + Err(_) => Err(ErrorCode::IllegalStream(format!( + "Failed to read the offset snapshot: {:?}, maybe purged", + base_location + ))), + } + } } fn replace_push_downs( diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 618366ad3f80..edc0a5c76124 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -480,22 +480,8 @@ impl FuseTable { } } - #[inline] - pub fn is_error_recoverable(e: &ErrorCode, is_table_transient: bool) -> bool { - let code = e.code(); - code == ErrorCode::TABLE_VERSION_MISMATCHED - || (is_table_transient && code == ErrorCode::STORAGE_NOT_FOUND) - } - - #[inline] - pub fn no_side_effects_in_meta_store(e: &ErrorCode) -> bool { - // currently, the only error that we know, which indicates there are no side effects - // is TABLE_VERSION_MISMATCHED - e.code() == ErrorCode::TABLE_VERSION_MISMATCHED - } - // check if there are any fuse table legacy options - pub fn remove_legacy_options(table_options: &mut BTreeMap) { + fn remove_legacy_options(table_options: &mut BTreeMap) { table_options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC); } } diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index dd1a23cad263..551d0aa2c1c3 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -138,12 +138,21 @@ where F: SnapshotGenerator + Send + 'static } fn is_error_recoverable(&self, e: &ErrorCode) -> bool { + let code = e.code(); // When prev_snapshot_id is some, means it is an alter table column modification or truncate. // In this case if commit to meta fail and error is TABLE_VERSION_MISMATCHED operation will be aborted. - if self.prev_snapshot_id.is_some() && e.code() == ErrorCode::TABLE_VERSION_MISMATCHED { + if self.prev_snapshot_id.is_some() && code == ErrorCode::TABLE_VERSION_MISMATCHED { return false; } - FuseTable::is_error_recoverable(e, self.purge) + + code == ErrorCode::TABLE_VERSION_MISMATCHED + || (self.purge && code == ErrorCode::STORAGE_NOT_FOUND) + } + + fn no_side_effects_in_meta_store(e: &ErrorCode) -> bool { + // currently, the only error that we know, which indicates there are no side effects + // is TABLE_VERSION_MISMATCHED + e.code() == ErrorCode::TABLE_VERSION_MISMATCHED } fn read_meta(&mut self) -> Result { @@ -469,7 +478,7 @@ where F: SnapshotGenerator + Send + 'static None => { // Commit not fulfilled. try to abort the operations. // if it is safe to do so. - if FuseTable::no_side_effects_in_meta_store(&e) { + if Self::no_side_effects_in_meta_store(&e) { // if we are sure that table state inside metastore has not been // modified by this operation, abort this operation. self.state = State::Abort(e); diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index b43a08db11f8..e3f0bbec4c8d 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -42,7 +42,6 @@ use databend_common_pipeline_core::Pipeline; use databend_common_sql::binder::STREAM_COLUMN_FACTORY; use databend_common_storages_fuse::io::MetaReaders; use databend_common_storages_fuse::io::SnapshotHistoryReader; -use databend_common_storages_fuse::io::SnapshotsIO; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::FuseTable; use databend_storages_common_table_meta::table::ChangeType; @@ -173,8 +172,7 @@ impl StreamTable { fuse_table.check_changes_valid(source_desc, self.offset()?)?; let (base_row_count, base_timsestamp) = if let Some(base_loc) = self.snapshot_loc() { - let (base, _) = - SnapshotsIO::read_snapshot(base_loc.to_string(), fuse_table.get_operator()).await?; + let base = fuse_table.changes_read_offset_snapshot(&base_loc).await?; (base.summary.row_count, base.timestamp) } else { (0, None) diff --git a/src/query/storages/system/src/streams_table.rs b/src/query/storages/system/src/streams_table.rs index 614839112377..7ac2b487582b 100644 --- a/src/query/storages/system/src/streams_table.rs +++ b/src/query/storages/system/src/streams_table.rs @@ -39,7 +39,6 @@ use databend_common_meta_app::principal::OwnershipObject; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; -use databend_common_storages_fuse::io::SnapshotsIO; use databend_common_storages_fuse::operations::acquire_task_permit; use databend_common_storages_fuse::FuseTable; use databend_common_storages_stream::stream_table::StreamTable; @@ -250,13 +249,11 @@ impl AsyncSystemTable for StreamsTable { let fuse_table = FuseTable::try_from_table(source.as_ref()).unwrap(); if let Some(location) = stream_table.snapshot_loc() { - reason = SnapshotsIO::read_snapshot( - location, - fuse_table.get_operator(), - ) - .await - .err() - .map_or("".to_string(), |e| e.display_text()); + reason = fuse_table + .changes_read_offset_snapshot(&location) + .await + .err() + .map_or("".to_string(), |e| e.display_text()); } } Err(e) => {