forked from y-scope/clp
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(clp-package): Add support for deleting archives that are exclusi…
…vely within a time range. (y-scope#594) Co-authored-by: kirkrodrigues <[email protected]>
- Loading branch information
1 parent
7aea626
commit b8e22da
Showing
13 changed files
with
272 additions
and
72 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import logging | ||
|
||
# Set up console logging | ||
logging_console_handler = logging.StreamHandler() | ||
logging_formatter = logging.Formatter( | ||
"%(asctime)s.%(msecs)03d %(levelname)s [%(module)s] %(message)s", datefmt="%Y-%m-%dT%H:%M:%S" | ||
) | ||
logging_console_handler.setFormatter(logging_formatter) | ||
|
||
# Set up root logger | ||
root_logger = logging.getLogger() | ||
root_logger.setLevel(logging.INFO) | ||
root_logger.addHandler(logging_console_handler) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
103 changes: 103 additions & 0 deletions
103
components/clp-package-utils/clp_package_utils/scripts/del_archives.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import argparse | ||
import logging | ||
import subprocess | ||
import sys | ||
from pathlib import Path | ||
|
||
from clp_package_utils.general import ( | ||
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, | ||
dump_container_config, | ||
generate_container_config, | ||
generate_container_name, | ||
generate_container_start_cmd, | ||
get_clp_home, | ||
load_config_file, | ||
validate_and_load_db_credentials_file, | ||
) | ||
|
||
logger = logging.getLogger(__file__) | ||
|
||
|
||
def main(argv): | ||
clp_home = get_clp_home() | ||
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH | ||
|
||
args_parser = argparse.ArgumentParser( | ||
description="Deletes archives that fall within the specified time range." | ||
) | ||
args_parser.add_argument( | ||
"--config", | ||
"-c", | ||
default=str(default_config_file_path), | ||
help="CLP package configuration file.", | ||
) | ||
args_parser.add_argument( | ||
"--begin-ts", | ||
type=int, | ||
default=0, | ||
help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", | ||
) | ||
args_parser.add_argument( | ||
"--end-ts", | ||
type=int, | ||
required=True, | ||
help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", | ||
) | ||
parsed_args = args_parser.parse_args(argv[1:]) | ||
|
||
# Validate and load config file | ||
try: | ||
config_file_path = Path(parsed_args.config) | ||
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) | ||
clp_config.validate_logs_dir() | ||
|
||
# Validate and load necessary credentials | ||
validate_and_load_db_credentials_file(clp_config, clp_home, False) | ||
except: | ||
logger.exception("Failed to load config.") | ||
return -1 | ||
|
||
# Validate the input timestamp | ||
begin_ts = parsed_args.begin_ts | ||
end_ts = parsed_args.end_ts | ||
if begin_ts > end_ts: | ||
logger.error("begin-ts must be <= end-ts") | ||
return -1 | ||
if end_ts < 0 or begin_ts < 0: | ||
logger.error("begin_ts and end_ts must be non-negative.") | ||
return -1 | ||
|
||
container_name = generate_container_name("del-archives") | ||
|
||
container_clp_config, mounts = generate_container_config(clp_config, clp_home) | ||
generated_config_path_on_container, generated_config_path_on_host = dump_container_config( | ||
container_clp_config, clp_config, container_name | ||
) | ||
|
||
necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir] | ||
container_start_cmd = generate_container_start_cmd( | ||
container_name, necessary_mounts, clp_config.execution_container | ||
) | ||
|
||
# fmt: off | ||
del_archive_cmd = [ | ||
"python3", | ||
"-m", "clp_package_utils.scripts.native.del_archives", | ||
"--config", str(generated_config_path_on_container), | ||
str(begin_ts), | ||
str(end_ts) | ||
|
||
] | ||
# fmt: on | ||
|
||
cmd = container_start_cmd + del_archive_cmd | ||
subprocess.run(cmd, check=True) | ||
|
||
# Remove generated files | ||
generated_config_path_on_host.unlink() | ||
|
||
return 0 | ||
|
||
|
||
if "__main__" == __name__: | ||
sys.exit(main(sys.argv)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
139 changes: 139 additions & 0 deletions
139
components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
import argparse | ||
import logging | ||
import shutil | ||
import sys | ||
from contextlib import closing | ||
from pathlib import Path | ||
from typing import List | ||
|
||
from clp_py_utils.clp_config import Database | ||
from clp_py_utils.sql_adapter import SQL_Adapter | ||
|
||
from clp_package_utils.general import ( | ||
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, | ||
get_clp_home, | ||
load_config_file, | ||
) | ||
|
||
logger = logging.getLogger(__file__) | ||
|
||
|
||
def main(argv): | ||
clp_home = get_clp_home() | ||
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH | ||
|
||
args_parser = argparse.ArgumentParser( | ||
description="Deletes archives that fall within the specified time range." | ||
) | ||
args_parser.add_argument( | ||
"--config", | ||
"-c", | ||
required=True, | ||
default=str(default_config_file_path), | ||
help="CLP configuration file.", | ||
) | ||
args_parser.add_argument( | ||
"begin_ts", | ||
type=int, | ||
help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", | ||
) | ||
args_parser.add_argument( | ||
"end_ts", | ||
type=int, | ||
help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", | ||
) | ||
parsed_args = args_parser.parse_args(argv[1:]) | ||
|
||
# Validate and load config file | ||
config_file_path = Path(parsed_args.config) | ||
try: | ||
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) | ||
clp_config.validate_logs_dir() | ||
except: | ||
logger.exception("Failed to load config.") | ||
return -1 | ||
|
||
database_config = clp_config.database | ||
archives_dir = clp_config.archive_output.directory | ||
if not archives_dir.exists(): | ||
logger.error("`archive_output.directory` doesn't exist.") | ||
return -1 | ||
|
||
return _delete_archives( | ||
archives_dir, | ||
database_config, | ||
parsed_args.begin_ts, | ||
parsed_args.end_ts, | ||
) | ||
|
||
|
||
def _delete_archives( | ||
archives_dir: Path, | ||
database_config: Database, | ||
begin_ts: int, | ||
end_ts: int, | ||
) -> int: | ||
""" | ||
Deletes all archives where `begin_ts <= archive.begin_timestamp` and | ||
`archive.end_timestamp <= end_ts` from both the metadata database and disk. | ||
:param archives_dir: | ||
:param database_config: | ||
:param begin_ts: | ||
:param end_ts: | ||
:return: 0 on success, -1 otherwise. | ||
""" | ||
|
||
archive_ids: List[str] | ||
logger.info("Starting to delete archives from the database.") | ||
try: | ||
sql_adapter = SQL_Adapter(database_config) | ||
clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) | ||
table_prefix = clp_db_connection_params["table_prefix"] | ||
with closing(sql_adapter.create_connection(True)) as db_conn, closing( | ||
db_conn.cursor(dictionary=True) | ||
) as db_cursor: | ||
db_cursor.execute( | ||
f""" | ||
DELETE FROM `{table_prefix}archives` | ||
WHERE begin_timestamp >= %s AND end_timestamp <= %s | ||
RETURNING id | ||
""", | ||
(begin_ts, end_ts), | ||
) | ||
results = db_cursor.fetchall() | ||
|
||
if 0 == len(results): | ||
logger.info("No archives (exclusively) within the specified time range.") | ||
return 0 | ||
|
||
archive_ids = [result["id"] for result in results] | ||
db_cursor.execute( | ||
f""" | ||
DELETE FROM `{table_prefix}files` | ||
WHERE archive_id in ({', '.join(['%s'] * len(archive_ids))}) | ||
""", | ||
archive_ids, | ||
) | ||
db_conn.commit() | ||
except Exception: | ||
logger.exception("Failed to delete archives from the database. Aborting deletion.") | ||
return -1 | ||
|
||
logger.info(f"Finished deleting archives from the database.") | ||
|
||
for archive_id in archive_ids: | ||
archive_path = archives_dir / archive_id | ||
if not archive_path.is_dir(): | ||
logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.") | ||
continue | ||
|
||
logger.info(f"Deleting archive {archive_id} from disk.") | ||
shutil.rmtree(archive_path) | ||
|
||
logger.info(f"Finished deleting archives from disk.") | ||
|
||
return 0 | ||
|
||
|
||
if "__main__" == __name__: | ||
sys.exit(main(sys.argv)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.