LogoLogo
  • Overview
  • Setup and Configuration
  • Other Data Versioning Systems
  • Examples
    • Tutorial
      • Creating Bundles with the Python API
      • Push/Pull using S3
      • Simple Pipeline
      • Run the Pipeline
      • Dockerize a Pipeline
      • Run the Pipeline Container (locally)
      • Run the Pipeline Container (AWS)
    • Examples
      • MNIST and TensorFlow
      • Spacy Task
  • Basic Concepts
    • Bundles
      • Naming
      • Bundle Data Types
      • Tags and Parameters
      • Lineage (or Bundle Metadata)
    • Data Contexts
  • Reference
    • CLI Reference
      • dsdt add
      • dsdt apply
      • dsdt cat
      • dsdt context
      • dsdt commit
      • dsdt dockerize
      • dsdt init
      • dsdt lineage
      • dsdt ls
      • dsdt pull
      • dsdt push
      • dsdt remote
      • dsdt rm
      • dsdt rmr
      • dsdt switch
    • Python API
  • Details
  • Building Pipelines
  • Running Pipelines on AWS
  • Admin
    • Contact / Slack
Powered by GitBook
On this page
  • This section will teach you:
  • Setting up an example Python project
  • A two-step pipeline:
  • Observations

Was this helpful?

  1. Examples
  2. Tutorial

Simple Pipeline

Using examples from https://github.com/seanr15/disdat-examples

PreviousPush/Pull using S3NextRun the Pipeline

Last updated 5 years ago

Was this helpful?

This section will teach you:

  • Writing simple pipelines using PipeTask classes and pipe_requires() and pipe_run() methods.

  • How to programmatically set the bundles human name in pipe_requires()

  • How to specify dependencies (both normal and external)

  • Disdat supported Task return types, or how bundles present to pipeline tasks

Note: We didn't want to build "yet another workflow system", so we built on Luigi.

Pro Tip: It may be helpful to review a bit about how Spotify's Luigi defines pipelines. Specifically a bit about , the bit about run and requires in , and finally some information on Luigi .

Setting up an example Python project

  1. Clone our examples github repo (https://github.com/seanr15/disdat-examples)

  2. We'll assume you've installed it in $CODE

  3. Change directories into your project:cd $CODE

  4. Assuming you are in your virtual environment, install the example project: pip install -e .

A two-step pipeline:

We will look at a simple two-step pipeline (the file is on github ). Here Task B runs after task A. In , task B requires task A.

If you were writing Luigi tasks, you'd define requires(), run(), and output()for each Task. But Disdat manages outputs for you, so you no longer need to write an output() method.

from disdat.pipe import PipeTask

class A(PipeTask):
    def pipe_requires(self):
        self.set_bundle_name('a') # <-- Each time A runs, it produces bundle 'a'

    def pipe_run(self):
        print('-------------------')
        print('A is Running!')
        print('-------------------')
        print()

        return 2 # <-- Files, literals, flat collections, and pd.dataframes may be returned.


class B(PipeTask):
    def pipe_requires(self):
        self.set_bundle_name('b')
        self.add_dependency('a', A, params={}) # <-- B needs A to run with no params

    def pipe_run(self, a): # <-- Upstream bundles appear as parameters
        print('-------------------')
        print('B is Running!')
        print('-------------------')
        print()

        
        return a ** 2

Observations

  • Disdat defines tasks using instances of PipeTask

  • Each Task declares the upstream Tasks that must complete before it runs. They do so by adding dependencies in `pipe_requires()`

  • In pipe_requires() one can:

    • Add dependencies via:

      • self.add_dependency(<arg name>, PipeTask class, params dictionary)

        • Semantic: If the output bundle from A does not exist, re-run task A

      • self.add_external_dependency(<arg name>, PipeTask class, params dictionary)

    • Set the bundle's human name via: self.set_bundle_name('b')

  • pipe_run() one can:

    • Do whatever you want

    • Return values from which Disdat will create a bundle

    • Note: the return type of a task is called the output bundle's presentation.

    • Here all the bundles present as integer literals.

Semantic: Only look for this upstream bundle, but fail if it does not exist. Equivalent to a .

See more about .

workflows
Tasks
parameters
here.
here
Luigi task terms
Luigi ExternalTask
Disdat Task Return Types