Skip to content

Commit

Permalink
Merge branch 'master' into dropbox-root-namespace-id
Browse files Browse the repository at this point in the history
  • Loading branch information
dlstadther authored Nov 30, 2024
2 parents fe2157a + b1d3ca3 commit 6db62e5
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 8 deletions.
11 changes: 11 additions & 0 deletions doc/central_scheduler.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,14 @@ The task history has the following pages:
a listing of all runs of the task ``{name}`` restricted to runs with ``params`` matching the given history.
The ``params`` is a json blob describing the parameters,
e.g. ``data={"foo": "bar"}`` looks for a task with ``foo=bar``.
* ``/history/by_task_id/{task_id}``
the latest run of a task given the ``{task_id}``. It is different from just ``{id}``
and is a derivative of ``params``. It is available via ``{task_id}`` property of a
``luigi.Task`` instance or via `luigi.task.task_id_str
<https://luigi.readthedocs.io/en/stable/api/luigi.task.html#luigi.task.task_id_str>`_.
This kind of representation is useful for concisely recording URLs in a history tree.
Example screenshot:

.. figure:: history_by_task_id.png
:alt: By task_id screenshot

Binary file added doc/history_by_task_id.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion luigi/contrib/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def get(self, path, local_path):

self._close()

os.rename(tmp_local_path, local_path)
os.replace(tmp_local_path, local_path)

def _sftp_get(self, path, tmp_local_path):
self.conn.get(path, tmp_local_path)
Expand Down
2 changes: 1 addition & 1 deletion luigi/contrib/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def get(self, path, local_path):

tmp_local_path = local_path + '-luigi-tmp-%09d' % random.randrange(0, 10_000_000_000)
self._scp("%s:%s" % (self.remote_context._host_ref(), path), tmp_local_path)
os.rename(tmp_local_path, local_path)
os.replace(tmp_local_path, local_path)


class AtomicRemoteFileWriter(luigi.format.OutputPipeProcessWrapper):
Expand Down
7 changes: 7 additions & 0 deletions luigi/db_task_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ def find_task_by_id(self, id, session=None):
with self._session(session) as session:
return session.query(TaskRecord).get(id)

def find_task_by_task_id(self, task_id, session=None):
"""
Find task with the given task ID.
"""
with self._session(session) as session:
return session.query(TaskRecord).filter(TaskRecord.task_id == task_id).all()[-1]


class TaskParameter(Base): # type: ignore
"""
Expand Down
6 changes: 3 additions & 3 deletions luigi/local_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class atomic_file(AtomicLocalFile):
"""

def move_to_final_destination(self):
os.rename(self.tmp_path, self.path)
os.replace(self.tmp_path, self.path)

def generate_tmp_path(self, path):
return path + '-luigi-tmp-%09d' % random.randrange(0, 10_000_000_000)
Expand Down Expand Up @@ -109,12 +109,12 @@ def move(self, old_path, new_path, raise_if_exists=False):
if d and not os.path.exists(d):
self.mkdir(d)
try:
os.rename(old_path, new_path)
os.replace(old_path, new_path)
except OSError as err:
if err.errno == errno.EXDEV:
new_path_tmp = '%s-%09d' % (new_path, random.randint(0, 999999999))
shutil.copy(old_path, new_path_tmp)
os.rename(new_path_tmp, new_path)
os.replace(new_path_tmp, new_path)
os.remove(old_path)
else:
raise err
Expand Down
8 changes: 8 additions & 0 deletions luigi/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ def get(self, id):
self.render("show.html", task=task)


class ByTaskIdHandler(BaseTaskHistoryHandler):
def get(self, task_id):
with self._scheduler.task_history._session(None) as session:
task = self._scheduler.task_history.find_task_by_task_id(task_id, session)
self.render("show.html", task=task)


class ByParamsHandler(BaseTaskHistoryHandler):
def get(self, name):
payload = self.get_argument('data', default="{}")
Expand Down Expand Up @@ -314,6 +321,7 @@ def app(scheduler):
(r'/history', RecentRunHandler, {'scheduler': scheduler}),
(r'/history/by_name/(.*?)', ByNameHandler, {'scheduler': scheduler}),
(r'/history/by_id/(.*?)', ByIdHandler, {'scheduler': scheduler}),
(r'/history/by_task_id/(.*?)', ByTaskIdHandler, {'scheduler': scheduler}),
(r'/history/by_params/(.*?)', ByParamsHandler, {'scheduler': scheduler}),
(r'/metrics', MetricsHandler, {'scheduler': scheduler})
]
Expand Down
6 changes: 3 additions & 3 deletions test/local_target_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,16 @@ def rename_across_filesystems(src, dst):
err.errno = EXDEV
raise err

real_rename = os.rename
real_rename = os.replace

def mockrename(src, dst):
def mockreplace(src, dst):
if '-across-fs' in src:
real_rename(src, dst)
else:
rename_across_filesystems(src, dst)

copy = '%s-across-fs' % self.copy
with mock.patch('os.rename', mockrename):
with mock.patch('os.replace', mockreplace):
t.move(copy)

self.assertFalse(os.path.exists(self.path))
Expand Down

0 comments on commit 6db62e5

Please sign in to comment.