Skip to content

Commit

Permalink
Merge pull request #164 from samply/develop
Browse files Browse the repository at this point in the history
Postgres querying etc.
  • Loading branch information
enola-dkfz authored Oct 14, 2024
2 parents cce9c50 + f316288 commit 740d427
Show file tree
Hide file tree
Showing 17 changed files with 430 additions and 107 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# Samply.Focus v0.7.0 2024-09-24

In this release, we are extending the supported data backends beyond CQL-enabled FHIR stores. We now support PostgreSQL as well. Usage instructions are included in the Readme.

## Major changes
* PostgreSQL support added



# Focus -- 2023-02-08

This is the initial release of Focus, a task distribution application designed for working with Samply.Beam. Currently, only Samply.Blaze is supported as an endpoint, but other endpoints can easily be integrated.
22 changes: 14 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,41 +1,47 @@
[package]
name = "focus"
version = "0.6.0"
version = "0.7.0"
edition = "2021"
license = "Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
base64 = "0.22.1"
http = "0.2"
reqwest = { version = "0.11", default_features = false, features = ["json", "default-tls"] }
reqwest = { version = "0.12", default-features = false, features = ["json", "default-tls"] }
serde = { version = "1.0.152", features = ["serde_derive"] }
serde_json = "1.0"
thiserror = "1.0.38"
chrono = "0.4.31"
indexmap = "2.1.0"
tokio = { version = "1.25.0", default_features = false, features = ["signal", "rt-multi-thread", "macros"] }
tokio = { version = "1.25.0", default-features = false, features = ["signal", "rt-multi-thread", "macros"] }
beam-lib = { git = "https://github.com/samply/beam", branch = "develop", features = ["http-util"] }
laplace_rs = {git = "https://github.com/samply/laplace-rs.git", tag = "v0.3.0" }
uuid = "1.8.0"
rand = { default-features = false, version = "0.8.5" }
futures-util = { version = "0.3", default-features = false, features = ["std"] }
tryhard = "0.5"

# Logging
tracing = { version = "0.1.37", default_features = false }
tracing-subscriber = { version = "0.3.11", default_features = false, features = ["env-filter", "ansi"] }
tracing = { version = "0.1.37", default-features = false }
tracing-subscriber = { version = "0.3.11", default-features = false, features = ["env-filter", "ansi"] }

# Global variables
once_cell = "1.18"

# Command Line Interface
clap = { version = "4", default_features = false, features = ["std", "env", "derive", "help", "color"] }
clap = { version = "4", default-features = false, features = ["std", "env", "derive", "help", "color"] }

# Query via SQL
sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono", "rust_decimal", "uuid"], optional = true }
kurtbuilds_sqlx_serde = { version = "0.3.2", features = [ "json", "decimal", "chrono", "uuid"], optional = true }


[features]
default = []
bbmri = []
dktk = []
dktk = ["query-sql"]
query-sql = ["dep:sqlx", "dep:kurtbuilds_sqlx_serde"]

[dev-dependencies]
pretty_assertions = "1.4.0"
Expand Down
45 changes: 30 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,36 @@ BEAM_APP_ID_LONG = "app1.broker.example.com"
### Optional variables

```bash
RETRY_COUNT = "32" # The maximum number of retries for beam and blaze healthchecks, default value: 32
ENDPOINT_TYPE = "blaze" # Type of the endpoint, allowed values: "blaze", "omop", default value: "blaze"
RETRY_COUNT = "32" # The maximum number of retries for beam and blaze healthchecks; default value: 32
ENDPOINT_TYPE = "blaze" # Type of the endpoint, allowed values: "blaze", "omop", "sql", "blaze-and-sql"; default value: "blaze"
EXPORTER_URL = " https://exporter.site/" # The exporter URL
OBFUSCATE = "yes" # Should the results be obfuscated - the "master switch", allowed values: "yes", "no", default value: "yes"
OBFUSCATE_BELOW_10_MODE = "1" # The mode of obfuscating values below 10: 0 - return zero, 1 - return ten, 2 - obfuscate using Laplace distribution and rounding, has no effect if OBFUSCATE = "no", default value: 1
DELTA_PATIENT = "1." # Sensitivity parameter for obfuscating the counts in the Patient stratifier, has no effect if OBFUSCATE = "no", default value: 1
DELTA_SPECIMEN = "20." # Sensitivity parameter for obfuscating the counts in the Specimen stratifier, has no effect if OBFUSCATE = "no", default value: 20
DELTA_DIAGNOSIS = "3." # Sensitivity parameter for obfuscating the counts in the Diagnosis stratifier, has no effect if OBFUSCATE = "no", default value: 3
DELTA_PROCEDURES = "1.7" # Sensitivity parameter for obfuscating the counts in the Procedures stratifier, has no effect if OBFUSCATE = "no", default value: 1.7
DELTA_MEDICATION_STATEMENTS = "2.1" # Sensitivity parameter for obfuscating the counts in the Medication Statements stratifier, has no effect if OBFUSCATE = "no", default value: 2.1
DELTA_HISTO = "20." # Sensitivity parameter for obfuscating the counts in the Histo stratifier, has no effect if OBFUSCATE = "no", default value: 20
EPSILON = "0.1" # Privacy budget parameter for obfuscating the counts in the stratifiers, has no effect if OBFUSCATE = "no", default value: 0.1
ROUNDING_STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no", default value: 10
PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter;ehds2" # Projects for which the results are not to be obfuscated, separated by ;, default value: "exliquid;dktk_supervisors;exporter;ehds2"
OBFUSCATE = "yes" # Should the results be obfuscated - the "master switch", allowed values: "yes", "no"; default value: "yes"
OBFUSCATE_BELOW_10_MODE = "1" # The mode of obfuscating values below 10: 0 - return zero, 1 - return ten, 2 - obfuscate using Laplace distribution and rounding, has no effect if OBFUSCATE = "no"; default value: 1
DELTA_PATIENT = "1." # Sensitivity parameter for obfuscating the counts in the Patient stratifier, has no effect if OBFUSCATE = "no"; default value: 1
DELTA_SPECIMEN = "20." # Sensitivity parameter for obfuscating the counts in the Specimen stratifier, has no effect if OBFUSCATE = "no"; default value: 20
DELTA_DIAGNOSIS = "3." # Sensitivity parameter for obfuscating the counts in the Diagnosis stratifier, has no effect if OBFUSCATE = "no"; default value: 3
DELTA_PROCEDURES = "1.7" # Sensitivity parameter for obfuscating the counts in the Procedures stratifier, has no effect if OBFUSCATE = "no"; default value: 1.7
DELTA_MEDICATION_STATEMENTS = "2.1" # Sensitivity parameter for obfuscating the counts in the Medication Statements stratifier, has no effect if OBFUSCATE = "no"; default value: 2.1
DELTA_HISTO = "20." # Sensitivity parameter for obfuscating the counts in the Histo stratifier, has no effect if OBFUSCATE = "no"; default value: 20
EPSILON = "0.1" # Privacy budget parameter for obfuscating the counts in the stratifiers, has no effect if OBFUSCATE = "no"; default value: 0.1
ROUNDING_STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no"; default value: 10
PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter;ehds2" # Projects for which the results are not to be obfuscated, separated by ";" ; default value: "exliquid;dktk_supervisors;exporter;ehds2"
QUERIES_TO_CACHE = "queries_to_cache.conf" # The path to a file containing base64 encoded queries whose results are to be cached. If not set, no results are cached
PROVIDER = "name" #OMOP provider name
PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" # Base64 encoded OMOP provider icon
PROVIDER = "name" #EUCAIM provider name
PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" # Base64 encoded EUCAIM provider icon
AUTH_HEADER = "ApiKey XXXX" #Authorization header
```

In order to use Postgres querying, a Docker image built with the feature "dktk" needs to be used and this optional variable set:
```bash
POSTGRES_CONNECTION_STRING = "postgresql://postgres:Test.123@localhost:5432/postgres" # Postgres connection string
```

Additionally when using Postgres this optional variable can be set:
```bash
MAX_DB_ATTEMPTS = "8" # Max number of attempts to connect to the database; default value: 8
```

Obfuscating zero counts is by default switched off. To enable obfuscating zero counts, set the env. variable `OBFUSCATE_ZERO`.

Optionally, you can provide the `TLS_CA_CERTIFICATES_DIR` environment variable to add additional trusted certificates, e.g., if you have a TLS-terminating proxy server in place. The application respects the `HTTP_PROXY`, `HTTPS_PROXY`, `ALL_PROXY`, `NO_PROXY`, and their respective lowercase equivalents.
Expand All @@ -80,6 +90,11 @@ Creating a sample task containing an abstract syntax tree (AST) query using curl
curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"bbmri"},"body":"eyJsYW5nIjoiYXN0IiwicGF5bG9hZCI6ImV5SmhjM1FpT25zaWIzQmxjbUZ1WkNJNklrOVNJaXdpWTJocGJHUnlaVzRpT2x0N0ltOXdaWEpoYm1RaU9pSkJUa1FpTENKamFHbHNaSEpsYmlJNlczc2liM0JsY21GdVpDSTZJazlTSWl3aVkyaHBiR1J5Wlc0aU9sdDdJbXRsZVNJNkltZGxibVJsY2lJc0luUjVjR1VpT2lKRlVWVkJURk1pTENKemVYTjBaVzBpT2lJaUxDSjJZV3gxWlNJNkltMWhiR1VpZlN4N0ltdGxlU0k2SW1kbGJtUmxjaUlzSW5SNWNHVWlPaUpGVVZWQlRGTWlMQ0p6ZVhOMFpXMGlPaUlpTENKMllXeDFaU0k2SW1abGJXRnNaU0o5WFgxZGZWMTlMQ0pwWkNJNkltRTJaakZqWTJZekxXVmlaakV0TkRJMFppMDVaRFk1TFRSbE5XUXhNelZtTWpNME1DSjkifQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks
```

Creating a sample SQL task for a `SELECT_TEST` query using curl:
```bash
curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"eyJwYXlsb2FkIjoiU0VMRUNUX1RFU1QifQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks
```

Creating a sample [Exporter](https://github.com/samply/exporter) "execute" task containing an Exporter query using curl:

```bash
Expand Down
25 changes: 25 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,30 @@ fn build_cqlmap() {
).unwrap();
}

fn build_sqlmap() {
let path = Path::new(&env::var("OUT_DIR").unwrap()).join("sql_replace_map.rs");
let mut file = BufWriter::new(File::create(path).unwrap());

write!(&mut file, r#"
static SQL_REPLACE_MAP: once_cell::sync::Lazy<HashMap<&'static str, &'static str>> = once_cell::sync::Lazy::new(|| {{
let mut map = HashMap::new();
"#).unwrap();

for sqlfile in std::fs::read_dir(Path::new("resources/sql")).unwrap() {
let sqlfile = sqlfile.unwrap();
let sqlfilename = sqlfile.file_name().to_str().unwrap().to_owned();
let sqlcontent = std::fs::read_to_string(sqlfile.path()).unwrap();
write!(&mut file, r####"
map.insert(r###"{sqlfilename}"###, r###"{sqlcontent}"###);
"####).unwrap();
}

writeln!(&mut file, "
map
}});"
).unwrap();
}

fn main() {
build_data::set_GIT_COMMIT_SHORT();
build_data::set_GIT_DIRTY();
Expand All @@ -51,4 +75,5 @@ fn main() {
println!("cargo:rustc-env=SAMPLY_USER_AGENT=Samply.Focus.{}/{}", env!("CARGO_PKG_NAME"), version());

build_cqlmap();
build_sqlmap();
}
5 changes: 5 additions & 0 deletions resources/cql/DHKI_STRAT_ENCOUNTER_STRATIFIER
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
define Encounter:
if InInitialPopulation then [Encounter] else {} as List<Encounter>

define function Departments(encounter FHIR.Encounter):
encounter.identifier.where(system = 'http://dktk.dkfz.de/fhir/sid/hki-department').value.first()
5 changes: 5 additions & 0 deletions resources/cql/DHKI_STRAT_MEDICATION_STRATIFIER
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
define MedicationStatement:
if InInitialPopulation then [MedicationStatement] else {} as List <MedicationStatement>

define function AppliedMedications(medication FHIR.MedicationStatement):
medication.medication.coding.code.last()
8 changes: 8 additions & 0 deletions resources/cql/DHKI_STRAT_SPECIMEN_STRATIFIER
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
define Specimen:
if InInitialPopulation then [Specimen] else {} as List<Specimen>

define function SampleType(specimen FHIR.Specimen):
specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()

define function SampleSubtype(specimen FHIR.Specimen):
specimen.type.text.first()
9 changes: 8 additions & 1 deletion resources/cql/DKTK_STRAT_AGE_STRATIFIER
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,12 @@ from [Condition] C
where C.extension.where(url='http://hl7.org/fhir/StructureDefinition/condition-related').empty() and C.onset is not null
sort by date from onset asc)

define FirstDiagnosis:
First(
from [Condition] C
sort by date from onset asc)

define AgeClass:
if (PrimaryDiagnosis.onset is null) then 'unknown' else ToString((AgeInYearsAt(FHIRHelpers.ToDateTime(PrimaryDiagnosis.onset)) div 10) * 10)
if (PrimaryDiagnosis.onset is null)
then ToString((AgeInYearsAt(FHIRHelpers.ToDateTime(FirstDiagnosis.onset)) div 10) * 10)
else ToString((AgeInYearsAt(FHIRHelpers.ToDateTime(PrimaryDiagnosis.onset)) div 10) * 10)
1 change: 1 addition & 0 deletions resources/sql/SELECT_TEST
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT 10 AS VALUE, quote_literal('Hello Rustaceans') AS GREETING, 4.7 as FLOATY, CURRENT_DATE AS TODAY;
2 changes: 1 addition & 1 deletion src/beam.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use beam_lib::{TaskResult, BeamClient, BlockingOptions, MsgId, TaskRequest, RawString};
use http::StatusCode;
use reqwest::StatusCode;
use once_cell::sync::Lazy;
use serde::Serialize;
use tracing::{debug, warn, info};
Expand Down
2 changes: 1 addition & 1 deletion src/blaze.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use http::StatusCode;
use reqwest::StatusCode;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
Expand Down
58 changes: 43 additions & 15 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::path::PathBuf;
use std::fmt;
use std::path::PathBuf;

use beam_lib::AppId;
use clap::Parser;
use http::{HeaderValue, Uri};
use reqwest::{header::HeaderValue, Url};
use once_cell::sync::Lazy;
use reqwest::{Certificate, Client, Proxy};
use tracing::{debug, info, warn};

use crate::errors::FocusError;


#[derive(clap::ValueEnum, Clone, PartialEq, Debug)]
pub enum Obfuscate {
No,
Expand All @@ -21,18 +20,25 @@ pub enum Obfuscate {
pub enum EndpointType {
Blaze,
Omop,
#[cfg(feature = "query-sql")]
BlazeAndSql,
#[cfg(feature = "query-sql")]
Sql,
}

impl fmt::Display for EndpointType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
EndpointType::Blaze => write!(f, "blaze"),
EndpointType::Blaze => write!(f, "blaze"),
EndpointType::Omop => write!(f, "omop"),
#[cfg(feature = "query-sql")]
EndpointType::BlazeAndSql => write!(f, "blaze_and_sql"),
#[cfg(feature = "query-sql")]
EndpointType::Sql => write!(f, "sql"),
}
}
}


pub(crate) static CONFIG: Lazy<Config> = Lazy::new(|| {
debug!("Loading config");
Config::load().unwrap_or_else(|e| {
Expand All @@ -53,7 +59,7 @@ const CLAP_FOOTER: &str = "For proxy support, environment variables HTTP_PROXY,
struct CliArgs {
/// The beam proxy's base URL, e.g. https://proxy1.beam.samply.de
#[clap(long, env, value_parser)]
beam_proxy_url: Uri,
beam_proxy_url: Url,

/// This application's beam AppId, e.g. focus.proxy1.broker.samply.de
#[clap(long, env, value_parser)]
Expand All @@ -69,15 +75,15 @@ struct CliArgs {

/// The endpoint base URL, e.g. https://blaze.site/fhir/
#[clap(long, env, value_parser)]
endpoint_url: Option<Uri>,
endpoint_url: Option<Url>,

/// The endpoint base URL, e.g. https://blaze.site/fhir/, for the sake of backward compatibility, use endpoint_url instead
#[clap(long, env, value_parser)]
blaze_url: Option<Uri>,
blaze_url: Option<Url>,

/// The exporter URL, e.g. https://exporter.site/
#[clap(long, env, value_parser)]
exporter_url: Option<Uri>,
exporter_url: Option<Url>,

/// Type of the endpoint, e.g. "blaze", "omop"
#[clap(long, env, value_parser = clap::value_parser!(EndpointType), default_value = "blaze")]
Expand Down Expand Up @@ -128,7 +134,12 @@ struct CliArgs {
rounding_step: usize,

/// Projects for which the results are not to be obfuscated, separated by ;
#[clap(long, env, value_parser, default_value = "exliquid;dktk_supervisors;exporter;ehds2")]
#[clap(
long,
env,
value_parser,
default_value = "exliquid;dktk_supervisors;exporter;ehds2"
)]
projects_no_obfuscation: String,

/// Path to a file containing BASE64 encoded queries whose results are to be cached
Expand All @@ -142,7 +153,7 @@ struct CliArgs {
/// OMOP provider name
#[clap(long, env, value_parser)]
provider: Option<String>,

/// Base64 encoded OMOP provider icon
#[clap(long, env, value_parser)]
provider_icon: Option<String>,
Expand All @@ -151,15 +162,24 @@ struct CliArgs {
#[clap(long, env, value_parser)]
auth_header: Option<String>,

/// Postgres connection string
#[cfg(feature = "query-sql")]
#[clap(long, env, value_parser)]
postgres_connection_string: Option<String>,

/// Max number of attempts to connect to the database
#[cfg(feature = "query-sql")]
#[clap(long, env, value_parser, default_value = "8")]
max_db_attempts: u32,
}

pub(crate) struct Config {
pub beam_proxy_url: Uri,
pub beam_proxy_url: Url,
pub beam_app_id_long: AppId,
pub api_key: String,
pub retry_count: usize,
pub endpoint_url: Uri,
pub exporter_url: Option<Uri>,
pub endpoint_url: Url,
pub exporter_url: Option<Url>,
pub endpoint_type: EndpointType,
pub obfuscate: Obfuscate,
pub obfuscate_zero: bool,
Expand All @@ -178,6 +198,10 @@ pub(crate) struct Config {
pub provider: Option<String>,
pub provider_icon: Option<String>,
pub auth_header: Option<String>,
#[cfg(feature = "query-sql")]
pub postgres_connection_string: Option<String>,
#[cfg(feature = "query-sql")]
pub max_db_attempts: u32,
}

impl Config {
Expand Down Expand Up @@ -219,6 +243,10 @@ impl Config {
provider: cli_args.provider,
provider_icon: cli_args.provider_icon,
auth_header: cli_args.auth_header,
#[cfg(feature = "query-sql")]
postgres_connection_string: cli_args.postgres_connection_string,
#[cfg(feature = "query-sql")]
max_db_attempts: cli_args.max_db_attempts,
client,
};
Ok(config)
Expand Down Expand Up @@ -274,7 +302,7 @@ pub fn prepare_reqwest_client(certs: &Vec<Certificate>) -> Result<reqwest::Clien
),
"all_proxy" => proxies.push(
Proxy::all(v)
.map_err( FocusError::InvalidProxyConfig)?
.map_err(FocusError::InvalidProxyConfig)?
.no_proxy(no_proxy.clone()),
),
_ => (),
Expand Down
Loading

0 comments on commit 740d427

Please sign in to comment.