diff --git a/sqlx-data.json b/sqlx-data.json index 75367b46..b28f99e6 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -2675,173 +2675,5 @@ } }, "query": "\n SELECT COUNT(DISTINCT label) as total FROM (\n SELECT\n *,\n bool_or(removed) OVER (\n PARTITION BY room_id, set, label\n ORDER BY occurred_at DESC\n ) AS removed_windowed\n FROM event\n WHERE deleted_at IS NULL\n AND room_id = $1\n AND set = $2\n AND original_occurred_at < $3\n AND occurred_at < COALESCE($4, 9223372036854775807)\n ORDER BY original_occurred_at DESC, label ASC, occurred_at DESC\n ) subq\n WHERE removed_windowed = 'f'\n " - }, - "ffe64f4e3f8afb3e66ea3a6cb116091345e328c2dabe01b07a9b7888e3f37667": { - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Uuid" - }, - { - "name": "room_id", - "ordinal": 1, - "type_info": "Uuid" - }, - { - "name": "kind", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "set", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "label", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "attribute", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "data", - "ordinal": 6, - "type_info": "Jsonb" - }, - { - "name": "binary_data: PostcardBin", - "ordinal": 7, - "type_info": "Bytea" - }, - { - "name": "occurred_at", - "ordinal": 8, - "type_info": "Int8" - }, - { - "name": "created_by!: AgentId", - "ordinal": 9, - "type_info": { - "Custom": { - "kind": { - "Composite": [ - [ - "account_id", - { - "Custom": { - "kind": { - "Composite": [ - [ - "label", - "Text" - ], - [ - "audience", - "Text" - ] - ] - }, - "name": "account_id" - } - } - ], - [ - "label", - "Text" - ] - ] - }, - "name": "agent_id" - } - } - }, - { - "name": "created_at", - "ordinal": 10, - "type_info": "Timestamptz" - }, - { - "name": "deleted_at", - "ordinal": 11, - "type_info": "Timestamptz" - }, - { - "name": "original_occurred_at", - "ordinal": 12, - "type_info": "Int8" - }, - { - "name": "original_created_by: AgentId", - "ordinal": 13, - "type_info": { - "Custom": { - "kind": { - "Composite": [ - [ - "account_id", - { - "Custom": { - "kind": { - "Composite": [ - [ - "label", - "Text" - ], - [ - "audience", - "Text" - ] - ] - }, - "name": "account_id" - } - } - ], - [ - "label", - "Text" - ] - ] - }, - "name": "agent_id" - } - } - }, - { - "name": "removed", - "ordinal": 14, - "type_info": "Bool" - } - ], - "nullable": [ - false, - false, - false, - false, - true, - true, - true, - true, - false, - false, - false, - true, - false, - false, - false - ], - "parameters": { - "Left": [ - "Int8" - ] - } - }, - "query": "\n SELECT\n id,\n room_id,\n kind,\n set,\n label,\n attribute,\n data,\n binary_data AS \"binary_data: PostcardBin\",\n occurred_at,\n created_by AS \"created_by!: AgentId\",\n created_at,\n deleted_at,\n original_occurred_at,\n original_created_by as \"original_created_by: AgentId\",\n removed\n FROM event\n WHERE room_id = (\n SELECT r.id FROM room AS r\n INNER JOIN event AS e\n ON r.id = e.room_id\n WHERE e.data IS NULL\n AND e.kind = 'draw'\n GROUP BY r.id\n LIMIT 1\n )\n AND data IS NULL\n AND kind = 'draw'\n LIMIT $1\n " } } \ No newline at end of file diff --git a/src/db/event/mod.rs b/src/db/event/mod.rs index 66f76b90..b69edc79 100644 --- a/src/db/event/mod.rs +++ b/src/db/event/mod.rs @@ -878,49 +878,6 @@ pub(crate) async fn select_not_encoded_events( .await } -pub(crate) async fn select_not_decoded_events( - limit: i64, - conn: &mut PgConnection, -) -> sqlx::Result> { - sqlx::query_as!( - RawObject, - r#" - SELECT - id, - room_id, - kind, - set, - label, - attribute, - data, - binary_data AS "binary_data: PostcardBin", - occurred_at, - created_by AS "created_by!: AgentId", - created_at, - deleted_at, - original_occurred_at, - original_created_by as "original_created_by: AgentId", - removed - FROM event - WHERE room_id = ( - SELECT r.id FROM room AS r - INNER JOIN event AS e - ON r.id = e.room_id - WHERE e.data IS NULL - AND e.kind = 'draw' - GROUP BY r.id - LIMIT 1 - ) - AND data IS NULL - AND kind = 'draw' - LIMIT $1 - "#, - limit - ) - .fetch_all(conn) - .await -} - mod binary_encoding; mod schema; mod set_state; diff --git a/src/main.rs b/src/main.rs index d5309660..1a73cc21 100644 --- a/src/main.rs +++ b/src/main.rs @@ -114,7 +114,7 @@ async fn main() -> Result<()> { match var("EVENT_MIGRATE_TO_BINARY") { Ok(_) => migration_to_binary_format::migrate_to_binary(db).await, Err(_) => match var("EVENT_MIGRATE_TO_JSON") { - Ok(_) => migration_to_binary_format::migrate_to_json(db).await, + Ok(dir) => migration_to_binary_format::migrate_to_json(db, dir).await, Err(_) => app::run(db, maybe_ro_db, redis_pool, authz_cache).await, }, } diff --git a/src/migration_to_binary_format.rs b/src/migration_to_binary_format.rs index c3a63122..2065c604 100644 --- a/src/migration_to_binary_format.rs +++ b/src/migration_to_binary_format.rs @@ -1,10 +1,15 @@ -use std::sync::{atomic::AtomicBool, Arc}; +use std::{ + ffi::OsStr, + str::FromStr, + sync::{atomic::AtomicBool, Arc}, +}; use anyhow::Result; use sqlx::postgres::{PgConnection, PgPool as Db}; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; +use uuid::Uuid; -use crate::db::event::{select_not_decoded_events, select_not_encoded_events}; +use crate::db::event::select_not_encoded_events; async fn disable_autovacuum(conn: &mut PgConnection) -> sqlx::Result<()> { sqlx::query!( @@ -125,6 +130,12 @@ pub(crate) async fn migrate_to_binary(db: Db) -> Result<()> { Ok(()) } +#[derive(serde::Serialize, serde::Deserialize)] +struct Event { + data: serde_json::Value, + id: Uuid, +} + async fn do_migrate_to_binary(db: Db, stop: Arc) -> Result<()> { let mut conn = db.acquire().await?; create_temp_table_binary(&mut conn).await?; @@ -154,7 +165,7 @@ async fn do_migrate_to_binary(db: Db, stop: Arc) -> Result<()> { match evt.encode_to_binary() { Ok((id, Some(data), Some(binary_data))) => match binary_data.to_bytes() { Ok(binary_data) => { - let evt_data = serde_json::to_vec(&data)?; + let evt_data = serde_json::to_vec(&Event { data, id })?; file.write_all(&evt_data).await?; file.write(b"\n").await?; @@ -242,14 +253,14 @@ async fn update_event_data_json(conn: &mut PgConnection) -> sqlx::Result<()> { Ok(()) } -pub(crate) async fn migrate_to_json(db: Db) -> Result<()> { +pub(crate) async fn migrate_to_json(db: Db, dir: String) -> Result<()> { { let mut conn = db.acquire().await?; disable_autovacuum(&mut conn).await?; } let stop = Arc::new(AtomicBool::new(false)); - let handle = tokio::spawn(do_migrate_to_json(db.clone(), stop.clone())); + let handle = tokio::spawn(do_migrate_to_json(db.clone(), dir, stop.clone())); tokio::spawn(async move { if let Err(err) = tokio::signal::ctrl_c().await { tracing::error!(%err, "error on signal"); @@ -269,38 +280,38 @@ pub(crate) async fn migrate_to_json(db: Db) -> Result<()> { Ok(()) } -async fn do_migrate_to_json(db: Db, stop: Arc) -> Result<()> { +async fn do_migrate_to_json(db: Db, dir: String, stop: Arc) -> Result<()> { let mut conn = db.acquire().await?; create_temp_table_json(&mut conn).await?; - loop { - let events = select_not_decoded_events(100_000, &mut conn).await?; - if events.is_empty() { - tracing::info!("DONE"); - break; + let mut files = tokio::fs::read_dir(dir).await?; + while let Some(entry) = files.next_entry().await? { + if entry.path().extension() != Some(OsStr::new("json")) { + continue; } - let mut event_ids = Vec::with_capacity(events.len()); - let mut event_data = Vec::with_capacity(events.len()); - - for evt in events { - match evt.decode_from_binary() { - Ok((id, Some(data))) => { - event_ids.push(id); - event_data.push(data); - } - Ok(_) => {} - Err(err) => { - tracing::error!(%err, "failed to decode event"); - } - } + let path = entry.path(); + let room_id = match path.file_stem().and_then(|s| s.to_str()) { + Some(room_id) => room_id, + None => continue, + }; + if let Err(_) = Uuid::from_str(room_id) { + continue; } - if event_ids.is_empty() { - tracing::info!("failed to decode all events"); - break; + let f = tokio::fs::File::open(path).await?; + let f = tokio::io::BufReader::new(f); + let mut lines = f.lines(); + + let mut event_ids = Vec::new(); + let mut event_data = Vec::new(); + + while let Some(line) = lines.next_line().await? { + let event: Event = serde_json::from_str(&line)?; + event_ids.push(event.id); + event_data.push(event.data); } -// TODO: load from files + insert_data_into_temp_table_json(event_ids, event_data, &mut conn).await?; update_event_data_json(&mut conn).await?; cleanup_temp_table(&mut conn).await?;