Pipelines
Pipeline Examples
Examples of using the Pipeline classes are listed at the bottom of this page Examples.
Pipelines store sequences of SageWorks transforms. So if you have a nightly ML workflow you can capture that as a Pipeline. Here's an example pipeline:
{
"data_source": {
"name": "nightly_data",
"tags": ["solubility", "foo"],
"s3_input": "s3://blah/blah.csv"
},
"feature_set": {
"name": "nightly_features",
"tags": ["blah", "blah"],
"input": "nightly_data"
"schema": "mol_descriptors_v1"
},
"model": {
"name": “nightly_model”,
"tags": ["blah", "blah"],
"features": ["col1", "col2"],
"target": “sol”,
"input": “nightly_features”
"endpoint": {
...
}
PipelineManager: Manages SageWorks Pipelines, listing, creating, and saving them.
PipelineManager
PipelineManager: Manages SageWorks Pipelines, listing, creating, and saving them.
Common Usage
Source code in src/sageworks/api/pipeline_manager.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
|
__init__()
Pipeline Init Method
Source code in src/sageworks/api/pipeline_manager.py
create_from_endpoint(endpoint_name)
Create a Pipeline from an Endpoint
Parameters:
Name | Type | Description | Default |
---|---|---|---|
endpoint_name
|
str
|
The name of the Endpoint |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A dictionary of the Pipeline |
Source code in src/sageworks/api/pipeline_manager.py
delete_pipeline(name)
Delete a Pipeline from S3
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
The name of the Pipeline to delete |
required |
Source code in src/sageworks/api/pipeline_manager.py
list_pipelines()
List all the Pipelines in the S3 Bucket
Returns:
Name | Type | Description |
---|---|---|
list |
list
|
A list of Pipeline names and details |
Source code in src/sageworks/api/pipeline_manager.py
load_pipeline_from_file(filepath)
Load a Pipeline from a local file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath
|
str
|
The path of the Pipeline to load |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
The Pipeline loaded from the file |
Source code in src/sageworks/api/pipeline_manager.py
publish_pipeline(name, pipeline)
Save a Pipeline to S3
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
The name of the Pipeline |
required |
pipeline
|
dict
|
The Pipeline to save |
required |
Source code in src/sageworks/api/pipeline_manager.py
publish_pipeline_from_file(filepath)
Publish a Pipeline to SageWorks from a local file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath
|
str
|
The path of the Pipeline to publish |
required |
Source code in src/sageworks/api/pipeline_manager.py
save_pipeline_to_file(pipeline, filepath)
Save a Pipeline to a local file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline
|
dict
|
The Pipeline to save |
required |
filepath
|
str
|
The path to save the Pipeline |
required |
Source code in src/sageworks/api/pipeline_manager.py
Pipeline: Manages the details around a SageWorks Pipeline, including Execution
Pipeline
Pipeline: SageWorks Pipeline API Class
Common Usage
Source code in src/sageworks/api/pipeline.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
|
__init__(name)
Pipeline Init Method
Source code in src/sageworks/api/pipeline.py
__repr__()
String representation of this pipeline
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
String representation of this pipeline |
Source code in src/sageworks/api/pipeline.py
delete()
execute()
Execute the entire Pipeline
Raises:
Type | Description |
---|---|
RunTimeException
|
If the pipeline execution fails in any way |
execute_partial(subset)
Execute a partial Pipeline
Parameters:
Name | Type | Description | Default |
---|---|---|---|
subset
|
list
|
A subset of the pipeline to execute |
required |
Raises:
Type | Description |
---|---|
RunTimeException
|
If the pipeline execution fails in any way |
Source code in src/sageworks/api/pipeline.py
report_settable_fields(pipeline={}, path='')
Recursively finds and prints keys with settable fields in a JSON-like dictionary.
Args: pipeline (dict): pipeline (or sub pipeline) to process. path (str): Current path to the key, used for nested dictionaries.
Source code in src/sageworks/api/pipeline.py
set_input(input, artifact='data_source')
Set the input for the Pipeline
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input
|
Union[str, DataFrame]
|
The input for the Pipeline |
required |
artifact
|
str
|
The artifact to set the input for (default: "data_source") |
'data_source'
|
Source code in src/sageworks/api/pipeline.py
set_training_holdouts(id_column, holdout_ids)
Set the input for the Pipeline
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_column
|
str
|
The column name of the unique identifier |
required |
holdout_ids
|
list[str]
|
The list of unique identifiers to hold out |
required |
Source code in src/sageworks/api/pipeline.py
Examples
Make a Pipeline
Pipelines are just JSON files (see sageworks/examples/pipelines/
). You can copy one and make changes to fit your objects/use case, or if you have a set of SageWorks artifacts created you can 'backtrack' from the Endpoint and have it create the Pipeline for you.
from sageworks.api.pipeline_manager import PipelineManager
# Create a PipelineManager
my_manager = PipelineManager()
# List the Pipelines
pprint(my_manager.list_pipelines())
# Create a Pipeline from an Endpoint
abalone_pipeline = my_manager.create_from_endpoint("abalone-regression-end")
# Publish the Pipeline
my_manager.publish_pipeline("abalone_pipeline_v1", abalone_pipeline)
Output
Listing Pipelines...
[{'last_modified': datetime.datetime(2024, 4, 16, 21, 10, 6, tzinfo=tzutc()),
'name': 'abalone_pipeline_v1',
'size': 445}]
from sageworks.api.pipeline import Pipeline
# Retrieve an existing Pipeline
my_pipeline = Pipeline("abalone_pipeline_v1")
pprint(my_pipeline.details())
Output
{
"name": "abalone_pipeline_v1",
"s3_path": "s3://sandbox/pipelines/abalone_pipeline_v1.json",
"pipeline": {
"data_source": {
"name": "abalone_data",
"tags": [
"abalone_data"
],
"input": "/Users/briford/work/sageworks/data/abalone.csv"
},
"feature_set": {
"name": "abalone_features",
"tags": [
"abalone_features"
],
"input": "abalone_data"
},
"model": {
"name": "abalone-regression",
"tags": [
"abalone",
"regression"
],
"input": "abalone_features"
},
...
}
}
Pipeline Execution
Pipeline Execution
Executing the Pipeline is obviously the most important reason for creating one. If gives you a reproducible way to capture, inspect, and run the same ML pipeline on different data (nightly).
from sageworks.api.pipeline import Pipeline
# Retrieve an existing Pipeline
my_pipeline = Pipeline("abalone_pipeline_v1")
# Execute the Pipeline
my_pipeline.execute() # Full execution
# Partial executions
my_pipeline.execute_partial(["data_source", "feature_set"])
my_pipeline.execute_partial(["model", "endpoint"])
Pipelines Advanced
As part of the flexible architecture sometimes DataSources or FeatureSets can be created with a Pandas DataFrame. To support a DataFrame as input to a pipeline we can call the set_input()
method to the pipeline object. If you'd like to specify the set_hold_out_ids()
you can also provide a list of ids.
def set_input(self, input: Union[str, pd.DataFrame], artifact: str = "data_source"):
"""Set the input for the Pipeline
Args:
input (Union[str, pd.DataFrame]): The input for the Pipeline
artifact (str): The artifact to set the input for (default: "data_source")
"""
self.pipeline[artifact]["input"] = input
def set_hold_out_ids(self, id_list: list):
"""Set the input for the Pipeline
Args:
id_list (list): The list of hold out ids
"""
self.pipeline["feature_set"]["hold_out_ids"] = id_list
Running a pipeline creates and deploys a set of SageWorks Artifacts, DataSource, FeatureSet, Model and Endpoint. These artifacts can be viewed in the Sagemaker Console/Notebook interfaces or in the SageWorks Dashboard UI.
Not Finding a particular method?
The SageWorks API Classes use the 'Core' Classes Internally, so for an extensive listing of all the methods available please take a deep dive into: SageWorks Core Classes