Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nyurik committed Nov 17, 2023
1 parent a35c61f commit e52b0a9
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 41 deletions.
14 changes: 9 additions & 5 deletions martin/src/args/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
mod connections;
mod environment;
mod pg;
mod root;
mod srv;

pub use connections::{Arguments, State};

mod environment;
pub use environment::{Env, OsEnv};

mod pg;
pub use pg::{BoundsCalcType, PgArgs, DEFAULT_BOUNDS_TIMEOUT};

mod root;
pub use root::{Args, ExtraArgs, MetaArgs};

mod srv;
pub use srv::SrvArgs;
94 changes: 59 additions & 35 deletions martin/src/bin/martin-cp.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
use futures::stream::{self, StreamExt};
use mbtiles::{
create_flat_tables, create_flat_with_hash_tables, create_normalized_tables,
create_tiles_with_hash_view, is_empty_database, MbtTypeCli, Mbtiles,
};
use std::fmt::Display;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use tokio::sync::mpsc::channel;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

use clap::Parser;
use futures::stream::{self, StreamExt};
use futures::TryStreamExt;
use log::{error, info, log_enabled};
use martin::args::{Args, MetaArgs, OsEnv, PgArgs};
use log::{debug, error, info, log_enabled};
use martin::args::{Args, BoundsCalcType, ExtraArgs, MetaArgs, OsEnv, PgArgs, SrvArgs};
use martin::srv::{get_composite_tile, RESERVED_KEYWORDS};
use martin::{append_rect, Error, TileRect, Xyz};
use martin::{read_config, Config, IdResolver, Result, ServerState};
use martin::{
append_rect, read_config, Config, Error, IdResolver, Result, ServerState, TileRect, Xyz,
};
use mbtiles::{
create_flat_tables, create_flat_with_hash_tables, create_normalized_tables,
create_tiles_with_hash_view, is_empty_database, MbtTypeCli, Mbtiles,
};
use tilejson::Bounds;
use tokio::sync::mpsc::channel;
use tokio::time::Instant;
use tokio::try_join;

const VERSION: &str = env!("CARGO_PKG_VERSION");
const PROGRESS_REPORT_EVERY: u64 = 1000;

#[derive(Parser, Debug, PartialEq, Default)]
#[command(about, version)]
Expand Down Expand Up @@ -52,17 +55,18 @@ pub struct CopyArgs {
#[arg(long)]
pub bbox: Vec<Bounds>,
/// Minimum zoom level to copy
#[arg(long, conflicts_with("zoom_levels"))]
#[arg(long, alias = "minzoom", conflicts_with("zoom_levels"))]
pub min_zoom: Option<u8>,
/// Maximum zoom level to copy
#[arg(
long,
alias = "maxzoom",
conflicts_with("zoom_levels"),
required_unless_present("zoom_levels")
)]
pub max_zoom: Option<u8>,
/// List of zoom levels to copy
#[arg(short, long, value_delimiter = ',')]
#[arg(short, long, alias = "zooms", value_delimiter = ',')]
pub zoom_levels: Vec<u8>,
}

Expand All @@ -79,13 +83,19 @@ async fn start(copy_args: CopierArgs) -> Result<()> {
Config::default()
};

let args = Args {
let mut args = Args {
meta: copy_args.meta,
extras: Default::default(),
srv: Default::default(),
extras: ExtraArgs::default(),
srv: SrvArgs::default(),
pg: copy_args.pg,
};

if let Some(ref mut pg_args) = args.pg {
if pg_args.auto_bounds.is_none() {
pg_args.auto_bounds = Some(BoundsCalcType::Skip);
}
}

args.merge_into_config(&mut config, &env)?;
config.finalize()?;
let sources = config.resolve(IdResolver::new(RESERVED_KEYWORDS)).await?;
Expand All @@ -101,7 +111,7 @@ async fn start(copy_args: CopierArgs) -> Result<()> {

/// Convert longitude and latitude to tile index
fn tile_index(lon: f64, lat: f64, zoom: u8) -> (u32, u32) {
let n = 2.0_f64.powi(zoom as i32);
let n = f64::from(1 << zoom);
let x = ((lon + 180.0) / 360.0 * n).floor() as u32;
let y = ((1.0
- (lat.to_radians().tan() + 1.0 / lat.to_radians().cos()).ln() / std::f64::consts::PI)
Expand Down Expand Up @@ -146,18 +156,21 @@ struct Tile {
}

struct Progress {
// needed to compute elapsed time
start_time: Instant,
total: u64,
done_empty: AtomicU64,
done_tiles: AtomicU64,
empty: AtomicU64,
non_empty: AtomicU64,
}

impl Progress {
pub fn new(tiles: &[TileRect]) -> Self {
let total = tiles.iter().map(TileRect::size).sum();
Progress {
start_time: Instant::now(),
total,
done_empty: AtomicU64::default(),
done_tiles: AtomicU64::default(),
empty: AtomicU64::default(),
non_empty: AtomicU64::default(),
}
}
}
Expand Down Expand Up @@ -229,19 +242,28 @@ pub async fn run_tile_copy(args: CopyArgs, state: ServerState) -> Result<()> {
let mut batch = Vec::new();
while let Some(tile) = rx.recv().await {
let done = if tile.data.is_empty() {
info!("Empty tile {}", tile.xyz);
progress.done_empty.fetch_add(1, Ordering::Relaxed)
debug!("Empty tile {}", tile.xyz);
progress.empty.fetch_add(1, Ordering::Relaxed)
} else {
batch.push((tile.xyz.z, tile.xyz.x, tile.xyz.y, tile.data));
info!("Got tile {}", tile.xyz);
progress.done_tiles.fetch_add(1, Ordering::Relaxed)
debug!("Got tile {}", tile.xyz);
progress.non_empty.fetch_add(1, Ordering::Relaxed)
};
if done % 1000 == 0 {
if done % PROGRESS_REPORT_EVERY == (PROGRESS_REPORT_EVERY - 1) {
let non_empty = progress.non_empty.load(Ordering::Relaxed);
let empty = progress.empty.load(Ordering::Relaxed);
let done = non_empty + empty;
let total = progress.total;
let elapsed = progress.start_time.elapsed();
let elapsed_s = elapsed.as_secs_f32();
let left = Duration::from_secs_f32(if non_empty > 0 {
elapsed_s * (total - done) as f32 / done as f32
} else {
0.0
});
info!(
"Copied {}/{} of {} tiles",
progress.done_tiles.load(Ordering::Relaxed),
progress.done_empty.load(Ordering::Relaxed),
progress.total
"[{elapsed:.1?}] {percent:.2}% / {total} | ✓ {non_empty} □ {empty} | {left:.0?} left",
percent = done * 100 / total
);
mbt.insert_tiles(&mut conn, batch.as_slice()).await?;
batch.clear();
Expand All @@ -256,8 +278,8 @@ pub async fn run_tile_copy(args: CopyArgs, state: ServerState) -> Result<()> {

info!(
"Finished copying {}/{} of {} tiles",
progress.done_tiles.load(Ordering::Relaxed),
progress.done_empty.load(Ordering::Relaxed),
progress.non_empty.load(Ordering::Relaxed),
progress.empty.load(Ordering::Relaxed),
progress.total
);

Expand Down Expand Up @@ -286,10 +308,12 @@ fn on_error<E: Display>(e: E) -> ! {

#[cfg(test)]
mod tests {
use super::*;
use insta::assert_yaml_snapshot;
use std::str::FromStr;

use insta::assert_yaml_snapshot;

use super::*;

#[test]
fn test_tile_index() {
assert_eq!((0, 0), tile_index(-180.0, 85.0511, 0));
Expand Down
3 changes: 2 additions & 1 deletion martin/src/utils/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use mbtiles::MbtError;
use std::fmt::Write;
use std::io;
use std::path::PathBuf;

use mbtiles::MbtError;

use crate::file_config::FileError;
use crate::fonts::FontError;
use crate::pg::PgError;
Expand Down

0 comments on commit e52b0a9

Please sign in to comment.