Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Load everything to and from files
Browse files Browse the repository at this point in the history
  • Loading branch information
0nkery committed Nov 7, 2022
1 parent 22191f4 commit ce2281b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 242 deletions.
168 changes: 0 additions & 168 deletions sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -2675,173 +2675,5 @@
}
},
"query": "\n SELECT COUNT(DISTINCT label) as total FROM (\n SELECT\n *,\n bool_or(removed) OVER (\n PARTITION BY room_id, set, label\n ORDER BY occurred_at DESC\n ) AS removed_windowed\n FROM event\n WHERE deleted_at IS NULL\n AND room_id = $1\n AND set = $2\n AND original_occurred_at < $3\n AND occurred_at < COALESCE($4, 9223372036854775807)\n ORDER BY original_occurred_at DESC, label ASC, occurred_at DESC\n ) subq\n WHERE removed_windowed = 'f'\n "
},
"ffe64f4e3f8afb3e66ea3a6cb116091345e328c2dabe01b07a9b7888e3f37667": {
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Uuid"
},
{
"name": "room_id",
"ordinal": 1,
"type_info": "Uuid"
},
{
"name": "kind",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "set",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "label",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "attribute",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "data",
"ordinal": 6,
"type_info": "Jsonb"
},
{
"name": "binary_data: PostcardBin<CompactEvent>",
"ordinal": 7,
"type_info": "Bytea"
},
{
"name": "occurred_at",
"ordinal": 8,
"type_info": "Int8"
},
{
"name": "created_by!: AgentId",
"ordinal": 9,
"type_info": {
"Custom": {
"kind": {
"Composite": [
[
"account_id",
{
"Custom": {
"kind": {
"Composite": [
[
"label",
"Text"
],
[
"audience",
"Text"
]
]
},
"name": "account_id"
}
}
],
[
"label",
"Text"
]
]
},
"name": "agent_id"
}
}
},
{
"name": "created_at",
"ordinal": 10,
"type_info": "Timestamptz"
},
{
"name": "deleted_at",
"ordinal": 11,
"type_info": "Timestamptz"
},
{
"name": "original_occurred_at",
"ordinal": 12,
"type_info": "Int8"
},
{
"name": "original_created_by: AgentId",
"ordinal": 13,
"type_info": {
"Custom": {
"kind": {
"Composite": [
[
"account_id",
{
"Custom": {
"kind": {
"Composite": [
[
"label",
"Text"
],
[
"audience",
"Text"
]
]
},
"name": "account_id"
}
}
],
[
"label",
"Text"
]
]
},
"name": "agent_id"
}
}
},
{
"name": "removed",
"ordinal": 14,
"type_info": "Bool"
}
],
"nullable": [
false,
false,
false,
false,
true,
true,
true,
true,
false,
false,
false,
true,
false,
false,
false
],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "\n SELECT\n id,\n room_id,\n kind,\n set,\n label,\n attribute,\n data,\n binary_data AS \"binary_data: PostcardBin<CompactEvent>\",\n occurred_at,\n created_by AS \"created_by!: AgentId\",\n created_at,\n deleted_at,\n original_occurred_at,\n original_created_by as \"original_created_by: AgentId\",\n removed\n FROM event\n WHERE room_id = (\n SELECT r.id FROM room AS r\n INNER JOIN event AS e\n ON r.id = e.room_id\n WHERE e.data IS NULL\n AND e.kind = 'draw'\n GROUP BY r.id\n LIMIT 1\n )\n AND data IS NULL\n AND kind = 'draw'\n LIMIT $1\n "
}
}
43 changes: 0 additions & 43 deletions src/db/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,49 +878,6 @@ pub(crate) async fn select_not_encoded_events(
.await
}

pub(crate) async fn select_not_decoded_events(
limit: i64,
conn: &mut PgConnection,
) -> sqlx::Result<Vec<RawObject>> {
sqlx::query_as!(
RawObject,
r#"
SELECT
id,
room_id,
kind,
set,
label,
attribute,
data,
binary_data AS "binary_data: PostcardBin<CompactEvent>",
occurred_at,
created_by AS "created_by!: AgentId",
created_at,
deleted_at,
original_occurred_at,
original_created_by as "original_created_by: AgentId",
removed
FROM event
WHERE room_id = (
SELECT r.id FROM room AS r
INNER JOIN event AS e
ON r.id = e.room_id
WHERE e.data IS NULL
AND e.kind = 'draw'
GROUP BY r.id
LIMIT 1
)
AND data IS NULL
AND kind = 'draw'
LIMIT $1
"#,
limit
)
.fetch_all(conn)
.await
}

mod binary_encoding;
mod schema;
mod set_state;
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async fn main() -> Result<()> {
match var("EVENT_MIGRATE_TO_BINARY") {
Ok(_) => migration_to_binary_format::migrate_to_binary(db).await,
Err(_) => match var("EVENT_MIGRATE_TO_JSON") {
Ok(_) => migration_to_binary_format::migrate_to_json(db).await,
Ok(dir) => migration_to_binary_format::migrate_to_json(db, dir).await,
Err(_) => app::run(db, maybe_ro_db, redis_pool, authz_cache).await,
},
}
Expand Down
71 changes: 41 additions & 30 deletions src/migration_to_binary_format.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use std::sync::{atomic::AtomicBool, Arc};
use std::{
ffi::OsStr,
str::FromStr,
sync::{atomic::AtomicBool, Arc},
};

use anyhow::Result;
use sqlx::postgres::{PgConnection, PgPool as Db};
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use uuid::Uuid;

use crate::db::event::{select_not_decoded_events, select_not_encoded_events};
use crate::db::event::select_not_encoded_events;

async fn disable_autovacuum(conn: &mut PgConnection) -> sqlx::Result<()> {
sqlx::query!(
Expand Down Expand Up @@ -125,6 +130,12 @@ pub(crate) async fn migrate_to_binary(db: Db) -> Result<()> {
Ok(())
}

#[derive(serde::Serialize, serde::Deserialize)]
struct Event {
data: serde_json::Value,
id: Uuid,
}

async fn do_migrate_to_binary(db: Db, stop: Arc<AtomicBool>) -> Result<()> {
let mut conn = db.acquire().await?;
create_temp_table_binary(&mut conn).await?;
Expand Down Expand Up @@ -154,7 +165,7 @@ async fn do_migrate_to_binary(db: Db, stop: Arc<AtomicBool>) -> Result<()> {
match evt.encode_to_binary() {
Ok((id, Some(data), Some(binary_data))) => match binary_data.to_bytes() {
Ok(binary_data) => {
let evt_data = serde_json::to_vec(&data)?;
let evt_data = serde_json::to_vec(&Event { data, id })?;
file.write_all(&evt_data).await?;
file.write(b"\n").await?;

Expand Down Expand Up @@ -242,14 +253,14 @@ async fn update_event_data_json(conn: &mut PgConnection) -> sqlx::Result<()> {
Ok(())
}

pub(crate) async fn migrate_to_json(db: Db) -> Result<()> {
pub(crate) async fn migrate_to_json(db: Db, dir: String) -> Result<()> {
{
let mut conn = db.acquire().await?;
disable_autovacuum(&mut conn).await?;
}

let stop = Arc::new(AtomicBool::new(false));
let handle = tokio::spawn(do_migrate_to_json(db.clone(), stop.clone()));
let handle = tokio::spawn(do_migrate_to_json(db.clone(), dir, stop.clone()));
tokio::spawn(async move {
if let Err(err) = tokio::signal::ctrl_c().await {
tracing::error!(%err, "error on signal");
Expand All @@ -269,38 +280,38 @@ pub(crate) async fn migrate_to_json(db: Db) -> Result<()> {
Ok(())
}

async fn do_migrate_to_json(db: Db, stop: Arc<AtomicBool>) -> Result<()> {
async fn do_migrate_to_json(db: Db, dir: String, stop: Arc<AtomicBool>) -> Result<()> {
let mut conn = db.acquire().await?;
create_temp_table_json(&mut conn).await?;

loop {
let events = select_not_decoded_events(100_000, &mut conn).await?;
if events.is_empty() {
tracing::info!("DONE");
break;
let mut files = tokio::fs::read_dir(dir).await?;
while let Some(entry) = files.next_entry().await? {
if entry.path().extension() != Some(OsStr::new("json")) {
continue;
}

let mut event_ids = Vec::with_capacity(events.len());
let mut event_data = Vec::with_capacity(events.len());

for evt in events {
match evt.decode_from_binary() {
Ok((id, Some(data))) => {
event_ids.push(id);
event_data.push(data);
}
Ok(_) => {}
Err(err) => {
tracing::error!(%err, "failed to decode event");
}
}
let path = entry.path();
let room_id = match path.file_stem().and_then(|s| s.to_str()) {
Some(room_id) => room_id,
None => continue,
};
if let Err(_) = Uuid::from_str(room_id) {
continue;
}

if event_ids.is_empty() {
tracing::info!("failed to decode all events");
break;
let f = tokio::fs::File::open(path).await?;
let f = tokio::io::BufReader::new(f);
let mut lines = f.lines();

let mut event_ids = Vec::new();
let mut event_data = Vec::new();

while let Some(line) = lines.next_line().await? {
let event: Event = serde_json::from_str(&line)?;
event_ids.push(event.id);
event_data.push(event.data);
}
// TODO: load from files

insert_data_into_temp_table_json(event_ids, event_data, &mut conn).await?;
update_event_data_json(&mut conn).await?;
cleanup_temp_table(&mut conn).await?;
Expand Down

0 comments on commit ce2281b

Please sign in to comment.