Skip to content

Commit

Permalink
Merge branch 'master' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
pablocasares authored Aug 26, 2024
2 parents 0e501f0 + 9e0898e commit 16b1487
Show file tree
Hide file tree
Showing 28 changed files with 236 additions and 88 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ jobs:
tox-env: py310-core
- python-version: "3.11"
tox-env: py311-core
- python-version: "3.12"
tox-env: py312-core

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -92,6 +94,8 @@ jobs:
tox-env: py310-postgres
- python-version: "3.11"
tox-env: py311-postgres
- python-version: "3.12"
tox-env: py312-postgres

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -145,6 +149,8 @@ jobs:
tox-env: py310-aws
- python-version: "3.11"
tox-env: py311-aws
- python-version: "3.12"
tox-env: py312-aws

- python-version: "3.6"
tox-env: py36-unixsocket
Expand All @@ -164,6 +170,9 @@ jobs:
- python-version: "3.11"
tox-env: py311-unixsocket
OVERRIDE_SKIP_CI_TESTS: True
- python-version: "3.12"
tox-env: py312-unixsocket
OVERRIDE_SKIP_CI_TESTS: True

- python-version: "3.6"
tox-env: py36-apache
Expand All @@ -177,6 +186,8 @@ jobs:
tox-env: py310-apache
- python-version: "3.11"
tox-env: py311-apache
- python-version: "3.12"
tox-env: py312-apache

- python-version: "3.6"
tox-env: py36-azureblob
Expand All @@ -190,6 +201,8 @@ jobs:
tox-env: py310-azureblob
- python-version: "3.11"
tox-env: py311-azureblob
- python-version: "3.12"
tox-env: py312-azureblob


- python-version: 3.9
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
:target: https://luigi.readthedocs.io/en/stable/?badge=stable
:alt: Documentation Status

Luigi is a Python (3.6, 3.7, 3.8, 3.9, 3.10, 3.11 tested) package that helps you build complex
Luigi is a Python (3.6, 3.7, 3.8, 3.9, 3.10, 3.11, 3.12 tested) package that helps you build complex
pipelines of batch jobs. It handles dependency resolution, workflow management,
visualization, handling failures, command line integration, and much more.

Expand Down
4 changes: 2 additions & 2 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,13 @@ traceback_max_length
Maximum length for traceback included in error email. Default is 5000.


[batch_notifier]
[batch_email]
----------------

Parameters controlling the contents of batch notifications sent from the
scheduler

email_interval_minutes
email_interval
Number of minutes between e-mail sends. Making this larger results in
fewer, bigger e-mails.
Defaults to 60.
Expand Down
2 changes: 1 addition & 1 deletion luigi/__meta__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
__author__ = 'The Luigi Authors'
__contact__ = 'https://github.com/spotify/luigi'
__license__ = 'Apache License 2.0'
__version__ = '3.5.0'
__version__ = '3.5.1'
__status__ = 'Production'
109 changes: 76 additions & 33 deletions luigi/contrib/azureblob.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import logging
import datetime

from azure.storage.blob import blockblobservice
from azure.storage.blob import BlobServiceClient

from luigi.format import get_default_format
from luigi.target import FileAlreadyExists, FileSystem, AtomicLocalFile, FileSystemTarget
Expand Down Expand Up @@ -62,60 +62,101 @@ def __init__(self, account_name=None, account_key=None, sas_token=None, **kwargs
* `custom_domain` - The custom domain to use. This can be set in the Azure Portal. For example, ‘www.mydomain.com’.
* `token_credential` - A token credential used to authenticate HTTPS requests. The token value should be updated before its expiration.
"""
self.options = {"account_name": account_name, "account_key": account_key, "sas_token": sas_token}
if kwargs.get("custom_domain"):
account_url = "{protocol}://{custom_domain}/{account_name}".format(protocol=kwargs.get("protocol", "https"),
custom_domain=kwargs.get("custom_domain"),
account_name=account_name)
else:
account_url = "{protocol}://{account_name}.blob.{endpoint_suffix}".format(protocol=kwargs.get("protocol",
"https"),
account_name=account_name,
endpoint_suffix=kwargs.get(
"endpoint_suffix",
"core.windows.net"))

self.options = {
"account_name": account_name,
"account_key": account_key,
"account_url": account_url,
"sas_token": sas_token}
self.kwargs = kwargs

@property
def connection(self):
return blockblobservice.BlockBlobService(account_name=self.options.get("account_name"),
account_key=self.options.get("account_key"),
sas_token=self.options.get("sas_token"),
protocol=self.kwargs.get("protocol"),
connection_string=self.kwargs.get("connection_string"),
endpoint_suffix=self.kwargs.get("endpoint_suffix"),
custom_domain=self.kwargs.get("custom_domain"),
is_emulated=self.kwargs.get("is_emulated") or False)
if self.kwargs.get("connection_string"):
return BlobServiceClient.from_connection_string(conn_str=self.kwargs.get("connection_string"),
**self.kwargs)
else:
return BlobServiceClient(account_url=self.options.get("account_url"),
credential=self.options.get("account_key") or self.options.get("sas_token"),
**self.kwargs)

def container_client(self, container_name):
return self.connection.get_container_client(container_name)

def blob_client(self, container_name, blob_name):
container_client = self.container_client(container_name)
return container_client.get_blob_client(blob_name)

def upload(self, tmp_path, container, blob, **kwargs):
logging.debug("Uploading file '{tmp_path}' to container '{container}' and blob '{blob}'".format(
tmp_path=tmp_path, container=container, blob=blob))
self.create_container(container)
lease_id = self.connection.acquire_blob_lease(container, blob)\
if self.exists("{container}/{blob}".format(container=container, blob=blob)) else None
lease = None
blob_client = self.blob_client(container, blob)
if blob_client.exists():
lease = blob_client.acquire_lease()
try:
self.connection.create_blob_from_path(container, blob, tmp_path, lease_id=lease_id, progress_callback=kwargs.get("progress_callback"))
with open(tmp_path, 'rb') as data:
blob_client.upload_blob(data,
overwrite=True,
lease=lease,
progress_hook=kwargs.get("progress_callback"))
finally:
if lease_id is not None:
self.connection.release_blob_lease(container, blob, lease_id)
if lease is not None:
lease.release()

def download_as_bytes(self, container, blob, bytes_to_read=None):
start_range, end_range = (0, bytes_to_read-1) if bytes_to_read is not None else (None, None)
logging.debug("Downloading from container '{container}' and blob '{blob}' as bytes".format(
container=container, blob=blob))
return self.connection.get_blob_to_bytes(container, blob, start_range=start_range, end_range=end_range).content
blob_client = self.blob_client(container, blob)
download_stream = blob_client.download_blob(offset=0, length=bytes_to_read) if bytes_to_read \
else blob_client.download_blob()
return download_stream.readall()

def download_as_file(self, container, blob, location):
logging.debug("Downloading from container '{container}' and blob '{blob}' to {location}".format(
container=container, blob=blob, location=location))
return self.connection.get_blob_to_path(container, blob, location)
blob_client = self.blob_client(container, blob)
with open(location, 'wb') as file:
download_stream = blob_client.download_blob()
file.write(download_stream.readall())
return blob_client.get_blob_properties()

def create_container(self, container_name):
return self.connection.create_container(container_name)
if not self.exists(container_name):
return self.connection.create_container(container_name)

def delete_container(self, container_name):
lease_id = self.connection.acquire_container_lease(container_name)
self.connection.delete_container(container_name, lease_id=lease_id)
container_client = self.container_client(container_name)
lease = container_client.acquire_lease()
container_client.delete_container(lease=lease)

def exists(self, path):
container, blob = self.splitfilepath(path)
return self.connection.exists(container, blob)
if blob is None:
return self.container_client(container).exists()
else:
return self.blob_client(container, blob).exists()

def remove(self, path, recursive=True, skip_trash=True):
container, blob = self.splitfilepath(path)
if not self.exists(path):
return False
lease_id = self.connection.acquire_blob_lease(container, blob)
self.connection.delete_blob(container, blob, lease_id=lease_id)

container, blob = self.splitfilepath(path)
blob_client = self.blob_client(container, blob)
lease = blob_client.acquire_lease()
blob_client.delete_blob(lease=lease)
return True

def mkdir(self, path, parents=True, raise_if_exists=False):
Expand Down Expand Up @@ -148,16 +189,18 @@ def copy(self, path, dest):
source_container=source_container, dest_container=dest_container
))

source_lease_id = self.connection.acquire_blob_lease(source_container, source_blob)
destination_lease_id = self.connection.acquire_blob_lease(dest_container, dest_blob) if self.exists(dest) else None
source_blob_client = self.blob_client(source_container, source_blob)
dest_blob_client = self.blob_client(dest_container, dest_blob)
source_lease = source_blob_client.acquire_lease()
destination_lease = dest_blob_client.acquire_lease() if self.exists(dest) else None
try:
return self.connection.copy_blob(source_container, dest_blob, self.connection.make_blob_url(
source_container, source_blob),
destination_lease_id=destination_lease_id, source_lease_id=source_lease_id)
return dest_blob_client.start_copy_from_url(source_url=source_blob_client.url,
source_lease=source_lease,
destination_lease=destination_lease)
finally:
self.connection.release_blob_lease(source_container, source_blob, source_lease_id)
if destination_lease_id is not None:
self.connection.release_blob_lease(dest_container, dest_blob, destination_lease_id)
source_lease.release()
if destination_lease is not None:
destination_lease.release()

def rename_dont_move(self, path, dest):
self.move(path, dest)
Expand Down
4 changes: 2 additions & 2 deletions luigi/contrib/hadoop_jar.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import logging
import os
import pipes
import random
import shlex
import warnings

import luigi.contrib.hadoop
Expand Down Expand Up @@ -110,7 +110,7 @@ def run_job(self, job, tracking_url_callback=None):
arglist += ['-o', 'UserKnownHostsFile=/dev/null',
'-o', 'StrictHostKeyChecking=no']
arglist.append('{}@{}'.format(username, host))
hadoop_arglist = [pipes.quote(arg) for arg in hadoop_arglist]
hadoop_arglist = [shlex.quote(arg) for arg in hadoop_arglist]
arglist.append(' '.join(hadoop_arglist))
else:
if not os.path.exists(job.jar()):
Expand Down
5 changes: 4 additions & 1 deletion luigi/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def normalize(self, x):
"""
return x # default impl

def next_in_enumeration(self, _value):
def next_in_enumeration(self, value):
"""
If your Parameter type has an enumerable ordering of values. You can
choose to override this method. This method is used by the
Expand Down Expand Up @@ -389,6 +389,9 @@ def _warn_on_wrong_param_type(self, param_name, param_value):
OptionalParameterTypeWarning,
)

def next_in_enumeration(self, value):
return None


class OptionalParameter(OptionalParameterMixin, Parameter):
"""Class to parse optional parameters."""
Expand Down
Empty file added luigi/py.typed
Empty file.
4 changes: 2 additions & 2 deletions luigi/static/visualiser/js/graph.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ Graph = (function() {
$.each(n.deps, function(i, dep) {
if (nodeIndex[dep]) {
var next_node = nodes[nodeIndex[dep]]
var depth = (selfDependencies ? depth + 1 : classDepths[next_node.name])
placeNodes(next_node, depth);
var next_depth = (selfDependencies ? depth + 1 : classDepths[next_node.name])
placeNodes(next_node, next_depth);
}
});
}
Expand Down
31 changes: 21 additions & 10 deletions luigi/static/visualiser/js/test/graph_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,27 @@ module("graph.js");

test("nodeFromTask", function() {
var task = {
deps: ["B","C"],
taskId: "A",
status: "DONE"
deps: ["B1","C1"],
taskId: "A1",
status: "DONE",
name: "A",
params: {},
priority: 0,
};
var expected = {
taskId: "A",
taskId: "A1",
status: "DONE",
trackingUrl: "#A",
deps: ["B","C"],
depth: -1
trackingUrl: "#A1",
deps: ["B1","C1"],
depth: -1,
name: "A",
params: {},
priority: 0,
};
deepEqual(Graph.testableMethods.nodeFromTask(task), expected);
let graph = {
hashBase: "#"
}
deepEqual(Graph.testableMethods.nodeFromTask.bind(graph)(task), expected);
});

test("uniqueIndexByProperty", function() {
Expand Down Expand Up @@ -70,7 +79,7 @@ test("computeRowsSelfDeps", function () {
var rowSizes = Graph.testableMethods.computeRows(nodes, nodeIndex)
equal(A1.depth, 0)
equal(A2.depth, 1)
equal(rowSizes, [1, 1])
deepEqual(rowSizes, [1, 1])
});

test("computeRowsGrouped", function() {
Expand All @@ -83,6 +92,8 @@ test("computeRowsGrouped", function() {
var D2 = {name: "D", taskId: "D2", deps: [], depth: -1}
var E1 = {name: "E", taskId: "E1", deps: [], depth: -1}
var E2 = {name: "E", taskId: "E2", deps: [], depth: -1}
var nodes = [A0, B0, C1, C2, D0, D1, D2, E1, E2]
var nodeIndex = {"A0": 0, "B0": 1, "C1": 2, "C2": 3, "D0": 4, "D1": 5, "D2": 6, "E1": 7, "E2": 8}
var rowSizes = Graph.testableMethods.computeRows(nodes, nodeIndex)
equal(A0.depth, 0)
equal(B0.depth, 1)
Expand All @@ -93,7 +104,7 @@ test("computeRowsGrouped", function() {
equal(D2.depth, 3)
equal(E1.depth, 4)
equal(E2.depth, 4)
equal(rowSizes, [1, 1, 2, 3, 2])
deepEqual(rowSizes, [1, 1, 2, 3, 2])
});

test("createGraph", function() {
Expand Down
7 changes: 7 additions & 0 deletions luigi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ def wrapped(callback):
return callback
return wrapped

@classmethod
def remove_event_handler(cls, event, callback):
"""
Function to remove the event handler registered previously by the cls.event_handler decorator.
"""
cls._event_callbacks[cls][event].remove(callback)

def trigger_event(self, event, *args, **kwargs):
"""
Trigger that calls all of the specified events associated with this class.
Expand Down
4 changes: 2 additions & 2 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler, RetryPolicy
from luigi.scheduler import WORKER_STATE_ACTIVE, WORKER_STATE_DISABLED
from luigi.target import Target
from luigi.task import Task, Config, DynamicRequirements
from luigi.task import Task, Config, DynamicRequirements, flatten
from luigi.task_register import TaskClassException
from luigi.task_status import RUNNING
from luigi.parameter import BoolParameter, FloatParameter, IntParameter, OptionalParameter, Parameter, TimeDeltaParameter
Expand Down Expand Up @@ -185,7 +185,7 @@ def run(self):
missing = []
for dep in self.task.deps():
if not self.check_complete(dep):
nonexistent_outputs = [output for output in dep.output() if not output.exists()]
nonexistent_outputs = [output for output in flatten(dep.output()) if not output.exists()]
if nonexistent_outputs:
missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})')
else:
Expand Down
Loading

0 comments on commit 16b1487

Please sign in to comment.