# MNIST and TensorFlow

## Running the Disdat MNIST Pipeline

We've shamelessly adapted the[ MNIST Tensorflow example](https://www.tensorflow.org/get_started/mnist/pros).  Here we've broken the example down into three steps (Disdat *Tasks*) all of which are in `$DISDAT_HOME/examples/ex_pipelines/mnist.py`.

1. Get MNIST data: `class GetDataGz(PipeTask)` This downloads four gzip files and stores them in a bundle called `MNIST.data.gz`
2. Train the model:  `class Train(PipeTask)`This PipeTask depends on the `GetDataGz` tasks, gets the gzip files, builds a Tensorflow graph and trains it. It stores the saved model into an output bundle called `MNIST.trained`.
3. Evaluate accuracy: `class Evaluate(PipeTask)`: This PipeTask depends on both upstream tasks. It rebuilds the TF graph, restores the values, and evaluates the model. It returns a single accuracy float in its output bundle `MNIST.eval`

### Train the classifier

```bash
$dsdt apply pipelines.mnist.Train
INFO: Informed scheduler that task   DriverTask_False______48a9755ee1   has status   PENDING
INFO: Informed scheduler that task   Train__99914b932b   has status   PENDING
INFO: Informed scheduler that task   GetDataGz__99914b932b   has status   PENDING
INFO: Done scheduling tasks
[... more output ...]
Successfully downloaded train-images-idx3-ubyte.gz 9912422 bytes.
Successfully downloaded train-labels-idx1-ubyte.gz 28881 bytes.
Successfully downloaded t10k-images-idx3-ubyte.gz 1648877 bytes.
Successfully downloaded t10k-labels-idx1-ubyte.gz 4542 bytes.
[... more output ...]

End training.
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 DriverTask(...)
    - 1 GetDataGz(...)
    - 1 Train(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====
```

{% hint style="warning" %}
NOTE: we could have run just the first step by:

`dsdt apply pipelines.mnist.GetDataGz`

and all three steps by:

`dsdt apply pipelines.mnist.Evaluate`
{% endhint %}

### Inspect the training data and trained model

This produces two bundles: **MNIST.data.gz** and **MNIST.trained**.  &#x20;

The training data bundle [*presents*](broken://pages/-LySfaCdVz-72slAbHvL) as a dictionary, with the key signifying what kind of information the value (a file link) holds.  &#x20;

```bash
$dsdt cat MNIST.data.gz
{'train-images': ['/Users/kyocum/.disdat/context/examples/objects/10beb5e2-3c07-4a4c-a1f7-3c2b0e2407f7/train-images-idx3-ubyte.gz'], 
'train-labels': ['/Users/kyocum/.disdat/context/examples/objects/10beb5e2-3c07-4a4c-a1f7-3c2b0e2407f7/train-labels-idx1-ubyte.gz'], 
't10k-images': ['/Users/kyocum/.disdat/context/examples/objects/10beb5e2-3c07-4a4c-a1f7-3c2b0e2407f7/t10k-images-idx3-ubyte.gz'], 
't10k-labels': ['/Users/kyocum/.disdat/context/examples/objects/10beb5e2-3c07-4a4c-a1f7-3c2b0e2407f7/t10k-labels-idx1-ubyte.gz']}
```

The trained model also [*presents*](broken://pages/-LySfaCdVz-72slAbHvL) as a dictionary, with a key `save_files` pointing to a list of saved output files (note you can have directories in your bundle).  And another key `model_name` which just holds the name of the saved model.&#x20;

```bash
$dsdt cat MNIST.trained
{'save_files': array(['/Users/kyocum/.disdat/context/examples/objects/53f75bd7-6841-47ca-8f8b-904c5f02dada/MNIST/MNIST_tf_model.index',
       '/Users/kyocum/.disdat/context/examples/objects/53f75bd7-6841-47ca-8f8b-904c5f02dada/MNIST/checkpoint',
       '/Users/kyocum/.disdat/context/examples/objects/53f75bd7-6841-47ca-8f8b-904c5f02dada/MNIST/MNIST_tf_model.data-00000-of-00001',
       '/Users/kyocum/.disdat/context/examples/objects/53f75bd7-6841-47ca-8f8b-904c5f02dada/MNIST/MNIST_tf_model.meta'],
      dtype='<U124'), 'model_name': ['MNIST_tf_model']}
```

### Evaluate the trained model

Let's run the lst stage of the pipeline.  Note that under the hood, Luigi is telling us that the GetDataGz and Train tasks are done.&#x20;

```bash
$dsdt apply -f pipelines.mnist.Evaluate
INFO: Informed scheduler that task   DriverTask_False______a33b532760   has status   PENDING
INFO: Informed scheduler that task   Evaluate__99914b932b   has status   PENDING
INFO: Informed scheduler that task   GetDataGz__99914b932b   has status   DONE
INFO: Informed scheduler that task   Train__99914b932b   has status   DONE
[ ... ]
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 2 complete ones were encountered:
    - 1 GetDataGz(...)
    - 1 Train(...)
* 2 ran successfully:
    - 1 DriverTask(...)
    - 1 Evaluate(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====
```

Now you've produced three bundles.   Let's look at evaluate task's final output bundle:

```bash
$dsdt cat MNIST.eval
0.9203000068664551
```

## Pipeline Detail

### Setting up dependencies

Let's look at the `Train` task.   Recall that a Disdat PipeTask consists of two functions: `pipe_requires` and `pipe_run`.   Starting with `pipe_requires()`

```python
class Train(PipeTask):
    """ Pipe Task 2
    Train the softmax layer.
    Returns:
        (dict): all the model files in 'save_files' and the name of the dir in 'save_dir'
    """

    def pipe_requires(self):
        """ Depend on the gzip files being downloaded  """
        self.add_dependency("input_gzs", GetDataGz, {})
        self.set_bundle_name("MNIST.trained")
...
```

* `pipe_requires`: This declares the tasks that must run before this task. The statement `self.add_dependency("input_gzs", GetDataGz, params={})` says that the current task needs a `GetDataGz` instance to run with no parameters. It also says that Disdat should setup the output of that task as a named parameter to `pipe_run` called `'input_gzs'`.

### Outputs

Next lets look into `pipe_run` in more detail.   Note that TensorFlow code is just Python TensorFlow code.  No magic.   Let's focus on lines 19 and 30.   This will illustrate how tasks save state (via files) and how you implicitly tell Disdat to put those files in a bundle.&#x20;

```python
    def pipe_run(self, input_gzs=None):
        print("Beginning training . . . ")
    
        # Bring the data in as an mnist tutorial dataset
        mnist = convert_to_data_sets(input_gzs, one_hot=True)
    
        g = tf.Graph()
        with g.as_default():
            train_step, cross_entropy, x, y_, _ = make_model()
            saver = tf.train.Saver()
    
        with tf.Session(graph=g) as sess:
            sess.run(tf.global_variables_initializer())
    
            for _ in range(1000):
                batch = mnist.train.next_batch(100)
                train_step.run(feed_dict={x: batch[0], y_: batch[1]})
    
----------->save_dir = os.path.join(self.get_output_dir(), 'MNIST')
            saver.save(sess, os.path.join(save_dir, 'MNIST_tf_model'))
    
        print("End training.")
    
        # Note 1: When returning a dictionary, disdat requires you to use a sequence
        # in the value.
    
        # Note 2: You can return file paths, directories, or luigi.LocalTarget.
        # If a directory, Disdat takes all files directly under the directory.
    
------->return {'save_files': [save_dir], 'model_name': ['MNIST_tf_model']}
```

Bundles are designed primarily to *wrap* other data formats -- not re-invent them. Thus tasks typically produce one or more files as output (in whatever format they choose), and they pass the names of those files directly as return values or place them in lists, dictionaries, tuples, or simple Pandas dataframes.&#x20;

However, Disdat manages your output paths for you -- you just need to name the files, not describe outputs with a cascade of custom directory names.   That is, instead of: **`/Users/moonga/created/12_31_18/input_data/11_18_18/models/results.mdl`**  you only provide **`results.mdl`** You can do this by:

* Calling `self.create_output_file("results.mdl")`: this returns a `luigi.LocalTarget` object.  You can open and write to it.
* Or, like line 19 above, you can call `self.get_output_dir()`:  This returns a fully-qualified path to your bundle's output directory.  You can place files directly into this directory.  This is handy because some libraries, like TensorFlow's `Saver` object, just want an output directory -- it's fragile and difficult to enumerate all the outputs ahead of time.&#x20;

{% hint style="warning" %}
Note: You can always just create files wherever you wish, i.e., on your local machine or in S3.   And you can place the fully qualified paths in your output bundle.   In this case, Disdat will ***copy-in*** your external files into your data context.   &#x20;

The benefit of using `create_output_file` and `get_output_dir` is that there is ***no*** data copying when Disdat creates the bundle.  &#x20;
{% endhint %}

When you're done making files, you still need to return the file paths (if you want them to be in your output bundle). You can return:

* The `luigi.LocalTarget`: Disdat knows how to interpet them.
* The full paths of any file, e.g., `os.path.join(self.get_output_dir(), "my_results.txt")`
* A directory (line 30 above): Disdat will include files in a sub-directory automatically (one-level deep).

In Disdat, the `pipe_run` function [may return](/disdat-documentation/basic-concepts/bundles/untitled.md) scalars, lists, dictionaries, tuples, Numpy ndarrays, or Pandas DataFrames (see Task Return Types). Downstream tasks will be called with the same type. Under the hood, Disdat bundles those data types, storing them as Protocol Buffers in a managed directory (typically `~/.disdat`).

## Pushing outputs to S3 (optional)

Finally, let's say that you're ready to share the training data, model, and results. then we are going to *commit* each of our output bundles. Committing is simply setting a flag that tells Disdat that you like this version so much, you're willing to put it up on S3.&#x20;

```
$ dsdt commit MNIST.eval; dsdt commit MNIST.data.gz; dsdt commit MNIST.trained
```

Now let's push each bundle up to our remote context:&#x20;

```
$ dsdt push MNIST.eval; dsdt push MNIST.data.gz; dsdt push MNIST.trained
```

Now all of your data is safely on S3. To illustrate, let's delete our local copies and pull it back.  Note that we are using the `-l` flag to tell Disdat to download (localize) our data from S3 to the local file system.&#x20;

```
$ dsdt rm --all MNI.*
$ dsdt pull -l MNIST.eval; dsdt pull -l MNIST.data.gz; dsdt pull -l MNIST.trained
```

You can see all of your bundles using `dsdt ls -v`

## Dockerize MNIST

Change to the examples directory.  This is what contains the examples pipeline's setup.py file.  Let's create the container and push it up to AWS ECR as well:

```bash
$cd $CODE
$dsdt dockerize --push .
```

Assuming you have set up your AWS Batch queue in the Disdat configuration file, go ahead and run the container.

```bash
$dsdt run --backend AWSBatch . pipelines.mnist.Evaluate
```

Check your AWS Batch queue to see the job progress from ready to finished.  Then pull the bundles and list out the committed (-c) bundles that begin with `MNIST.*`

```bash
$dsdt pull
$dsdt ls -vc MNIST.*
NAME                	PROC_NAME           	OWNER   	DATE              	COMMITTED	UUID                                    	TAGS
MNIST.eval          	Evaluate__99914b932b	root    	02-01-20 11:14:32 	True    	22eb7d23-ee8b-4985-87d6-fa05dc3aea53
MNIST.trained       	Train__99914b932b   	root    	02-01-20 11:14:31 	True    	3ea79fb6-9641-499f-9cc1-89ed23740e8e
MNIST.data.gz       	GetDataGz__99914b932b	root    	02-01-20 11:14:28 	True    	84d6f033-d94c-4a75-95b6-d7b4681bdad5
```

## &#x20;&#x20;

{% hint style="info" %}
How to run Disdat AWS Batch tasks:

**Default**: By default Disdat will user your credentials in \~/.aws/credentials. If they are standard credentials (not session tokens), then Disdat will create a token (expiring in 43200 seconds) to give your container sufficient privileges.

**AWS session token**: In the case that Disdat finds a session token in your credentials file, it will submit the job with that token. If your org has a short time limit, then your jobs might fail when they access AWS resources. Typically this happens when Disdat writes your results back to S3. Bummer.

**AWS IAM roles**: To avoid short tokens, create a role using IAM, and pass it in to the `dsdt run` command.   IAM is a pain, but if you don't have an SRE to help you here, you should read up on roles.  &#x20;
{% endhint %}

{% hint style="info" %}
Creating a role:&#x20;

* Go to IAM on the AWS Console and choose create role.
* Select `AWS Service` as the trusted entity
* Choose **`Elastic Container Service`** as your service use case
* Choose `Elastic Container Service` `Task`
* Hit next for permissions
* Search for `S3` and then choose `AmazonS3FullAccess`
* Hit tags, and enter some tags. I like ‘owner’ and ‘project’ at a minimum
* Give your role a name and description
* At the end you should see your role, copy the ARN to your clipboard

`dsdt run --job-role-arn <YOUR ROLE ARN> --backend AWSBatch . pipelines.dependent_tasks.B`
{% endhint %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://disdat.gitbook.io/disdat-documentation/examples/untitled-1/mnist-and-tensorflow.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
