Skip to content

Commit

Permalink
coord: make DataflowBuilder::import_into_dataflow stack safe
Browse files Browse the repository at this point in the history
Fixes #8599.
  • Loading branch information
aalexandrov committed Dec 2, 2021
1 parent ee8dcc3 commit f43b016
Showing 1 changed file with 88 additions and 80 deletions.
168 changes: 88 additions & 80 deletions src/coord/src/coord/dataflow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! isolates that logic from the rest of the somewhat complicated coordinator.
use super::*;
use ore::stack::maybe_grow;

/// Borrows of catalog and indexes sufficient to build dataflow descriptions.
pub struct DataflowBuilder<'a> {
Expand Down Expand Up @@ -66,98 +67,105 @@ impl<'a> DataflowBuilder<'a> {
/// Imports the view, source, or table with `id` into the provided
/// dataflow description.
fn import_into_dataflow(&mut self, id: &GlobalId, dataflow: &mut DataflowDesc) {
// Avoid importing the item redundantly.
if dataflow.is_imported(id) {
return;
}
maybe_grow(|| {
// Avoid importing the item redundantly.
if dataflow.is_imported(id) {
return;
}

// A valid index is any index on `id` that is known to the dataflow
// layer, as indicated by its presence in `self.indexes`.
let valid_index = self.catalog.enabled_indexes()[id]
.iter()
.find(|(id, _keys)| self.indexes.contains_key(*id));
if let Some((index_id, keys)) = valid_index {
let index_desc = IndexDesc {
on_id: *id,
keys: keys.to_vec(),
};
let desc = self
.catalog
.get_by_id(id)
.desc()
.expect("indexes can only be built on items with descs");
dataflow.import_index(*index_id, index_desc, desc.typ().clone(), *id);
} else {
// This is only needed in the case of a source with a transformation, but we generate it now to
// get around borrow checker issues.
let transient_id = *self.transient_id_counter;
*self.transient_id_counter = transient_id
.checked_add(1)
.expect("id counter overflows i64");
let entry = self.catalog.get_by_id(id);
match entry.item() {
CatalogItem::Table(table) => {
dataflow.import_source(
*id,
dataflow_types::SourceDesc {
name: entry.name().to_string(),
connector: SourceConnector::Local {
timeline: table.timeline(),
},
operators: None,
bare_desc: table.desc.clone(),
persisted_name: table.persist.as_ref().map(|p| p.stream_name.clone()),
},
*id,
);
}
CatalogItem::Source(source) => {
if source.optimized_expr.0.is_trivial_source() {
// A valid index is any index on `id` that is known to the dataflow
// layer, as indicated by its presence in `self.indexes`.
let valid_index = self.catalog.enabled_indexes()[id]
.iter()
.find(|(id, _keys)| self.indexes.contains_key(*id));
if let Some((index_id, keys)) = valid_index {
let index_desc = IndexDesc {
on_id: *id,
keys: keys.to_vec(),
};
let desc = self
.catalog
.get_by_id(id)
.desc()
.expect("indexes can only be built on items with descs");
dataflow.import_index(*index_id, index_desc, desc.typ().clone(), *id);
} else {
// This is only needed in the case of a source with a transformation, but we generate it now to
// get around borrow checker issues.
let transient_id = *self.transient_id_counter;
*self.transient_id_counter = transient_id
.checked_add(1)
.expect("id counter overflows i64");
let entry = self.catalog.get_by_id(id);
match entry.item() {
CatalogItem::Table(table) => {
dataflow.import_source(
*id,
dataflow_types::SourceDesc {
name: entry.name().to_string(),
connector: source.connector.clone(),
operators: None,
bare_desc: source.bare_desc.clone(),
persisted_name: source.persist_name.clone(),
},
*id,
);
} else {
// From the dataflow layer's perspective, the source transformation is just a view (across which it should be able to do whole-dataflow optimizations).
// Install it as such (giving the source a global transient ID by which the view/transformation can refer to it)
let bare_source_id = GlobalId::Transient(transient_id);
dataflow.import_source(
bare_source_id,
dataflow_types::SourceDesc {
name: entry.name().to_string(),
connector: source.connector.clone(),
connector: SourceConnector::Local {
timeline: table.timeline(),
},
operators: None,
bare_desc: source.bare_desc.clone(),
persisted_name: source.persist_name.clone(),
bare_desc: table.desc.clone(),
persisted_name: table
.persist
.as_ref()
.map(|p| p.stream_name.clone()),
},
*id,
);
let mut transformation = source.optimized_expr.clone();
transformation.0.visit_mut_post(&mut |node| {
match node {
MirRelationExpr::Get { id, .. } if *id == Id::LocalBareSource => {
*id = Id::Global(bare_source_id);
}
_ => {}
};
});
self.import_view_into_dataflow(id, &transformation, dataflow);
}
CatalogItem::Source(source) => {
if source.optimized_expr.0.is_trivial_source() {
dataflow.import_source(
*id,
dataflow_types::SourceDesc {
name: entry.name().to_string(),
connector: source.connector.clone(),
operators: None,
bare_desc: source.bare_desc.clone(),
persisted_name: source.persist_name.clone(),
},
*id,
);
} else {
// From the dataflow layer's perspective, the source transformation is just a view (across which it should be able to do whole-dataflow optimizations).
// Install it as such (giving the source a global transient ID by which the view/transformation can refer to it)
let bare_source_id = GlobalId::Transient(transient_id);
dataflow.import_source(
bare_source_id,
dataflow_types::SourceDesc {
name: entry.name().to_string(),
connector: source.connector.clone(),
operators: None,
bare_desc: source.bare_desc.clone(),
persisted_name: source.persist_name.clone(),
},
*id,
);
let mut transformation = source.optimized_expr.clone();
transformation.0.visit_mut_post(&mut |node| {
match node {
MirRelationExpr::Get { id, .. }
if *id == Id::LocalBareSource =>
{
*id = Id::Global(bare_source_id);
}
_ => {}
};
});
self.import_view_into_dataflow(id, &transformation, dataflow);
}
}
CatalogItem::View(view) => {
let expr = view.optimized_expr.clone();
self.import_view_into_dataflow(id, &expr, dataflow);
}
_ => unreachable!(),
}
CatalogItem::View(view) => {
let expr = view.optimized_expr.clone();
self.import_view_into_dataflow(id, &expr, dataflow);
}
_ => unreachable!(),
}
}
})
}

/// Imports the view with the specified ID and expression into the provided
Expand Down

0 comments on commit f43b016

Please sign in to comment.