Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out run_daemon and move to library #829

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/bin/oura/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ use clap::Parser;
use std::process;

mod console;
mod daemon;
mod run_daemon;

#[derive(Parser)]
#[clap(name = "Oura")]
#[clap(bin_name = "oura")]
#[clap(author, version, about, long_about = None)]
enum Oura {
Daemon(daemon::Args),
Daemon(run_daemon::Args),
}

fn main() {
let args = Oura::parse();

let result = match args {
Oura::Daemon(x) => daemon::run(&x),
Oura::Daemon(x) => run_daemon::run(&x),
};

if let Err(err) = &result {
Expand Down
85 changes: 85 additions & 0 deletions src/bin/oura/run_daemon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use gasket::daemon::Daemon;
use oura::framework::*;
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::info;
use oura::daemon::{run_daemon, ConfigRoot, MetricsConfig};

use crate::console;

fn setup_tracing() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.finish(),
)
.unwrap();
}

async fn serve_prometheus(
daemon: Arc<Daemon>,
metrics: Option<MetricsConfig>,
) -> Result<(), Error> {
if let Some(metrics) = metrics {
info!("starting metrics exporter");
let runtime = daemon.clone();

let addr: SocketAddr = metrics
.address
.as_deref()
.unwrap_or("0.0.0.0:9186")
.parse()
.map_err(Error::parse)?;

gasket_prometheus::serve(addr, runtime).await;
}

Ok(())
}

pub fn run(args: &Args) -> Result<(), Error> {
if !args.tui {
setup_tracing();
}

let config = ConfigRoot::new(&args.config).map_err(Error::config)?;
let metrics = config.metrics.clone();

let daemon = run_daemon(config)?;

info!("oura is running");

let daemon = Arc::new(daemon);

let tokio_rt = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.build()
.unwrap();

let prometheus = tokio_rt.spawn(serve_prometheus(daemon.clone(), metrics));
let tui = tokio_rt.spawn(console::render(daemon.clone(), args.tui));

daemon.block();

info!("oura is stopping");

daemon.teardown();
prometheus.abort();
tui.abort();

Ok(())
}


Anviking marked this conversation as resolved.
Show resolved Hide resolved
#[derive(clap::Args)]
#[clap(author, version, about, long_about = None)]
pub struct Args {
/// config file to load by the daemon
#[clap(long, value_parser)]
config: Option<std::path::PathBuf>,

/// display the terminal UI
#[clap(long, action)]
tui: bool,
}
109 changes: 15 additions & 94 deletions src/bin/oura/daemon.rs → src/daemon/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@

use gasket::daemon::Daemon;
use oura::{cursor, filters, framework::*, sinks, sources};
use crate::{cursor, filters, framework::*, sinks, sources};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::{sync::Arc, time::Duration};
use tracing::info;

use crate::console;
use std::time::Duration;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsConfig {
pub address: Option<String>,
}

#[derive(Deserialize)]
struct ConfigRoot {
source: sources::Config,
filters: Option<Vec<filters::Config>>,
sink: sinks::Config,
intersect: IntersectConfig,
finalize: Option<FinalizeConfig>,
chain: Option<ChainConfig>,
retries: Option<gasket::retries::Policy>,
cursor: Option<cursor::Config>,
metrics: Option<MetricsConfig>,
pub struct ConfigRoot {
pub source: sources::Config,
pub filters: Option<Vec<filters::Config>>,
pub sink: sinks::Config,
pub intersect: IntersectConfig,
pub finalize: Option<FinalizeConfig>,
pub chain: Option<ChainConfig>,
pub retries: Option<gasket::retries::Policy>,
pub cursor: Option<cursor::Config>,
pub metrics: Option<MetricsConfig>,
}

impl ConfigRoot {
Expand Down Expand Up @@ -94,107 +91,31 @@ fn connect_stages(
Ok(runtime)
}

fn setup_tracing() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::DEBUG)
.finish(),
)
.unwrap();
}

async fn serve_prometheus(
daemon: Arc<Daemon>,
metrics: Option<MetricsConfig>,
) -> Result<(), Error> {
if let Some(metrics) = metrics {
info!("starting metrics exporter");
let runtime = daemon.clone();

let addr: SocketAddr = metrics
.address
.as_deref()
.unwrap_or("0.0.0.0:9186")
.parse()
.map_err(Error::parse)?;

gasket_prometheus::serve(addr, runtime).await;
}

Ok(())
}

pub fn run(args: &Args) -> Result<(), Error> {
if !args.tui {
setup_tracing();
}

let config = ConfigRoot::new(&args.config).map_err(Error::config)?;

pub fn run_daemon(config: ConfigRoot) -> Result<Daemon, Error> {
let chain = config.chain.unwrap_or_default();
let intersect = config.intersect;
let finalize = config.finalize;
let current_dir = std::env::current_dir().unwrap();
let cursor = config.cursor.unwrap_or_default();
let breadcrumbs = cursor.initial_load()?;

let ctx = Context {
chain,
intersect,
finalize,
current_dir,
breadcrumbs,
};

let source = config.source.bootstrapper(&ctx)?;

let filters = config
.filters
.into_iter()
.flatten()
.map(|x| x.bootstrapper(&ctx))
.collect::<Result<_, _>>()?;

let sink = config.sink.bootstrapper(&ctx)?;

let cursor = cursor.bootstrapper(&ctx)?;

let retries = define_gasket_policy(config.retries.as_ref());

let daemon = connect_stages(source, filters, sink, cursor, retries)?;

info!("oura is running");

let daemon = Arc::new(daemon);

let tokio_rt = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.build()
.unwrap();

let prometheus = tokio_rt.spawn(serve_prometheus(daemon.clone(), config.metrics));
let tui = tokio_rt.spawn(console::render(daemon.clone(), args.tui));

daemon.block();

info!("oura is stopping");

daemon.teardown();
prometheus.abort();
tui.abort();

Ok(())
Ok(daemon)
}

#[derive(clap::Args)]
#[clap(author, version, about, long_about = None)]
pub struct Args {
/// config file to load by the daemon
#[clap(long, value_parser)]
config: Option<std::path::PathBuf>,

/// display the terminal UI
#[clap(long, action)]
tui: bool,
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub mod filters;
pub mod framework;
pub mod sinks;
pub mod sources;
pub mod daemon;