diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index e086245f..4dfad4e0 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -22,6 +22,7 @@ import glob import string import random +import fcntl from flask import jsonify, send_from_directory, make_response from flask import request, g @@ -886,9 +887,13 @@ def get_request_files_dir(self): return request_files_dir.path def set_scratch_dir(self, session_id, job_id=None, verbose=False): - if verbose == True: - print('SETSCRATCH ---->', session_id, - type(session_id), job_id, type(job_id)) + lock_file = f".lock_{self.job_id}" + scratch_dir_retry_attempts = 5 + scratch_dir_retry_delay = 0.2 + scratch_dir_created = True + + if verbose: + print('SETSCRATCH ---->', session_id, type(session_id), job_id, type(job_id)) wd = 'scratch' @@ -898,14 +903,28 @@ def set_scratch_dir(self, session_id, job_id=None, verbose=False): if job_id is not None: wd += '_jid_'+job_id - alias_workdir = self.get_existing_job_ID_path( - wd=FilePath(file_dir=wd).path) - if alias_workdir is not None: - wd = wd+'_aliased' - - wd = FilePath(file_dir=wd) - wd.mkdir() - self.scratch_dir = wd.path + for attempt in range(scratch_dir_retry_attempts): + try: + with open(lock_file, 'w') as lock: + fcntl.flock(lock, fcntl.LOCK_EX | fcntl.LOCK_NB) + alias_workdir = self.get_existing_job_ID_path(wd=FilePath(file_dir=wd).path) + if alias_workdir is not None: + wd = wd + '_aliased' + + wd_path_obj = FilePath(file_dir=wd) + wd_path_obj.mkdir() + self.scratch_dir = wd_path_obj.path + scratch_dir_created = True + break + except (OSError, IOError) as io_e: + scratch_dir_created = False + self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts - (attempt + 1)} left), sleeping {scratch_dir_retry_delay} seconds until retry.\nError: {str(io_e)}') + time.sleep(scratch_dir_retry_delay) + + if not scratch_dir_created: + dir_list = glob.glob(f"*_jid_{job_id}*") + sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts.\njob_id: {self.job_id}\ndir_list: {dir_list}") + raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.", status_code=500) def set_temp_dir(self, session_id, job_id=None, verbose=False): if verbose: @@ -1659,9 +1678,7 @@ def set_config(self): def get_existing_job_ID_path(self, wd): # exist same job_ID, different session ID dir_list = glob.glob(f'*_jid_{self.job_id}') - # print('dirs',dir_list) - if dir_list: - dir_list = [d for d in dir_list if 'aliased' not in d] + dir_list = [d for d in dir_list if 'aliased' not in d] if len(dir_list) == 1: if dir_list[0] != wd: @@ -1670,9 +1687,8 @@ def get_existing_job_ID_path(self, wd): alias_dir = None elif len(dir_list) > 1: - sentry.capture_message(f'Found two non aliased identical job_id, dir_list: {dir_list}') - self.logger.warning(f'Found two non aliased identical job_id, dir_list: {dir_list}') - + sentry.capture_message(f'Found two or more non aliased identical job_id, dir_list: {dir_list}') + self.logger.warning(f'Found two or more non aliased identical job_id, dir_list: {dir_list}') raise InternalError("We have encountered an internal error! " "Our team is notified and is working on it. We are sorry! " "When we find a solution we will try to reach you", @@ -1683,6 +1699,7 @@ def get_existing_job_ID_path(self, wd): return alias_dir + def get_file_mtime(self, file): return os.path.getmtime(file) diff --git a/tests/test_server_basic.py b/tests/test_server_basic.py index 6612e4b2..26a21e9a 100644 --- a/tests/test_server_basic.py +++ b/tests/test_server_basic.py @@ -2,6 +2,7 @@ import shutil import urllib import io + import requests import time import uuid @@ -11,6 +12,7 @@ import jwt import glob import pytest +import fcntl from datetime import datetime, timedelta from dateutil import parser, tz from functools import reduce @@ -320,6 +322,55 @@ def test_error_two_scratch_dir_same_job_id(dispatcher_live_fixture): os.rmdir(fake_scratch_dir) +@pytest.mark.not_safe_parallel +@pytest.mark.fast +def test_scratch_dir_creation_lock_error(dispatcher_live_fixture): + DispatcherJobState.remove_scratch_folders() + server = dispatcher_live_fixture + logger.info("constructed server: %s", server) + + encoded_token = jwt.encode(default_token_payload, secret_key, algorithm='HS256') + # issuing a request each, with the same set of parameters + params = dict( + query_status="new", + query_type="Real", + instrument="empty-async", + product_type="dummy", + token=encoded_token + ) + DataServerQuery.set_status('submitted') + # let's generate a fake scratch dir + jdata = ask(server, + params, + expected_query_status=["submitted"], + max_time_s=50, + ) + + job_id = jdata['job_monitor']['job_id'] + session_id = jdata['session_id'] + fake_scratch_dir = f'scratch_sid_01234567890_jid_{job_id}' + os.makedirs(fake_scratch_dir) + + params['job_id'] = job_id + params['session_id'] = session_id + + lock_file = f".lock_{job_id}" + + with open(lock_file, 'w') as f_lock: + fcntl.flock(f_lock, fcntl.LOCK_EX) + + jdata = ask(server, + params, + expected_status_code=500, + expected_query_status=None, + ) + scratch_dir_retry_attempts = 5 + assert jdata['error'] == f"InternalError():Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts." + assert jdata['error_message'] == f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts." + os.rmdir(fake_scratch_dir) + os.remove(lock_file) + + @pytest.mark.fast def test_same_request_different_users(dispatcher_live_fixture): server = dispatcher_live_fixture