Skip to content

Pipelines

Pipeline Examples

Examples of using the Pipeline classes are listed at the bottom of this page Examples.

Pipelines store sequences of Workbench transforms. So if you have a nightly ML workflow you can capture that as a Pipeline. Here's an example pipeline:

nightly_sol_pipeline_v1.json
{
    "data_source": {
         "name": "nightly_data",
         "tags": ["solubility", "foo"],
         "s3_input": "s3://blah/blah.csv"
    },
    "feature_set": {
          "name": "nightly_features",
          "tags": ["blah", "blah"],
          "input": "nightly_data"
          "schema": "mol_descriptors_v1"
    },
    "model": {
          "name": nightly_model,
          "tags": ["blah", "blah"],
          "features": ["col1", "col2"],
          "target": sol,
          "input": nightly_features
    "endpoint": {
          ...
}    

Pipeline: Manages the details around a Workbench Pipeline, including Execution

log = logging.getLogger('workbench') module-attribute

my_pipeline = Pipeline("aqsol_pipeline_v1") my_pipeline.set_input("s3://workbench-public-data/comp_chem/aqsol_public_data.csv") my_pipeline.execute_partial(["model", "endpoint"]) exit(0)

Pipeline

Pipeline: Workbench Pipeline API Class

Common Usage
my_pipeline = Pipeline("name")
my_pipeline.details()
my_pipeline.execute()  # Execute entire pipeline
my_pipeline.execute_partial(["data_source", "feature_set"])
my_pipeline.execute_partial(["model", "endpoint"])
Source code in src/workbench/api/pipeline.py
class Pipeline:
    """Pipeline: Workbench Pipeline API Class

    Common Usage:
        ```python
        my_pipeline = Pipeline("name")
        my_pipeline.details()
        my_pipeline.execute()  # Execute entire pipeline
        my_pipeline.execute_partial(["data_source", "feature_set"])
        my_pipeline.execute_partial(["model", "endpoint"])
        ```
    """

    def __init__(self, name: str):
        """Pipeline Init Method"""
        self.log = logging.getLogger("workbench")
        self.uuid = name

        # Spin up a Parameter Store for Pipelines
        self.prefix = "/workbench/pipelines"
        self.params = ParameterStore()
        self.pipeline = self.params.get(f"{self.prefix}/{self.uuid}")

    def summary(self, **kwargs) -> dict:
        """Retrieve the Pipeline Summary.

        Returns:
            dict: A dictionary of details about the Pipeline
        """
        return self.pipeline

    def details(self, **kwargs) -> dict:
        """Retrieve the Pipeline Details.

        Returns:
            dict: A dictionary of details about the Pipeline
        """
        return self.pipeline

    def health_check(self, **kwargs) -> dict:
        """Retrieve the Pipeline Health Check.

        Returns:
            dict: A dictionary of health check details for the Pipeline
        """
        return {}

    def set_input(self, input: Union[str, pd.DataFrame], artifact: str = "data_source"):
        """Set the input for the Pipeline

        Args:
            input (Union[str, pd.DataFrame]): The input for the Pipeline
            artifact (str): The artifact to set the input for (default: "data_source")
        """
        self.pipeline[artifact]["input"] = input

    def set_training_holdouts(self, id_column: str, holdout_ids: list[str]):
        """Set the input for the Pipeline

        Args:
            id_column (str): The column name of the unique identifier
            holdout_ids (list[str]): The list of unique identifiers to hold out
        """
        self.pipeline["feature_set"]["id_column"] = id_column
        self.pipeline["feature_set"]["holdout_ids"] = holdout_ids

    def execute(self):
        """Execute the entire Pipeline

        Raises:
            RunTimeException: If the pipeline execution fails in any way
        """
        pipeline_executor = PipelineExecutor(self)
        pipeline_executor.execute()

    def execute_partial(self, subset: list):
        """Execute a partial Pipeline

        Args:
            subset (list): A subset of the pipeline to execute

        Raises:
            RunTimeException: If the pipeline execution fails in any way
        """
        pipeline_executor = PipelineExecutor(self)
        pipeline_executor.execute_partial(subset)

    def report_settable_fields(self, pipeline: dict = {}, path: str = "") -> None:
        """
        Recursively finds and prints keys with settable fields in a JSON-like dictionary.

        Args:
        pipeline (dict): pipeline (or sub pipeline) to process.
        path (str): Current path to the key, used for nested dictionaries.
        """
        # Grab the entire pipeline if not provided (first call)
        if not pipeline:
            self.log.important(f"Checking Pipeline: {self.uuid}...")
            pipeline = self.pipeline
        for key, value in pipeline.items():
            if isinstance(value, dict):
                # Recurse into sub-dictionary
                self.report_settable_fields(value, path + key + " -> ")
            elif isinstance(value, str) and value.startswith("<<") and value.endswith(">>"):
                # Check if required or optional
                required = "[Required]" if "required" in value else "[Optional]"
                self.log.important(f"{required} Path: {path + key}")

    def delete(self):
        """Pipeline Deletion"""
        self.log.info(f"Deleting Pipeline: {self.uuid}...")
        self.params.delete(f"{self.prefix}/{self.uuid}")

    def __repr__(self) -> str:
        """String representation of this pipeline

        Returns:
            str: String representation of this pipeline
        """
        # Class name and details
        class_name = self.__class__.__name__
        pipeline_details = json.dumps(self.pipeline, indent=4)
        return f"{class_name}({pipeline_details})"

__init__(name)

Pipeline Init Method

Source code in src/workbench/api/pipeline.py
def __init__(self, name: str):
    """Pipeline Init Method"""
    self.log = logging.getLogger("workbench")
    self.uuid = name

    # Spin up a Parameter Store for Pipelines
    self.prefix = "/workbench/pipelines"
    self.params = ParameterStore()
    self.pipeline = self.params.get(f"{self.prefix}/{self.uuid}")

__repr__()

String representation of this pipeline

Returns:

Name Type Description
str str

String representation of this pipeline

Source code in src/workbench/api/pipeline.py
def __repr__(self) -> str:
    """String representation of this pipeline

    Returns:
        str: String representation of this pipeline
    """
    # Class name and details
    class_name = self.__class__.__name__
    pipeline_details = json.dumps(self.pipeline, indent=4)
    return f"{class_name}({pipeline_details})"

delete()

Pipeline Deletion

Source code in src/workbench/api/pipeline.py
def delete(self):
    """Pipeline Deletion"""
    self.log.info(f"Deleting Pipeline: {self.uuid}...")
    self.params.delete(f"{self.prefix}/{self.uuid}")

details(**kwargs)

Retrieve the Pipeline Details.

Returns:

Name Type Description
dict dict

A dictionary of details about the Pipeline

Source code in src/workbench/api/pipeline.py
def details(self, **kwargs) -> dict:
    """Retrieve the Pipeline Details.

    Returns:
        dict: A dictionary of details about the Pipeline
    """
    return self.pipeline

execute()

Execute the entire Pipeline

Raises:

Type Description
RunTimeException

If the pipeline execution fails in any way

Source code in src/workbench/api/pipeline.py
def execute(self):
    """Execute the entire Pipeline

    Raises:
        RunTimeException: If the pipeline execution fails in any way
    """
    pipeline_executor = PipelineExecutor(self)
    pipeline_executor.execute()

execute_partial(subset)

Execute a partial Pipeline

Parameters:

Name Type Description Default
subset list

A subset of the pipeline to execute

required

Raises:

Type Description
RunTimeException

If the pipeline execution fails in any way

Source code in src/workbench/api/pipeline.py
def execute_partial(self, subset: list):
    """Execute a partial Pipeline

    Args:
        subset (list): A subset of the pipeline to execute

    Raises:
        RunTimeException: If the pipeline execution fails in any way
    """
    pipeline_executor = PipelineExecutor(self)
    pipeline_executor.execute_partial(subset)

health_check(**kwargs)

Retrieve the Pipeline Health Check.

Returns:

Name Type Description
dict dict

A dictionary of health check details for the Pipeline

Source code in src/workbench/api/pipeline.py
def health_check(self, **kwargs) -> dict:
    """Retrieve the Pipeline Health Check.

    Returns:
        dict: A dictionary of health check details for the Pipeline
    """
    return {}

report_settable_fields(pipeline={}, path='')

Recursively finds and prints keys with settable fields in a JSON-like dictionary.

Args: pipeline (dict): pipeline (or sub pipeline) to process. path (str): Current path to the key, used for nested dictionaries.

Source code in src/workbench/api/pipeline.py
def report_settable_fields(self, pipeline: dict = {}, path: str = "") -> None:
    """
    Recursively finds and prints keys with settable fields in a JSON-like dictionary.

    Args:
    pipeline (dict): pipeline (or sub pipeline) to process.
    path (str): Current path to the key, used for nested dictionaries.
    """
    # Grab the entire pipeline if not provided (first call)
    if not pipeline:
        self.log.important(f"Checking Pipeline: {self.uuid}...")
        pipeline = self.pipeline
    for key, value in pipeline.items():
        if isinstance(value, dict):
            # Recurse into sub-dictionary
            self.report_settable_fields(value, path + key + " -> ")
        elif isinstance(value, str) and value.startswith("<<") and value.endswith(">>"):
            # Check if required or optional
            required = "[Required]" if "required" in value else "[Optional]"
            self.log.important(f"{required} Path: {path + key}")

set_input(input, artifact='data_source')

Set the input for the Pipeline

Parameters:

Name Type Description Default
input Union[str, DataFrame]

The input for the Pipeline

required
artifact str

The artifact to set the input for (default: "data_source")

'data_source'
Source code in src/workbench/api/pipeline.py
def set_input(self, input: Union[str, pd.DataFrame], artifact: str = "data_source"):
    """Set the input for the Pipeline

    Args:
        input (Union[str, pd.DataFrame]): The input for the Pipeline
        artifact (str): The artifact to set the input for (default: "data_source")
    """
    self.pipeline[artifact]["input"] = input

set_training_holdouts(id_column, holdout_ids)

Set the input for the Pipeline

Parameters:

Name Type Description Default
id_column str

The column name of the unique identifier

required
holdout_ids list[str]

The list of unique identifiers to hold out

required
Source code in src/workbench/api/pipeline.py
def set_training_holdouts(self, id_column: str, holdout_ids: list[str]):
    """Set the input for the Pipeline

    Args:
        id_column (str): The column name of the unique identifier
        holdout_ids (list[str]): The list of unique identifiers to hold out
    """
    self.pipeline["feature_set"]["id_column"] = id_column
    self.pipeline["feature_set"]["holdout_ids"] = holdout_ids

summary(**kwargs)

Retrieve the Pipeline Summary.

Returns:

Name Type Description
dict dict

A dictionary of details about the Pipeline

Source code in src/workbench/api/pipeline.py
def summary(self, **kwargs) -> dict:
    """Retrieve the Pipeline Summary.

    Returns:
        dict: A dictionary of details about the Pipeline
    """
    return self.pipeline

Examples

Make a Pipeline

Pipelines are just JSON files (see workbench/examples/pipelines/). You can copy one and make changes to fit your objects/use case, or if you have a set of Workbench artifacts created you can 'backtrack' from the Endpoint and have it create the Pipeline for you.

pipeline_manager.py
from workbench.api.pipeline_manager import PipelineManager

 # Create a PipelineManager
my_manager = PipelineManager()

# List the Pipelines
pprint(my_manager.list_pipelines())

# Create a Pipeline from an Endpoint
abalone_pipeline = my_manager.create_from_endpoint("abalone-regression-end")

# Publish the Pipeline
my_manager.publish_pipeline("abalone_pipeline_v1", abalone_pipeline)

Output

Listing Pipelines...
[{'last_modified': datetime.datetime(2024, 4, 16, 21, 10, 6, tzinfo=tzutc()),
  'name': 'abalone_pipeline_v1',
  'size': 445}]
Pipeline Details

pipeline_details.py
from workbench.api.pipeline import Pipeline

# Retrieve an existing Pipeline
my_pipeline = Pipeline("abalone_pipeline_v1")
pprint(my_pipeline.details())

Output

{
    "name": "abalone_pipeline_v1",
    "s3_path": "s3://sandbox/pipelines/abalone_pipeline_v1.json",
    "pipeline": {
        "data_source": {
            "name": "abalone_data",
            "tags": [
                "abalone_data"
            ],
            "input": "/Users/briford/work/workbench/data/abalone.csv"
        },
        "feature_set": {
            "name": "abalone_features",
            "tags": [
                "abalone_features"
            ],
            "input": "abalone_data"
        },
        "model": {
            "name": "abalone-regression",
            "tags": [
                "abalone",
                "regression"
            ],
            "input": "abalone_features"
        },
        ...
    }
}

Pipeline Execution

Pipeline Execution

Executing the Pipeline is obviously the most important reason for creating one. If gives you a reproducible way to capture, inspect, and run the same ML pipeline on different data (nightly).

pipeline_execution.py
from workbench.api.pipeline import Pipeline

# Retrieve an existing Pipeline
my_pipeline = Pipeline("abalone_pipeline_v1")

# Execute the Pipeline
my_pipeline.execute()  # Full execution

# Partial executions
my_pipeline.execute_partial(["data_source", "feature_set"])
my_pipeline.execute_partial(["model", "endpoint"])

Pipelines Advanced

As part of the flexible architecture sometimes DataSources or FeatureSets can be created with a Pandas DataFrame. To support a DataFrame as input to a pipeline we can call the set_input() method to the pipeline object. If you'd like to specify the set_hold_out_ids() you can also provide a list of ids.

    def set_input(self, input: Union[str, pd.DataFrame], artifact: str = "data_source"):
        """Set the input for the Pipeline

        Args:
            input (Union[str, pd.DataFrame]): The input for the Pipeline
            artifact (str): The artifact to set the input for (default: "data_source")
        """
        self.pipeline[artifact]["input"] = input

    def set_hold_out_ids(self, id_list: list):
        """Set the input for the Pipeline

        Args:
           id_list (list): The list of hold out ids
        """
        self.pipeline["feature_set"]["hold_out_ids"] = id_list

Running a pipeline creates and deploys a set of Workbench Artifacts, DataSource, FeatureSet, Model and Endpoint. These artifacts can be viewed in the Sagemaker Console/Notebook interfaces or in the Workbench Dashboard UI.

Not Finding a particular method?

The Workbench API Classes use the 'Core' Classes Internally, so for an extensive listing of all the methods available please take a deep dive into: Workbench Core Classes