Skip to content

Commit

Permalink
feat(robot-server): Support downgrades without clearing robot-server …
Browse files Browse the repository at this point in the history
…data (#14329)
  • Loading branch information
SyntaxColoring authored Jan 30, 2024
1 parent 5dfd25e commit 4ad5c36
Show file tree
Hide file tree
Showing 8 changed files with 680 additions and 67 deletions.
2 changes: 1 addition & 1 deletion robot-server/robot_server/app_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def on_startup() -> None:
],
)
start_initializing_persistence(
app_state=app.state, persistence_directory=persistence_directory
app_state=app.state, persistence_directory_root=persistence_directory
)
initialize_notification_client(
app_state=app.state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from robot_server.deck_configuration.store import DeckConfigurationStore
from robot_server.hardware import get_deck_type
from robot_server.persistence import get_persistence_directory
from robot_server.persistence import get_active_persistence_directory


# This needs to be kept in sync with opentrons.execute, which reads this file.
Expand All @@ -27,7 +27,7 @@
async def get_deck_configuration_store(
app_state: AppState = fastapi.Depends(get_app_state),
deck_type: DeckType = fastapi.Depends(get_deck_type),
persistence_directory: Path = fastapi.Depends(get_persistence_directory),
persistence_directory: Path = fastapi.Depends(get_active_persistence_directory),
) -> DeckConfigurationStore:
"""Return the server's singleton `DeckConfigurationStore`."""
# It's important that this dependency doesn't do anything that might fail, like reading
Expand Down
4 changes: 2 additions & 2 deletions robot-server/robot_server/persistence/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
start_initializing_persistence,
clean_up_persistence,
get_sql_engine,
get_persistence_directory,
get_active_persistence_directory,
get_persistence_resetter,
)
from ._persistence_directory import PersistenceResetter
Expand Down Expand Up @@ -34,7 +34,7 @@
"clean_up_persistence",
# dependencies and types for use by FastAPI endpoint functions
"get_sql_engine",
"get_persistence_directory",
"get_active_persistence_directory",
"PersistenceResetter",
"get_persistence_resetter",
]
150 changes: 114 additions & 36 deletions robot-server/robot_server/persistence/_fastapi_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from ._database import create_sql_engine
from ._persistence_directory import (
PersistenceResetter,
prepare as prepare_persistence_directory,
prepare_active_subdirectory,
prepare_root,
)


Expand All @@ -29,9 +30,12 @@
_log = logging.getLogger(__name__)


_directory_init_task_accessor = AppStateAccessor["asyncio.Task[Path]"](
"persistence_directory_init_task"
_root_persistence_directory_init_task_accessor = AppStateAccessor["asyncio.Task[Path]"](
"persistence_root_directory_init_task"
)
_active_persistence_directory_init_task_accessor = AppStateAccessor[
"asyncio.Task[Path]"
]("persistence_active_subdirectory_init_task")
_sql_engine_init_task_accessor = AppStateAccessor["asyncio.Task[SQLEngine]"](
"persistence_sql_engine_init_task"
)
Expand All @@ -52,36 +56,51 @@ class DatabaseFailedToInitialize(ErrorDetails):
title: str = "Database Failed to Initialize"


def start_initializing_persistence(
app_state: AppState, persistence_directory: Optional[Path]
def start_initializing_persistence( # noqa: C901
app_state: AppState, persistence_directory_root: Optional[Path]
) -> None:
"""Initialize the persistence layer to get it ready for use by endpoint functions.

This should be called exactly once, as part of server startup.
It will return immediately while initialization continues in the background.
"""

async def init_directory_and_log() -> Path:
async def init_root_persistence_directory() -> Path:
try:
return await prepare_persistence_directory(
persistence_directory=persistence_directory
return await prepare_root(persistence_directory_root)
except Exception:
_log.exception(
"Exception initializing persistence directory root in the background."
)
raise

async def init_active_persistence_directory() -> Path:
try:
root_prep_task = _root_persistence_directory_init_task_accessor.get_from(
app_state
)
assert root_prep_task is not None
prepared_root = await root_prep_task

active_subdirectory = await prepare_active_subdirectory(prepared_root)
return active_subdirectory

except Exception:
_log.exception(
"Exception initializing persistence directory in the background."
"Exception initializing active persistence directory in the background."
)
raise

async def init_sql_engine_and_log() -> SQLEngine:
async def init_sql_engine() -> SQLEngine:
try:
directory_prep_task = _directory_init_task_accessor.get_from(
app_state=app_state
subdirectory_prep_task = (
_active_persistence_directory_init_task_accessor.get_from(app_state)
)
assert directory_prep_task is not None
prepared_persistence_directory = await directory_prep_task
assert subdirectory_prep_task is not None
prepared_subdirectory = await subdirectory_prep_task

sql_engine = await to_thread.run_sync(
create_sql_engine, prepared_persistence_directory / _DATABASE_FILE
create_sql_engine, prepared_subdirectory / _DATABASE_FILE
)
return sql_engine

Expand All @@ -90,18 +109,29 @@ async def init_sql_engine_and_log() -> SQLEngine:
raise

assert (
_directory_init_task_accessor.get_from(app_state=app_state) is None
and _sql_engine_init_task_accessor.get_from(app_state=app_state) is None
_root_persistence_directory_init_task_accessor.get_from(app_state) is None
and _active_persistence_directory_init_task_accessor.get_from(app_state) is None
and _sql_engine_init_task_accessor.get_from(app_state) is None
), "Cannot initialize more than once."

# We keep initialization of the persistence directory separate, and do not combine
# it with initialization of the SQL engine. This lets PersistenceResetter remain
# usable even if initializing the SQL engine fails, which is important to let users
# recover from corrupt databases.
directory_init_task = asyncio.create_task(init_directory_and_log())
_directory_init_task_accessor.set_on(app_state=app_state, value=directory_init_task)
# We keep initialization of the root persistence directory separate, and do not
# combine it with initialization of the active subdirectory or the SQL engine.
# This lets PersistenceResetter remain usable even if initializing the SQL engine
# fails, which is important to let users recover from corrupt databases.

root_directory_init_task = asyncio.create_task(init_root_persistence_directory())
_root_persistence_directory_init_task_accessor.set_on(
app_state=app_state, value=root_directory_init_task
)

active_subdirectory_init_task = asyncio.create_task(
init_active_persistence_directory()
)
_active_persistence_directory_init_task_accessor.set_on(
app_state=app_state, value=active_subdirectory_init_task
)

sql_engine_init_task = asyncio.create_task(init_sql_engine_and_log())
sql_engine_init_task = asyncio.create_task(init_sql_engine())
_sql_engine_init_task_accessor.set_on(
app_state=app_state, value=sql_engine_init_task
)
Expand All @@ -113,12 +143,19 @@ async def clean_up_persistence(app_state: AppState) -> None:
This should be called exactly once at server shutdown.
"""
sql_engine_init_task = _sql_engine_init_task_accessor.get_from(app_state=app_state)
directory_init_task = _directory_init_task_accessor.get_from(app_state=app_state)
active_subdirectory_init_task = (
_root_persistence_directory_init_task_accessor.get_from(app_state=app_state)
)
root_directory_init_task = _root_persistence_directory_init_task_accessor.get_from(
app_state=app_state
)
if sql_engine_init_task is not None:
sql_engine = await sql_engine_init_task
sql_engine.dispose()
if directory_init_task is not None:
await directory_init_task
if active_subdirectory_init_task is not None:
await active_subdirectory_init_task
if root_directory_init_task is not None:
await root_directory_init_task


async def get_sql_engine(
Expand Down Expand Up @@ -149,23 +186,64 @@ async def get_sql_engine(
) from exception


async def get_persistence_directory(
async def get_active_persistence_directory(
app_state: AppState = Depends(get_app_state),
) -> Path:
"""Return the path to the server's persistence directory."""
initialize_task = _directory_init_task_accessor.get_from(app_state)
"""Return the path to the server's persistence directory.

If you need to keep something in a file to persist it across reboots,
you should put it in here.

Specifically, this returns the subdirectory that's meant for this robot server
version--the "active" subdirectory. Other subdirectories may exist for other
versions.

This directory is initialized in the background, starting when the server boots.
This initialization can entail time-consuming migrations.
If this is called before that initialization completes, this will raise an
appropriate HTTP-facing error to indicate that the server is busy.
"""
initialize_task = _root_persistence_directory_init_task_accessor.get_from(app_state)
assert (
initialize_task is not None
), "Forgot to start persistence directory initialization as part of server startup?"

# Unlike get_sql_engine(), we don't expect the background initialization task
# to take long. Patiently wait until it completes instead of immediately returning
# 503 "service unavailable".
return await initialize_task
try:
return initialize_task.result()

except asyncio.InvalidStateError as exception:
raise DatabaseNotYetInitialized().as_error(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE
) from exception

except asyncio.CancelledError as exception:
raise DatabaseFailedToInitialize(
detail="Database initialization cancelled."
).as_error(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) from exception

except Exception as exception:
raise DatabaseFailedToInitialize(detail=str(exception)).as_error(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
) from exception


async def _get_persistence_directory_root(
app_state: AppState = Depends(get_app_state),
) -> Path:
"""Return the root persistence directory.

It may be undergoing creation or a reset. This will only return after that's done.
"""
init_task = _root_persistence_directory_init_task_accessor.get_from(app_state)
assert (
init_task is not None
), "Forgot to initialize persistence directory root as part of server startup?"
return await init_task


async def get_persistence_resetter(
persistence_directory: Path = Depends(get_persistence_directory),
# We want to reset everything, not only the *active* persistence directory.
directory_to_reset: Path = Depends(_get_persistence_directory_root),
) -> PersistenceResetter:
"""Get a `PersistenceResetter` to reset the robot-server's stored data."""
return PersistenceResetter(persistence_directory)
return PersistenceResetter(directory_to_reset)
Loading

0 comments on commit 4ad5c36

Please sign in to comment.