Skip to content

Commit

Permalink
worker: Log which outputs are missing when task is unexpectedly incom…
Browse files Browse the repository at this point in the history
…plete
  • Loading branch information
progval committed Oct 17, 2023
1 parent 319ce20 commit bda147c
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 1 deletion.
3 changes: 3 additions & 0 deletions luigi/contrib/dropbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ def __init__(self, path, token, format=None, user_agent="Luigi"):
self.client = DropboxClient(token, user_agent)
self.format = format or luigi.format.get_default_format()

def __str__(self):
return self.path

Check warning on line 302 in luigi/contrib/dropbox.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/dropbox.py#L301-L302

Added lines #L301 - L302 were not covered by tests

@property
def fs(self):
return self.client
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ def __init__(self, table, partition, database='default', fail_missing_table=True
self.client = client or get_default_client()
self.fail_missing_table = fail_missing_table

def __str__(self):
return self.path

Check warning on line 489 in luigi/contrib/hive.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/hive.py#L489

Added line #L489 was not covered by tests

def exists(self):
"""
returns `True` if the partition/table exists
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def __init__(self, mongo_client, index, collection):
self._index = index
self._collection = collection

def __str__(self):
return '%s/%s' % (self._index, self._collection)

Check warning on line 39 in luigi/contrib/mongodb.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/mongodb.py#L39

Added line #L39 was not covered by tests

def get_collection(self):
"""
Return targeted mongo collection to query on
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/mssqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def __init__(self, host, database, user, password, table, update_id):
self.table = table
self.update_id = update_id

def __str__(self):
return self.table

Check warning on line 72 in luigi/contrib/mssqldb.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/mssqldb.py#L71-L72

Added lines #L71 - L72 were not covered by tests

def touch(self, connection=None):
"""
Mark this update as complete.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/mysqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def __init__(self, host, database, user, password, table, update_id, **cnx_kwarg
self.update_id = update_id
self.cnx_kwargs = cnx_kwargs

def __str__(self):
return self.table

Check warning on line 72 in luigi/contrib/mysqldb.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/mysqldb.py#L72

Added line #L72 was not covered by tests

def touch(self, connection=None):
"""
Mark this update as complete.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ def __init__(
self.table = table
self.update_id = update_id

def __str__(self):
return self.table

Check warning on line 204 in luigi/contrib/postgres.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/postgres.py#L204

Added line #L204 was not covered by tests

def touch(self, connection=None):
"""
Mark this update as complete.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/presto.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def __init__(self, client, catalog, database, table, partition=None):
self._client = client
self._count = None

def __str__(self):
return self.table

Check warning on line 135 in luigi/contrib/presto.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/presto.py#L135

Added line #L135 was not covered by tests

@property
def _count_query(self):
partition = OrderedDict(self.partition or {1: 1})
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/redis_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def __init__(self, host, port, db, update_id, password=None,
socket_timeout=self.socket_timeout,
)

def __str__(self):
return self.marker_key()

Check warning on line 77 in luigi/contrib/redis_store.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/redis_store.py#L76-L77

Added lines #L76 - L77 were not covered by tests

def marker_key(self):
"""
Generate a key for the indicator hash.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ def __init__(self, task_obj):
shutil.rmtree(path)
logger.debug('Deleted temporary directory %s', path)

def __str__(self):
return self.task_id

def get_path(self):
"""
Returns a temporary file path based on a MD5 hash generated with the task's name and its arguments
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/sqla.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def __init__(self, connection_string, target_table, update_id, echo=False, conne
self.connect_args = connect_args
self.marker_table_bound = None

def __str__(self):
return self.marker_table_bound or self.connection_string

Check warning on line 193 in luigi/contrib/sqla.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/sqla.py#L193

Added line #L193 was not covered by tests

@property
def engine(self):
"""
Expand Down
3 changes: 3 additions & 0 deletions luigi/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ def __init__(self, path):
# cast to str to allow path to be objects like pathlib.PosixPath and py._path.local.LocalPath
self.path = str(path)

def __str__(self):
return self.path

Check warning on line 219 in luigi/target.py

View check run for this annotation

Codecov / codecov/patch

luigi/target.py#L219

Added line #L219 was not covered by tests

@property
@abc.abstractmethod
def fs(self):
Expand Down
9 changes: 8 additions & 1 deletion luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,14 @@ def run(self):
# checking completeness of self.task so outputs of dependencies are
# irrelevant.
if self.check_unfulfilled_deps and not _is_external(self.task):
missing = [dep.task_id for dep in self.task.deps() if not self.check_complete(dep)]
missing = []
for dep in self.task.deps():
if not self.check_complete(dep):
nonexistent_outputs = [output for output in dep.output() if not output.exists()]
if nonexistent_outputs:
missing.append('%s (%s)' % (dep.task_id, ', '.join(map(str, nonexistent_outputs))))

Check warning on line 190 in luigi/worker.py

View check run for this annotation

Codecov / codecov/patch

luigi/worker.py#L190

Added line #L190 was not covered by tests
else:
missing.append(dep.task_id)
if missing:
deps = 'dependency' if len(missing) == 1 else 'dependencies'
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
Expand Down

0 comments on commit bda147c

Please sign in to comment.