Python API

API documentation for modules: api.

DRAFT: Please see the latestapi.py in github here.

A Disdat API for creating and publishing bundles.

These calls are note thread safe. If they operate on a context, they require the user to specify the context. This is unlike the CLI that maintains state on disk that keeps track of your current context between calls. The API won't change the context you're in in the CLI and vice versa.

To make this work we get a pointer to the singelton FS object, and we temporarily change the context (it will automatically assume the context of the CLI) and perform our operation.

Author: Kenneth Yocum

Functions

add

def add(local_context, bundle_name, path, tags=None)

Create bundle bundle_name given path path_name.

If path is a directory, then create bundle with items in directory as a list of links. If path is a file or set of files: Create bundle as links to this file.

Note: Bundle presents as Python list of files, unless path is a single file. In which case the Bundle presents as just a single file link.

Args

local_context : str : The local context in which to create this bundle

bundle_name : str : The human name for this new bundle

path : str : The directory or file from which to create a bundle

tags : dict : The set of tags to attach to this bundle

Returns

Bundle

apply

def apply(local_context, transform, output_bundle='-', input_tags=None, output_tags=None, force=False, force_all=False, params=None, output_bundle_uuid=None, central_scheduler=False, workers=1, incremental_push=False, incremental_pull=False)

Execute a Disdat pipeline natively on the local machine. Note that run() will execute a Disdat pipeline that has been dockerized (either locally or remotely on AWS Batch or AWS Sagemaker)

Args

local_context : str : The name of the local context in which the pipeline will run in the container

transform : type[disdat.pipe.PipeTask] : A reference to the Disdat Pipe class

output_bundle : str : The name of the output bundle. Defaults to <task_name>_<param_hash>

input_tags : optional tags dictionary for selecting input bundle

output_tags : optional tags dictionary to tag output bundle

force : bool : Force re-running this transform, default False

force_all : bool : Force re-running ALL transforms, default False

params : optional parameters dictionary

output_bundle_uuid : Force UUID of output bundle

central_scheduler : bool : Use a central scheduler, default False, i.e., use local scheduler

workers : int : Number of workers, default 1.

incremental_push : bool : commit and push task bundles as they complete

incremental_pull : bool : localize bundles from remote as they are required by downstream tasks

Returns

result : int : 0 success, >0 if issue

Function cat

def cat(local_context, bundle_name)

Retrieve the data representation of the latest bundle with name bundle_name

Args

local_context : str : The name of the local context from which to get bundle

bundle_name : str : The human name of bundle

Returns

Object : The presentation data type with which this bundle was created.

Function commit

def commit(local_context, bundle_name, tags=None, uuid=None)

Commit bundle in this local context. This adds a special committed tag to the bundle, allowing it to be pushed to a remote. This also removes the bundle from the "uncommitted history" limit. One can have as many versions of committed bundles as they wish, but only N uncommitted bundles.

Args

local_context : str : The local context in which the bundle exists

bundle_name : str : The human name of the bundle to commit

tags (dict(str:str)): Optional dictionary of tags with which to find bundle uuid : str : UUID of the bundle to commit

Returns

None :  

Function context

def context(context_name)

Create a new context

Args

context_name(str): / or

Returns

(int) : 0 if successfully created branch, 1 if branch exists

Function current_context

def current_context()

Return the current context name (not object)

Function delete_context

def delete_context(context_name, remote=False, force=False)

Delete a local context. Will not delete any data if there is a remote attached.

Args

context_name : str : The name of the local context to delete.

remote() : bool : Delete remote context as well, force must be True

force : bool : Force deletion if dirty

Returns

None :  

Function dockerize

def dockerize(setup_dir, config_dir=None, build=True, push=False, sagemaker=False)

Create a docker container image using a setup.py and pkg.module.class description of the pipeline.

Note

Users set the os_type and os_version in the disdat.cfg file. os_type : The base operating system type for the Docker image

os_version : The base operating system version for the Docker image

Args

setup_dir : str : The directory that contains the setup.py holding the requirements for any pipelines

config_dir : str : The directory containing the configuration of .deb packages

build : bool : If False, just copy files into the Docker build context without building image.

push() : bool : Push the container to the repository

sagemaker : bool : Create a Docker image executable as a SageMaker container (instead of a local / AWSBatch container).

Returns

(int): 0 if success, 1 on failure

Function get

def get(local_context, bundle_name, uuid=None, tags=None)

Retrieve the latest (by date) bundle from local context, with name, uuid, and tags.

Args

local_context : str : The name of the local context to search.

bundle_name : str : The human bundle name to find.

uuid : str : The UUID to return. Trumps bundle_name

tags : dict : The tags the bundle must have.

Returns

Bundle

Function lineage

def lineage(local_context, uuid, max_depth=None)

Return lineage information from a bundle uuid in a local context. This will follow in a breadth-first manner the lineage information. Shortcut: api.lineage(<uuid>) is equivalent to api.search(uuid=<uuid>)[0].lineage

Args

local_context : str : The context in which the bundle lives.

uuid : str : The UUID of the bundle in question

max_depth : int : Maximum depth returned in search of lineage objects. Default None is unbounded.

Returns

list(hyperframe_pb2.Lineage): List of Protocol Buffer Lineage objects in BFS order

Function ls_contexts

def ls_contexts()

Return list of contexts and their remotes

Returns

List[Tuple(str,str)]: Return a list of tuples containing <local context>, <remote context>@<remote string> :  

Function pull

def pull(local_context, bundle_name=None, uuid=None, localize=False)

Pull bundles from the remote context into this local context. If there is no remote context associated with this context, then this is a no-op. fs.pull will raise UserWarning if there is no remote context.

Args

local_context : str : The local context whose remote the bundle will be pulled

bundle_name : str : An optional human bundle name

uuid : str : An optional bundle UUID

localize : bool : Whether to bring linked files directly into bundle directory

Returns

None :  

Function push

def push(local_context, bundle_name, tags=None, uuid=None)

Push a bundle to a remote repository.

Args

local_context : str : The local context to push to (must have a remote).

bundle_name : str : human name of the bundle to push or None (if using uuid)

tags : dict : Tags the bundle must have

uuid : str : Optional UUID of the bundle to push. UUID takes precedence over bundle name

Returns

None :  

Function remote

def remote(local_context, remote_context, remote_url, force=False)

Add a remote to local_context.

Note that this local context may already have a remote bound. This means that it might have references to bundles that have not been localized (file references will be 's3:). In this case you must setforce` to be True.

Args

local_context : str : The name of the local context to which to add the remote.

remote_context : str : The name of the remote context.

remote_url : str : The S3 path that holds the contexts, e.g., s3://disdat-prod/beta/

force : bool : If this local context already has a remote, you must set force=True

Returns

None :  

Function rm

def rm(local_context, bundle_name=None, uuid=None, tags=None, rm_all=False, rm_old_only=False, force=False)

Delete a bundle with a certain name, uuid, or tags. By default removes the most recent bundle. Otherwise one may specify rm_all=True to remove all bundles, or rm_old_only=True to remove all bundles but the most recent.

Args

local_context : str : Local context name from which to remove the bundle

bundle_name : str : Optional human-given name for the bundle

uuid : str : Optional UUID for the bundle to remove. Trumps bundle_name argument if both given

tags (dict(str:str)): Optional dictionary of tags that must be present on bundle to remove rm_all : bool : Remove latest and all historical if given bundle_name

rm_old_only : bool : remove everything but latest if given bundle_name

force : bool : If a db-link exists and it is the latest on the remote DB, force remove. Default False.

Returns

None :  

Function run

def run(local_context, remote_context, setup_dir, pipe_cls, pipeline_args, output_bundle='-', remote=None, backend='Local', input_tags=None, output_tags=None, force=False, force_all=False, no_pull=False, no_push=False, no_push_int=False, vcpus=2, memory=4000, workers=1, no_submit=False, aws_session_token_duration=42300, job_role_arn=None)

Execute a pipeline in a container. Run locally, on AWS Batch, or AWS Sagemaker _run() finds out whether we have a

Args

local_context : str : The name of the local context in which the pipeline will run in the container

remote_context : str : The remote context to pull / push bundles during execution

setup_dir : str : The directory that contains the setup.py holding the requirements for any pipelines

output_bundle : str : The human name of output bundle

pipe_cls : str : The pkg.module.class of the root of the pipeline DAG

pipeline_args : dict : Dictionary of the parameters of the root task

remote() : str : The remote's S3 path

input_tags : dict : str:str dictionary of tags required of the input bundle

output_tags : dict : str:str dictionary of tags placed on all output bundles (including intermediates)

force : bool : Re-run the last pipe task no matter prior outputs

force_all : bool : Re-run the entire pipeline no matter prior outputs

no_pull : bool : Do not pull before execution

no_push : bool : Do not push any output bundles after task execution

no_push_int : bool : Do not push intermediate task bundles after execution

vcpus : int : Number of virtual CPUs (if backend=AWSBatch). Default 2.

memory : int : Number of MB (if backend='AWSBatch'). Default 2000.

workers : int : Number of Luigi workers. Default 1.

no_submit : bool : If True, just create the AWS Batch Job definition, but do not submit the job

aws_session_token_duration : int : Seconds lifetime of temporary token (backend='AWSBatch'). Default 42300

job_role_arn : str : AWS ARN for job execution in a batch container (backend='AWSBatch')

Returns

json (str): :  

Function search

def search(local_context, search_name=None, search_tags=None, is_committed=None, find_intermediates=False, find_roots=False, before=None, after=None)

Search for bundle in a local context. Allow for searching by human name, is_committed, is intermediate or is root task output, and tags.

At this time the SQL interface in disdat.fs does not allow searching for entries without particular tags.

Args

local_context : str : The name of the local context to search.

search_name : May be None. Interpret as a simple regex (one kleene star)

search_tags : dict : A set of key:values the bundles must have

is_committed : bool : If None (default): ignore committed, If True return committed, If False return uncommitted

find_intermediates : bool : Results must be intermediates

find_roots : bool : Results must be final outputs

before : str : Return bundles < "12-1-2009" or "12-1-2009 12:13:42"

after : str : Return bundles >= "12-1-2009" or "12-1-2009 12:13:42"

Returns

: List of API bundle objects

Function set_aws_profile

def set_aws_profile(aws_profile)

Function switch

def switch(context_name)

Stateful switch to a different context. This changes the current default context used by the CLI commands.

Args

context_name : str : The name of the local context to switch into.

Returns

None :  

Classes

Class Bundle

class Bundle(local_context, name, owner='')

HyperFrameRecord stores a named list of frames (or tensors) Includes lineage, tags, and links This is the in-python representation. Each can import / export to PBs and DBs (via named tuples)

Given name and a local context, create a handle that users can work with to: a.) Create files / directories / dbtables (i.e., Bundle Links) b.) Add constants and files into bundle

Create the bundle ahead of time, and add items to it. Or use a temp dir and copy things into the bundle when you're done. If #2 then, it's easy to use a bundle object and write it to multiple contexts. We should close or destroy a bundle in case 2.

Args

local_context : str : Where this bundle will be output or where it was sourced from.

name : str : Human name for this bundle

Ancestors (in MRO)

  • disdat.hyperframe.HyperFrameRecord

  • disdat.hyperframe.PBObject

Instance variables

Variable creation_date

Variable lineage

Variable name

Variable owner

Variable params

Return the tags that were parameters This returns the string version of the parameters (how they were serialized into the bundle) Note that we currently use Luigi Parameter parse and serialize to go from string and to string. Luigi does so to interpret command-line arguments.

Variable processing_name

Variable tags

Variable user_tags

Return the tags that the user set bundle.tags holds all of the tags, including the "hidden" parameter tags. This accesses everything but the parameter tags. bundle.params accesses everything but the user tags

Variable uuid

Methods

Method add_data

def add_data(self, data)

Attach data to a bundle. The bundle must be open and not closed. One attaches one data item to a bundle (dictionary, list, tuple, scalar, or dataframe). Calling this replaces the latest item -- only the latest will be included in the bundle on close.

Note: One uses `add_data_row` or `add_data` but not both.  Adding a row after `add_data`
removes the data.   Using `add_data` after `add_data_row` removes all previously added rows.

Args

data (list|tuple|dict|scalar|pandas.DataFrame):

Returns

self :  

Method add_dependency

def add_dependency(self, bundle)

Add an upstream bundle as a dependency

Args

bundle(Bundle): Another bundle that may have been used to produce this one

Method add_tags

def add_tags(self, tags)

Add tags to the bundle. Updates if existing.

Args

k,v (dict): string:string dictionary

Returns

self :  

Method cat

def cat(self)

Return the data in the bundle The data is already present in .data

Method commit

def commit(self)

Shortcut version of api.commit(uuid=bundle.uuid)

Returns

self :  

Method copy_in_file

def copy_in_file(self, existing_file)

This function copies the file 'existing_file' into the output bundle. This is used when you have an existing file on disk and wish to add it to the bundle.

To use, you must record this as part of the bundle with bundle.add_data(bundle.copy_in_file("my_file"))

Args

existing_file : str : Path to an existing file

Returns

luigi.LocalTarget or luigi.s3.S3Target

Method db_table

def db_table(self, dsn, table_name, schema_name)

This is the way to allocate a db table reference one can place into a bundle. Like make_directory, make_file, or copy_in_file, this returns a target object that the user must add to their bundle.data before it is recorded in the bundle.

Args

dsn (unicode): table_name (unicode): schema_name (unicode):

Returns

disdat.db_link.DBLink

Method fill_from_hfr

def fill_from_hfr(self, hfr)

Given an internal hyperframe, copy out the information to this user-side Bundle object.

Assume the user has set the data_context appropriately when creating the bundle object

The Bundle object inherits from HyperFrameRecord. So we needs to: a.) set the pb to point to this pb, they may both point to the same pb, but this bundle will be closed b.) init internal state c.) Set bundle object specific fields. Note we do not set or clear the self.data_context

Args

hfr:

Returns

self : this bundle object with self.data containing the kind of object the user saved.

Method make_directory

def make_directory(self, dir_name)

Returns path <disdat-managed-directory>/<dir_name>. This is used if you need to hand a process an output directory and you do not have control of what it writes in that directory.

Add this path as you would add file paths to your output bundle. Disdat will incorporate all the data found in this directory into the bundle.

See Pipe.create_output_dir()

Arguments

dir_name : str : Either a FQP (prefix is the bundle path) or a basedir of a directory to appear in the bundle. Neither should end in /

Returns

str : A directory path managed by disdat

Method make_file

def make_file(self, filename)

Create a file target called "filename" that will exist in the bundle. This is used when you have data in memory and wish to write it to a file, e.g., create a parquet file.

To use, you must a.) write data into this file-like object (a 'target'), and b.) you must add this target to the bundle via bundle.add_data(bundle.make_file("my_file"))

Arguments

filename : str,list,dict : filename to create in the bundle

Returns

luigi.LocalTarget or luigi.s3.S3Target

Method pull

def pull(self, localize=False)

Shortcut version of api.pull()

Note if localizing, then we update this bundle to reflect the possibility of new, local links

Method push

def push(self)

Shortcut version of api.push(uuid=bundle.uuid)

Returns

(disdat.api.Bundle): this bundle

Method rm

def rm(self)

Remove bundle from the current context associated with this bundle object Only remove this bundle with this uuid. This only makes sense if the bundle is closed.

Class BundleWrapperTask

class BundleWrapperTask(*args, **kwargs)

This task allows one to create bundles that can be referred to in a Disdat pipeline through a self.add_external_dependency. 1.) User makes a bundle outside of Luigi 2.) Luigi pipeline wants to use bundle. a.) Refer to the bundle as an argument and reads it using API (outside of Luigi dependencies) b.) Refer to the bundle using a BundleWrapperTask. Luigi Disdat pipeline uses the latest bundle with the processing_name.

Two implementation options: 1.) We add a "add_bundle_dependency()" call to Disdat. This directly changes how we schedule. 2.) We add a special Luigi task (as luigi has for outside files) and use that to proddduce the processing_name, when there isn't an actual task creating the data.

Thus this task is mainly to provide a way that a.) A user can create a bundle and set the processing_name b.) The pipeline can refer to this bundle using the same processing_name

The parameters in this task should allow one to sufficiently identify versions of this bundle.

Note: No processing_name and No UUID

This has the same signature as luigi.Task. Go through this class and get the set of params we define

Args

args: *kwargs:

Ancestors (in MRO)

  • disdat.pipe.PipeTask

  • luigi.task.Task

  • disdat.pipe_base.PipeBase

Class variables

Variable name

Variable owner

Variable tags

Methods

Method bundle_inputs

def bundle_inputs(self)

Determine input bundles

Method bundle_outputs

def bundle_outputs(self)

Determine input bundles

Method human_id

def human_id(self)

default is shortened version of pipe_id But here we want it to be the set name

Generated by pdoc 0.7.5 (https://pdoc3.github.io).

Last updated