Skip to content

Commit

Permalink
chore(storage): refine error message for stream read offset snapshot (#โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ16964)

* chore: refine error message for stream read offset snapshot

* fix
  • Loading branch information
zhyass authored Nov 28, 2024
1 parent 0c3c0e1 commit 8cbda70
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/binder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ impl<'a> Binder {
};

match plan.kind() {
QueryKind::Query { .. } | QueryKind::Explain { .. } => {}
QueryKind::Query | QueryKind::Explain => {}
_ => {
let meta_data_guard = self.metadata.read();
let tables = meta_data_guard.tables();
Expand Down
25 changes: 18 additions & 7 deletions src/query/storages/fuse/src/operations/changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use databend_common_expression::BASE_BLOCK_IDS_COL_NAME;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::table::ChangeType;
use databend_storages_common_table_meta::table::StreamMode;
use databend_storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING_BEGIN_VER;
Expand Down Expand Up @@ -222,9 +223,8 @@ impl FuseTable {
let latest_segments: HashSet<&Location> =
HashSet::from_iter(&latest_snapshot.segments);

let (base_snapshot, _) =
SnapshotsIO::read_snapshot(base_location.clone(), self.get_operator())
.await?;
let base_snapshot =
self.changes_read_offset_snapshot(base_location).await?;
let base_segments = HashSet::from_iter(&base_snapshot.segments);

// If the base segments are a subset of the latest segments,
Expand Down Expand Up @@ -349,8 +349,7 @@ impl FuseTable {
};

let base_segments = if let Some(snapshot) = base {
let (sn, _) =
SnapshotsIO::read_snapshot(snapshot.to_string(), self.get_operator()).await?;
let sn = self.changes_read_offset_snapshot(snapshot).await?;
HashSet::from_iter(sn.segments.clone())
} else {
HashSet::new()
Expand Down Expand Up @@ -435,8 +434,7 @@ impl FuseTable {
return self.table_statistics(ctx, true, None).await;
};

let (base_snapshot, _) =
SnapshotsIO::read_snapshot(base_location.clone(), self.get_operator()).await?;
let base_snapshot = self.changes_read_offset_snapshot(base_location).await?;
let base_summary = base_snapshot.summary.clone();
let latest_summary = if let Some(snapshot) = self.read_table_snapshot().await? {
snapshot.summary.clone()
Expand Down Expand Up @@ -499,6 +497,19 @@ impl FuseTable {
}
}
}

pub async fn changes_read_offset_snapshot(
&self,
base_location: &String,
) -> Result<Arc<TableSnapshot>> {
match SnapshotsIO::read_snapshot(base_location.to_string(), self.get_operator()).await {
Ok((base_snapshot, _)) => Ok(base_snapshot),
Err(_) => Err(ErrorCode::IllegalStream(format!(
"Failed to read the offset snapshot: {:?}, maybe purged",
base_location
))),
}
}
}

fn replace_push_downs(
Expand Down
16 changes: 1 addition & 15 deletions src/query/storages/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,22 +480,8 @@ impl FuseTable {
}
}

#[inline]
pub fn is_error_recoverable(e: &ErrorCode, is_table_transient: bool) -> bool {
let code = e.code();
code == ErrorCode::TABLE_VERSION_MISMATCHED
|| (is_table_transient && code == ErrorCode::STORAGE_NOT_FOUND)
}

#[inline]
pub fn no_side_effects_in_meta_store(e: &ErrorCode) -> bool {
// currently, the only error that we know, which indicates there are no side effects
// is TABLE_VERSION_MISMATCHED
e.code() == ErrorCode::TABLE_VERSION_MISMATCHED
}

// check if there are any fuse table legacy options
pub fn remove_legacy_options(table_options: &mut BTreeMap<String, String>) {
fn remove_legacy_options(table_options: &mut BTreeMap<String, String>) {
table_options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,21 @@ where F: SnapshotGenerator + Send + 'static
}

fn is_error_recoverable(&self, e: &ErrorCode) -> bool {
let code = e.code();
// When prev_snapshot_id is some, means it is an alter table column modification or truncate.
// In this case if commit to meta fail and error is TABLE_VERSION_MISMATCHED operation will be aborted.
if self.prev_snapshot_id.is_some() && e.code() == ErrorCode::TABLE_VERSION_MISMATCHED {
if self.prev_snapshot_id.is_some() && code == ErrorCode::TABLE_VERSION_MISMATCHED {
return false;
}
FuseTable::is_error_recoverable(e, self.purge)

code == ErrorCode::TABLE_VERSION_MISMATCHED
|| (self.purge && code == ErrorCode::STORAGE_NOT_FOUND)
}

fn no_side_effects_in_meta_store(e: &ErrorCode) -> bool {
// currently, the only error that we know, which indicates there are no side effects
// is TABLE_VERSION_MISMATCHED
e.code() == ErrorCode::TABLE_VERSION_MISMATCHED
}

fn read_meta(&mut self) -> Result<Event> {
Expand Down Expand Up @@ -469,7 +478,7 @@ where F: SnapshotGenerator + Send + 'static
None => {
// Commit not fulfilled. try to abort the operations.
// if it is safe to do so.
if FuseTable::no_side_effects_in_meta_store(&e) {
if Self::no_side_effects_in_meta_store(&e) {
// if we are sure that table state inside metastore has not been
// modified by this operation, abort this operation.
self.state = State::Abort(e);
Expand Down
4 changes: 1 addition & 3 deletions src/query/storages/stream/src/stream_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use databend_common_pipeline_core::Pipeline;
use databend_common_sql::binder::STREAM_COLUMN_FACTORY;
use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::SnapshotHistoryReader;
use databend_common_storages_fuse::io::SnapshotsIO;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_table_meta::table::ChangeType;
Expand Down Expand Up @@ -173,8 +172,7 @@ impl StreamTable {
fuse_table.check_changes_valid(source_desc, self.offset()?)?;

let (base_row_count, base_timsestamp) = if let Some(base_loc) = self.snapshot_loc() {
let (base, _) =
SnapshotsIO::read_snapshot(base_loc.to_string(), fuse_table.get_operator()).await?;
let base = fuse_table.changes_read_offset_snapshot(&base_loc).await?;
(base.summary.row_count, base.timestamp)
} else {
(0, None)
Expand Down
13 changes: 5 additions & 8 deletions src/query/storages/system/src/streams_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use databend_common_meta_app::principal::OwnershipObject;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
use databend_common_storages_fuse::io::SnapshotsIO;
use databend_common_storages_fuse::operations::acquire_task_permit;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_stream::stream_table::StreamTable;
Expand Down Expand Up @@ -250,13 +249,11 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {
let fuse_table =
FuseTable::try_from_table(source.as_ref()).unwrap();
if let Some(location) = stream_table.snapshot_loc() {
reason = SnapshotsIO::read_snapshot(
location,
fuse_table.get_operator(),
)
.await
.err()
.map_or("".to_string(), |e| e.display_text());
reason = fuse_table
.changes_read_offset_snapshot(&location)
.await
.err()
.map_or("".to_string(), |e| e.display_text());
}
}
Err(e) => {
Expand Down

0 comments on commit 8cbda70

Please sign in to comment.