MNIST and TensorFlow
Running the Disdat MNIST Pipeline
We've shamelessly adapted the MNIST Tensorflow example. Here we've broken the example down into three steps (Disdat Tasks) all of which are in $DISDAT_HOME/examples/ex_pipelines/mnist.py
.
Get MNIST data:
class GetDataGz(PipeTask)
This downloads four gzip files and stores them in a bundle calledMNIST.data.gz
Train the model:
class Train(PipeTask)
This PipeTask depends on theGetDataGz
tasks, gets the gzip files, builds a Tensorflow graph and trains it. It stores the saved model into an output bundle calledMNIST.trained
.Evaluate accuracy:
class Evaluate(PipeTask)
: This PipeTask depends on both upstream tasks. It rebuilds the TF graph, restores the values, and evaluates the model. It returns a single accuracy float in its output bundleMNIST.eval
Train the classifier
NOTE: we could have run just the first step by:
dsdt apply pipelines.mnist.GetDataGz
and all three steps by:
dsdt apply pipelines.mnist.Evaluate
Inspect the training data and trained model
This produces two bundles: MNIST.data.gz and MNIST.trained.
The training data bundle presents as a dictionary, with the key signifying what kind of information the value (a file link) holds.
The trained model also presents as a dictionary, with a key save_files
pointing to a list of saved output files (note you can have directories in your bundle). And another key model_name
which just holds the name of the saved model.
Evaluate the trained model
Let's run the lst stage of the pipeline. Note that under the hood, Luigi is telling us that the GetDataGz and Train tasks are done.
Now you've produced three bundles. Let's look at evaluate task's final output bundle:
Pipeline Detail
Setting up dependencies
Let's look at the Train
task. Recall that a Disdat PipeTask consists of two functions: pipe_requires
and pipe_run
. Starting with pipe_requires()
pipe_requires
: This declares the tasks that must run before this task. The statementself.add_dependency("input_gzs", GetDataGz, params={})
says that the current task needs aGetDataGz
instance to run with no parameters. It also says that Disdat should setup the output of that task as a named parameter topipe_run
called'input_gzs'
.
Outputs
Next lets look into pipe_run
in more detail. Note that TensorFlow code is just Python TensorFlow code. No magic. Let's focus on lines 19 and 30. This will illustrate how tasks save state (via files) and how you implicitly tell Disdat to put those files in a bundle.
Bundles are designed primarily to wrap other data formats -- not re-invent them. Thus tasks typically produce one or more files as output (in whatever format they choose), and they pass the names of those files directly as return values or place them in lists, dictionaries, tuples, or simple Pandas dataframes.
However, Disdat manages your output paths for you -- you just need to name the files, not describe outputs with a cascade of custom directory names. That is, instead of: /Users/moonga/created/12_31_18/input_data/11_18_18/models/results.mdl
you only provide results.mdl
You can do this by:
Calling
self.create_output_file("results.mdl")
: this returns aluigi.LocalTarget
object. You can open and write to it.Or, like line 19 above, you can call
self.get_output_dir()
: This returns a fully-qualified path to your bundle's output directory. You can place files directly into this directory. This is handy because some libraries, like TensorFlow'sSaver
object, just want an output directory -- it's fragile and difficult to enumerate all the outputs ahead of time.
Note: You can always just create files wherever you wish, i.e., on your local machine or in S3. And you can place the fully qualified paths in your output bundle. In this case, Disdat will copy-in your external files into your data context.
The benefit of using create_output_file
and get_output_dir
is that there is no data copying when Disdat creates the bundle.
When you're done making files, you still need to return the file paths (if you want them to be in your output bundle). You can return:
The
luigi.LocalTarget
: Disdat knows how to interpet them.The full paths of any file, e.g.,
os.path.join(self.get_output_dir(), "my_results.txt")
A directory (line 30 above): Disdat will include files in a sub-directory automatically (one-level deep).
In Disdat, the pipe_run
function may return scalars, lists, dictionaries, tuples, Numpy ndarrays, or Pandas DataFrames (see Task Return Types). Downstream tasks will be called with the same type. Under the hood, Disdat bundles those data types, storing them as Protocol Buffers in a managed directory (typically ~/.disdat
).
Pushing outputs to S3 (optional)
Finally, let's say that you're ready to share the training data, model, and results. then we are going to commit each of our output bundles. Committing is simply setting a flag that tells Disdat that you like this version so much, you're willing to put it up on S3.
Now let's push each bundle up to our remote context:
Now all of your data is safely on S3. To illustrate, let's delete our local copies and pull it back. Note that we are using the -l
flag to tell Disdat to download (localize) our data from S3 to the local file system.
You can see all of your bundles using dsdt ls -v
Dockerize MNIST
Change to the examples directory. This is what contains the examples pipeline's setup.py file. Let's create the container and push it up to AWS ECR as well:
Assuming you have set up your AWS Batch queue in the Disdat configuration file, go ahead and run the container.
Check your AWS Batch queue to see the job progress from ready to finished. Then pull the bundles and list out the committed (-c) bundles that begin with MNIST.*
How to run Disdat AWS Batch tasks:
Default: By default Disdat will user your credentials in ~/.aws/credentials. If they are standard credentials (not session tokens), then Disdat will create a token (expiring in 43200 seconds) to give your container sufficient privileges.
AWS session token: In the case that Disdat finds a session token in your credentials file, it will submit the job with that token. If your org has a short time limit, then your jobs might fail when they access AWS resources. Typically this happens when Disdat writes your results back to S3. Bummer.
AWS IAM roles: To avoid short tokens, create a role using IAM, and pass it in to the dsdt run
command. IAM is a pain, but if you don't have an SRE to help you here, you should read up on roles.
Creating a role:
Go to IAM on the AWS Console and choose create role.
Select
AWS Service
as the trusted entityChoose
Elastic Container Service
as your service use caseChoose
Elastic Container Service
Task
Hit next for permissions
Search for
S3
and then chooseAmazonS3FullAccess
Hit tags, and enter some tags. I like ‘owner’ and ‘project’ at a minimum
Give your role a name and description
At the end you should see your role, copy the ARN to your clipboard
dsdt run --job-role-arn <YOUR ROLE ARN> --backend AWSBatch . pipelines.dependent_tasks.B
Last updated
Was this helpful?