Run the Pipeline
This section will teach you:
How to run a pipeline
How Disdat automatically versions data from each task in a pipeline
How to add parameters to your pipeline
Running the Pipeline
Using the CLI
Use Disdat's apply
command.
Pass it the <package>.<module>.<class>
that you wish to run.
$dsdt apply pipelines.dependent_tasks.B
And look at the output:
INFO: Informed scheduler that task DriverTask_False______839e56525b has status PENDING
INFO: Informed scheduler that task B____ca7a191361 has status PENDING
INFO: Informed scheduler that task A____ca7a191361 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) running A(output_tags={})
-------------------
A is Running!
-------------------
INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) done A(output_tags={})
INFO: Informed scheduler that task A____ca7a191361 has status DONE
INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) running B(output_tags={})
-------------------
B is Running!
-------------------
INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) done B(output_tags={})
INFO: Informed scheduler that task B____ca7a191361 has status DONE
INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) running DriverTask(output_bundle=-, pipe_params={}, pipe_cls=<class 'pipelines.2_dependent-tasks.B'>, input_tags={}, output_tags={}, force=False)
INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) done DriverTask(output_bundle=-, pipe_params={}, pipe_cls=<class 'pipelines.2_dependent-tasks.B'>, input_tags={}, output_tags={}, force=False)
INFO: Informed scheduler that task DriverTask_False______839e56525b has status DONE
INFO: Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 3 ran successfully:
- 1 A(...)
- 1 B(...)
- 1 DriverTask(...)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
We love output strings that look like "This progress looks :)" !
This means that we ran Task A, then Task B ,and the Disdat "driver" Task completed as well. The driver task is just their to coordinate the workflow.
Using the API in a notebook
You could also have run the same code via (the notebook is on github here):
import disdat.api as api
from pipelines.dependent_tasks import B
api.apply(data_context, B)
Look at the versioned data bundles
$dsdt ls
b
a
my.bundle
And what's inside the final output bundle b.
Recall that Task B returns 2**2
$dsdt cat b
4
Last updated
Was this helpful?