luigi_tools.task

This module provides some specific luigi tasks and associated tools.

Classes

GlobalParamMixin(*args, **kwargs)

Mixin used to add customisable global parameters.

LogTargetMixin(*args, **kwargs)

Mixin used to log output paths of this task.

ParamRef(cls[, name, default])

Class to store parameter reference information.

RemoveCorruptedOutputMixin(*args, **kwargs)

Mixin used to remove incomplete outputs of a failed Task.

RerunMixin(*args, **kwargs)

Mixin used to force a task to run again by setting the 'rerun' parameter to True.

WorkflowTask(*args, **kwargs)

Default task used in workflows.

WorkflowWrapperTask(*args, **kwargs)

Base wrapper class with global parameters.

copy_params(**params_to_copy)

Copy a parameter from another Task.

Exceptions

DuplicatedParameterError

Exception raised when a parameter is duplicated in a task.

GlobalParameterNoValueError

Exception raised when the value of a global parameter can not be found.

exception luigi_tools.task.DuplicatedParameterError

Bases: Exception

Exception raised when a parameter is duplicated in a task.

class luigi_tools.task.GlobalParamMixin(*args, **kwargs)

Bases: object

Mixin used to add customisable global parameters.

For the tasks that inherit from this GlobalParamMixin, when a parameter is linked to another one and it value is None, its value is automatically replaced by the value of the linked parameter. See copy_params for details about parameter linking.

exception luigi_tools.task.GlobalParameterNoValueError

Bases: Exception

Exception raised when the value of a global parameter can not be found.

class luigi_tools.task.LogTargetMixin(*args, **kwargs)

Bases: object

Mixin used to log output paths of this task.

Note

To have an effect, this mixin must be placed to the left of the first class inheriting from luigi.Task in the base class list.

class luigi_tools.task.ParamRef(cls, name=None, default='__no_default_value__')

Bases: object

Class to store parameter reference information.

class luigi_tools.task.RemoveCorruptedOutputMixin(*args, **kwargs)

Bases: object

Mixin used to remove incomplete outputs of a failed Task.

The clean_failed parameter must be set True to enable this feature.

class luigi_tools.task.RerunMixin(*args, **kwargs)

Bases: object

Mixin used to force a task to run again by setting the ‘rerun’ parameter to True.

rerun = Trigger to force the task to rerun.(False)
class luigi_tools.task.WorkflowTask(*args, **kwargs)

Bases: GlobalParamMixin, RerunMixin, Task

Default task used in workflows.

This task can be forced running again by setting the ‘rerun’ parameter to True. It can also use copy and link parameters from other tasks.

class luigi_tools.task.WorkflowWrapperTask(*args, **kwargs)

Bases: WorkflowTask, WrapperTask

Base wrapper class with global parameters.

class luigi_tools.task.copy_params(**params_to_copy)

Bases: object

Copy a parameter from another Task.

This decorator takes kwargs where keys are the parameter names and the values are ParamRef instances.

If no default value is given to the ParamRef, two behaviours are possible:

  • If the task inherits from the GlobalParamMixin, the parameter is linked to the one of the base class. The default parameter value is then the actual value of the linked parameter.

  • If the task does not inherit from the GlobalParamMixin, the default value is copied from the linked parameter.

Differences with luigi.util.inherits:

  • luigi.util.inherits set all other class’ parameters that the current one is missing. It is not possible to select a subset of parameters.

  • luigi.util.inherits set the same value to the other class’ parameter than the current one’s parameter. This class does not set the same value, except for global parameters with no given value.

Usage:

class AnotherTask(luigi.Task):
    m = luigi.IntParameter(default=1)

@copy_params(m=ParamRef(AnotherTask))
class MyFirstTask(luigi.Task):
    def run(self):
       print(self.m) # this will be defined and print 1
       # ...

@copy_params(another_m=ParamRef(AnotherTask, "m"))
class MySecondTask(luigi.Task):
    def run(self):
       print(self.another_m) # this will be defined and print 1
       # ...

@copy_params(another_m=ParamRef(AnotherTask, "m", 5))
class MyFirstTask(luigi.Task):
    def run(self):
       print(self.another_m) # this will be defined and print 5
       # ...

@copy_params(another_m=ParamRef(AnotherTask, "m"))
class MyFirstTask(GlobalParamMixin, luigi.Task):
    def run(self):
       # this will be defined and print 1 if self.another_m is not explicitly set
       # (this means that self.another_m == luigi_tools.tasks._no_default_value)
       print(self.another_m)
       # ...

Warning

There is a limitation of using copy_params on a GlobalParamMixin. In this particular case, the task from which a parameter is copied must be called before the inheriting task (which does not mean it must be executed before, just task() is enough). This is due to the metaclass magic of luigi.