This package extends and adds new features to the luigi package. Here are a few examples of these features:
add a new
BoolParameterthat automatically switch to explicit parsing when the default value is
True(otherwise it is not possible to set it to
Falseusing the CLI).
add several types of optional parameters.
OutputLocalTargetclass to help building an output tree.
add a mixin that adds a
--rerunparameter that forces a given task to run again even if its targets exist, and also rerun all the tasks that depend on this one.
add a mixin to remove the output of failed tasks which is likely to be corrupted or incomplete. This feature applies the default behaviour of a snakemake rule (Task).
add a new
@copy_paramsmechanism to copy the parameters from a task to another (the
@inheritsgives the same object to all the inheriting tasks while
@copy_paramsonly copies the definition of the parameter so each inheriting task can be given a different value).
add functions to get and display the dependency graph of a given task.
add a mechanism to setup templates for the
luigi.cfgfiles, so the user just has to update specific values instead of copying the entire
Please read the complete API documentation for more details.
This package should be installed using pip:
pip install luigi-tools
The Luigi package describes itself as follow:
Luigi is a Python (2.7, 3.6, 3.7 tested) package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.
The luigi-tools package is supposed to make luigi easier for developers. The following presents a few examples of the main features of the package.
can be parsed in two ways: implicit or explicit. The explicit way requires the user to enter a
False. On the contrary, the implicit way requires no value and will just set
the value to
True if the parameter is given. This is not compatible with a default value set to
True, as it is not possible to set the value to
False using the CLI in this case.
If you want to automatically set the parsing to explicit when the default value is
from luigi.task import Task from luigi_tools import BoolParameter class MyTask(Task): a_boolean_parameter = BoolParameter(default=True) def run(self): pass
Target with prefix¶
The Luigi workflows are based on
Target object that represents the state of a step
of the workflow. These targets can be anything but are often files in a result directory tree. In
order to not having to specify the result directory to each target, one can use the
OutputLocalTarget class and give it a
prefix. So all targets based on this class will be
located in the same directory.
from luigi.task import Task from luigi_tools.target import OutputLocalTarget class MyTask(Task): def run(self): pass def output(self): # The target will point to the file result_directory/filename.ext return OutputLocalTarget("filename.ext") # Set the default prefix (it could also be called inside another Task) OutputLocalTarget.set_default_prefix("result_directory") # Run the task (the task can also be called with the CLI as usual) luigi.build([MyTask()], local_scheduler=True)
In Luigi, the states of the tasks are deducted from their targets. If the targets exist, the task
is assumed to have already been completed and is thus skipped if the workflow is run again. This
behavior is usually good to avoid performing computations that are already completed. Nevertheless,
sometimes it is desirable to overwrite a former result, especially during the development process.
For this reason, a mixin that adds a
--rerun parameter to a task is introduced. When this
parameter is set to
True, all the targets of this task are deleted as well as the targets of the
tasks that depend on this one. So when all the tasks that are related to this task will run again.
As for any mixin, it must be go on the left of the
Task class in the inheritance list.
from luigi.task import Task from luigi_tools.task import RerunMixin class MyTask(RerunMixin, Task): def run(self): pass
Now the task
MyTask has a boolean parameter
--rerun which can be called in the CLI:
luigi -m my_module mytask --rerun luigi -m my_module another_task_that_depends_on_mytask --MyTask-rerun
Clear the output of failed tasks¶
When a task fails unexpectedly, it may leave an incomplete or corrupted output that leads to wrong results in the downstream. With the RemoveCorruptedOutputMixin, Luigi automatically removes the output targets of the tasks that failed. This is the default behaviour in other workflow management systems such as Snakemake.
from luigi_tools.task import RemoveCorruptedOutputMixin class TaskA(RemoveCorruptedOutputMixin, luigi.Task): """TaskA can remove its output upon failure.""" pass
false by default and it must explicitly be set to
This allows users to set it to false to debug the output without changing the code.
luigi -m my_module TaskA --clean_failed true
In some situations, several tasks have a few parameters in common. This can lead to painful
situations, and luigi provides some dedicated tools to deal with this,
as described here.
Nevertheless, the tools provided by Luigi have a major drawback: all the tasks with
the inherited parameter will have the same value for this parameter. In some situations, one want
to be able to give different values to a task with an inherited parameter, especially during the
development process. This is possible with the
from luigi.task import Task from luigi_tools.task import copy_params class TaskA(Task): a = luigi.Parameter(default="default_value_a") @luigi_tools.task.copy_params( a=luigi_tools.task.ParamRef(TaskA) ) class TaskB(Task): b = luigi.Parameter(default="b")
Here the class
TaskB has two parameters:
default_value_aas default value.
bas default value.
It also possible to change the name of the parameter or to change the default value:
from luigi.task import Task from luigi_tools.task import copy_params class TaskA(Task): a = luigi.Parameter(default="default_value_a") @luigi_tools.task.copy_params( a=luigi_tools.task.ParamRef(TaskA), aa=luigi_tools.task.ParamRef(TaskA, "a"), a_default=luigi_tools.task.ParamRef(TaskA, "a", "given_default_value"), a_none=luigi_tools.task.ParamRef(TaskA, "a", None), ) class TaskB(Task): b = luigi.Parameter(default="b")
In this case the class
TaskB has 5 parameters:
default_value_aas default value.
aas default value.
given_default_valueas default value.
Noneas default value.
bas default value.
Note that the second parameter of
ParamRef is the name of the inherited parameter in the parent
class. If it is not given, it is supposed that the parameter has the same name in both the
inheriting and the parent classes.
In addition to the
@copy_params decorator, it is possible to use the
A task with this mixin has a new feature for the parameters inherited using
@copy_params: if the
default value is not changed in
ParamRef and if no specific value is given for the task, then the
task would take the same value as one of the inherited parameter. This combination of the
@copy_params decorator and
GlobalParamMixin allows many ways of dealing with the parameters.
from luigi.task import Task from luigi_tools.task import copy_params from luigi_tools.task import GlobalParamMixin class TaskA(Task): a = luigi.Parameter(default="default_value_a") @luigi_tools.task.copy_params( a=luigi_tools.task.ParamRef(TaskA) ) class TaskB(GlobalParamMixin, Task): b = luigi.Parameter(default="b")
TaskB is called with the following configuration:
[TaskA] a = "value for a" [TaskB] b = "value for b"
then the parameter
TaskB has the value
value for a.
TaskB did not inherit from
GlobalParamMixin, then it would have the value
luigi-tools package provides several functions to get the dependency graph of a task and to
render it using GraphViz. This can be very useful to show how the tasks of a workflow are
Funding & Acknowledgment¶
The development of this software was supported by funding to the Blue Brain Project, a research center of the École polytechnique fédérale de Lausanne (EPFL), from the Swiss government’s ETH Board of the Swiss Federal Institutes of Technology.
For license and authors, see
Copyright © 2021 Blue Brain Project/EPFL