Simple Pipeline
Using examples from https://github.com/seanr15/disdat-examples
This section will teach you:
Writing simple pipelines using
PipeTask
classes andpipe_requires()
andpipe_run()
methods.How to programmatically set the bundles human name in
pipe_requires()
How to specify dependencies (both normal and external)
Disdat supported Task return types, or how bundles present to pipeline tasks
Setting up an example Python project
Clone our examples github repo (https://github.com/seanr15/disdat-examples) here.
We'll assume you've installed it in
$CODE
Change directories into your project:
cd $CODE
Assuming you are in your virtual environment, install the example project:
pip install -e .
A two-step pipeline:
We will look at a simple two-step pipeline (the file is on github here). Here Task B runs after task A. In Luigi task terms, task B requires task A.
from disdat.pipe import PipeTask
class A(PipeTask):
def pipe_requires(self):
self.set_bundle_name('a') # <-- Each time A runs, it produces bundle 'a'
def pipe_run(self):
print('-------------------')
print('A is Running!')
print('-------------------')
print()
return 2 # <-- Files, literals, flat collections, and pd.dataframes may be returned.
class B(PipeTask):
def pipe_requires(self):
self.set_bundle_name('b')
self.add_dependency('a', A, params={}) # <-- B needs A to run with no params
def pipe_run(self, a): # <-- Upstream bundles appear as parameters
print('-------------------')
print('B is Running!')
print('-------------------')
print()
return a ** 2
Observations
Disdat defines tasks using instances of
PipeTask
Each Task declares the upstream Tasks that must complete before it runs. They do so by adding dependencies in
`pipe_requires()`
In
pipe_requires()
one can:Add dependencies via:
self.add_dependency(<arg name>, PipeTask class, params dictionary)
Semantic: If the output bundle from A does not exist, re-run task A
self.add_external_dependency(<arg name>, PipeTask class, params dictionary)
Semantic: Only look for this upstream bundle, but fail if it does not exist. Equivalent to a Luigi ExternalTask.
Set the bundle's human name via:
self.set_bundle_name('b')
pipe_run()
one can:Do whatever you want
Return values from which Disdat will create a bundle
Note: the return type of a task is called the output bundle's presentation.
Here all the bundles present as integer literals.
See more about Disdat Task Return Types.
Last updated
Was this helpful?