diff --git a/.github/workflows/benchmark.yaml b/.github/workflows/benchmark.yaml new file mode 100644 index 0000000..ea87fbd --- /dev/null +++ b/.github/workflows/benchmark.yaml @@ -0,0 +1,33 @@ +name: benchmark + +on: + workflow_dispatch: {} + +permissions: read-all + +jobs: + benchmark: + runs-on: ubuntu-24.04 + env: + SYNTHEA_POPULATION_SIZE: "100" + steps: + - name: Checkout + uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 + with: + fetch-depth: 0 + + - name: Install Task + uses: arduino/setup-task@b91d5d2c96a56797b48ac1e0e89220bf64044611 # v2.0.0 + with: + version: "3.38.0" + + - name: Run benchmark + run: | + task run + + - name: Upload results + uses: actions/upload-artifact@0b2256b8c012f0828dc542b3febcab082c67f72b # v4.3.4 + with: + name: benchmark-results + path: | + src/results/ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..199339b --- /dev/null +++ b/.gitignore @@ -0,0 +1,186 @@ +# Created by https://www.toptal.com/developers/gitignore/api/python +# Edit at https://www.toptal.com/developers/gitignore?templates=python + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +### Python Patch ### +# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration +poetry.toml + +# ruff +.ruff_cache/ + +# LSP config files +pyrightconfig.json + +# End of https://www.toptal.com/developers/gitignore/api/python + +*.jar +output/ +blazectl + + +results/ +spark-tmp/ + +synthea/ diff --git a/.sqlfluff b/.sqlfluff new file mode 100644 index 0000000..fd1e7f0 --- /dev/null +++ b/.sqlfluff @@ -0,0 +1,8 @@ +[sqlfluff] +dialect = trino +max_line_length = 120 +# RF01 doesn't really work with structs. +exclude_rules = RF01 + +[sqlfluff:rules:capitalisation.identifiers] +extended_capitalisation_policy = lower diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..2b69939 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "cwd": "${workspaceFolder}/src" + } + ] +} diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..adf4577 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "sqlfluff.executablePath": ".venv/bin/sqlfluff" +} diff --git a/README.md b/README.md index dac208a..0471e84 100644 --- a/README.md +++ b/README.md @@ -1 +1,11 @@ -# analytics-on-fhir-benchmark \ No newline at end of file +# Analytics on FHIR Benchmark + +## Setup + +```sh +git clone https://github.com/bzkf/analytics-on-fhir-benchmark.git +cd analytics-on-fhir-benchmark/ +./setup.sh + +task run +``` diff --git a/Taskfile.yaml b/Taskfile.yaml new file mode 100644 index 0000000..f493874 --- /dev/null +++ b/Taskfile.yaml @@ -0,0 +1,81 @@ +version: "3" + +env: + SYNTHEA_POPULATION_SIZE: 1000 + +vars: + # only here as a comment. not yet used to loop through all at once. + SYNTHEA_POPULATION_SIZES: + - 1000 + - 5000 + - 10000 + - 50000 + - 100000 + +tasks: + install-dependencies: + cmds: + - curl -LO https://github.com/synthetichealth/synthea/releases/download/master-branch-latest/synthea-with-dependencies.jar + - curl -LO https://repo1.maven.org/maven2/io/trino/trino-cli/453/trino-cli-453-executable.jar + - curl -LO https://github.com/samply/blazectl/releases/download/v0.16.0/blazectl-0.16.0-linux-amd64.tar.gz + - tar xzf blazectl-0.16.0-linux-amd64.tar.gz + - rm blazectl-0.16.0-linux-amd64.tar.gz + - ./blazectl --version + - pip install -r requirements.txt + generates: + - synthea-with-dependencies.jar + - blazectl + - trino-cli-453-executable.jar + + # TODO: could use a task loop to generate synthea data for all sizes at once. + generate-fhir-data: + cmds: + - java -jar synthea-with-dependencies.jar -s 20240711 -cs 20240711 -r 20240711 -p "${SYNTHEA_POPULATION_SIZE}" -c config/synthea.properties --exporter.baseDirectory="./synthea/output-${SYNTHEA_POPULATION_SIZE}/bulk" --exporter.fhir.bulk_data="true" + - java -jar synthea-with-dependencies.jar -s 20240711 -cs 20240711 -r 20240711 -p "${SYNTHEA_POPULATION_SIZE}" -c config/synthea.properties --exporter.baseDirectory="./synthea/output-${SYNTHEA_POPULATION_SIZE}/transactions" --exporter.fhir.bulk_data="false" + + start-servers: + cmds: + - docker compose -f compose.blaze.yaml -f compose.pathling.yaml -f compose.trino.yaml up -d + # after the import is done, we no longer need the pathling server itself + - docker compose -f compose.pathling.yaml stop pathling + + upload-fhir-data: + cmds: + # blaze + # hospitalInformation & practitionerInformation are required for referential integrity. + - curl -X POST --header 'Content-Type:application/fhir+json' --data @"$(find synthea/output-${SYNTHEA_POPULATION_SIZE}/transactions/fhir/ -name 'hospitalInformation*.json')" --url 'http://localhost:8084/fhir' + - curl -X POST --header 'Content-Type:application/fhir+json' --data @"$(find synthea/output-${SYNTHEA_POPULATION_SIZE}/transactions/fhir/ -name 'practitionerInformation*.json')" --url 'http://localhost:8084/fhir' + - ./blazectl upload --concurrency 32 synthea/output-${SYNTHEA_POPULATION_SIZE}/transactions/fhir/ --server http://localhost:8084/fhir + # hapi fhir jpa + # - curl -X POST --header 'Content-Type:application/fhir+json' --data @"$(find output/transactions/fhir/ -name 'hospitalInformation*.json')" --url 'http://localhost:8084/fhir' + # - curl -X POST --header 'Content-Type:application/fhir+json' --data @"$(find output/transactions/fhir/ -name 'practitionerInformation*.json')" --url 'http://localhost:8084/fhir' + # - ./blazectl upload output/transactions/fhir/ --server http://localhost:8084/fhir + + remove-local-synthea-files: + cmds: + - rm -r synthea/ + + run-benchmarks: + dir: src/ + cmds: + - python main.py + + draw-plots: + dir: src/ + cmds: + - python plot.py + + run: + cmds: + - task: install-dependencies + - task: generate-fhir-data + - task: start-servers + - task: upload-fhir-data + - task: remove-local-synthea-files + - task: run-benchmarks + - task: draw-plots + - task: clean + + clean: + cmds: + - docker compose -f compose.blaze.yaml -f compose.pathling.yaml -f compose.trino.yaml down -v --remove-orphans diff --git a/compose.blaze.yaml b/compose.blaze.yaml new file mode 100644 index 0000000..c2a9ac5 --- /dev/null +++ b/compose.blaze.yaml @@ -0,0 +1,35 @@ +services: + blaze: + image: docker.io/samply/blaze:0.29.3@sha256:47527f478ef2ddf932f4e5c882c0266ac179d7a2fc37ed66868d079698541da3 + environment: + JAVA_TOOL_OPTIONS: "-Xmx64g" + BASE_URL: "http://localhost:8083" + volumes: + - "blaze-data:/app/data" + ports: + - "127.0.0.1:8083:8080" + + wait-for-blaze: + image: docker.io/curlimages/curl:8.8.0@sha256:73e4d532ea62d7505c5865b517d3704966ffe916609bedc22af6833dc9969bcd + ipc: none + security_opt: + - "no-new-privileges:true" + cap_drop: + - ALL + privileged: false + restart: "no" + environment: + BLAZE_SERVER_URL: http://blaze:8080 + entrypoint: ["/bin/sh", "-c"] + command: + - | + until [ "$(curl -s -o /dev/null -L -w "%{http_code}" "$$BLAZE_SERVER_URL/fhir/metadata")" == "200" ]; do + echo "$(date): Waiting for blaze server @ $$BLAZE_SERVER_URL to be up"; + sleep 5; + done; + depends_on: + blaze: + condition: service_started + +volumes: + blaze-data: {} diff --git a/compose.pathling.yaml b/compose.pathling.yaml new file mode 100644 index 0000000..1ae9fc1 --- /dev/null +++ b/compose.pathling.yaml @@ -0,0 +1,215 @@ +services: + minio: + image: docker.io/bitnami/minio:2024.7.10@sha256:9a1a49c6c460733f6b4070dbf2f189c5523666b7be22993fe20c7fb4b51b19d6 + ipc: none + security_opt: + - "no-new-privileges:true" + cap_drop: + - ALL + privileged: false + environment: + MINIO_UPDATE: "off" + MINIO_CALLHOME_ENABLE: "off" + MINIO_ROOT_USER: "admin" + # kics-scan ignore-line + MINIO_ROOT_PASSWORD: "miniopass" # gitleaks:allow + MINIO_DEFAULT_BUCKETS: "fhir" + MINIO_SCHEME: "http" + volumes: + - minio-data:/bitnami/minio/data:rw + ports: + - "127.0.0.1:9001:9001" + - "127.0.0.1:9000:9000" + deploy: + resources: + limits: + memory: 8G + + wait-for-minio: + image: docker.io/curlimages/curl:8.8.0@sha256:73e4d532ea62d7505c5865b517d3704966ffe916609bedc22af6833dc9969bcd + ipc: none + security_opt: + - "no-new-privileges:true" + cap_drop: + - ALL + privileged: false + restart: "no" + environment: + MINIO_ENDPOINT_URL: http://minio:9000 + entrypoint: ["/bin/sh", "-c"] + command: + - | + until [ "$(curl -s -o /dev/null -L -w "%{http_code}" "$$MINIO_ENDPOINT_URL/minio/health/live")" == "200" ]; do + echo "$(date): Waiting for minio server @ $$MINIO_ENDPOINT_URL to be up"; + sleep 5; + done; + depends_on: + minio: + condition: service_started + + pathling: + image: docker.io/aehrc/pathling:7.0.1@sha256:70177a4eb7a20a5edba7a4957ac6cd245c29e3c306e98c5de59fe2974c1f71b8 + ipc: none + security_opt: + - "no-new-privileges:true" + cap_drop: + - ALL + privileged: false + environment: + JAVA_TOOL_OPTIONS: | + -Xmx64g + -Xss64m + -XX:G1HeapRegionSize=32M + -XX:+ExplicitGCInvokesConcurrent + -XX:+ExitOnOutOfMemoryError + -XX:+HeapDumpOnOutOfMemoryError + -Duser.timezone=UTC + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + pathling.storage.warehouseUrl: s3a://fhir + pathling.storage.cacheDatasets: "false" + pathling.query.cacheResults: "false" + pathling.import.allowableSources: file:///tmp/import/ + pathling.terminology.enabled: "false" + pathling.terminology.serverUrl: http://localhost:8080/i-dont-exist + fs.s3a.endpoint: "http://minio:9000" + fs.s3a.access.key: "admin" + fs.s3a.secret.key: "miniopass" + fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem" + fs.s3a.path.style.access: "true" + # spark.sql.parquet.compression.codec: "zstd" + # spark.io.compression.codec: "zstd" + # parquet.compression.codec.zstd.level: "9" + spark.serializer: "org.apache.spark.serializer.KryoSerializer" + spark.master: "local[*]" + spark.driver.memory: 24g + ports: + - "127.0.0.1:8082:8080" + - "127.0.0.1:4040:4040" + volumes: + - $PWD/synthea/output-${SYNTHEA_POPULATION_SIZE:?}/bulk/fhir/:/tmp/import/:ro + depends_on: + wait-for-minio: + condition: service_completed_successfully + + wait-for-pathling: + image: docker.io/curlimages/curl:8.8.0@sha256:73e4d532ea62d7505c5865b517d3704966ffe916609bedc22af6833dc9969bcd + ipc: none + security_opt: + - "no-new-privileges:true" + cap_drop: + - ALL + privileged: false + restart: "no" + environment: + PATHLING_URL: http://pathling:8080 + entrypoint: ["/bin/sh", "-c"] + command: + - | + until [ "$(curl -s -o /dev/null -L -w "%{http_code}" "$$PATHLING_URL/fhir/metadata")" == "200" ]; do + echo "$(date): Waiting for pathling server @ $$PATHLING_URL to be up"; + sleep 5; + done; + depends_on: + pathling: + condition: service_started + + import-resources: + image: docker.io/curlimages/curl:8.8.0@sha256:73e4d532ea62d7505c5865b517d3704966ffe916609bedc22af6833dc9969bcd + ipc: none + security_opt: + - "no-new-privileges:true" + cap_drop: + - ALL + privileged: false + restart: "no" + environment: + PATHLING_URL: http://pathling:8080 + entrypoint: ["/bin/sh", "-c"] + command: + - | + curl -X POST --header 'Content-Type: application/fhir+json' --data @/tmp/pathling-import-request.json --url 'http://pathling:8080/fhir/$$import' + volumes: + - $PWD/pathling-import-request.json:/tmp/pathling-import-request.json:ro + depends_on: + wait-for-pathling: + condition: service_completed_successfully + + warehousekeeper: + image: ghcr.io/miracum/util-images/warehousekeeper:v0.1.7@sha256:1487804b409e0f85a1b6e89b7bdd5dd89b730dfc41e2348fbde0810c9f3ea00a + ipc: none + security_opt: + - "no-new-privileges:true" + cap_drop: + - ALL + privileged: false + restart: "no" + environment: + AWS_ALLOW_HTTP: "1" + AWS_REGION: "eu-central-1" + AWS_ENDPOINT_URL: "http://minio:9000" + AWS_ACCESS_KEY_ID: "admin" + AWS_SECRET_ACCESS_KEY: "miniopass" + entrypoint: ["/bin/sh", "-c"] + command: + - | + python3 /opt/warehousekeeper/warehousekeeper.py register --bucket-name=fhir --database-name-prefix=default/ --hive-metastore=thrift://hive-metastore:9083 + python3 /opt/warehousekeeper/warehousekeeper.py optimize --bucket-name=fhir --database-name-prefix=default/ + python3 /opt/warehousekeeper/warehousekeeper.py vacuum --bucket-name=fhir --database-name-prefix=default/ --retention-hours=0 --dry-run=false --enforce-retention-duration=false + volumes: + - $PWD/config/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf:ro + depends_on: + import-resources: + condition: service_completed_successfully + + metastore-db: + image: docker.io/bitnami/postgresql:16.3.0@sha256:b0248a5e2bf4fda5208183d4a6203287828666823a7a57431cfa4d31688bae97 + ipc: private + security_opt: + - "no-new-privileges:true" + cap_drop: + - ALL + privileged: false + restart: unless-stopped + environment: + POSTGRESQL_USERNAME: hive + POSTGRESQL_PASSWORD: hive + POSTGRESQL_DATABASE: metastore + deploy: + resources: + limits: + memory: 2G + + hive-metastore: + image: ghcr.io/miracum/util-images/hive-metastore:v1.2.0@sha256:d8f99a4a16ede640f184dca9bd84bef84f84b26f41b10eb5c62a57bb796268c8 + ipc: none + security_opt: + - "no-new-privileges:true" + cap_drop: + - ALL + privileged: false + restart: unless-stopped + environment: + SERVICE_NAME: metastore + DB_DRIVER: postgres + SERVICE_OPTS: | + -Djavax.jdo.option.ConnectionUserName=hive + -Djavax.jdo.option.ConnectionPassword=hive + AWS_ACCESS_KEY_ID: "admin" + AWS_SECRET_ACCESS_KEY: "miniopass" + AWS_DEFAULT_REGION: "eu-central-1" + volumes: + - $PWD/config/hive-site.xml:/opt/hive/conf/hive-site.xml:ro + depends_on: + metastore-db: + condition: service_started + deploy: + resources: + limits: + memory: 2G + +volumes: + minio-data: {} diff --git a/compose.trino.yaml b/compose.trino.yaml new file mode 100644 index 0000000..ffb7811 --- /dev/null +++ b/compose.trino.yaml @@ -0,0 +1,21 @@ +services: + trino: + image: docker.io/trinodb/trino:453@sha256:86f7a156aeeca1dda3a1ba1e97474913d10c75f8fb7cda0da1d29a52b5731e0c + ipc: none + security_opt: + - "no-new-privileges:true" + cap_drop: + - ALL + privileged: false + restart: unless-stopped + environment: + AWS_ACCESS_KEY_ID: "admin" + AWS_SECRET_ACCESS_KEY: "miniopass" + ports: + - "127.0.0.1:8080:8080" + volumes: + - $PWD/config/trino/etc:/usr/lib/trino/etc:ro + - $PWD/config/trino/catalog:/etc/trino/catalog:ro + depends_on: + - hive-metastore + - minio diff --git a/compose.vacuum.yaml b/compose.vacuum.yaml new file mode 100644 index 0000000..88d44b8 --- /dev/null +++ b/compose.vacuum.yaml @@ -0,0 +1,22 @@ +services: + warehousekeeper: + image: ghcr.io/miracum/util-images/warehousekeeper:v0.1.7@sha256:1487804b409e0f85a1b6e89b7bdd5dd89b730dfc41e2348fbde0810c9f3ea00a + ipc: none + security_opt: + - "no-new-privileges:true" + cap_drop: + - ALL + privileged: false + restart: "no" + environment: + AWS_ALLOW_HTTP: "1" + AWS_REGION: "eu-central-1" + AWS_ENDPOINT_URL: "http://minio:9000" + AWS_ACCESS_KEY_ID: "admin" + AWS_SECRET_ACCESS_KEY: "miniopass" + entrypoint: ["/bin/sh", "-c"] + command: + - | + python3 /opt/warehousekeeper/warehousekeeper.py vacuum --bucket-name=fhir --database-name-prefix=default/ --retention-hours=0 --dry-run=false --enforce-retention-duration=false + volumes: + - $PWD/config/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf:ro diff --git a/config/hive-site.xml b/config/hive-site.xml new file mode 100644 index 0000000..f9675de --- /dev/null +++ b/config/hive-site.xml @@ -0,0 +1,58 @@ + + + + hive.metastore.warehouse.dir + s3a://fhir/metastore + + + fs.s3.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3n.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3a.endpoint + http://minio:9000 + + + fs.s3a.path.style.access + true + + + javax.jdo.option.ConnectionURL + jdbc:postgresql://metastore-db:5432/metastore + + + javax.jdo.option.ConnectionDriverName + org.postgresql.Driver + + + metastore.storage.schema.reader.impl + org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader + + + hive.security.authorization.createtable.owner.grants + ALL + The set of privileges automatically granted to the owner whenever a table gets + created. + + + + hive.security.metastore.authorization.auth.reads + true + + + hive.users.in.admin.role + admin + + + hive.input.format + io.delta.hive.HiveInputFormat + + + hive.tez.input.format + io.delta.hive.HiveInputFormat + + diff --git a/config/spark-defaults.conf b/config/spark-defaults.conf new file mode 100644 index 0000000..42282d4 --- /dev/null +++ b/config/spark-defaults.conf @@ -0,0 +1 @@ +spark.driver.memory=32g diff --git a/config/synthea.properties b/config/synthea.properties new file mode 100644 index 0000000..c824a07 --- /dev/null +++ b/config/synthea.properties @@ -0,0 +1,4 @@ +exporter.fhir.export = true +exporter.hospital.fhir.export = true +exporter.practitioner.fhir.export = true +exporter.fhir.included_resources = Practitioner,Organization,Patient,Condition,Observation,Encounter diff --git a/config/trino/catalog/fhir.properties b/config/trino/catalog/fhir.properties new file mode 100644 index 0000000..e8f2e98 --- /dev/null +++ b/config/trino/catalog/fhir.properties @@ -0,0 +1,10 @@ +connector.name=delta_lake +delta.enable-non-concurrent-writes=true +hive.s3.endpoint=http://minio:9000 +hive.s3.aws-access-key=${ENV:AWS_ACCESS_KEY_ID} +hive.s3.aws-secret-key=${ENV:AWS_SECRET_ACCESS_KEY} +hive.s3.path-style-access=true +hive.metastore.uri=thrift://hive-metastore:9083 +hive.metastore-cache-ttl=0s +hive.metastore-refresh-interval=5s +hive.metastore.thrift.client.connect-timeout=10s diff --git a/config/trino/etc/config.properties b/config/trino/etc/config.properties new file mode 100644 index 0000000..0b4b617 --- /dev/null +++ b/config/trino/etc/config.properties @@ -0,0 +1,4 @@ +coordinator=true +node-scheduler.include-coordinator=true +http-server.http.port=8080 +discovery.uri=http://localhost:8080 diff --git a/config/trino/etc/jvm.config b/config/trino/etc/jvm.config new file mode 100644 index 0000000..e731415 --- /dev/null +++ b/config/trino/etc/jvm.config @@ -0,0 +1,20 @@ +-server +-Xmx64g +-XX:InitialRAMPercentage=80 +-XX:MaxRAMPercentage=80 +-XX:G1HeapRegionSize=32M +-XX:+ExplicitGCInvokesConcurrent +-XX:+ExitOnOutOfMemoryError +-XX:+HeapDumpOnOutOfMemoryError +-XX:-OmitStackTraceInFastThrow +-XX:ReservedCodeCacheSize=512M +-XX:PerMethodRecompilationCutoff=10000 +-XX:PerBytecodeRecompilationCutoff=10000 +-Djdk.attach.allowAttachSelf=true +-Djdk.nio.maxCachedBufferSize=2000000 +-Dfile.encoding=UTF-8 +# Allow loading dynamic agent used by JOL +-XX:+EnableDynamicAgentLoading +# https://bugs.openjdk.org/browse/JDK-8329528 +-XX:+UnlockDiagnosticVMOptions +-XX:G1NumCollectionsKeepPinned=10000000 diff --git a/config/trino/etc/node.properties b/config/trino/etc/node.properties new file mode 100644 index 0000000..2846d5c --- /dev/null +++ b/config/trino/etc/node.properties @@ -0,0 +1,2 @@ +node.environment=compose +node.data-dir=/data/trino diff --git a/pathling-import-request.json b/pathling-import-request.json new file mode 100644 index 0000000..f09a2f7 --- /dev/null +++ b/pathling-import-request.json @@ -0,0 +1,73 @@ +{ + "resourceType": "Parameters", + "parameter": [ + { + "name": "source", + "part": [ + { + "name": "resourceType", + "valueCode": "Patient" + }, + { + "name": "url", + "valueUrl": "file:///tmp/import/Patient.ndjson" + }, + { + "name": "mode", + "valueCode": "overwrite" + } + ] + }, + { + "name": "source", + "part": [ + { + "name": "resourceType", + "valueCode": "Condition" + }, + { + "name": "url", + "valueUrl": "file:///tmp/import/Condition.ndjson" + }, + { + "name": "mode", + "valueCode": "overwrite" + } + ] + }, + { + "name": "source", + "part": [ + { + "name": "resourceType", + "valueCode": "Encounter" + }, + { + "name": "url", + "valueUrl": "file:///tmp/import/Encounter.ndjson" + }, + { + "name": "mode", + "valueCode": "overwrite" + } + ] + }, + { + "name": "source", + "part": [ + { + "name": "resourceType", + "valueCode": "Observation" + }, + { + "name": "url", + "valueUrl": "file:///tmp/import/Observation.ndjson" + }, + { + "name": "mode", + "valueCode": "overwrite" + } + ] + } + ] +} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..86f9661 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +fhir-pyrate==0.2.1 +trino==0.329.0 +sqlfluff==3.1.0 +pathling==7.0.1 +loguru==0.7.2 +polars==1.3.0 +seaborn==0.13.2 +docker==7.1.0 +deltalake==0.19.2 diff --git a/setup.sh b/setup.sh new file mode 100644 index 0000000..e7be375 --- /dev/null +++ b/setup.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +sh -c "$(curl --location https://taskfile.dev/install.sh)" -- -d -b /usr/local/bin + +# Add Docker's official GPG key: +apt-get update +apt-get install -y ca-certificates +install -m 0755 -d /etc/apt/keyrings +curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc +chmod a+r /etc/apt/keyrings/docker.asc + +# Add the repository to Apt sources: +echo \ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \ + $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \ + tee /etc/apt/sources.list.d/docker.list > /dev/null + +apt-get update +apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin python3-pip python3.12-venv openjdk-21-jre-headless + +python3 -m venv .venv +source .venv/bin/activate diff --git a/src/benchmark.py b/src/benchmark.py new file mode 100644 index 0000000..9a282a9 --- /dev/null +++ b/src/benchmark.py @@ -0,0 +1,43 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +import datetime +from enum import Enum + + +class QueryType(Enum): + AGGREGATE = "aggregate" + COUNT = "count" + EXTRACT = "extract" + + def __str__(self): + return str(self.value).lower() + +class QueryEngine(Enum): + PATHLING = "pathling" + PYRATE = "pyrate" + TRINO = "trino" + + def __str__(self): + return str(self.value).lower() + + +@dataclass +class BenchmarkRunResult: + run_id: str + start_timestamp: datetime.datetime + engine: str + query: str + query_type: QueryType + total_duration_seconds: float + write_to_file_duration_seconds: float + fetch_duration_seconds: float + post_process_duration_seconds: float = 0 + trino_cpu_time_seconds: float = 0 + trino_wall_time_seconds: float = 0 + trino_elapsed_time_seconds: float = 0 + + +class Benchmark(ABC): + @abstractmethod + def run_all_queries(self): + pass diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..27eafeb --- /dev/null +++ b/src/main.py @@ -0,0 +1,119 @@ +import datetime +from pathlib import Path +import sys +import time +from loguru import logger +import pandas as pd +import docker +import gc +import time + +from pathling_benchmark import PathlingBenchmark +from pyrate_benchmark import PyrateBenchmark +from trino_benchmark import TrinoBenchmark + +NUM_RUNS_PER_ENGINE: int = 5 + + +def main() -> int: + results = pd.DataFrame() + + docker_client = docker.from_env() + + logger.info("Setting up benchmarks") + trino = TrinoBenchmark() + pyrate = PyrateBenchmark() + pathling = PathlingBenchmark() + + resources_to_count = ["Patient", "Observation", "Encounter", "Condition"] + + resource_count_total = trino.get_resource_counts_total(resources_to_count) + + resource_counts = trino.get_resource_counts(resources_to_count) + + logger.info("Resource counts: {resource_counts}", resource_counts=resource_counts) + + logger.info( + "Running {n} rounds per engine against {resource_count_total} total resources", + n=NUM_RUNS_PER_ENGINE, + resource_count_total=resource_count_total, + ) + + benchmark_timestamp = datetime.datetime.now(datetime.UTC) + + failed_run_count = 0 + + for i in range(NUM_RUNS_PER_ENGINE): + logger.info( + "Run {i} out of {total_runs}", i=i + 1, total_runs=NUM_RUNS_PER_ENGINE + ) + + # trino + trino_results = trino.run_all_queries(run_id=str(i)) + results = pd.concat([results, pd.DataFrame(trino_results)]) + docker_client.containers.get("analytics-on-fhir-benchmark-minio-1").restart() + docker_client.containers.get("analytics-on-fhir-benchmark-trino-1").restart() + gc.collect() + + logger.info("Done with trino. Waiting for 30s") + time.sleep(30) + + # pathling + # we occasionally observe transient OOM issues, so add retries here + max_retries = 3 + retry_count = 0 + while retry_count < max_retries: + try: + pathling_results = pathling.run_all_queries(run_id=str(i)) + results = pd.concat([results, pd.DataFrame(pathling_results)]) + pathling.reset() + break + except Exception as exc: + logger.error( + "Pathling benchmark failed {error}. Attempt {retry_count} out of {max_retries}.", + retry_count=retry_count, + max_retries=max_retries, + error=exc, + ) + failed_run_count += 1 + retry_count += 1 + + logger.info("Done with pathling. Waiting for 30s") + time.sleep(30) + + # pyrate + pyrate_results = pyrate.run_all_queries(run_id=str(i)) + results = pd.concat([results, pd.DataFrame(pyrate_results)]) + docker_client.containers.get("analytics-on-fhir-benchmark-blaze-1").restart() + gc.collect() + + logger.info("Done with pyrate. Waiting for 30s") + time.sleep(30) + + logger.info( + "All benchmarks completed. Failed runs: {failed_run_count}", + failed_run_count=failed_run_count, + ) + output_dir = Path.cwd() / "results" / "benchmark-runs" + output_dir.mkdir(parents=True, exist_ok=True) + + results["benchmark_timestamp"] = benchmark_timestamp + + # append the resource_count_total as a fixed-value column. Makes it easier to later facet by it. + results["resource_count_total"] = resource_count_total + + for resource_type in resource_counts.keys(): + results[f"resource_count_{resource_type.lower()}"] = resource_counts[ + resource_type + ] + + results.to_csv( + output_dir + / f"{time.strftime("%Y%m%d-%H%M%S")}-{resource_count_total}-benchmark-results.csv", + index=False, + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/pathling_benchmark.py b/src/pathling_benchmark.py new file mode 100644 index 0000000..a81ef9f --- /dev/null +++ b/src/pathling_benchmark.py @@ -0,0 +1,253 @@ +import datetime +import time +from pathling import PathlingContext, Expression as exp +from pyspark.sql import SparkSession, DataFrame +from pyspark.sql.functions import count_distinct +from loguru import logger +from pathlib import Path + +from benchmark import Benchmark, BenchmarkRunResult, QueryType + + +class PathlingBenchmark(Benchmark): + def __init__(self): + self._init_pc() + logger.info("Completed initialization.") + + def _init_pc(self): + spark = ( + SparkSession.builder.config( + "spark.jars.packages", + "au.csiro.pathling:library-runtime:7.0.1,io.delta:delta-spark_2.12:3.2.0,org.apache.hadoop:hadoop-aws:3.3.4", + ) + .config("fs.s3a.access.key", "admin") + .config("fs.s3a.secret.key", "miniopass") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog", + ) + .config( + "spark.driver.memory", + "64g", + ) + .config( + "spark.hadoop.fs.s3a.endpoint", + "localhost:9000", + ) + .config( + "spark.hadoop.fs.s3a.connection.ssl.enabled", + "false", + ) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config( + "spark.hadoop.fs.s3a.path.style.access", + "true", + ) + .config("spark.local.dir", Path.cwd() / "spark-tmp") + .getOrCreate() + ) + + self.pc = PathlingContext.create( + spark, enable_delta=True, enable_terminology=False + ) + + def run_all_queries(self, run_id: str) -> list[BenchmarkRunResult]: + output_folder_base = Path.cwd() / "results" / "pathling" + + data = self.pc.read.delta("s3a://fhir/default") + + results = [] + queries = { + QueryType.EXTRACT: [ + { + "query_name": "gender-age", + "resource_type": "Patient", + "columns": [ + exp("Patient.id", "patient_id"), + exp("Patient.birthDate", "patient_birthdate"), + exp("Patient.gender", "patient_gender"), + ], + "filters": [ + "Patient.gender = 'female' and Patient.birthDate >= @1970-01-01" + ], + }, + { + "query_name": "diabetes", + "resource_type": "Condition", + "columns": [ + exp("Condition.id", "condition_id"), + exp( + "Condition.code.coding.where(system='http://snomed.info/sct' and (code='73211009' or code='427089005' or code='44054006')).code", + "condition_snomed_code", + ), + exp("Condition.onsetDateTime", "condition_onset"), + exp("Condition.encounter.resolve().id", "encounter_id"), + exp( + "Condition.encounter.resolve().period.start", + "encounter_period_start", + ), + exp( + "Condition.encounter.resolve().period.end", + "encounter_period_end", + ), + exp("Condition.encounter.resolve().status", "encounter_status"), + exp( + "Condition.subject.resolve().ofType(Patient).id", + "patient_id", + ), + exp( + "Condition.subject.resolve().ofType(Patient).birthDate", + "patient_birthdate", + ), + ], + "filters": [ + "Condition.code.coding.where(system='http://snomed.info/sct' and (code='73211009' or code='427089005' or code='44054006')).exists()", + "Condition.encounter.resolve().period.start >= @2020-01-01", + "Condition.subject.resolve().ofType(Patient).birthDate >= @1970-01-01", + ], + }, + { + "query_name": "hemoglobin", + "resource_type": "Observation", + "columns": [ + exp( + "Observation.subject.resolve().ofType(Patient).id", + "patient_id", + ), + exp( + "Observation.subject.resolve().ofType(Patient).birthDate", + "patient_birthdate", + ), + exp("Observation.id", "observation_id"), + exp( + "Observation.code.coding.where(system = 'http://loinc.org').code", + "loinc_code", + ), + exp( + "Observation.valueQuantity.where(system = 'http://unitsofmeasure.org').code", + "value_quantity_ucum_code", + ), + exp( + "Observation.valueQuantity.where(system = 'http://unitsofmeasure.org').value", + "value_quantity_value", + ), + exp("Observation.effectiveDateTime", "effective_datetime"), + exp( + "Observation.subject.reference", + "observation_patient_reference", + ), + ], + "filters": [ + "Observation.exists((code.coding.exists(system='http://loinc.org' and code='718-7') and valueQuantity.exists(system='http://unitsofmeasure.org' and code='g/dL') and valueQuantity.value > 25) " + + "or (code.coding.exists(system='http://loinc.org' and (code='17856-6' or code='4548-4' or code='4549-2')) and valueQuantity.exists(system='http://unitsofmeasure.org' and code='%') and valueQuantity.value > 5))", + ], + }, + ], + QueryType.AGGREGATE: [ + { + "query_name": "observations-by-code", + "resource_type": "Observation", + "aggregations": [exp("count()", "num_observations")], + "groupings": [ + exp("code.coding", "coding"), + ], + "filters": [], + "order_by": "num_observations", + }, + ], + QueryType.COUNT: [ + { + "query_name": "gender-age", + }, + { + "query_name": "diabetes", + }, + { + "query_name": "hemoglobin", + }, + ], + } + + start_timestamp = datetime.datetime.now(datetime.UTC) + + for query_type in QueryType: + output_folder = output_folder_base / str(query_type) + output_folder.mkdir(parents=True, exist_ok=True) + + for query in queries[query_type]: + query_name = query["query_name"] + logger.info( + "Running {query_type} query {query_name}", + query_type=query_type, + query_name=query_name, + ) + timings_start = time.perf_counter() + + df: DataFrame = None + + if query_type == QueryType.AGGREGATE: + df = data.aggregate( + resource_type=query["resource_type"], + aggregations=query["aggregations"], + groupings=query["groupings"], + filters=query["filters"], + ) + df = df.orderBy(query["order_by"], ascending=False).select( + "coding.display", + "coding.code", + "coding.system", + "num_observations", + ) + else: + # re-use the query with the same name in the list of "extract" queries + if query_type == QueryType.COUNT: + query = [ + q + for q in queries[QueryType.EXTRACT] + if q["query_name"] == query_name + ][0] + + df = data.extract( + resource_type=query["resource_type"], + columns=query["columns"], + filters=query["filters"], + ) + + if query_type == QueryType.COUNT: + # in case of the diabetes query, count the condition id, not the patient id + if query_name == "diabetes": + df = df.agg(count_distinct("condition_id")) + else: + df = df.agg(count_distinct("patient_id")) + else: + df = df.orderBy("patient_id", ascending=True) + + df.write.option("header", "true").format("csv").mode("overwrite").save( + (output_folder / f"{query_name}.csv").as_posix() + ) + + duration_total = time.perf_counter() - timings_start + + result = BenchmarkRunResult( + run_id=run_id, + start_timestamp=start_timestamp, + engine="pathling", + query=query_name, + query_type=query_type, + total_duration_seconds=duration_total, + write_to_file_duration_seconds=0, + fetch_duration_seconds=0, + post_process_duration_seconds=0, + ) + results.append(result) + + df.unpersist(blocking=True) + + return results + + def reset(self): + self.pc.spark.catalog.clearCache() + self.pc.spark.sparkContext._jvm.System.gc() + self.pc.spark.stop() + self._init_pc() diff --git a/src/plot.py b/src/plot.py new file mode 100644 index 0000000..7c7895d --- /dev/null +++ b/src/plot.py @@ -0,0 +1,153 @@ +from pathlib import Path +import pandas as pd +import seaborn as sns +from loguru import logger + +from benchmark import QueryType + +def compute_relative_duration(group): + # Find the duration for the FHIR-PYrate engine as the reference + reference_row = group[group["engine"] == "FHIR-PYrate"] + if not reference_row.empty: + reference_duration = reference_row["total_duration_seconds"].values[0] + else: + raise ValueError("reference row not found!") + + # Calculate relative duration by dividing each duration by the reference + group["relative_duration"] = reference_duration / group["total_duration_seconds"] + return group + +df = pd.DataFrame() + +results_dir_path = Path.cwd() / "results" / "benchmark-runs" + +for file in results_dir_path.glob("*.csv"): + if file.stem.startswith("_"): + logger.info("Skipping {file}", file=file) + continue + + logger.info("Adding {file} to dataset", file=file) + df = pd.concat([df, pd.read_csv(file)]) + +record_counts = [1000, 5000, 10000, 50000, 100000] + +df["resource_count_total_categorical"] = df["resource_count_total"].astype("category") +df["engine"] = ( + df["engine"] + .astype("category") + .cat.rename_categories( + {"pyrate": "FHIR-PYrate", "pathling": "Pathling", "trino": "Trino"} + ) +) +df["query_type"] = df["query_type"].astype("category") + +df["record_count_numeric"] = ( + df["resource_count_patient"] + .apply(lambda value: min(record_counts, key=lambda x: abs(x - value))) + .astype("int") +) + +df["record_count_categorical"] = df["record_count_numeric"].astype("category") + + +logger.info(df) +logger.info(df.dtypes) + +results_dir = Path.cwd() / "results" +results_dir.mkdir(parents=True, exist_ok=True) + +# this results in some NA values in the means since `observations-by-code` is +# only used in the `aggregate` scenario and in turn all other scenarios +# are not ran against the `observations-by-code` query. +df[df["record_count_categorical"] == 100000].groupby( + ["engine", "query_type", "query", "record_count_categorical"] +)["total_duration_seconds"].mean().dropna().drop( + columns=("record_count_categorical") +).to_csv( + results_dir / "summary-largest-record-count.csv" +) + +means_largest_record_count = pd.read_csv( + results_dir / "summary-largest-record-count.csv" +) + +logger.info(means_largest_record_count) + +# Apply the function to each (query, query_type) group +relative_data = means_largest_record_count.groupby(["query", "query_type"]).apply( + compute_relative_duration +) + +# Sort and display +relative_data = relative_data.sort_values(by=["query", "query_type", "engine"]) + +logger.info(relative_data) + +mean_relative_performance = ( + relative_data.groupby(["engine", "query_type"])["relative_duration"].mean().dropna() +) + +logger.info(mean_relative_performance) + +output_dir = results_dir / "plots" +output_dir.mkdir(parents=True, exist_ok=True) + +sns.set_theme(style="whitegrid", font="sans-serif", context="paper") + +df_original = df + +for query_type in [QueryType.EXTRACT, QueryType.COUNT, QueryType.AGGREGATE]: + query_type_str = str(query_type) + + (output_dir / query_type_str).mkdir(parents=True, exist_ok=True) + + df = df_original[df_original["query_type"] == query_type_str] + + col_order = ["gender-age", "diabetes", "hemoglobin"] + titles = "{col_name}" + col_wrap = 2 + + if query_type == QueryType.AGGREGATE: + col_order = ["observations-by-code"] + titles = "" + col_wrap = None + + g = sns.catplot( + data=df, + kind="bar", + x="record_count_categorical", + y="total_duration_seconds", + hue="engine", + col="query", + palette="Set2", + errorbar=("ci", 95), + col_order=col_order, + err_kws={"color": ".1", "linewidth": 1}, + capsize=0.2, + ) + + if query_type != QueryType.AGGREGATE: + sns.move_legend(g, "upper left", bbox_to_anchor=(0.05, 0.85), frameon=False) + + g.legend.set_title("Query Engine") + g.set_titles(titles) + g.set_axis_labels("Record Count", "Mean duration (seconds)") + + for facet in g.axes_dict.keys(): + ax = g.axes_dict[facet] + logger.info("{facet}, {ax}", facet=facet, ax=ax) + ax.set_yscale("log") + + for p in ax.patches: + ax.text( + p.get_x(), + p.get_height() * 1.3, + "{0:.1f}".format(p.get_height()), # Used to format it K representation + color="black", + rotation="horizontal", + size="small", + ) + + g.figure.savefig( + output_dir / query_type_str / "duration-by-resource-count-facetted-by-query.png" + ) diff --git a/src/pyrate_benchmark.py b/src/pyrate_benchmark.py new file mode 100644 index 0000000..4f8f294 --- /dev/null +++ b/src/pyrate_benchmark.py @@ -0,0 +1,265 @@ +import datetime +import os +import time +from fhir_pyrate import Ahoy, Pirate +from pathlib import Path +from loguru import logger +from pandas import DataFrame +import pandas as pd + +from benchmark import Benchmark, BenchmarkRunResult, QueryType + +PAGE_SIZE: int = 10000 + + +class PyrateBenchmark(Benchmark): + def __init__(self): + os.environ["FHIR_USER"] = "any" + os.environ["FHIR_PASSWORD"] = "any" + + auth = Ahoy(auth_type="BasicAuth", auth_method="env") + + self.search = Pirate( + auth=auth, + base_url="http://localhost:8083/fhir/", + print_request_url=False, # TODO: useful for debugging + ) + logger.info("Completed initialization.") + + def run_all_queries(self, run_id: str) -> list[BenchmarkRunResult]: + output_folder_base = Path.cwd() / "results" / "pyrate" + + results = [] + queries = { + QueryType.EXTRACT: [ + { + "query_name": "gender-age", + "resource_type": "Patient", + "request_params": { + "birthdate": "ge1970-01-01", + "gender": "female", + "_count": PAGE_SIZE, + "_sort": "_id", + }, + "fhir_paths": [ + ("patient_id", "Patient.id"), + ("patient_birthdate", "Patient.birthDate"), + ("patient_gender", "Patient.gender"), + ], + "post_process": None, + }, + { + "query_name": "diabetes", + "resource_type": "Condition", + "request_params": { + "encounter.date": "ge2020-01-01", + "code": "http://snomed.info/sct|73211009,http://snomed.info/sct|427089005,http://snomed.info/sct|44054006", + "subject:Patient.birthdate": "ge1970-01-01", + "_include": "Condition:encounter", + "_include": "Condition:patient", + "_count": PAGE_SIZE, + "_sort": "_id", + }, + "fhir_paths": [ + ("condition_id", "Condition.id"), + ( + "condition_snomed_code", + "Condition.code.coding.where(system = 'http://snomed.info/sct' and (code = '73211009' or code = '427089005' or code = '44054006')).code", + ), + ("condition_onset", "Condition.onsetDateTime"), + ("condition_patient_reference", "Condition.subject.reference"), + ("encounter_id", "Encounter.id"), + ("encounter_patient_reference", "Encounter.subject.reference"), + ("encounter_period_start", "Encounter.period.start"), + ("encounter_period_end", "Encounter.period.end"), + ("encounter_status", "Encounter.status"), + ("patient_id", "Patient.id"), + ("patient_birthdate", "Patient.birthDate"), + ], + "post_process": None, + }, + { + "query_name": "hemoglobin", + "resource_type": "Observation", + "request_params": { + "code-value-quantity": "http://loinc.org|718-7$gt25|http://unitsofmeasure.org|g/dL,http://loinc.org|17856-6$gt5|http://unitsofmeasure.org|%,http://loinc.org|4548-4$gt5|http://unitsofmeasure.org|%,http://loinc.org|4549-2$gt5|http://unitsofmeasure.org|%", + "_include": "Observation:patient", + "_count": PAGE_SIZE, + "_sort": "_id", + }, + "fhir_paths": [ + ("patient_id", "Patient.id"), + ("patient_birthdate", "Patient.birthDate"), + ("observation_id", "Observation.id"), + ( + "loinc_code", + "Observation.code.coding.where(system = 'http://loinc.org').code", + ), + ( + "value_quantity_ucum_code", + "Observation.valueQuantity.where(system = 'http://unitsofmeasure.org').code", + ), + ( + "value_quantity_value", + "Observation.valueQuantity.where(system = 'http://unitsofmeasure.org').value", + ), + ("effective_datetime", "Observation.effectiveDateTime"), + ( + "observation_patient_reference", + "Observation.subject.reference", + ), + ], + "post_process": None, + }, + ], + QueryType.AGGREGATE: [ + { + "query_name": "observations-by-code", + "resource_type": "Observation", + "request_params": { + "_count": PAGE_SIZE, + "_sort": "_id", + }, + "fhir_paths": [ + ( + "display", + "Observation.code.coding.display", + ), + ( + "code", + "Observation.code.coding.code", + ), + ( + "code_system", + "Observation.code.coding.system", + ), + ], + "post_process": lambda df: self._post_process_observations_by_code( + df + ), + }, + ], + QueryType.COUNT: [ + { + "query_name": "gender-age", + "resource_type": "Patient", + "request_params": { + "birthdate": "ge1970-01-01", + "gender": "female", + "_summary": "count", + "_sort": "_id", + }, + "fhir_paths": [], + "post_process": None, + }, + { + "query_name": "diabetes", + "resource_type": "Condition", + "request_params": { + "encounter.date": "ge2020-01-01", + "code": "http://snomed.info/sct|73211009,http://snomed.info/sct|427089005,http://snomed.info/sct|44054006", + "subject:Patient.birthdate": "ge1970-01-01", + "_summary": "count", + "_sort": "_id", + }, + "fhir_paths": [], + "post_process": None, + }, + { + "query_name": "hemoglobin", + "resource_type": "Patient", + "request_params": { + "_has:Observation:patient:code-value-quantity": "http://loinc.org|718-7$gt25|http://unitsofmeasure.org|g/dL,http://loinc.org|17856-6$gt5|http://unitsofmeasure.org|%,http://loinc.org|4548-4$gt5|http://unitsofmeasure.org|%,http://loinc.org|4549-2$gt5|http://unitsofmeasure.org|%", + "_summary": "count", + "_sort": "_id", + }, + "fhir_paths": [], + "post_process": None, + }, + ], + } + + start_timestamp = datetime.datetime.now(datetime.UTC) + + for query_type in QueryType: + output_folder = output_folder_base / str(query_type) + output_folder.mkdir(parents=True, exist_ok=True) + + for query in queries[query_type]: + query_name = query["query_name"] + logger.info( + "Running {query_type} query {query_name}", + query_type=query_type, + query_name=query_name, + ) + timings_start = time.perf_counter() + + df: DataFrame = None + + if query_type == QueryType.COUNT: + # special handling for the count case + count = self.search.get_bundle_total( + resource_type=query["resource_type"], + request_params=query["request_params"], + ) + df = DataFrame(data={"count": [count]}) + else: + df = self.search.steal_bundles_to_dataframe( + resource_type=query["resource_type"], + request_params=query["request_params"], + fhir_paths=query["fhir_paths"], + ) + + fetch_done_timestamp = time.perf_counter() + fetch_duration = fetch_done_timestamp - timings_start + + post_process_duration = 0 + if query["post_process"] is not None: + post_process_start = time.perf_counter() + df = query["post_process"](df) + post_process_duration = time.perf_counter() - post_process_start + + write_to_file_start = time.perf_counter() + if isinstance(df, DataFrame): + df.to_csv(output_folder / f"{query_name}.csv", index=False) + else: + for resource_type in df.keys(): + df[resource_type].to_csv( + output_folder / f"{query_name}-{resource_type}.csv", + index=False, + ) + + write_to_file_duration = time.perf_counter() - write_to_file_start + duration_total = time.perf_counter() - timings_start + + result = BenchmarkRunResult( + run_id=run_id, + start_timestamp=start_timestamp, + engine="pyrate", + query=query_name, + query_type=query_type, + total_duration_seconds=duration_total, + write_to_file_duration_seconds=write_to_file_duration, + fetch_duration_seconds=fetch_duration, + post_process_duration_seconds=post_process_duration, + ) + results.append(result) + + return results + + def _post_process_observations_by_code(self, df: DataFrame): + if not isinstance(df, DataFrame): + logger.warning( + "fetch result is not a DataFrame. Instead: {df_type}", + df_type=type(df), + ) + return df + + processed = ( + df.astype(str) + .groupby(["display", "code", "code_system"]) + .size() + .reset_index(name="num_observations") + .sort_values(by="num_observations", ascending=False) + ) + return processed diff --git a/src/queries/aggregate/observations-by-code.sql b/src/queries/aggregate/observations-by-code.sql new file mode 100644 index 0000000..433d277 --- /dev/null +++ b/src/queries/aggregate/observations-by-code.sql @@ -0,0 +1,11 @@ +SELECT + code_coding.display, + code_coding.code, + code_coding.system AS code_system, + COUNT(*) AS num_observations +FROM fhir.default.observation, UNNEST(code.coding) AS code_coding +GROUP BY + code_coding.code, + code_coding.system, + code_coding.display +ORDER BY COUNT(*) DESC diff --git a/src/queries/count/diabetes.sql b/src/queries/count/diabetes.sql new file mode 100644 index 0000000..5e7970c --- /dev/null +++ b/src/queries/count/diabetes.sql @@ -0,0 +1,10 @@ +SELECT COUNT(DISTINCT condition.id) +FROM fhir.default.condition +LEFT JOIN fhir.default.encounter ON condition.encounter.reference = CONCAT('Encounter/', encounter.id) +LEFT JOIN UNNEST(condition.code.coding) AS condition_coding ON TRUE +LEFT JOIN fhir.default.patient AS patient ON encounter.subject.reference = CONCAT('Patient/', patient.id) +WHERE + DATE(FROM_ISO8601_TIMESTAMP(encounter.period.start)) >= DATE('2020-01-01') + AND condition_coding.system = 'http://snomed.info/sct' + AND condition_coding.code IN ('73211009', '427089005', '44054006') + AND DATE(patient.birthdate) >= DATE('1970-01-01') diff --git a/src/queries/count/gender-age.sql b/src/queries/count/gender-age.sql new file mode 100644 index 0000000..fdd3ee0 --- /dev/null +++ b/src/queries/count/gender-age.sql @@ -0,0 +1,3 @@ +SELECT COUNT(DISTINCT patient.id) +FROM fhir.default.patient +WHERE patient.gender = 'female' AND DATE(patient.birthdate) >= DATE('1970-01-01') diff --git a/src/queries/count/hemoglobin.sql b/src/queries/count/hemoglobin.sql new file mode 100644 index 0000000..e855898 --- /dev/null +++ b/src/queries/count/hemoglobin.sql @@ -0,0 +1,17 @@ +SELECT COUNT(DISTINCT patient.id) +FROM fhir.default.observation +LEFT JOIN fhir.default.patient AS patient ON observation.subject.reference = CONCAT('Patient/', patient.id) +LEFT JOIN UNNEST(observation.code.coding) AS observation_code_coding ON TRUE +WHERE + observation_code_coding.system = 'http://loinc.org' + AND valuequantity.system = 'http://unitsofmeasure.org' + AND ( + observation_code_coding.code = '718-7' + AND valuequantity.code = 'g/dL' + AND valuequantity.value > 25.0 + ) + OR ( + observation_code_coding.code IN ('17856-6', '4548-4', '4549-2') + AND valuequantity.code = '%' + AND valuequantity.value > 5 + ) diff --git a/src/queries/extract/diabetes.sql b/src/queries/extract/diabetes.sql new file mode 100644 index 0000000..8b55ecc --- /dev/null +++ b/src/queries/extract/diabetes.sql @@ -0,0 +1,20 @@ +SELECT + condition.id AS condition_id, + condition_coding.code AS condition_snomed_code, + condition.onsetdatetime AS condition_onset, + encounter.id AS encounter_id, + encounter.period.start AS encounter_period_start, + encounter.period."end" AS encounter_period_end, + encounter.status AS encounter_status, + patient.id AS patient_id, + patient.birthdate AS patient_birthdate +FROM fhir.default.condition AS condition +LEFT JOIN fhir.default.encounter ON condition.encounter.reference = CONCAT('Encounter/', encounter.id) +LEFT JOIN UNNEST(condition.code.coding) AS condition_coding ON TRUE +LEFT JOIN fhir.default.patient AS patient ON encounter.subject.reference = CONCAT('Patient/', patient.id) +WHERE + DATE(FROM_ISO8601_TIMESTAMP(encounter.period.start)) >= DATE('2020-01-01') + AND condition_coding.system = 'http://snomed.info/sct' + AND condition_coding.code IN ('73211009', '427089005', '44054006') + AND DATE(patient.birthdate) >= DATE('1970-01-01') +ORDER BY patient.id ASC diff --git a/src/queries/extract/gender-age.sql b/src/queries/extract/gender-age.sql new file mode 100644 index 0000000..c7e92f9 --- /dev/null +++ b/src/queries/extract/gender-age.sql @@ -0,0 +1,7 @@ +SELECT + id AS patient_id, + birthdate AS patient_birthdate, + gender AS patient_gender +FROM fhir.default.patient +WHERE gender = 'female' AND date(birthdate) >= date('1970-01-01') +ORDER BY patient.id ASC diff --git a/src/queries/extract/hemoglobin.sql b/src/queries/extract/hemoglobin.sql new file mode 100644 index 0000000..9a52e6a --- /dev/null +++ b/src/queries/extract/hemoglobin.sql @@ -0,0 +1,26 @@ +SELECT + patient.id AS patient_id, + patient.birthdate AS patient_birthdate, + observation.id AS observation_id, + observation_code_coding.code AS loinc_code, + valuequantity.code AS value_quantity_ucum_code, + valuequantity.value AS value_quantity_value, + observation.effectivedatetime AS effective_datetime, + observation.subject.reference AS observation_patient_reference +FROM fhir.default.observation +LEFT JOIN fhir.default.patient ON observation.subject.reference = CONCAT('Patient/', patient.id) +LEFT JOIN UNNEST(observation.code.coding) AS observation_code_coding ON TRUE +WHERE + observation_code_coding.system = 'http://loinc.org' + AND valuequantity.system = 'http://unitsofmeasure.org' + AND ( + observation_code_coding.code = '718-7' + AND valuequantity.code = 'g/dL' + AND valuequantity.value > 25.0 + ) + OR ( + observation_code_coding.code IN ('17856-6', '4548-4', '4549-2') + AND valuequantity.code = '%' + AND valuequantity.value > 5 + ) +ORDER BY patient.id ASC diff --git a/src/trino_benchmark.py b/src/trino_benchmark.py new file mode 100644 index 0000000..2542ac2 --- /dev/null +++ b/src/trino_benchmark.py @@ -0,0 +1,136 @@ +import datetime +import trino +import pandas as pd +from pathlib import Path +from loguru import logger +import time + +from benchmark import Benchmark, BenchmarkRunResult, QueryType + + +class TrinoBenchmark(Benchmark): + def __init__(self): + self.trino_connection = trino.dbapi.connect( + host="localhost", + port="8080", + user="trino", + catalog="fhir", + schema="default", + ) + logger.info("Completed initialization.") + + def run_all_queries(self, run_id: str) -> list[BenchmarkRunResult]: + logger.info("Begin trino benchmarking") + queries_base_path = Path.cwd() / "queries" + + results = [] + start_timestamp = datetime.datetime.now(datetime.UTC) + + for query_type in QueryType: + queries_dir_path = queries_base_path / str(query_type) + logger.info( + "Looking for sql files in {queries_dir_path}", + queries_dir_path=queries_dir_path, + ) + + for file in queries_dir_path.glob("*.sql"): + query_name = file.stem + logger.info( + "Running {query_type} query {query_name}", + query_type=query_type, + query_name=query_name, + ) + + output_file_name = f"{query_name}.csv" + output_folder = Path.cwd() / "results" / "trino" / str(query_type) + output_folder.mkdir(parents=True, exist_ok=True) + output_file_path = output_folder / output_file_name + + logger.info( + "Output file path set to {output_file_path}", + output_file_path=output_file_path, + ) + + query = file.read_text() + + cursor = self.trino_connection.cursor() + + # technically, the query is likely first executed on fetchall + timings_start = time.perf_counter() + cursor.execute(query) + + # The DBAPI implementation in trino.dbapi provides methods to retrieve fewer rows for example Cursor.fetchone() or Cursor.fetchmany(). + # By default Cursor.fetchmany() fetches one row. Please set trino.dbapi.Cursor.arraysize accordingly. + # So this could probably be optimized. + rows = cursor.fetchall() + + fetch_done_timestamp = time.perf_counter() + fetch_duration = fetch_done_timestamp - timings_start + + # TODO: try pd.read_sql_query + df = pd.DataFrame(rows, columns=[i[0] for i in cursor.description]) + + df.to_csv(output_file_path, index=False) + + write_to_file_duration = time.perf_counter() - fetch_done_timestamp + + duration_total = time.perf_counter() - timings_start + + logger.info( + "Total duration: {duration_total:0.4f} s", + duration_total=duration_total, + ) + + cursor.close() + + result = BenchmarkRunResult( + run_id=run_id, + start_timestamp=start_timestamp, + engine="trino", + query=query_name, + query_type=query_type, + total_duration_seconds=duration_total, + write_to_file_duration_seconds=write_to_file_duration, + fetch_duration_seconds=fetch_duration, + post_process_duration_seconds=0, + trino_cpu_time_seconds=cursor.stats["cpuTimeMillis"] / 1000.0, + trino_wall_time_seconds=cursor.stats["wallTimeMillis"] / 1000.0, + trino_elapsed_time_seconds=cursor.stats["elapsedTimeMillis"] + / 1000.0, + ) + + results.append(result) + + return results + + def get_resource_counts_total(self, resource_types: list[str]) -> int: + cursor = self.trino_connection.cursor() + total = 0 + for resource_type in resource_types: + query = ( + f"SELECT COUNT(DISTINCT(id)) AS count FROM fhir.default.{resource_type}" + ) + + cursor.execute(query) + row = cursor.fetchone() + + total = total + row[0] + cursor.close() + return total + + def get_resource_counts(self, resource_types: list[str]) -> dict[str, int]: + cursor = self.trino_connection.cursor() + result = {} + for resource_type in resource_types: + query = ( + f"SELECT COUNT(DISTINCT(id)) AS count FROM fhir.default.{resource_type}" + ) + + cursor.execute(query) + row = cursor.fetchone() + + result[resource_type] = row[0] + + cursor.close() + + return result