From bda147c999d170a3227e91e9d4bdbb16ac8f7930 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Tue, 17 Oct 2023 15:07:16 +0200 Subject: [PATCH 1/2] worker: Log which outputs are missing when task is unexpectedly incomplete --- luigi/contrib/dropbox.py | 3 +++ luigi/contrib/hive.py | 3 +++ luigi/contrib/mongodb.py | 3 +++ luigi/contrib/mssqldb.py | 3 +++ luigi/contrib/mysqldb.py | 3 +++ luigi/contrib/postgres.py | 3 +++ luigi/contrib/presto.py | 3 +++ luigi/contrib/redis_store.py | 3 +++ luigi/contrib/simulate.py | 3 +++ luigi/contrib/sqla.py | 3 +++ luigi/target.py | 3 +++ luigi/worker.py | 9 ++++++++- 12 files changed, 41 insertions(+), 1 deletion(-) diff --git a/luigi/contrib/dropbox.py b/luigi/contrib/dropbox.py index aaa77953b2..52a6b9ae16 100644 --- a/luigi/contrib/dropbox.py +++ b/luigi/contrib/dropbox.py @@ -298,6 +298,9 @@ def __init__(self, path, token, format=None, user_agent="Luigi"): self.client = DropboxClient(token, user_agent) self.format = format or luigi.format.get_default_format() + def __str__(self): + return self.path + @property def fs(self): return self.client diff --git a/luigi/contrib/hive.py b/luigi/contrib/hive.py index 70dc4cab3c..0af1025d27 100644 --- a/luigi/contrib/hive.py +++ b/luigi/contrib/hive.py @@ -485,6 +485,9 @@ def __init__(self, table, partition, database='default', fail_missing_table=True self.client = client or get_default_client() self.fail_missing_table = fail_missing_table + def __str__(self): + return self.path + def exists(self): """ returns `True` if the partition/table exists diff --git a/luigi/contrib/mongodb.py b/luigi/contrib/mongodb.py index 7fa44cca80..640d46652b 100644 --- a/luigi/contrib/mongodb.py +++ b/luigi/contrib/mongodb.py @@ -35,6 +35,9 @@ def __init__(self, mongo_client, index, collection): self._index = index self._collection = collection + def __str__(self): + return '%s/%s' % (self._index, self._collection) + def get_collection(self): """ Return targeted mongo collection to query on diff --git a/luigi/contrib/mssqldb.py b/luigi/contrib/mssqldb.py index 57c0570673..8a70aaa5b0 100644 --- a/luigi/contrib/mssqldb.py +++ b/luigi/contrib/mssqldb.py @@ -68,6 +68,9 @@ def __init__(self, host, database, user, password, table, update_id): self.table = table self.update_id = update_id + def __str__(self): + return self.table + def touch(self, connection=None): """ Mark this update as complete. diff --git a/luigi/contrib/mysqldb.py b/luigi/contrib/mysqldb.py index 45e928314c..804e94fdee 100644 --- a/luigi/contrib/mysqldb.py +++ b/luigi/contrib/mysqldb.py @@ -68,6 +68,9 @@ def __init__(self, host, database, user, password, table, update_id, **cnx_kwarg self.update_id = update_id self.cnx_kwargs = cnx_kwargs + def __str__(self): + return self.table + def touch(self, connection=None): """ Mark this update as complete. diff --git a/luigi/contrib/postgres.py b/luigi/contrib/postgres.py index f1dfebf4b8..719b80a4d7 100644 --- a/luigi/contrib/postgres.py +++ b/luigi/contrib/postgres.py @@ -200,6 +200,9 @@ def __init__( self.table = table self.update_id = update_id + def __str__(self): + return self.table + def touch(self, connection=None): """ Mark this update as complete. diff --git a/luigi/contrib/presto.py b/luigi/contrib/presto.py index dac21ecbef..dd5cd027c6 100644 --- a/luigi/contrib/presto.py +++ b/luigi/contrib/presto.py @@ -131,6 +131,9 @@ def __init__(self, client, catalog, database, table, partition=None): self._client = client self._count = None + def __str__(self): + return self.table + @property def _count_query(self): partition = OrderedDict(self.partition or {1: 1}) diff --git a/luigi/contrib/redis_store.py b/luigi/contrib/redis_store.py index 0a1d3bc60e..5b128d2837 100644 --- a/luigi/contrib/redis_store.py +++ b/luigi/contrib/redis_store.py @@ -73,6 +73,9 @@ def __init__(self, host, port, db, update_id, password=None, socket_timeout=self.socket_timeout, ) + def __str__(self): + return self.marker_key() + def marker_key(self): """ Generate a key for the indicator hash. diff --git a/luigi/contrib/simulate.py b/luigi/contrib/simulate.py index 88ea90664c..240512c8f5 100644 --- a/luigi/contrib/simulate.py +++ b/luigi/contrib/simulate.py @@ -79,6 +79,9 @@ def __init__(self, task_obj): shutil.rmtree(path) logger.debug('Deleted temporary directory %s', path) + def __str__(self): + return self.task_id + def get_path(self): """ Returns a temporary file path based on a MD5 hash generated with the task's name and its arguments diff --git a/luigi/contrib/sqla.py b/luigi/contrib/sqla.py index ad8a352df1..7a64e85130 100644 --- a/luigi/contrib/sqla.py +++ b/luigi/contrib/sqla.py @@ -189,6 +189,9 @@ def __init__(self, connection_string, target_table, update_id, echo=False, conne self.connect_args = connect_args self.marker_table_bound = None + def __str__(self): + return self.marker_table_bound or self.connection_string + @property def engine(self): """ diff --git a/luigi/target.py b/luigi/target.py index 8b333f5326..ff5183e3eb 100644 --- a/luigi/target.py +++ b/luigi/target.py @@ -215,6 +215,9 @@ def __init__(self, path): # cast to str to allow path to be objects like pathlib.PosixPath and py._path.local.LocalPath self.path = str(path) + def __str__(self): + return self.path + @property @abc.abstractmethod def fs(self): diff --git a/luigi/worker.py b/luigi/worker.py index 01a2772a68..c678487fa3 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -182,7 +182,14 @@ def run(self): # checking completeness of self.task so outputs of dependencies are # irrelevant. if self.check_unfulfilled_deps and not _is_external(self.task): - missing = [dep.task_id for dep in self.task.deps() if not self.check_complete(dep)] + missing = [] + for dep in self.task.deps(): + if not self.check_complete(dep): + nonexistent_outputs = [output for output in dep.output() if not output.exists()] + if nonexistent_outputs: + missing.append('%s (%s)' % (dep.task_id, ', '.join(map(str, nonexistent_outputs)))) + else: + missing.append(dep.task_id) if missing: deps = 'dependency' if len(missing) == 1 else 'dependencies' raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing))) From db92ba7fdb34ad9541c239c0fba9d7024dcd2dc9 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 21 Oct 2023 08:04:02 +0200 Subject: [PATCH 2/2] Apply @dlstadther's suggestions --- luigi/contrib/mongodb.py | 2 +- luigi/contrib/sqla.py | 2 +- luigi/worker.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/luigi/contrib/mongodb.py b/luigi/contrib/mongodb.py index 640d46652b..7e4a6069bf 100644 --- a/luigi/contrib/mongodb.py +++ b/luigi/contrib/mongodb.py @@ -36,7 +36,7 @@ def __init__(self, mongo_client, index, collection): self._collection = collection def __str__(self): - return '%s/%s' % (self._index, self._collection) + return f'{self._index}/{self._collection}' def get_collection(self): """ diff --git a/luigi/contrib/sqla.py b/luigi/contrib/sqla.py index 7a64e85130..f292c21354 100644 --- a/luigi/contrib/sqla.py +++ b/luigi/contrib/sqla.py @@ -190,7 +190,7 @@ def __init__(self, connection_string, target_table, update_id, echo=False, conne self.marker_table_bound = None def __str__(self): - return self.marker_table_bound or self.connection_string + return self.target_table @property def engine(self): diff --git a/luigi/worker.py b/luigi/worker.py index c678487fa3..c3ea777b8a 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -187,7 +187,7 @@ def run(self): if not self.check_complete(dep): nonexistent_outputs = [output for output in dep.output() if not output.exists()] if nonexistent_outputs: - missing.append('%s (%s)' % (dep.task_id, ', '.join(map(str, nonexistent_outputs)))) + missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})') else: missing.append(dep.task_id) if missing: