Skip to content

Commit

Permalink
refactor: remove RawEventLogger (#1713)
Browse files Browse the repository at this point in the history
* refactor: remove RawEventLogger

* update

* update

* update

* update

* update

* update
  • Loading branch information
wsxiaoys authored Mar 26, 2024
1 parent c39395c commit f0e55f6
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 85 deletions.
50 changes: 19 additions & 31 deletions crates/tabby-common/src/api/event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

Expand All @@ -19,19 +17,19 @@ pub struct LogEventRequest {
pub elapsed: Option<u32>,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
pub struct Choice {
pub index: u32,
pub text: String,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "snake_case")]
pub enum SelectKind {
Line,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "snake_case")]
pub enum Event {
View {
Expand Down Expand Up @@ -79,13 +77,13 @@ pub enum Event {
},
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
pub struct Message {
pub role: String,
pub content: String,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
pub struct Segments {
pub prefix: String,
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -94,35 +92,18 @@ pub struct Segments {
pub clipboard: Option<String>,
}

pub trait EventLogger: Send + Sync {
fn log(&self, e: Event);
}

#[derive(Serialize, Deserialize)]
pub struct Log {
#[derive(Serialize, Deserialize, Debug)]
pub struct LogEntry {
pub ts: u128,
pub event: Event,
}

pub trait RawEventLogger: Send + Sync {
fn log(&self, content: String);
}

impl<T: RawEventLogger> EventLogger for T {
fn log(&self, e: Event) {
let content = serdeconv::to_json_string(&Log {
impl From<Event> for LogEntry {
fn from(event: Event) -> Self {
Self {
ts: timestamp(),
event: e,
})
.unwrap();

self.log(content);
}
}

impl RawEventLogger for Arc<dyn RawEventLogger> {
fn log(&self, content: String) {
(**self).log(content)
event,
}
}
}

Expand All @@ -134,3 +115,10 @@ fn timestamp() -> u128 {
.expect("Time went backwards")
.as_millis()
}

pub trait EventLogger: Send + Sync {
fn log(&self, x: Event) {
self.write(x.into())
}
fn write(&self, x: LogEntry);
}
12 changes: 5 additions & 7 deletions crates/tabby/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use clap::Args;
use hyper::StatusCode;
use tabby_common::{
api,
api::{
code::CodeSearch,
event::{EventLogger, RawEventLogger},
},
api::{code::CodeSearch, event::EventLogger},
config::Config,
usage,
};
Expand Down Expand Up @@ -125,7 +122,8 @@ pub async fn main(config: &Config, args: &ServeArgs) {

#[cfg(feature = "ee")]
let ws = tabby_webserver::public::WebserverHandle::new().await;
let logger: Arc<dyn RawEventLogger>;

let logger: Arc<dyn EventLogger>;
#[cfg(feature = "ee")]
{
logger = ws.logger();
Expand All @@ -134,9 +132,9 @@ pub async fn main(config: &Config, args: &ServeArgs) {
{
logger = Arc::new(crate::services::event::create_logger());
}
let code = Arc::new(create_code_search());

let api = api_router(args, config, Arc::new(logger.clone()), code.clone()).await;
let code = Arc::new(create_code_search());
let api = api_router(args, config, logger.clone(), code.clone()).await;
let ui = Router::new()
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()));

Expand Down
21 changes: 16 additions & 5 deletions crates/tabby/src/services/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::{path::PathBuf, time::Duration};

use chrono::Utc;
use lazy_static::lazy_static;
use tabby_common::{api::event::RawEventLogger, path};
use tabby_common::{
api::event::{EventLogger, LogEntry},
path,
};
use tokio::{
io::AsyncWriteExt,
sync::mpsc::{unbounded_channel, UnboundedSender},
Expand Down Expand Up @@ -98,16 +101,24 @@ impl EventWriter {

struct EventService;

impl RawEventLogger for EventService {
fn log(&self, content: String) {
if let Err(err) = WRITER.send(content) {
impl EventLogger for EventService {
fn write(&self, x: LogEntry) {
let json = match serdeconv::to_json_string(&x) {
Ok(json) => json,
Err(err) => {
error!("Failed to serialize event into json {}", err);
return;
}
};

if let Err(err) = WRITER.send(json) {
error!("Failed to write event to file: {}", err);
}
}
}

#[allow(unused)]
pub fn create_logger() -> impl RawEventLogger + 'static {
pub fn create_logger() -> impl EventLogger + 'static {
EventService
}

Expand Down
23 changes: 18 additions & 5 deletions ee/tabby-db/src/user_completions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use anyhow::Result;
use std::time::Duration;

use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use sqlx::{prelude::FromRow, query};

Expand All @@ -21,15 +23,21 @@ pub struct UserCompletionDAO {
impl DbConn {
pub async fn create_user_completion(
&self,
ts: u128,
user_id: i32,
completion_id: String,
language: String,
) -> Result<i32> {
let duration = Duration::from_millis(ts as u64);
let created_at =
DateTime::from_timestamp(duration.as_secs() as i64, duration.subsec_nanos())
.context("Invalid created_at timestamp")?;
let res = query!(
"INSERT INTO user_completions (user_id, completion_id, language) VALUES (?, ?, ?);",
"INSERT INTO user_completions (user_id, completion_id, language, created_at) VALUES (?, ?, ?, ?);",
user_id,
completion_id,
language
language,
created_at
)
.execute(&self.pool)
.await?;
Expand All @@ -38,13 +46,18 @@ impl DbConn {

pub async fn add_to_user_completion(
&self,
ts: u128,
completion_id: &str,
views: i64,
selects: i64,
dismisses: i64,
) -> Result<()> {
query!("UPDATE user_completions SET views = views + ?, selects = selects + ?, dismisses = dismisses + ? WHERE completion_id = ?",
views, selects, dismisses, completion_id).execute(&self.pool).await?;
let duration = Duration::from_millis(ts as u64);
let updated_at =
DateTime::from_timestamp(duration.as_secs() as i64, duration.subsec_nanos())
.context("Invalid updated_at timestamp")?;
query!("UPDATE user_completions SET views = views + ?, selects = selects + ?, dismisses = dismisses + ?, updated_at = ? WHERE completion_id = ?",
views, selects, dismisses, completion_id, updated_at).execute(&self.pool).await?;
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions ee/tabby-webserver/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::{
};
use hyper::{Body, StatusCode};
use juniper_axum::{graphiql, graphql, playground};
use tabby_common::api::{code::CodeSearch, event::RawEventLogger, server_setting::ServerSetting};
use tabby_common::api::{code::CodeSearch, event::EventLogger, server_setting::ServerSetting};
use tabby_db::DbConn;
use tracing::warn;

Expand All @@ -22,7 +22,7 @@ use crate::{

pub struct WebserverHandle {
db: DbConn,
event_logger: Arc<dyn RawEventLogger>,
event_logger: Arc<dyn EventLogger>,
}

impl WebserverHandle {
Expand All @@ -32,7 +32,7 @@ impl WebserverHandle {
WebserverHandle { db, event_logger }
}

pub fn logger(&self) -> Arc<dyn RawEventLogger + 'static> {
pub fn logger(&self) -> Arc<dyn EventLogger + 'static> {
self.event_logger.clone()
}

Expand Down
11 changes: 5 additions & 6 deletions ee/tabby-webserver/src/hub/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
use tabby_common::{
api::{
code::{CodeSearch, CodeSearchError, SearchResponse},
event::RawEventLogger,
event::{EventLogger, LogEntry},
},
config::{RepositoryAccess, RepositoryConfig},
};
Expand All @@ -21,7 +21,7 @@ pub use crate::schema::worker::WorkerKind;

#[tarpc::service]
pub trait Hub {
async fn log_event(content: String);
async fn write_log(x: LogEntry);

async fn search(q: String, limit: usize, offset: usize) -> SearchResponse;

Expand Down Expand Up @@ -70,12 +70,11 @@ pub async fn create_worker_client(
WorkerClient(HubClient::new(Default::default(), WebSocketTransport::from(socket)).spawn())
}

impl RawEventLogger for WorkerClient {
fn log(&self, content: String) {
impl EventLogger for WorkerClient {
fn write(&self, x: LogEntry) {
let context = tarpc::context::current();
let client = self.0.clone();

tokio::spawn(async move { client.log_event(context, content).await });
tokio::spawn(async move { client.write_log(context, x).await });
}
}

Expand Down
9 changes: 6 additions & 3 deletions ee/tabby-webserver/src/hub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use axum::{
};
use hyper::{Body, StatusCode};
use juniper_axum::extract::AuthBearer;
use tabby_common::{api::code::SearchResponse, config::RepositoryConfig};
use tabby_common::{
api::{code::SearchResponse, event::LogEntry},
config::RepositoryConfig,
};
use tarpc::server::{BaseChannel, Channel};
use tracing::warn;
use websocket::WebSocketTransport;
Expand Down Expand Up @@ -100,8 +103,8 @@ impl Drop for HubImpl {

#[tarpc::server]
impl Hub for Arc<HubImpl> {
async fn log_event(self, _context: tarpc::context::Context, content: String) {
self.ctx.logger().log(content)
async fn write_log(self, _context: tarpc::context::Context, x: LogEntry) {
self.ctx.logger().write(x)
}

async fn search(
Expand Down
4 changes: 2 additions & 2 deletions ee/tabby-webserver/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use juniper_axum::{
relay::{self, Connection},
FromAuth,
};
use tabby_common::api::{code::CodeSearch, event::RawEventLogger};
use tabby_common::api::{code::CodeSearch, event::EventLogger};
use tracing::error;
use validator::{Validate, ValidationErrors};
use worker::{Worker, WorkerService};
Expand All @@ -44,7 +44,7 @@ pub trait ServiceLocator: Send + Sync {
fn auth(&self) -> Arc<dyn AuthenticationService>;
fn worker(&self) -> Arc<dyn WorkerService>;
fn code(&self) -> Arc<dyn CodeSearch>;
fn logger(&self) -> Arc<dyn RawEventLogger>;
fn logger(&self) -> Arc<dyn EventLogger>;
fn job(&self) -> Arc<dyn JobService>;
fn repository(&self) -> Arc<dyn RepositoryService>;
fn email(&self) -> Arc<dyn EmailService>;
Expand Down
Loading

0 comments on commit f0e55f6

Please sign in to comment.