diff --git a/sqlx-data.json b/sqlx-data.json index b28f99e6..538e0180 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -1635,6 +1635,175 @@ }, "query": "\n SELECT id, source_room_id, created_by AS \"created_by!: AgentId\", created_at\n FROM edition\n WHERE source_room_id = $1\n AND created_at > COALESCE($2, TO_TIMESTAMP(0))\n ORDER BY created_at DESC\n LIMIT $3\n " }, + "9592ce215031cc32ba20e885782aa6900dc99f052b522f7495861b1a28ff9b39": { + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Uuid" + }, + { + "name": "room_id", + "ordinal": 1, + "type_info": "Uuid" + }, + { + "name": "kind", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "set", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "label", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "attribute", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "data", + "ordinal": 6, + "type_info": "Jsonb" + }, + { + "name": "binary_data: PostcardBin", + "ordinal": 7, + "type_info": "Bytea" + }, + { + "name": "occurred_at", + "ordinal": 8, + "type_info": "Int8" + }, + { + "name": "created_by!: AgentId", + "ordinal": 9, + "type_info": { + "Custom": { + "kind": { + "Composite": [ + [ + "account_id", + { + "Custom": { + "kind": { + "Composite": [ + [ + "label", + "Text" + ], + [ + "audience", + "Text" + ] + ] + }, + "name": "account_id" + } + } + ], + [ + "label", + "Text" + ] + ] + }, + "name": "agent_id" + } + } + }, + { + "name": "created_at", + "ordinal": 10, + "type_info": "Timestamptz" + }, + { + "name": "deleted_at", + "ordinal": 11, + "type_info": "Timestamptz" + }, + { + "name": "original_occurred_at", + "ordinal": 12, + "type_info": "Int8" + }, + { + "name": "original_created_by: AgentId", + "ordinal": 13, + "type_info": { + "Custom": { + "kind": { + "Composite": [ + [ + "account_id", + { + "Custom": { + "kind": { + "Composite": [ + [ + "label", + "Text" + ], + [ + "audience", + "Text" + ] + ] + }, + "name": "account_id" + } + } + ], + [ + "label", + "Text" + ] + ] + }, + "name": "agent_id" + } + } + }, + { + "name": "removed", + "ordinal": 14, + "type_info": "Bool" + } + ], + "nullable": [ + false, + false, + false, + false, + true, + true, + true, + true, + false, + false, + false, + true, + false, + false, + false + ], + "parameters": { + "Left": [ + "UuidArray", + "Int8" + ] + } + }, + "query": "\n SELECT\n id,\n room_id,\n kind,\n set,\n label,\n attribute,\n data,\n binary_data AS \"binary_data: PostcardBin\",\n occurred_at,\n created_by AS \"created_by!: AgentId\",\n created_at,\n deleted_at,\n original_occurred_at,\n original_created_by as \"original_created_by: AgentId\",\n removed\n FROM event\n WHERE room_id = (\n SELECT r.id FROM room AS r\n INNER JOIN event AS e\n ON r.id = e.room_id\n WHERE e.binary_data IS NULL\n AND e.kind = 'draw'\n AND e.room_id <> ALL($1)\n GROUP BY r.id\n LIMIT 1\n )\n AND binary_data IS NULL\n AND kind = 'draw'\n LIMIT $2\n " + }, "96ca15b6812ff9ec3fc998fe3651d09d83ed927466773ee1da1e84c29d45748c": { "describe": { "columns": [], @@ -2289,174 +2458,6 @@ }, "query": "\n INSERT INTO room_ban (account_id, room_id, reason)\n VALUES ($1, $2, $3) ON CONFLICT (account_id, room_id) DO UPDATE\n SET created_at=room_ban.created_at\n RETURNING\n id,\n account_id AS \"account_id!: AccountId\",\n room_id,\n reason,\n created_at\n " }, - "dd0eeb6ac208cec5afb7b00e06430cd23e523beb54605860605d9adf3cc98b2e": { - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Uuid" - }, - { - "name": "room_id", - "ordinal": 1, - "type_info": "Uuid" - }, - { - "name": "kind", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "set", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "label", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "attribute", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "data", - "ordinal": 6, - "type_info": "Jsonb" - }, - { - "name": "binary_data: PostcardBin", - "ordinal": 7, - "type_info": "Bytea" - }, - { - "name": "occurred_at", - "ordinal": 8, - "type_info": "Int8" - }, - { - "name": "created_by!: AgentId", - "ordinal": 9, - "type_info": { - "Custom": { - "kind": { - "Composite": [ - [ - "account_id", - { - "Custom": { - "kind": { - "Composite": [ - [ - "label", - "Text" - ], - [ - "audience", - "Text" - ] - ] - }, - "name": "account_id" - } - } - ], - [ - "label", - "Text" - ] - ] - }, - "name": "agent_id" - } - } - }, - { - "name": "created_at", - "ordinal": 10, - "type_info": "Timestamptz" - }, - { - "name": "deleted_at", - "ordinal": 11, - "type_info": "Timestamptz" - }, - { - "name": "original_occurred_at", - "ordinal": 12, - "type_info": "Int8" - }, - { - "name": "original_created_by: AgentId", - "ordinal": 13, - "type_info": { - "Custom": { - "kind": { - "Composite": [ - [ - "account_id", - { - "Custom": { - "kind": { - "Composite": [ - [ - "label", - "Text" - ], - [ - "audience", - "Text" - ] - ] - }, - "name": "account_id" - } - } - ], - [ - "label", - "Text" - ] - ] - }, - "name": "agent_id" - } - } - }, - { - "name": "removed", - "ordinal": 14, - "type_info": "Bool" - } - ], - "nullable": [ - false, - false, - false, - false, - true, - true, - true, - true, - false, - false, - false, - true, - false, - false, - false - ], - "parameters": { - "Left": [ - "Int8" - ] - } - }, - "query": "\n SELECT\n id,\n room_id,\n kind,\n set,\n label,\n attribute,\n data,\n binary_data AS \"binary_data: PostcardBin\",\n occurred_at,\n created_by AS \"created_by!: AgentId\",\n created_at,\n deleted_at,\n original_occurred_at,\n original_created_by as \"original_created_by: AgentId\",\n removed\n FROM event\n WHERE room_id = (\n SELECT r.id FROM room AS r\n INNER JOIN event AS e\n ON r.id = e.room_id\n WHERE e.binary_data IS NULL\n AND e.kind = 'draw'\n GROUP BY r.id\n LIMIT 1\n )\n AND binary_data IS NULL\n AND kind = 'draw'\n LIMIT $1\n " - }, "e622a1d666432501f753f485ee5146c98ffdec408451404be5264a5f56f3ad41": { "describe": { "columns": [], diff --git a/src/db/event/mod.rs b/src/db/event/mod.rs index acdc5f18..a8fd9f3d 100644 --- a/src/db/event/mod.rs +++ b/src/db/event/mod.rs @@ -828,6 +828,7 @@ pub(crate) async fn insert_account_ban_event( pub(crate) async fn select_not_encoded_events( limit: i64, + skip_rooms: &[Uuid], conn: &mut PgConnection, ) -> sqlx::Result> { sqlx::query_as!( @@ -856,14 +857,16 @@ pub(crate) async fn select_not_encoded_events( ON r.id = e.room_id WHERE e.binary_data IS NULL AND e.kind = 'draw' + AND e.room_id <> ALL($1) GROUP BY r.id LIMIT 1 ) AND binary_data IS NULL - AND kind = 'draw' - LIMIT $1 + AND kind = 'draw' + LIMIT $2 "#, - limit + skip_rooms, + limit, ) .fetch_all(conn) .await diff --git a/src/migration_to_binary_format.rs b/src/migration_to_binary_format.rs index e8adf283..93d3980b 100644 --- a/src/migration_to_binary_format.rs +++ b/src/migration_to_binary_format.rs @@ -140,8 +140,10 @@ async fn do_migrate_to_binary(db: Db, stop: Arc) -> Result<()> { let mut conn = db.acquire().await?; create_temp_table_binary(&mut conn).await?; + let mut skip_rooms = Vec::new(); + loop { - let events = select_not_encoded_events(100_000, &mut conn).await?; + let events = select_not_encoded_events(100_000, &skip_rooms, &mut conn).await?; if events.is_empty() { tracing::info!("DONE"); @@ -149,7 +151,8 @@ async fn do_migrate_to_binary(db: Db, stop: Arc) -> Result<()> { } // all events have the same room id - let filename = format!("{}.json", events[0].room_id()); + let room_id = events[0].room_id(); + let filename = format!("{room_id}.json"); let mut file = tokio::fs::OpenOptions::new() .append(true) .create(true) @@ -185,16 +188,16 @@ async fn do_migrate_to_binary(db: Db, stop: Arc) -> Result<()> { } } - if event_ids.is_empty() { - tracing::info!("failed to encode all events"); - break; - } - - insert_data_into_temp_table_binary(event_ids, event_binary_data, &mut conn).await?; - update_event_data_binary(&mut conn).await?; - cleanup_temp_table(&mut conn).await?; + if !event_ids.is_empty() { + insert_data_into_temp_table_binary(event_ids, event_binary_data, &mut conn).await?; + update_event_data_binary(&mut conn).await?; + cleanup_temp_table(&mut conn).await?; - vacuum(&mut conn).await?; + vacuum(&mut conn).await?; + } else { + tracing::info!(%room_id, "failed to encode whole room"); + skip_rooms.push(room_id); + } if stop.load(std::sync::atomic::Ordering::SeqCst) { break; @@ -286,6 +289,10 @@ async fn do_migrate_to_json(db: Db, dir: String, stop: Arc) -> Resul let mut files = tokio::fs::read_dir(dir).await?; while let Some(entry) = files.next_entry().await? { + if stop.load(std::sync::atomic::Ordering::SeqCst) { + break; + } + if entry.path().extension() != Some(OsStr::new("json")) { continue; } @@ -317,10 +324,6 @@ async fn do_migrate_to_json(db: Db, dir: String, stop: Arc) -> Resul cleanup_temp_table(&mut conn).await?; vacuum(&mut conn).await?; - - if stop.load(std::sync::atomic::Ordering::SeqCst) { - break; - } } Ok(())