Python API
API documentation for modules: api.
DRAFT: Please see the latestapi.py
in github here.
A Disdat API for creating and publishing bundles.
These calls are note thread safe. If they operate on a context, they require the user to specify the context. This is unlike the CLI that maintains state on disk that keeps track of your current context between calls. The API won't change the context you're in in the CLI and vice versa.
To make this work we get a pointer to the singelton FS object, and we temporarily change the context (it will automatically assume the context of the CLI) and perform our operation.
Author: Kenneth Yocum
Functions
add
add
def add(local_context, bundle_name, path, tags=None)
Create bundle bundle_name given path path_name.
If path is a directory, then create bundle with items in directory as a list of links. If path is a file or set of files: Create bundle as links to this file.
Note: Bundle presents as Python list of files, unless path is a single file. In which case the Bundle presents as just a single file link.
Args
local_context
: str
: The local context in which to create this bundle
bundle_name
: str
: The human name for this new bundle
path
: str
: The directory or file from which to create a bundle
tags
: dict
: The set of tags to attach to this bundle
Returns
Bundle
apply
apply
def apply(local_context, transform, output_bundle='-', input_tags=None, output_tags=None, force=False, force_all=False, params=None, output_bundle_uuid=None, central_scheduler=False, workers=1, incremental_push=False, incremental_pull=False)
Execute a Disdat pipeline natively on the local machine. Note that run()
will execute a Disdat pipeline that has been dockerized (either locally or remotely on AWS Batch or AWS Sagemaker)
Args
local_context
: str
: The name of the local context in which the pipeline will run in the container
transform
: type
[disdat.pipe.PipeTask
] : A reference to the Disdat Pipe class
output_bundle
: str
: The name of the output bundle. Defaults to <task_name>_<param_hash>
input_tags
: optional tags dictionary for selecting input bundle
output_tags
: optional tags dictionary to tag output bundle
force
: bool
: Force re-running this transform, default False
force_all
: bool
: Force re-running ALL transforms, default False
params
: optional parameters dictionary
output_bundle_uuid
: Force UUID of output bundle
central_scheduler
: bool
: Use a central scheduler, default False, i.e., use local scheduler
workers
: int
: Number of workers, default 1.
incremental_push
: bool
: commit and push task bundles as they complete
incremental_pull
: bool
: localize bundles from remote as they are required by downstream tasks
Returns
result
: int
: 0 success, >0 if issue
Function cat
cat
def cat(local_context, bundle_name)
Retrieve the data representation of the latest bundle with name bundle_name
Args
local_context
: str
: The name of the local context from which to get bundle
bundle_name
: str
: The human name of bundle
Returns
Object
: The presentation data type with which this bundle was created.
Function commit
commit
def commit(local_context, bundle_name, tags=None, uuid=None)
Commit bundle in this local context. This adds a special committed
tag to the bundle, allowing it to be pushed to a remote. This also removes the bundle from the "uncommitted history" limit. One can have as many versions of committed bundles as they wish, but only N uncommitted bundles.
Args
local_context
: str
: The local context in which the bundle exists
bundle_name
: str
: The human name of the bundle to commit
tags (dict(str:str)): Optional dictionary of tags with which to find bundle uuid
: str
: UUID of the bundle to commit
Returns
None
:
Function context
context
def context(context_name)
Create a new context
Args
context_name(str): / or
Returns
(int) : 0 if successfully created branch, 1 if branch exists
Function current_context
current_context
def current_context()
Return the current context name (not object)
Function delete_context
delete_context
def delete_context(context_name, remote=False, force=False)
Delete a local context. Will not delete any data if there is a remote attached.
Args
context_name
: str
: The name of the local context to delete.
remote()
: bool
: Delete remote context as well, force must be True
force
: bool
: Force deletion if dirty
Returns
None
:
Function dockerize
dockerize
def dockerize(setup_dir, config_dir=None, build=True, push=False, sagemaker=False)
Create a docker container image using a setup.py and pkg.module.class description of the pipeline.
Note
Users set the os_type and os_version in the disdat.cfg file. os_type
: The base operating system type for the Docker image
os_version
: The base operating system version for the Docker image
Args
setup_dir
: str
: The directory that contains the setup.py holding the requirements for any pipelines
config_dir
: str
: The directory containing the configuration of .deb packages
build
: bool
: If False, just copy files into the Docker build context without building image.
push()
: bool
: Push the container to the repository
sagemaker
: bool
: Create a Docker image executable as a SageMaker container (instead of a local / AWSBatch container).
Returns
(int): 0 if success, 1 on failure
Function get
get
def get(local_context, bundle_name, uuid=None, tags=None)
Retrieve the latest (by date) bundle from local context, with name, uuid, and tags.
Args
local_context
: str
: The name of the local context to search.
bundle_name
: str
: The human bundle name to find.
uuid
: str
: The UUID to return. Trumps bundle_name
tags
: dict
: The tags the bundle must have.
Returns
Bundle
Function lineage
lineage
def lineage(local_context, uuid, max_depth=None)
Return lineage information from a bundle uuid in a local context. This will follow in a breadth-first manner the lineage information. Shortcut: api.lineage(<uuid>)
is equivalent to api.search(uuid=<uuid>)[0].lineage
Args
local_context
: str
: The context in which the bundle lives.
uuid
: str
: The UUID of the bundle in question
max_depth
: int
: Maximum depth returned in search of lineage objects. Default None is unbounded.
Returns
list(hyperframe_pb2.Lineage
): List of Protocol Buffer Lineage objects in BFS order
Function ls_contexts
ls_contexts
def ls_contexts()
Return list of contexts and their remotes
Returns
List
[Tuple
(str
,str
)]: Return
a
list
of tuples
containing
<local
context
>, <remote
context
>@<remote
string
> :
Function pull
pull
def pull(local_context, bundle_name=None, uuid=None, localize=False)
Pull bundles from the remote context into this local context. If there is no remote context associated with this context, then this is a no-op. fs.pull will raise UserWarning if there is no remote context.
Args
local_context
: str
: The local context whose remote the bundle will be pulled
bundle_name
: str
: An optional human bundle name
uuid
: str
: An optional bundle UUID
localize
: bool
: Whether to bring linked files directly into bundle directory
Returns
None
:
Function push
push
def push(local_context, bundle_name, tags=None, uuid=None)
Push a bundle to a remote repository.
Args
local_context
: str
: The local context to push to (must have a remote).
bundle_name
: str
: human name of the bundle to push or None (if using uuid)
tags
: dict
: Tags the bundle must have
uuid
: str
: Optional UUID of the bundle to push. UUID takes precedence over bundle name
Returns
None
:
Function remote
remote
def remote(local_context, remote_context, remote_url, force=False)
Add a remote to local_context.
Note that this local context may already have a remote bound. This means that it might have references to bundles that have not been localized (file references will be 's3:). In this case you must set
force` to be True.
Args
local_context
: str
: The name of the local context to which to add the remote.
remote_context
: str
: The name of the remote context.
remote_url
: str
: The S3 path that holds the contexts, e.g., s3://disdat-prod/beta/
force
: bool
: If this local context already has a remote, you must set force=True
Returns
None
:
Function rm
rm
def rm(local_context, bundle_name=None, uuid=None, tags=None, rm_all=False, rm_old_only=False, force=False)
Delete a bundle with a certain name, uuid, or tags. By default removes the most recent bundle. Otherwise one may specify rm_all=True
to remove all bundles, or rm_old_only=True
to remove all bundles but the most recent.
Args
local_context
: str
: Local context name from which to remove the bundle
bundle_name
: str
: Optional human-given name for the bundle
uuid
: str
: Optional UUID for the bundle to remove. Trumps bundle_name argument if both given
tags (dict(str:str)): Optional dictionary of tags that must be present on bundle to remove rm_all
: bool
: Remove latest and all historical if given bundle_name
rm_old_only
: bool
: remove everything but latest if given bundle_name
force
: bool
: If a db-link exists and it is the latest on the remote DB, force remove. Default False.
Returns
None
:
Function run
run
def run(local_context, remote_context, setup_dir, pipe_cls, pipeline_args, output_bundle='-', remote=None, backend='Local', input_tags=None, output_tags=None, force=False, force_all=False, no_pull=False, no_push=False, no_push_int=False, vcpus=2, memory=4000, workers=1, no_submit=False, aws_session_token_duration=42300, job_role_arn=None)
Execute a pipeline in a container. Run locally, on AWS Batch, or AWS Sagemaker _run() finds out whether we have a
Args
local_context
: str
: The name of the local context in which the pipeline will run in the container
remote_context
: str
: The remote context to pull / push bundles during execution
setup_dir
: str
: The directory that contains the setup.py holding the requirements for any pipelines
output_bundle
: str
: The human name of output bundle
pipe_cls
: str
: The pkg.module.class of the root of the pipeline DAG
pipeline_args
: dict
: Dictionary of the parameters of the root task
remote()
: str
: The remote's S3 path
input_tags
: dict
: str:str dictionary of tags required of the input bundle
output_tags
: dict
: str:str dictionary of tags placed on all output bundles (including intermediates)
force
: bool
: Re-run the last pipe task no matter prior outputs
force_all
: bool
: Re-run the entire pipeline no matter prior outputs
no_pull
: bool
: Do not pull before execution
no_push
: bool
: Do not push any output bundles after task execution
no_push_int
: bool
: Do not push intermediate task bundles after execution
vcpus
: int
: Number of virtual CPUs (if backend=AWSBatch
). Default 2.
memory
: int
: Number of MB (if backend='AWSBatch'). Default 2000.
workers
: int
: Number of Luigi workers. Default 1.
no_submit
: bool
: If True, just create the AWS Batch Job definition, but do not submit the job
aws_session_token_duration
: int
: Seconds lifetime of temporary token (backend='AWSBatch'). Default 42300
job_role_arn
: str
: AWS ARN for job execution in a batch container (backend='AWSBatch')
Returns
json
(str
): :
Function search
search
def search(local_context, search_name=None, search_tags=None, is_committed=None, find_intermediates=False, find_roots=False, before=None, after=None)
Search for bundle in a local context. Allow for searching by human name, is_committed, is intermediate or is root task output, and tags.
At this time the SQL interface in disdat.fs does not allow searching for entries without particular tags.
Args
local_context
: str
: The name of the local context to search.
search_name
: May be None. Interpret as a simple regex (one kleene star)
search_tags
: dict
: A set of key:values the bundles must have
is_committed
: bool
: If None (default): ignore committed, If True return committed, If False return uncommitted
find_intermediates
: bool
: Results must be intermediates
find_roots
: bool
: Results must be final outputs
before
: str
: Return bundles < "12-1-2009" or "12-1-2009 12:13:42"
after
: str
: Return bundles >= "12-1-2009" or "12-1-2009 12:13:42"
Returns
: List of API bundle objects
Function set_aws_profile
set_aws_profile
def set_aws_profile(aws_profile)
Function switch
switch
def switch(context_name)
Stateful switch to a different context. This changes the current default context used by the CLI commands.
Args
context_name
: str
: The name of the local context to switch into.
Returns
None
:
Classes
Class Bundle
Bundle
class Bundle(local_context, name, owner='')
HyperFrameRecord stores a named list of frames (or tensors) Includes lineage, tags, and links This is the in-python representation. Each can import / export to PBs and DBs (via named tuples)
Given name and a local context, create a handle that users can work with to: a.) Create files / directories / dbtables (i.e., Bundle Links) b.) Add constants and files into bundle
Create the bundle ahead of time, and add items to it. Or use a temp dir and copy things into the bundle when you're done. If #2 then, it's easy to use a bundle object and write it to multiple contexts. We should close or destroy a bundle in case 2.
Args
local_context
: str
: Where this bundle will be output or where it was sourced from.
name
: str
: Human name for this bundle
Ancestors (in MRO)
disdat.hyperframe.HyperFrameRecord
disdat.hyperframe.PBObject
Instance variables
Variable creation_date
Variable lineage
Variable name
Variable owner
Variable params
Return the tags that were parameters This returns the string version of the parameters (how they were serialized into the bundle) Note that we currently use Luigi Parameter parse and serialize to go from string and to string. Luigi does so to interpret command-line arguments.
Variable processing_name
Variable tags
Variable user_tags
Return the tags that the user set bundle.tags holds all of the tags, including the "hidden" parameter tags. This accesses everything but the parameter tags. bundle.params accesses everything but the user tags
Variable uuid
Methods
Method add_data
def add_data(self, data)
Attach data to a bundle. The bundle must be open and not closed. One attaches one data item to a bundle (dictionary, list, tuple, scalar, or dataframe). Calling this replaces the latest item -- only the latest will be included in the bundle on close.
Args
data (list|tuple|dict|scalar|pandas.DataFrame
):
Returns
self
:
Method add_dependency
def add_dependency(self, bundle)
Add an upstream bundle as a dependency
Args
bundle(Bundle
): Another bundle that may have been used to produce this one
Method add_tags
def add_tags(self, tags)
Add tags to the bundle. Updates if existing.
Args
k,v (dict): string:string dictionary
Returns
self
:
Method cat
def cat(self)
Return the data in the bundle The data is already present in .data
Method commit
def commit(self)
Shortcut version of api.commit(uuid=bundle.uuid)
Returns
self
:
Method copy_in_file
def copy_in_file(self, existing_file)
This function copies the file 'existing_file' into the output bundle. This is used when you have an existing file on disk and wish to add it to the bundle.
To use, you must record this as part of the bundle with bundle.add_data(bundle.copy_in_file("my_file"))
Args
existing_file
: str
: Path to an existing file
Returns
luigi.LocalTarget
or luigi.s3.S3Target
Method db_table
def db_table(self, dsn, table_name, schema_name)
This is the way to allocate a db table reference one can place into a bundle. Like make_directory
, make_file
, or copy_in_file
, this returns a target object that the user must add to their bundle.data before it is recorded in the bundle.
Args
dsn (unicode): table_name (unicode): schema_name (unicode):
Returns
disdat.db_link.DBLink
Method fill_from_hfr
def fill_from_hfr(self, hfr)
Given an internal hyperframe, copy out the information to this user-side Bundle object.
Assume the user has set the data_context appropriately when creating the bundle object
The Bundle object inherits from HyperFrameRecord. So we needs to: a.) set the pb to point to this pb, they may both point to the same pb, but this bundle will be closed b.) init internal state c.) Set bundle object specific fields. Note we do not set or clear the self.data_context
Args
hfr:
Returns
self
: this bundle object with self.data containing the kind of object the user saved.
Method make_directory
def make_directory(self, dir_name)
Returns path <disdat-managed-directory>/<dir_name>
. This is used if you need to hand a process an output directory and you do not have control of what it writes in that directory.
Add this path as you would add file paths to your output bundle. Disdat will incorporate all the data found in this directory into the bundle.
See Pipe.create_output_dir()
Arguments
dir_name
: str
: Either a FQP (prefix is the bundle path) or a basedir of a directory to appear in the bundle. Neither should end in /
Returns
str
: A directory path managed by disdat
Method make_file
def make_file(self, filename)
Create a file target called "filename" that will exist in the bundle. This is used when you have data in memory and wish to write it to a file, e.g., create a parquet file.
To use, you must a.) write data into this file-like object (a 'target'), and b.) you must add this target to the bundle via bundle.add_data(bundle.make_file("my_file"))
Arguments
filename
: str
,list
,dict
: filename to create in the bundle
Returns
luigi.LocalTarget
or luigi.s3.S3Target
Method pull
def pull(self, localize=False)
Shortcut version of api.pull()
Note if localizing, then we update this bundle to reflect the possibility of new, local links
Method push
def push(self)
Shortcut version of api.push(uuid=bundle.uuid)
Returns
(disdat.api.Bundle
): this bundle
Method rm
def rm(self)
Remove bundle from the current context associated with this bundle object Only remove this bundle with this uuid. This only makes sense if the bundle is closed.
Class BundleWrapperTask
BundleWrapperTask
class BundleWrapperTask(*args, **kwargs)
This task allows one to create bundles that can be referred to in a Disdat pipeline through a self.add_external_dependency. 1.) User makes a bundle outside of Luigi 2.) Luigi pipeline wants to use bundle. a.) Refer to the bundle as an argument and reads it using API (outside of Luigi dependencies) b.) Refer to the bundle using a BundleWrapperTask. Luigi Disdat pipeline uses the latest bundle with the processing_name.
Two implementation options: 1.) We add a "add_bundle_dependency()" call to Disdat. This directly changes how we schedule. 2.) We add a special Luigi task (as luigi has for outside files) and use that to proddduce the processing_name, when there isn't an actual task creating the data.
Thus this task is mainly to provide a way that a.) A user can create a bundle and set the processing_name b.) The pipeline can refer to this bundle using the same processing_name
The parameters in this task should allow one to sufficiently identify versions of this bundle.
Note: No processing_name and No UUID
This has the same signature as luigi.Task. Go through this class and get the set of params we define
Args
args: *kwargs:
Ancestors (in MRO)
disdat.pipe.PipeTask
luigi.task.Task
disdat.pipe_base.PipeBase
Class variables
Variable name
Variable owner
Variable tags
Methods
Method bundle_inputs
def bundle_inputs(self)
Determine input bundles
Method bundle_outputs
def bundle_outputs(self)
Determine input bundles
Method human_id
def human_id(self)
default is shortened version of pipe_id But here we want it to be the set name
Generated by pdoc 0.7.5 (https://pdoc3.github.io).
Last updated