From 7d91350d2ef83a0fb2f199dd65225ec51e319353 Mon Sep 17 00:00:00 2001 From: Zairon Jacobs Date: Tue, 20 Aug 2024 16:35:36 +0200 Subject: [PATCH] re-add scheduled tasks with apscheduler --- harmony_api/core/settings.py | 10 +++---- harmony_api/scheduler.py | 32 +++++++++++++++------- harmony_api/services/instruments_cache.py | 4 +-- main.py | 33 ++++++++++------------- requirements.txt | 1 + 5 files changed, 44 insertions(+), 36 deletions(-) diff --git a/harmony_api/core/settings.py b/harmony_api/core/settings.py index 2d34432..b870f64 100644 --- a/harmony_api/core/settings.py +++ b/harmony_api/core/settings.py @@ -41,10 +41,10 @@ class Settings(BaseSettings): # General harmony_api config VERSION: str = "2.0" APP_TITLE: str = "Harmony API" - TIKA_ENDPOINT: str = os.getenv("TIKA_ENDPOINT", "") - OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY") - AZURE_OPENAI_API_KEY: str = os.getenv("AZURE_OPENAI_API_KEY") - AZURE_OPENAI_ENDPOINT: str = os.getenv("AZURE_OPENAI_ENDPOINT") + TIKA_ENDPOINT: str = os.getenv("TIKA_ENDPOINT", "http://tika:9998") + OPENAI_API_KEY: str | None = os.getenv("OPENAI_API_KEY") + AZURE_OPENAI_API_KEY: str | None = os.getenv("AZURE_OPENAI_API_KEY") + AZURE_OPENAI_ENDPOINT: str | None = os.getenv("AZURE_OPENAI_ENDPOINT") GOOGLE_APPLICATION_CREDENTIALS: dict = GOOGLE_APPLICATION_CREDENTIALS @@ -79,7 +79,7 @@ class ProdSettings(Settings): } -def get_settings(): +def get_settings() -> Union[DevSettings | ProdSettings]: env = os.getenv("STAGE", "dev") settings_type = { "dev": DevSettings(), diff --git a/harmony_api/scheduler.py b/harmony_api/scheduler.py index a3e3f0c..18726da 100644 --- a/harmony_api/scheduler.py +++ b/harmony_api/scheduler.py @@ -24,32 +24,44 @@ SOFTWARE. """ -from fastapi.concurrency import run_in_threadpool -# from rocketry import Rocketry -# from rocketry.conds import cron +from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.executors.pool import ThreadPoolExecutor from harmony_api.services.instruments_cache import InstrumentsCache from harmony_api.services.vectors_cache import VectorsCache -# app = Rocketry(executation="async") +# Jobstores +jobstores = { + "default": MemoryJobStore() +} +# Executors +executors = { + "default": ThreadPoolExecutor(), +} -# @app.task(cron("0 */12 * * *")) -async def do_every_12th_hour(): +scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, timezone="UTC") + + +@scheduler.scheduled_job( + "cron", year="*", month="*", day="*", hour="*/12", minute="0", second="0" +) +def do_every_12th_hour(): """ - Save the caches to disk every 12th hour. + Save the caches to disk. - Runs at minute 0 past every 12th hour + Runs at minute 0 past every 12th hour. """ # Save instruments cache to disk try: - await run_in_threadpool(InstrumentsCache().save) + InstrumentsCache().save() except (Exception,) as e: print(f"Could not save instruments cache: {str(e)}.") # Save vectors cache to disk try: - await run_in_threadpool(VectorsCache().save) + VectorsCache().save() except (Exception,) as e: print(f"Could not save vectors cache: {str(e)}.") diff --git a/harmony_api/services/instruments_cache.py b/harmony_api/services/instruments_cache.py index 07420d4..bc86584 100644 --- a/harmony_api/services/instruments_cache.py +++ b/harmony_api/services/instruments_cache.py @@ -65,7 +65,7 @@ def __load(self): # Dict to instruments cache_parsed: dict[str, List[Instrument]] = {} for key, value in cache.items(): - instruments = [Instrument.parse_obj(x) for x in value] + instruments = [Instrument.model_validate(x) for x in value] cache_parsed[key] = instruments self.__cache = cache_parsed @@ -113,7 +113,7 @@ def save(self): # Instruments to dict cache_parsed: dict[str, List] = {} for key, value in self.__cache.items(): - instruments = [x.model_dump(mode="json")() for x in value] + instruments = [x.model_dump(mode="json") for x in value] cache_parsed[key] = instruments with open(cache_file_path, "w", encoding="utf8") as file: diff --git a/main.py b/main.py index 3c87293..fff97c0 100644 --- a/main.py +++ b/main.py @@ -29,6 +29,7 @@ sys.path.append("./harmony/src") import asyncio +from contextlib import asynccontextmanager import uvicorn from fastapi import FastAPI @@ -40,7 +41,7 @@ from harmony_api.routers.info_router import router as info_router from harmony_api.routers.text_router import router as text_router from harmony_api.services.instruments_cache import InstrumentsCache -# from harmony_api.scheduler import app as app_rocketry +from harmony_api.scheduler import scheduler from harmony_api.services.vectors_cache import VectorsCache description = """ @@ -50,10 +51,18 @@ You can try Harmony at harmonydata.ac.uk/harmony_api and you can read our blog at harmonydata.ac.uk. """ + +@asynccontextmanager +async def lifespan(_: FastAPI): + scheduler.start() + + yield + app_fastapi = FastAPI( title=settings.APP_TITLE, description=description, version=settings.VERSION, + lifespan=lifespan, docs_url="/docs", contact={ "name": "Thomas Wood", @@ -82,25 +91,13 @@ app_fastapi.include_router(info_router, tags=["Info"]) -class Server(uvicorn.Server): - """ - Custom uvicorn.Server - Override signals and include Rocketry - """ - - def handle_exit(self, sig: int, frame): - # app_rocketry.session.shut_down() - - return super().handle_exit(sig, frame) - - async def main(): # Load cache print("INFO:\t Loading cache...") InstrumentsCache() VectorsCache() - server = Server( + server = uvicorn.Server( config=uvicorn.Config( app=app_fastapi, host=settings.SERVER_HOST, @@ -112,12 +109,10 @@ async def main(): ) api = asyncio.create_task(server.serve()) - # scheduler = asyncio.create_task(app_rocketry.serve()) - # Start both applications (FastAPI & Rocketry) - print("INFO:\t Starting applications...") - # await asyncio.wait([api, scheduler]) - await asyncio.wait(api) + # Start FastAPI + print("INFO:\t Starting application...") + await asyncio.wait([api]) if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index 2c8d628..b6a7c7f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ vertexai==1.49.0 numpy==1.26.4 sklearn-crfsuite==0.5.0 scikit-learn==1.5.0 +APScheduler==3.10.4 \ No newline at end of file