Skip to content

Commit

Permalink
Merge remote-tracking branch 'github/fix_mets_server_zombies'
Browse files Browse the repository at this point in the history
  • Loading branch information
kba committed Oct 10, 2024
2 parents 02c6eff + 3e736a7 commit 9391f49
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 121 deletions.
2 changes: 2 additions & 0 deletions src/ocrd/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
\b
{config.describe('OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS')}
\b
{config.describe('OCRD_NETWORK_RABBITMQ_HEARTBEAT')}
\b
{config.describe('OCRD_PROFILE_FILE')}
\b
{config.describe('OCRD_PROFILE', wrap_text=False)}
Expand Down
89 changes: 49 additions & 40 deletions src/ocrd/mets_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,27 @@ def save(self):
Request writing the changes to the file system
"""
if not self.multiplexing_mode:
self.session.request("PUT", url=self.url)
return self.session.request("PUT", url=self.url).text
else:
self.session.request(
return self.session.request(
"POST",
self.url,
json=MpxReq.save(self.ws_dir_path)
)
).json()["text"]

def stop(self):
"""
Request stopping the mets server
"""
try:
if not self.multiplexing_mode:
self.session.request("DELETE", self.url)
return
return self.session.request("DELETE", self.url).text
else:
self.session.request(
return self.session.request(
"POST",
self.url,
json=MpxReq.stop(self.ws_dir_path)
)
).json()["text"]
except ConnectionError:
# Expected because we exit the process without returning
pass
Expand Down Expand Up @@ -348,12 +347,12 @@ def __args_wrapper(
@staticmethod
def save(ws_dir_path: str) -> Dict:
return MpxReq.__args_wrapper(
ws_dir_path, method_type="PUT", response_type="empty", request_url="", request_data={})
ws_dir_path, method_type="PUT", response_type="text", request_url="", request_data={})

@staticmethod
def stop(ws_dir_path: str) -> Dict:
return MpxReq.__args_wrapper(
ws_dir_path, method_type="DELETE", response_type="empty", request_url="", request_data={})
ws_dir_path, method_type="DELETE", response_type="text", request_url="", request_data={})

@staticmethod
def reload(ws_dir_path: str) -> Dict:
Expand Down Expand Up @@ -438,15 +437,16 @@ def kill_process(mets_server_pid: int):
pass

def shutdown(self):
pid = os.getpid()
self.log.info(f"Shutdown method of mets server[{pid}] invoked, sending SIGTERM signal.")
os.kill(pid, signal.SIGTERM)
if self.is_uds:
if Path(self.url).exists():
self.log.warning(f"Due to a server shutdown, removing the existing UDS socket file: {self.url}")
Path(self.url).unlink()
# os._exit because uvicorn catches SystemExit raised by sys.exit
_exit(0)

def startup(self):
self.log.info("Starting up METS server")
self.log.info(f"Configuring the Mets Server")

workspace = self.workspace

Expand All @@ -472,51 +472,70 @@ def save():
"""
Write current changes to the file system
"""
return workspace.save_mets()
workspace.save_mets()
response = Response(content="The Mets Server is writing changes to disk.", media_type='text/plain')
self.log.info(f"PUT / -> {response.__dict__}")
return response

@app.delete(path='/')
async def stop():
def stop():
"""
Stop the mets server
"""
getLogger('ocrd.models.ocrd_mets').info(f'Shutting down METS Server {self.url}')
workspace.save_mets()
response = Response(content="The Mets Server will shut down soon...", media_type='text/plain')
self.shutdown()
self.log.info(f"DELETE / -> {response.__dict__}")
return response

@app.post(path='/reload')
async def workspace_reload_mets():
def workspace_reload_mets():
"""
Reload mets file from the file system
"""
workspace.reload_mets()
return Response(content=f'Reloaded from {workspace.directory}', media_type="text/plain")
response = Response(content=f"Reloaded from {workspace.directory}", media_type='text/plain')
self.log.info(f"POST /reload -> {response.__dict__}")
return response

@app.get(path='/unique_identifier', response_model=str)
async def unique_identifier():
return Response(content=workspace.mets.unique_identifier, media_type='text/plain')
response = Response(content=workspace.mets.unique_identifier, media_type='text/plain')
self.log.info(f"GET /unique_identifier -> {response.__dict__}")
return response

@app.get(path='/workspace_path', response_model=str)
async def workspace_path():
return Response(content=workspace.directory, media_type="text/plain")
response = Response(content=workspace.directory, media_type="text/plain")
self.log.info(f"GET /workspace_path -> {response.__dict__}")
return response

@app.get(path='/physical_pages', response_model=OcrdPageListModel)
async def physical_pages():
return {'physical_pages': workspace.mets.physical_pages}
response = {'physical_pages': workspace.mets.physical_pages}
self.log.info(f"GET /physical_pages -> {response}")
return response

@app.get(path='/file_groups', response_model=OcrdFileGroupListModel)
async def file_groups():
return {'file_groups': workspace.mets.file_groups}
response = {'file_groups': workspace.mets.file_groups}
self.log.info(f"GET /file_groups -> {response}")
return response

@app.get(path='/agent', response_model=OcrdAgentListModel)
async def agents():
return OcrdAgentListModel.create(workspace.mets.agents)
response = OcrdAgentListModel.create(workspace.mets.agents)
self.log.info(f"GET /agent -> {response.__dict__}")
return response

@app.post(path='/agent', response_model=OcrdAgentModel)
async def add_agent(agent: OcrdAgentModel):
kwargs = agent.dict()
kwargs['_type'] = kwargs.pop('type')
workspace.mets.add_agent(**kwargs)
return agent
response = agent
self.log.info(f"POST /agent -> {response.__dict__}")
return response

@app.get(path="/file", response_model=OcrdFileListModel)
async def find_files(
Expand All @@ -533,7 +552,9 @@ async def find_files(
found = workspace.mets.find_all_files(
fileGrp=file_grp, ID=file_id, pageId=page_id, mimetype=mimetype, local_filename=local_filename, url=url
)
return OcrdFileListModel.create(found)
response = OcrdFileListModel.create(found)
self.log.info(f"GET /file -> {response.__dict__}")
return response

@app.post(path='/file', response_model=OcrdFileModel)
async def add_file(
Expand All @@ -556,17 +577,16 @@ async def add_file(
# Add to workspace
kwargs = file_resource.dict()
workspace.add_file(**kwargs, force=force)
return file_resource
response = file_resource
self.log.info(f"POST /file -> {response.__dict__}")
return response

# ------------- #

if self.is_uds:
# Create socket and change to world-readable and -writable to avoid permission errors
self.log.debug(f"chmod 0o677 {self.url}")
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if Path(self.url).exists() and not is_socket_in_use(self.url):
# remove leftover unused socket which blocks startup
Path(self.url).unlink()
server.bind(self.url) # creates the socket file
atexit.register(self.shutdown)
server.close()
Expand All @@ -578,16 +598,5 @@ async def add_file(
uvicorn_kwargs['log_config'] = None
uvicorn_kwargs['access_log'] = False

self.log.debug("Starting uvicorn")
self.log.info("Starting the uvicorn Mets Server")
uvicorn.run(app, **uvicorn_kwargs)


def is_socket_in_use(socket_path):
if Path(socket_path).exists():
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
client.connect(socket_path)
except OSError:
return False
client.close()
return True
20 changes: 9 additions & 11 deletions src/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,26 +583,20 @@ async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) -
)

async def _consume_cached_jobs_of_workspace(
self, workspace_key: str, mets_server_url: str
self, workspace_key: str, mets_server_url: str, path_to_mets: str
) -> List[PYJobInput]:

# Check whether the internal queue for the workspace key still exists
if workspace_key not in self.cache_processing_requests.processing_requests:
self.log.debug(f"No internal queue available for workspace with key: {workspace_key}")
return []

# decrease the internal cache counter by 1
request_counter = self.cache_processing_requests.update_request_counter(
workspace_key=workspace_key, by_value=-1
)
self.log.debug(f"Internal processing job cache counter value: {request_counter}")
if not len(self.cache_processing_requests.processing_requests[workspace_key]):
if (workspace_key not in self.cache_processing_requests.processing_requests or
not len(self.cache_processing_requests.processing_requests[workspace_key])):
if request_counter <= 0:
# Shut down the Mets Server for the workspace_key since no
# more internal callbacks are expected for that workspace
self.log.debug(f"Stopping the mets server: {mets_server_url}")

self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url)
self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets)

try:
# The queue is empty - delete it
Expand All @@ -618,6 +612,10 @@ async def _consume_cached_jobs_of_workspace(
else:
self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.")
return []
# Check whether the internal queue for the workspace key still exists
if workspace_key not in self.cache_processing_requests.processing_requests:
self.log.debug(f"No internal queue available for workspace with key: {workspace_key}")
return []
consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key)
return consumed_requests

Expand Down Expand Up @@ -652,7 +650,7 @@ async def remove_job_from_request_cache(self, result_message: PYResultMessage):
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error)

consumed_cached_jobs = await self._consume_cached_jobs_of_workspace(
workspace_key=workspace_key, mets_server_url=mets_server_url
workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets
)
await self.push_cached_jobs_to_agents(processing_jobs=consumed_cached_jobs)

Expand Down
4 changes: 2 additions & 2 deletions src/ocrd_network/rabbitmq_utils/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Optional, Union
from pika import BasicProperties, BlockingConnection, ConnectionParameters, PlainCredentials
from pika.adapters.blocking_connection import BlockingChannel
from ocrd_utils import config
from .constants import (
DEFAULT_EXCHANGER_NAME,
DEFAULT_EXCHANGER_TYPE,
Expand Down Expand Up @@ -69,8 +70,7 @@ def open_blocking_connection(
port=port,
virtual_host=vhost,
credentials=credentials,
# TODO: The heartbeat should not be disabled (0)!
heartbeat=0
heartbeat=config.OCRD_NETWORK_RABBITMQ_HEARTBEAT
),
)
return blocking_connection
Expand Down
44 changes: 23 additions & 21 deletions src/ocrd_network/runtime_data/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"""
from __future__ import annotations
from pathlib import Path
from subprocess import Popen, run as subprocess_run
import psutil
from time import sleep
from typing import Dict, List, Union

Expand All @@ -30,6 +30,8 @@ def __init__(self, config_path: str) -> None:
self.data_hosts: List[DataHost] = parse_hosts_data(ps_config["hosts"])
self.internal_callback_url = ps_config.get("internal_callback_url", None)
self.mets_servers: Dict = {} # {"mets_server_url": "mets_server_pid"}
# This is required to store UDS urls that are multiplexed through the TCP proxy and are not preserved anywhere
self.mets_servers_paths: Dict = {} # {"ws_dir_path": "mets_server_url"}
self.use_tcp_mets = ps_config.get("use_tcp_mets", False)

# TODO: Reconsider this.
Expand Down Expand Up @@ -152,27 +154,27 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path:
"Removing to avoid any weird behavior before starting the server.")
Path(mets_server_url).unlink()
self.log.info(f"Starting UDS mets server: {mets_server_url}")
pid = OcrdMetsServer.create_process(mets_server_url=mets_server_url, ws_dir_path=ws_dir_path, log_file=log_file)
self.mets_servers[mets_server_url] = pid
pid = OcrdMetsServer.create_process(mets_server_url=str(mets_server_url), ws_dir_path=str(ws_dir_path), log_file=str(log_file))
self.mets_servers[str(mets_server_url)] = pid
self.mets_servers_paths[str(ws_dir_path)] = str(mets_server_url)
return mets_server_url

def stop_uds_mets_server(self, mets_server_url: str, stop_with_pid: bool = False) -> None:
def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str) -> None:
self.log.info(f"Stopping UDS mets server: {mets_server_url}")
if stop_with_pid:
if Path(mets_server_url) not in self.mets_servers:
message = f"UDS Mets server not found at URL: {mets_server_url}"
self.log.exception(message)
raise Exception(message)
mets_server_pid = self.mets_servers[Path(mets_server_url)]
OcrdMetsServer.kill_process(mets_server_pid=mets_server_pid)
if Path(mets_server_url).exists():
self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url}")
Path(mets_server_url).unlink()
return
# TODO: Reconsider this again
# Not having this sleep here causes connection errors
# on the last request processed by the processing worker.
# Sometimes 3 seconds is enough, sometimes not.
sleep(5)
stop_mets_server(mets_server_url=mets_server_url)
self.log.info(f"Path to the mets file: {path_to_mets}")
self.log.debug(f"mets_server: {self.mets_servers}")
self.log.debug(f"mets_server_paths: {self.mets_servers_paths}")
workspace_path = str(Path(path_to_mets).parent)
mets_server_url_uds = self.mets_servers_paths[workspace_path]
mets_server_pid = self.mets_servers[mets_server_url_uds]
self.log.info(f"Terminating mets server with pid: {mets_server_pid}")
p = psutil.Process(mets_server_pid)
stop_mets_server(self.log, mets_server_url=mets_server_url, ws_dir_path=workspace_path)
if p.is_running():
p.wait()
self.log.info(f"Terminated mets server with pid: {mets_server_pid}")
else:
self.log.info(f"Mets server with pid: {mets_server_pid} has already terminated.")
del self.mets_servers_paths[workspace_path]
del self.mets_servers[mets_server_url_uds]
return
Loading

0 comments on commit 9391f49

Please sign in to comment.