Skip to content

Commit

Permalink
fix copying with diff
Browse files Browse the repository at this point in the history
  • Loading branch information
nyurik committed Oct 4, 2023
1 parent b172516 commit caceae6
Show file tree
Hide file tree
Showing 49 changed files with 945 additions and 207 deletions.
164 changes: 87 additions & 77 deletions martin-mbtiles/src/copier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::mbtiles::MbtType::{Flat, FlatWithHash, Normalized};
use crate::mbtiles::{detach_db, MbtType};
use crate::queries::{
create_flat_tables, create_flat_with_hash_tables, create_normalized_tables,
create_tiles_with_hash_view,
create_tiles_with_hash_view, is_empty_database,
};
use crate::{MbtError, Mbtiles, AGG_TILES_HASH, AGG_TILES_HASH_IN_DIFF};

Expand Down Expand Up @@ -138,49 +138,48 @@ impl MbtileCopierInt {
}

pub async fn run(self) -> MbtResult<SqliteConnection> {
// src file connection is not needed after this point, as it will be attached to the dst file
// src and diff file connections are not needed later, as they will be attached to the dst file
let src_type = self.src_mbtiles.open_and_detect_type().await?;
let dif = if let Some(dif_file) = &self.options.diff_with_file {
let dif_file = Mbtiles::new(dif_file)?;
let dif_type = dif_file.open_and_detect_type().await?;
Some((dif_file, dif_type))
} else {
None
};

let mut conn = self.dst_mbtiles.open_or_new().await?;
let is_empty_db = is_empty_database(&mut conn).await?;
self.src_mbtiles.attach_to(&mut conn, "sourceDb").await?;

let src_path = self.src_mbtiles.filepath();
let dst_path = self.dst_mbtiles.filepath();
let dst_type;
if let Some((dif_mbt, dif_type)) = &dif {
if !is_empty_db {
return Err(MbtError::NonEmptyTargetFile(self.options.dst_file));
}
dst_type = self.options.dst_type.unwrap_or(src_type);
dif_mbt.attach_to(&mut conn, "diffDb").await?;
let dif_path = dif_mbt.filepath();
info!("Comparing {src_path} ({src_type}) and {dif_path} ({dif_type}) into a new file {dst_path} ({dst_type})");
} else if is_empty_db {
dst_type = self.options.dst_type.unwrap_or(src_type);
info!("Copying {src_path} ({src_type}) to a new file {dst_path} ({dst_type})");
} else {
dst_type = self.dst_mbtiles.detect_type(&mut conn).await?;
info!("Copying {src_path} ({src_type}) to an existing file {dst_path} ({dst_type})");
}

let is_empty = query!("SELECT 1 as has_rows FROM sqlite_schema LIMIT 1")
.fetch_optional(&mut conn)
.await?
.is_none();

let dst_type = if is_empty {
let dst_type = self.options.dst_type.unwrap_or(src_type);
info!(
"Copying {} ({src_type}) to a new file {} ({dst_type})",
self.options.src_file.display(),
self.options.dst_file.display()
);
if is_empty_db {
self.init_new_schema(&mut conn, src_type, dst_type).await?;
dst_type
} else if self.options.diff_with_file.is_some() {
return Err(MbtError::NonEmptyTargetFile(self.options.dst_file));
} else {
let dst_type = self.dst_mbtiles.detect_type(&mut conn).await?;
info!(
"Copying {} ({src_type}) to an existing file {} ({dst_type})",
self.options.src_file.display(),
self.options.dst_file.display()
);
self.src_mbtiles.attach_to(&mut conn, "sourceDb").await?;
dst_type
};
}

let (on_dupl, sql_cond) = self.get_on_duplicate_sql(dst_type);

let (select_from, query_args) = {
let select_from = if let Some(dif_file) = &self.options.diff_with_file {
let dif_file = Mbtiles::new(dif_file)?;
let dif_type = dif_file.open_and_detect_type().await?;
info!(
"Copying only the data not found in {} ({dif_type})",
dif_file.filepath(),
);
dif_file.attach_to(&mut conn, "diffDb").await?;
Self::get_select_from_with_diff(dst_type, dif_type)
let select_from = if let Some((_, dif_type)) = &dif {
Self::get_select_from_with_diff(dst_type, *dif_type)
} else {
Self::get_select_from(dst_type, src_type).to_string()
};
Expand All @@ -198,30 +197,34 @@ impl MbtileCopierInt {

// SAFETY: this is safe as long as handle_lock is valid
let rusqlite_conn = unsafe { rusqlite::Connection::from_handle(handle) }?;

match dst_type {
Flat => rusqlite_conn.execute(
&format!("INSERT {on_dupl} INTO tiles {select_from} {sql_cond}"),
params_from_iter(query_args),
)?,
FlatWithHash => rusqlite_conn.execute(
&format!("INSERT {on_dupl} INTO tiles_with_hash {select_from} {sql_cond}"),
params_from_iter(query_args),
)?,
Flat => {
let sql = format!("INSERT {on_dupl} INTO tiles {select_from} {sql_cond}");
debug!("Copying to {dst_type} with {sql} {query_args:?}");
rusqlite_conn.execute(&sql, params_from_iter(query_args))?
}
FlatWithHash => {
let sql =
format!("INSERT {on_dupl} INTO tiles_with_hash {select_from} {sql_cond}");
debug!("Copying to {dst_type} with {sql} {query_args:?}");
rusqlite_conn.execute(&sql, params_from_iter(query_args))?
}
Normalized => {
rusqlite_conn.execute(
&format!(
"INSERT {on_dupl} INTO map (zoom_level, tile_column, tile_row, tile_id)
let sql = format!(
"INSERT {on_dupl} INTO map (zoom_level, tile_column, tile_row, tile_id)
SELECT zoom_level, tile_column, tile_row, hash as tile_id
FROM ({select_from} {sql_cond})"
),
params_from_iter(&query_args),
)?;
rusqlite_conn.execute(
&format!(
"INSERT OR IGNORE INTO images SELECT tile_data, hash FROM ({select_from})"
),
params_from_iter(query_args),
)?
);
debug!("Copying to {dst_type} with {sql} {query_args:?}");
rusqlite_conn.execute(&sql, params_from_iter(&query_args))?;
let sql = format!(
"INSERT OR IGNORE INTO images (tile_id, tile_data)
SELECT hash as tile_id, tile_data
FROM ({select_from})"
);
debug!("Copying to {dst_type} with {sql} {query_args:?}");
rusqlite_conn.execute(&sql, params_from_iter(query_args))?
}
};

Expand Down Expand Up @@ -275,8 +278,6 @@ impl MbtileCopierInt {
.await?;
query!("VACUUM").execute(&mut *conn).await?;

self.src_mbtiles.attach_to(&mut *conn, "sourceDb").await?;

if src == dst {
// DB objects must be created in a specific order: tables, views, triggers, indexes.
debug!("Copying DB schema verbatim");
Expand Down Expand Up @@ -342,40 +343,49 @@ impl MbtileCopierInt {
}

fn get_select_from_with_diff(dst_type: MbtType, diff_type: MbtType) -> String {
let (hash_col_sql, new_tiles_with_hash) = if dst_type == Flat {
let (hash_col_sql, diff_tiles) = if dst_type == Flat {
("", "diffDb.tiles")
} else {
match diff_type {
Flat => (", hex(md5(tile_data)) as hash", "diffDb.tiles"),
FlatWithHash => (", new_tiles_with_hash.tile_hash as hash", "diffDb.tiles_with_hash"),
Normalized => (", new_tiles_with_hash.hash",
Flat => (", hex(md5(diffTiles.tile_data)) as hash", "diffDb.tiles"),
FlatWithHash => (", diffTiles.tile_hash as hash", "diffDb.tiles_with_hash"),
Normalized => (", diffTiles.hash",
"(SELECT zoom_level, tile_column, tile_row, tile_data, map.tile_id AS hash
FROM diffDb.map JOIN diffDb.images ON diffDb.map.tile_id = diffDb.images.tile_id)"),
}
};

format!("SELECT COALESCE(sourceDb.tiles.zoom_level, new_tiles_with_hash.zoom_level) as zoom_level,
COALESCE(sourceDb.tiles.tile_column, new_tiles_with_hash.tile_column) as tile_column,
COALESCE(sourceDb.tiles.tile_row, new_tiles_with_hash.tile_row) as tile_row,
new_tiles_with_hash.tile_data as tile_data
format!(
"SELECT COALESCE(sourceTiles.zoom_level, diffTiles.zoom_level) as zoom_level
, COALESCE(sourceTiles.tile_column, diffTiles.tile_column) as tile_column
, COALESCE(sourceTiles.tile_row, diffTiles.tile_row) as tile_row
, diffTiles.tile_data as tile_data
{hash_col_sql}
FROM sourceDb.tiles FULL JOIN {new_tiles_with_hash} AS new_tiles_with_hash
ON sourceDb.tiles.zoom_level = new_tiles_with_hash.zoom_level
AND sourceDb.tiles.tile_column = new_tiles_with_hash.tile_column
AND sourceDb.tiles.tile_row = new_tiles_with_hash.tile_row
WHERE (sourceDb.tiles.tile_data != new_tiles_with_hash.tile_data
OR sourceDb.tiles.tile_data ISNULL
OR new_tiles_with_hash.tile_data ISNULL)")
FROM sourceDb.tiles AS sourceTiles FULL JOIN {diff_tiles} AS diffTiles
ON sourceTiles.zoom_level = diffTiles.zoom_level
AND sourceTiles.tile_column = diffTiles.tile_column
AND sourceTiles.tile_row = diffTiles.tile_row
WHERE (sourceTiles.tile_data != diffTiles.tile_data
OR sourceTiles.tile_data ISNULL
OR diffTiles.tile_data ISNULL)"
)
}

fn get_select_from(dst_type: MbtType, src_type: MbtType) -> &'static str {
if dst_type == Flat {
"SELECT * FROM sourceDb.tiles WHERE TRUE"
"SELECT zoom_level, tile_column, tile_row, tile_data FROM sourceDb.tiles WHERE TRUE"
} else {
match src_type {
Flat => "SELECT zoom_level, tile_column, tile_row, tile_data, hex(md5(tile_data)) as hash FROM sourceDb.tiles WHERE TRUE",
FlatWithHash => "SELECT zoom_level, tile_column, tile_row, tile_data, tile_hash AS hash FROM sourceDb.tiles_with_hash WHERE TRUE",
Normalized => "SELECT zoom_level, tile_column, tile_row, tile_data, map.tile_id AS hash FROM sourceDb.map JOIN sourceDb.images ON sourceDb.map.tile_id = sourceDb.images.tile_id WHERE TRUE"
Flat => "SELECT zoom_level, tile_column, tile_row, tile_data, hex(md5(tile_data)) as hash
FROM sourceDb.tiles
WHERE TRUE",
FlatWithHash => "SELECT zoom_level, tile_column, tile_row, tile_data, tile_hash AS hash
FROM sourceDb.tiles_with_hash
WHERE TRUE",
Normalized => "SELECT zoom_level, tile_column, tile_row, tile_data, map.tile_id AS hash
FROM sourceDb.map JOIN sourceDb.images
ON sourceDb.map.tile_id = sourceDb.images.tile_id
WHERE TRUE"
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion martin-mbtiles/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub enum MbtError {
#[error("No tiles found")]
NoTilesFound,

#[error("The destination file {0} is non-empty")]
#[error("The destination file {0} is not empty. Some operations like creating a diff file require the destination file to be non-existent or empty.")]
NonEmptyTargetFile(PathBuf),

#[error("The file {0} does not have the required uniqueness constraint")]
Expand Down
11 changes: 11 additions & 0 deletions martin-mbtiles/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@ use sqlx::{query, Executor as _, SqliteExecutor};

use crate::errors::MbtResult;

/// Returns true if the database is empty (no tables/indexes/...)
pub async fn is_empty_database<T>(conn: &mut T) -> MbtResult<bool>
where
for<'e> &'e mut T: SqliteExecutor<'e>,
{
Ok(query!("SELECT 1 as has_rows FROM sqlite_schema LIMIT 1")
.fetch_optional(&mut *conn)
.await?
.is_none())
}

pub async fn is_normalized_tables_type<T>(conn: &mut T) -> MbtResult<bool>
where
for<'e> &'e mut T: SqliteExecutor<'e>,
Expand Down
74 changes: 43 additions & 31 deletions martin-mbtiles/tests/mbtiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ macro_rules! copy_to {
($function:tt, $src:expr, $dst:tt, $dst_type:expr, $skip_agg:expr) => {{
let func = stringify!($function);
let name = stringify!($dst);
let file = format!("file:{func}_{name}?mode=memory&cache=shared");
let file = format!("file:{func}--{name}?mode=memory&cache=shared");
let (dst, cn_dst) = open(&file).await?;
let mut opt = copier(&$src, &dst);
opt.skip_agg_tiles_hash = $skip_agg;
Expand All @@ -120,10 +120,10 @@ macro_rules! copy_to {
async fn copy_and_convert() -> MbtResult<()> {
let mem = Mbtiles::new(":memory:")?;

let (orig, _cn_orig) = new_source!(test_copy, orig, INSERT_METADATA_V1, INSERT_TILES_V1);
let (flat, _cn_flat, _) = copy_to!(test_copy, orig, flat, Flat);
let (hash, _cn_hash, _) = copy_to!(test_copy, orig, hash, FlatWithHash);
let (norm, _cn_norm, _) = copy_to!(test_copy, orig, norm, Normalized);
let (orig, _cn_orig) = new_source!(cp_conv, orig, INSERT_METADATA_V1, INSERT_TILES_V1);
let (flat, _cn_flat, _) = copy_to!(cp_conv, orig, flat, Flat);
let (hash, _cn_hash, _) = copy_to!(cp_conv, orig, hash, FlatWithHash);
let (norm, _cn_norm, _) = copy_to!(cp_conv, orig, norm, Normalized);

for (frm, src) in &[("flat", &flat), ("hash", &hash), ("norm", &norm)] {
// Same content, but also will include agg_tiles_hash metadata value
Expand Down Expand Up @@ -160,43 +160,55 @@ async fn copy_and_convert() -> MbtResult<()> {
#[actix_rt::test]
async fn diff_and_apply() -> MbtResult<()> {
let (orig_v1, _cn_orig_v1) =
new_source!(test_diff, orig_v1, INSERT_METADATA_V1, INSERT_TILES_V1);
let (flat_v1, _cn_flat_v1, _) = copy_hash!(test_diff, orig_v1, flat_v1, Flat);
let (hash_v1, _cn_hash_v1, _) = copy_hash!(test_diff, orig_v1, hash_v1, FlatWithHash);
let (norm_v1, _cn_norm_v1, _) = copy_hash!(test_diff, orig_v1, norm_v1, Normalized);
new_source!(dif_aply, orig_v1, INSERT_METADATA_V1, INSERT_TILES_V1);
let (flat_v1, _cn_flat_v1, _) = copy_hash!(dif_aply, orig_v1, flat_v1, Flat);
let (hash_v1, _cn_hash_v1, _) = copy_hash!(dif_aply, orig_v1, hash_v1, FlatWithHash);
let (norm_v1, _cn_norm_v1, _) = copy_hash!(dif_aply, orig_v1, norm_v1, Normalized);

let (orig_v2, _cn_orig_v2) =
new_source!(test_diff, orig_v2, INSERT_METADATA_V2, INSERT_TILES_V2);
let (flat_v2, _cn_flat_v2, dmp_flat_v2) = copy_hash!(test_diff, orig_v2, flat_v2, Flat);
let (hash_v2, _cn_hash_v2, dmp_hash_v2) = copy_hash!(test_diff, orig_v2, hash_v2, FlatWithHash);
let (norm_v2, _cn_norm_v2, dmp_norm_v2) = copy_hash!(test_diff, orig_v2, norm_v2, Normalized);
new_source!(dif_aply, orig_v2, INSERT_METADATA_V2, INSERT_TILES_V2);
let (flat_v2, _cn_flat_v2, dmp_flat_v2) = copy_hash!(dif_aply, orig_v2, flat_v2, Flat);
let (hash_v2, _cn_hash_v2, dmp_hash_v2) = copy_hash!(dif_aply, orig_v2, hash_v2, FlatWithHash);
let (norm_v2, _cn_norm_v2, dmp_norm_v2) = copy_hash!(dif_aply, orig_v2, norm_v2, Normalized);

for (frm, v1, v2, dump_v2) in &[
let types = &[
("flat", &flat_v1, &flat_v2, dmp_flat_v2),
("hash", &hash_v1, &hash_v2, dmp_hash_v2),
("norm", &norm_v1, &norm_v2, dmp_norm_v2),
] {
let (dff, _cn_dff) = open!(test_diff, format!("{frm}-dff"));
let (v2a, mut cn_v2a) = open!(test_diff, format!("{frm}-v2a"));
];

for (frm_type, v1, _, _) in types {
for (to_type, _, v2, dump_v2) in types {
let pair = format!("{frm_type}-{to_type}");
let (dff, _cn_dff) = open!(dif_aply, format!("{pair}-dff"));
let (v2a, mut cn_v2a) = open!(dif_aply, format!("{pair}-v2a"));

// Diff v1 with v2, and copy to diff anything that's different (i.e. mathematically: v2-v1)
let mut diff_with = copier(v1, &dff);
diff_with.diff_with_file = Some(path(v2));
assert_snapshot!("delta", pair, dump(&mut diff_with.run().await?).await?);

// Diff v1 with v2, and copy to diff anything that's different (i.e. mathematically: v2-v1)
let mut diff_with = copier(v1, &dff);
diff_with.diff_with_file = Some(path(v2));
assert_snapshot!(frm, "diff_v2-v1", dump(&mut diff_with.run().await?).await?);
// Copy v1 -> v2a, and apply dff to v2a
copier(v1, &v2a).run().await?;
apply_diff(path(&v2a), path(&dff)).await?;

// Copy v1 -> v2a, and apply dff to v2a
copier(v1, &v2a).run().await?;
apply_diff(path(&v2a), path(&dff)).await?;
let dump_v2a = dump(&mut cn_v2a).await?;
assert_snapshot!("applied", pair, &dump_v2a);

let dump_v2a = dump(&mut cn_v2a).await?;
assert_snapshot!(frm, "applied-diff", &dump_v2a);
let expected_dump = if frm_type != to_type {
eprintln!("TODO: implement convert copying {frm_type} -> {to_type}");
continue;
} else if frm_type == &"norm" {
eprintln!("FIXME: norm->norm diff is not working yet");
continue;
} else {
dump_v2
};

// FIXME!!!
if frm != &"norm" {
pretty_assertions::assert_eq!(
&dump_v2a,
dump_v2,
"v2a should be identical to v2 (type = {frm})"
expected_dump,
"v2a should be identical to v2 (type {frm_type} -> {to_type})"
);
}
}
Expand Down Expand Up @@ -261,7 +273,7 @@ async fn dump(conn: &mut SqliteConnection) -> MbtResult<Vec<SqliteEntry>> {
.map(|v| format!(r#""{v}""#)),
"BLOB" => row
.get::<Option<Vec<u8>>, _>(idx)
.map(|v| format!(r#""{}""#, from_utf8(&v).unwrap())),
.map(|v| format!("blob({})", from_utf8(&v).unwrap())),
_ => panic!("Unknown column type: {typ}"),
})
.unwrap_or("NULL".to_string())
Expand Down
6 changes: 3 additions & 3 deletions martin-mbtiles/tests/snapshots/[email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ CREATE TABLE tiles (
tile_data blob,
PRIMARY KEY(zoom_level, tile_column, tile_row))'''
values = [
'( 1, 0, 0, "same" )',
'( 1, 0, 1, "edit-v1" )',
'( 1, 1, 1, "remove" )',
'( 1, 0, 0, blob(same) )',
'( 1, 0, 1, blob(edit-v1) )',
'( 1, 1, 1, blob(remove) )',
]

[[]]
Expand Down
6 changes: 3 additions & 3 deletions martin-mbtiles/tests/snapshots/[email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ CREATE TABLE tiles (
tile_data blob,
PRIMARY KEY(zoom_level, tile_column, tile_row))'''
values = [
'( 1, 0, 0, "same" )',
'( 1, 0, 1, "edit-v1" )',
'( 1, 1, 1, "remove" )',
'( 1, 0, 0, blob(same) )',
'( 1, 0, 1, blob(edit-v1) )',
'( 1, 1, 1, blob(remove) )',
]

[[]]
Expand Down
Loading

0 comments on commit caceae6

Please sign in to comment.