diff --git a/ircjournal/src/db.rs b/ircjournal/src/db.rs index faf0cda..89a5b4e 100644 --- a/ircjournal/src/db.rs +++ b/ircjournal/src/db.rs @@ -2,6 +2,7 @@ use crate::{ model::{Datetime, NewMessage, ServerChannel}, Database, }; +use sqlx::{Postgres, QueryBuilder}; use std::time::Duration; pub async fn create_db(uri: &str) -> Result { @@ -29,54 +30,40 @@ pub async fn last_message_ts(db: &Database, sc: &ServerChannel) -> Option { { - if $messages.is_empty() { - return Some(0); - } - // TODO: https://github.com/launchbadge/sqlx/issues/294, https://github.com/launchbadge/sqlx/issues/1240. - let mut v_channel: Vec<&str> = Vec::with_capacity($messages.len()); - let mut v_nick: Vec> = Vec::with_capacity($messages.len()); - let mut v_line: Vec> = Vec::with_capacity($messages.len()); - let mut v_opcode: Vec> = Vec::with_capacity($messages.len()); - let mut v_oper_nick: Vec> = Vec::with_capacity($messages.len()); - let mut v_payload: Vec> = Vec::with_capacity($messages.len()); - let mut v_timestamp: Vec = Vec::with_capacity($messages.len()); - $messages.iter().for_each(|m| { - v_channel.push(m.channel.as_ref().unwrap()); - v_nick.push(m.nick.clone()); - v_line.push(m.line.clone()); - v_opcode.push(m.opcode.clone()); - v_oper_nick.push(m.oper_nick.clone()); - v_payload.push(m.payload.clone()); - v_timestamp.push(m.timestamp); - }); - // language=sql - sqlx::query($body) - .bind(v_channel) - .bind(v_nick) - .bind(v_line) - .bind(v_opcode) - .bind(v_oper_nick) - .bind(v_payload) - .bind(v_timestamp) - .execute($db) - .await - .ok() - .map(|info| info.rows_affected()) - } } +fn push_message_values<'a>(builder: &mut QueryBuilder<'a, Postgres>, messages: &'a [NewMessage]) { + builder.push_values(messages, |mut b, message| { + b /**/ + .push_bind(message.channel.as_ref().expect("no channel")) + .push_bind(message.nick.clone()) + .push_bind(message.line.clone()) + .push_bind(message.opcode.clone()) + .push_bind(message.oper_nick.clone()) + .push_bind(message.payload.clone()) + .push_bind(message.timestamp); + }); +} + +async fn execute_batch_insert_messages( + mut builder: QueryBuilder<'_, Postgres>, + db: &Database, +) -> Option { + builder + .build() + .execute(db) + .await + .ok() + .map(|info| info.rows_affected()) } pub async fn batch_insert_messages(db: &Database, messages: &[NewMessage]) -> Option { // language=sql - batch_messages!( - db, - messages, + let mut builder = QueryBuilder::new( r#" INSERT INTO message ("channel", "nick", "line", "opcode", "oper_nick", "payload", "timestamp") - SELECT * FROM UNNEST($1, $2, $3, $4, $5, $6, $7) - "# - ) + "#, + ); + push_message_values(&mut builder, messages); + execute_batch_insert_messages(builder, db).await } pub async fn batch_insert_messages_and_notify( @@ -84,16 +71,20 @@ pub async fn batch_insert_messages_and_notify( messages: &[NewMessage], ) -> Option { // language=sql - batch_messages!( - db, - messages, + let mut builder = QueryBuilder::new( r#" WITH new_rows AS ( INSERT INTO message ("channel", "nick", "line", "opcode", "oper_nick", "payload", "timestamp") - SELECT * FROM UNNEST($1, $2, $3, $4, $5, $6, $7) + "#, + ); + push_message_values(&mut builder, messages); + // language=sql + builder.push( + r#" RETURNING * ) SELECT pg_notify('new_message', row_to_json(row)::text) FROM new_rows row - "# - ) + "#, + ); + execute_batch_insert_messages(builder, db).await }