Pipelines
Pipeline Examples
Examples of using the Pipeline classes are listed at the bottom of this page Examples.
Pipelines store sequences of Workbench transforms. So if you have a nightly ML workflow you can capture that as a Pipeline. Here's an example pipeline:
{
"data_source": {
"name": "nightly_data",
"tags": ["solubility", "foo"],
"s3_input": "s3://blah/blah.csv"
},
"feature_set": {
"name": "nightly_features",
"tags": ["blah", "blah"],
"input": "nightly_data"
"schema": "mol_descriptors_v1"
},
"model": {
"name": “nightly_model”,
"tags": ["blah", "blah"],
"features": ["col1", "col2"],
"target": “sol”,
"input": “nightly_features”
"endpoint": {
...
}
Pipeline: Manages the details around a Workbench Pipeline, including Execution
log = logging.getLogger('workbench')
module-attribute
my_pipeline = Pipeline("aqsol_pipeline_v1") my_pipeline.set_input("s3://workbench-public-data/comp_chem/aqsol_public_data.csv") my_pipeline.execute_partial(["model", "endpoint"]) exit(0)
Pipeline
Pipeline: Workbench Pipeline API Class
Common Usage
Source code in src/workbench/api/pipeline.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
|
__init__(name)
Pipeline Init Method
Source code in src/workbench/api/pipeline.py
__repr__()
String representation of this pipeline
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
String representation of this pipeline |
Source code in src/workbench/api/pipeline.py
delete()
details(**kwargs)
Retrieve the Pipeline Details.
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A dictionary of details about the Pipeline |
execute()
Execute the entire Pipeline
Raises:
Type | Description |
---|---|
RunTimeException
|
If the pipeline execution fails in any way |
execute_partial(subset)
Execute a partial Pipeline
Parameters:
Name | Type | Description | Default |
---|---|---|---|
subset
|
list
|
A subset of the pipeline to execute |
required |
Raises:
Type | Description |
---|---|
RunTimeException
|
If the pipeline execution fails in any way |
Source code in src/workbench/api/pipeline.py
health_check(**kwargs)
Retrieve the Pipeline Health Check.
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A dictionary of health check details for the Pipeline |
report_settable_fields(pipeline={}, path='')
Recursively finds and prints keys with settable fields in a JSON-like dictionary.
Args: pipeline (dict): pipeline (or sub pipeline) to process. path (str): Current path to the key, used for nested dictionaries.
Source code in src/workbench/api/pipeline.py
set_input(input, artifact='data_source')
Set the input for the Pipeline
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Union[str, DataFrame]
|
The input for the Pipeline |
required |
artifact
|
str
|
The artifact to set the input for (default: "data_source") |
'data_source'
|
Source code in src/workbench/api/pipeline.py
set_training_holdouts(id_column, holdout_ids)
Set the input for the Pipeline
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_column
|
str
|
The column name of the unique identifier |
required |
holdout_ids
|
list[str]
|
The list of unique identifiers to hold out |
required |
Source code in src/workbench/api/pipeline.py
summary(**kwargs)
Retrieve the Pipeline Summary.
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A dictionary of details about the Pipeline |
Examples
Make a Pipeline
Pipelines are just JSON files (see workbench/examples/pipelines/
). You can copy one and make changes to fit your objects/use case, or if you have a set of Workbench artifacts created you can 'backtrack' from the Endpoint and have it create the Pipeline for you.
from workbench.api.pipeline_manager import PipelineManager
# Create a PipelineManager
my_manager = PipelineManager()
# List the Pipelines
pprint(my_manager.list_pipelines())
# Create a Pipeline from an Endpoint
abalone_pipeline = my_manager.create_from_endpoint("abalone-regression-end")
# Publish the Pipeline
my_manager.publish_pipeline("abalone_pipeline_v1", abalone_pipeline)
Output
Listing Pipelines...
[{'last_modified': datetime.datetime(2024, 4, 16, 21, 10, 6, tzinfo=tzutc()),
'name': 'abalone_pipeline_v1',
'size': 445}]
from workbench.api.pipeline import Pipeline
# Retrieve an existing Pipeline
my_pipeline = Pipeline("abalone_pipeline_v1")
pprint(my_pipeline.details())
Output
{
"name": "abalone_pipeline_v1",
"s3_path": "s3://sandbox/pipelines/abalone_pipeline_v1.json",
"pipeline": {
"data_source": {
"name": "abalone_data",
"tags": [
"abalone_data"
],
"input": "/Users/briford/work/workbench/data/abalone.csv"
},
"feature_set": {
"name": "abalone_features",
"tags": [
"abalone_features"
],
"input": "abalone_data"
},
"model": {
"name": "abalone-regression",
"tags": [
"abalone",
"regression"
],
"input": "abalone_features"
},
...
}
}
Pipeline Execution
Pipeline Execution
Executing the Pipeline is obviously the most important reason for creating one. If gives you a reproducible way to capture, inspect, and run the same ML pipeline on different data (nightly).
from workbench.api.pipeline import Pipeline
# Retrieve an existing Pipeline
my_pipeline = Pipeline("abalone_pipeline_v1")
# Execute the Pipeline
my_pipeline.execute() # Full execution
# Partial executions
my_pipeline.execute_partial(["data_source", "feature_set"])
my_pipeline.execute_partial(["model", "endpoint"])
Pipelines Advanced
As part of the flexible architecture sometimes DataSources or FeatureSets can be created with a Pandas DataFrame. To support a DataFrame as input to a pipeline we can call the set_input()
method to the pipeline object. If you'd like to specify the set_hold_out_ids()
you can also provide a list of ids.
def set_input(self, input: Union[str, pd.DataFrame], artifact: str = "data_source"):
"""Set the input for the Pipeline
Args:
input (Union[str, pd.DataFrame]): The input for the Pipeline
artifact (str): The artifact to set the input for (default: "data_source")
"""
self.pipeline[artifact]["input"] = input
def set_hold_out_ids(self, id_list: list):
"""Set the input for the Pipeline
Args:
id_list (list): The list of hold out ids
"""
self.pipeline["feature_set"]["hold_out_ids"] = id_list
Running a pipeline creates and deploys a set of Workbench Artifacts, DataSource, FeatureSet, Model and Endpoint. These artifacts can be viewed in the Sagemaker Console/Notebook interfaces or in the Workbench Dashboard UI.
Not Finding a particular method?
The Workbench API Classes use the 'Core' Classes Internally, so for an extensive listing of all the methods available please take a deep dive into: Workbench Core Classes