Simple Pipeline

Using examples from https://github.com/seanr15/disdat-examples

This section will teach you:

  • Writing simple pipelines using PipeTask classes and pipe_requires() and pipe_run() methods.

  • How to programmatically set the bundles human name in pipe_requires()

  • How to specify dependencies (both normal and external)

  • Disdat supported Task return types, or how bundles present to pipeline tasks

circle-info

Note: We didn't want to build "yet another workflow system", so we built on Luigi.

Pro Tip: It may be helpful to review a bit about how Spotify's Luigi defines pipelines. Specifically a bit about workflowsarrow-up-right, the bit about run and requires in Tasksarrow-up-right, and finally some information on Luigi parametersarrow-up-right.

Setting up an example Python project

  1. Clone our examples github repo (https://github.com/seanr15/disdat-examples) here.arrow-up-right

  2. We'll assume you've installed it in $CODE

  3. Change directories into your project:cd $CODE

  4. Assuming you are in your virtual environment, install the example project: pip install -e .

A two-step pipeline:

We will look at a simple two-step pipeline (the file is on github herearrow-up-right). Here Task B runs after task A. In Luigi task termsarrow-up-right, task B requires task A.

circle-info

If you were writing Luigi tasks, you'd define requires(), run(), and output()for each Task. But Disdat manages outputs for you, so you no longer need to write an output() method.

Observations

  • Disdat defines tasks using instances of PipeTask

  • Each Task declares the upstream Tasks that must complete before it runs. They do so by adding dependencies in `pipe_requires()`

  • In pipe_requires() one can:

    • Add dependencies via:

      • self.add_dependency(<arg name>, PipeTask class, params dictionary)

        • Semantic: If the output bundle from A does not exist, re-run task A

      • self.add_external_dependency(<arg name>, PipeTask class, params dictionary)

    • Set the bundle's human name via: self.set_bundle_name('b')

  • pipe_run() one can:

    • Do whatever you want

    • Return values from which Disdat will create a bundle

    • Note: the return type of a task is called the output bundle's presentation.

    • Here all the bundles present as integer literals.

    • See more about Disdat Task Return Types.

Last updated

Was this helpful?