Run the Pipeline

This section will teach you:

  • How to run a pipeline

  • How Disdat automatically versions data from each task in a pipeline

  • How to add parameters to your pipeline

Running the Pipeline

Using the CLI

Use Disdat's apply command. Pass it the <package>.<module>.<class> that you wish to run.

$dsdt apply pipelines.dependent_tasks.B

And look at the output:

INFO: Informed scheduler that task   DriverTask_False______839e56525b   has status   PENDING
INFO: Informed scheduler that task   B____ca7a191361   has status   PENDING
INFO: Informed scheduler that task   A____ca7a191361   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) running   A(output_tags={})
-------------------
A is Running!
-------------------

INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) done      A(output_tags={})
INFO: Informed scheduler that task   A____ca7a191361   has status   DONE
INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) running   B(output_tags={})
-------------------
B is Running!
-------------------

INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) done      B(output_tags={})
INFO: Informed scheduler that task   B____ca7a191361   has status   DONE
INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) running   DriverTask(output_bundle=-, pipe_params={}, pipe_cls=<class 'pipelines.2_dependent-tasks.B'>, input_tags={}, output_tags={}, force=False)
INFO: [pid 17720] Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) done      DriverTask(output_bundle=-, pipe_params={}, pipe_cls=<class 'pipelines.2_dependent-tasks.B'>, input_tags={}, output_tags={}, force=False)
INFO: Informed scheduler that task   DriverTask_False______839e56525b   has status   DONE
INFO: Worker Worker(salt=887666246, workers=1, host=INTUL173d00074, username=kyocum, pid=17720) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 A(...)
    - 1 B(...)
    - 1 DriverTask(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

We love output strings that look like "This progress looks :)" !

This means that we ran Task A, then Task B ,and the Disdat "driver" Task completed as well. The driver task is just their to coordinate the workflow.

Using the API in a notebook

You could also have run the same code via (the notebook is on github here):

import disdat.api as api
from pipelines.dependent_tasks import B

api.apply(data_context, B)

Look at the versioned data bundles

$dsdt ls
b
a
my.bundle

And what's inside the final output bundle b. Recall that Task B returns 2**2

$dsdt cat b
4

Last updated