Simple Pipeline

Using examples from https://github.com/seanr15/disdat-examples

This section will teach you:

  • Writing simple pipelines using PipeTask classes and pipe_requires() and pipe_run() methods.

  • How to programmatically set the bundles human name in pipe_requires()

  • How to specify dependencies (both normal and external)

  • Disdat supported Task return types, or how bundles present to pipeline tasks

Note: We didn't want to build "yet another workflow system", so we built on Luigi.

Pro Tip: It may be helpful to review a bit about how Spotify's Luigi defines pipelines. Specifically a bit about workflows, the bit about run and requires in Tasks, and finally some information on Luigi parameters.

Setting up an example Python project

  1. Clone our examples github repo (https://github.com/seanr15/disdat-examples) here.

  2. We'll assume you've installed it in $CODE

  3. Change directories into your project:cd $CODE

  4. Assuming you are in your virtual environment, install the example project: pip install -e .

A two-step pipeline:

We will look at a simple two-step pipeline (the file is on github here). Here Task B runs after task A. In Luigi task terms, task B requires task A.

If you were writing Luigi tasks, you'd define requires(), run(), and output()for each Task. But Disdat manages outputs for you, so you no longer need to write an output() method.

from disdat.pipe import PipeTask

class A(PipeTask):
    def pipe_requires(self):
        self.set_bundle_name('a') # <-- Each time A runs, it produces bundle 'a'

    def pipe_run(self):
        print('-------------------')
        print('A is Running!')
        print('-------------------')
        print()

        return 2 # <-- Files, literals, flat collections, and pd.dataframes may be returned.


class B(PipeTask):
    def pipe_requires(self):
        self.set_bundle_name('b')
        self.add_dependency('a', A, params={}) # <-- B needs A to run with no params

    def pipe_run(self, a): # <-- Upstream bundles appear as parameters
        print('-------------------')
        print('B is Running!')
        print('-------------------')
        print()

        
        return a ** 2

Observations

  • Disdat defines tasks using instances of PipeTask

  • Each Task declares the upstream Tasks that must complete before it runs. They do so by adding dependencies in `pipe_requires()`

  • In pipe_requires() one can:

    • Add dependencies via:

      • self.add_dependency(<arg name>, PipeTask class, params dictionary)

        • Semantic: If the output bundle from A does not exist, re-run task A

      • self.add_external_dependency(<arg name>, PipeTask class, params dictionary)

        • Semantic: Only look for this upstream bundle, but fail if it does not exist. Equivalent to a Luigi ExternalTask.

    • Set the bundle's human name via: self.set_bundle_name('b')

  • pipe_run() one can:

    • Do whatever you want

    • Return values from which Disdat will create a bundle

    • Note: the return type of a task is called the output bundle's presentation.

    • Here all the bundles present as integer literals.

    • See more about Disdat Task Return Types.

Last updated