Skip to content

Commit

Permalink
Merge pull request oda-hub#713 from oda-hub/lock-creation-scratch_dir
Browse files Browse the repository at this point in the history
Lock mechanism for scratch_dir creation, small refactoring
  • Loading branch information
burnout87 authored Oct 8, 2024
2 parents 2d00c76 + 4919457 commit 1b0c79e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 17 deletions.
51 changes: 34 additions & 17 deletions cdci_data_analysis/flask_app/dispatcher_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import glob
import string
import random
import fcntl

from flask import jsonify, send_from_directory, make_response
from flask import request, g
Expand Down Expand Up @@ -886,9 +887,13 @@ def get_request_files_dir(self):
return request_files_dir.path

def set_scratch_dir(self, session_id, job_id=None, verbose=False):
if verbose == True:
print('SETSCRATCH ---->', session_id,
type(session_id), job_id, type(job_id))
lock_file = f".lock_{self.job_id}"
scratch_dir_retry_attempts = 5
scratch_dir_retry_delay = 0.2
scratch_dir_created = True

if verbose:
print('SETSCRATCH ---->', session_id, type(session_id), job_id, type(job_id))

wd = 'scratch'

Expand All @@ -898,14 +903,28 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False):
if job_id is not None:
wd += '_jid_'+job_id

alias_workdir = self.get_existing_job_ID_path(
wd=FilePath(file_dir=wd).path)
if alias_workdir is not None:
wd = wd+'_aliased'

wd = FilePath(file_dir=wd)
wd.mkdir()
self.scratch_dir = wd.path
for attempt in range(scratch_dir_retry_attempts):
try:
with open(lock_file, 'w') as lock:
fcntl.flock(lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
alias_workdir = self.get_existing_job_ID_path(wd=FilePath(file_dir=wd).path)
if alias_workdir is not None:
wd = wd + '_aliased'

wd_path_obj = FilePath(file_dir=wd)
wd_path_obj.mkdir()
self.scratch_dir = wd_path_obj.path
scratch_dir_created = True
break
except (OSError, IOError) as io_e:
scratch_dir_created = False
self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts - (attempt + 1)} left), sleeping {scratch_dir_retry_delay} seconds until retry.\nError: {str(io_e)}')
time.sleep(scratch_dir_retry_delay)

if not scratch_dir_created:
dir_list = glob.glob(f"*_jid_{job_id}*")
sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts.\njob_id: {self.job_id}\ndir_list: {dir_list}")
raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.", status_code=500)

def set_temp_dir(self, session_id, job_id=None, verbose=False):
if verbose:
Expand Down Expand Up @@ -1659,9 +1678,7 @@ def set_config(self):
def get_existing_job_ID_path(self, wd):
# exist same job_ID, different session ID
dir_list = glob.glob(f'*_jid_{self.job_id}')
# print('dirs',dir_list)
if dir_list:
dir_list = [d for d in dir_list if 'aliased' not in d]
dir_list = [d for d in dir_list if 'aliased' not in d]

if len(dir_list) == 1:
if dir_list[0] != wd:
Expand All @@ -1670,9 +1687,8 @@ def get_existing_job_ID_path(self, wd):
alias_dir = None

elif len(dir_list) > 1:
sentry.capture_message(f'Found two non aliased identical job_id, dir_list: {dir_list}')
self.logger.warning(f'Found two non aliased identical job_id, dir_list: {dir_list}')

sentry.capture_message(f'Found two or more non aliased identical job_id, dir_list: {dir_list}')
self.logger.warning(f'Found two or more non aliased identical job_id, dir_list: {dir_list}')
raise InternalError("We have encountered an internal error! "
"Our team is notified and is working on it. We are sorry! "
"When we find a solution we will try to reach you",
Expand All @@ -1683,6 +1699,7 @@ def get_existing_job_ID_path(self, wd):

return alias_dir


def get_file_mtime(self, file):
return os.path.getmtime(file)

Expand Down
51 changes: 51 additions & 0 deletions tests/test_server_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import shutil
import urllib
import io

import requests
import time
import uuid
Expand All @@ -11,6 +12,7 @@
import jwt
import glob
import pytest
import fcntl
from datetime import datetime, timedelta
from dateutil import parser, tz
from functools import reduce
Expand Down Expand Up @@ -320,6 +322,55 @@ def test_error_two_scratch_dir_same_job_id(dispatcher_live_fixture):
os.rmdir(fake_scratch_dir)


@pytest.mark.not_safe_parallel
@pytest.mark.fast
def test_scratch_dir_creation_lock_error(dispatcher_live_fixture):
DispatcherJobState.remove_scratch_folders()
server = dispatcher_live_fixture
logger.info("constructed server: %s", server)

encoded_token = jwt.encode(default_token_payload, secret_key, algorithm='HS256')
# issuing a request each, with the same set of parameters
params = dict(
query_status="new",
query_type="Real",
instrument="empty-async",
product_type="dummy",
token=encoded_token
)
DataServerQuery.set_status('submitted')
# let's generate a fake scratch dir
jdata = ask(server,
params,
expected_query_status=["submitted"],
max_time_s=50,
)

job_id = jdata['job_monitor']['job_id']
session_id = jdata['session_id']
fake_scratch_dir = f'scratch_sid_01234567890_jid_{job_id}'
os.makedirs(fake_scratch_dir)

params['job_id'] = job_id
params['session_id'] = session_id

lock_file = f".lock_{job_id}"

with open(lock_file, 'w') as f_lock:
fcntl.flock(f_lock, fcntl.LOCK_EX)

jdata = ask(server,
params,
expected_status_code=500,
expected_query_status=None,
)
scratch_dir_retry_attempts = 5
assert jdata['error'] == f"InternalError():Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts."
assert jdata['error_message'] == f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts."
os.rmdir(fake_scratch_dir)
os.remove(lock_file)


@pytest.mark.fast
def test_same_request_different_users(dispatcher_live_fixture):
server = dispatcher_live_fixture
Expand Down

0 comments on commit 1b0c79e

Please sign in to comment.