Skip to content

Pipelines

Pipeline Examples

Examples of using the Pipeline classes are listed at the bottom of this page Examples.

Pipelines store sequences of SageWorks transforms. So if you have a nightly ML workflow you can capture that as a Pipeline. Here's an example pipeline:

nightly_sol_pipeline_v1.json
{
    "data_source": {
         "name": "nightly_data",
         "tags": ["solubility", "foo"],
         "s3_input": "s3://blah/blah.csv"
    },
    "feature_set": {
          "name": "nightly_features",
          "tags": ["blah", "blah"],
          "input": "nightly_data"
          "schema": "mol_descriptors_v1"
    },
    "model": {
          "name": nightly_model,
          "tags": ["blah", "blah"],
          "features": ["col1", "col2"],
          "target": sol,
          "input": nightly_features
    "endpoint": {
          ...
}    

PipelineManager: Manages SageWorks Pipelines, listing, creating, and saving them.

PipelineManager

PipelineManager: Manages SageWorks Pipelines, listing, creating, and saving them.

Common Usage
my_manager = PipelineManager()
my_manager.list_pipelines()
abalone_pipeline = my_manager.create_from_endpoint("abalone-regression-end")
my_manager.save_pipeline("abalone_pipeline_v1", abalone_pipeline)
Source code in src/sageworks/api/pipeline_manager.py
class PipelineManager:
    """PipelineManager: Manages SageWorks Pipelines, listing, creating, and saving them.

    Common Usage:
        ```python
        my_manager = PipelineManager()
        my_manager.list_pipelines()
        abalone_pipeline = my_manager.create_from_endpoint("abalone-regression-end")
        my_manager.save_pipeline("abalone_pipeline_v1", abalone_pipeline)
        ```
    """

    def __init__(self):
        """Pipeline Init Method"""
        self.log = logging.getLogger("sageworks")

        # Grab our SageWorks Bucket from Config
        self.cm = ConfigManager()
        self.sageworks_bucket = self.cm.get_config("SAGEWORKS_BUCKET")
        if self.sageworks_bucket is None:
            self.log = logging.getLogger("sageworks")
            self.log.critical("Could not find ENV var for SAGEWORKS_BUCKET!")
            sys.exit(1)

        # Set the S3 Path for Pipelines
        self.bucket = self.sageworks_bucket
        self.prefix = "pipelines/"
        self.pipelines_s3_path = f"s3://{self.sageworks_bucket}/pipelines/"

        # Grab a SageWorks Session (this allows us to assume the SageWorks ExecutionRole)
        self.boto3_session = AWSAccountClamp().boto3_session

        # Read all the Pipelines from this S3 path
        self.s3_client = self.boto3_session.client("s3")

    def list_pipelines(self) -> list:
        """List all the Pipelines in the S3 Bucket

        Returns:
            list: A list of Pipeline names and details
        """
        # List objects using the S3 client
        response = self.s3_client.list_objects_v2(Bucket=self.bucket, Prefix=self.prefix)

        # Check if there are objects
        if "Contents" in response:
            # Process the list of dictionaries (we only need the filename, the LastModified, and the Size)
            pipelines = [
                {
                    "name": pipeline["Key"].split("/")[-1].replace(".json", ""),
                    "last_modified": pipeline["LastModified"],
                    "size": pipeline["Size"],
                }
                for pipeline in response["Contents"]
            ]
            return pipelines
        else:
            self.log.important(f"No pipelines found at {self.pipelines_s3_path}...")
            return []

    # Create a new Pipeline from an Endpoint
    def create_from_endpoint(self, endpoint_name: str) -> dict:
        """Create a Pipeline from an Endpoint

        Args:
            endpoint_name (str): The name of the Endpoint

        Returns:
            dict: A dictionary of the Pipeline
        """
        self.log.important(f"Creating Pipeline from Endpoint: {endpoint_name}...")
        pipeline = {}
        endpoint = Endpoint(endpoint_name)
        model = Model(endpoint.get_input())
        feature_set = FeatureSet(model.get_input())
        data_source = DataSource(feature_set.get_input())
        s3_source = data_source.get_input()
        for name in ["data_source", "feature_set", "model", "endpoint"]:
            artifact = locals()[name]
            pipeline[name] = {"name": artifact.uuid, "tags": artifact.get_tags(), "input": artifact.get_input()}
            if name == "model":
                pipeline[name]["model_type"] = artifact.model_type.value
                pipeline[name]["target_column"] = artifact.target()
                pipeline[name]["feature_list"] = artifact.features()

        # Return the Pipeline
        return pipeline

    # Publish a Pipeline to SageWorks
    def publish_pipeline(self, name: str, pipeline: dict):
        """Save a Pipeline to S3

        Args:
            name (str): The name of the Pipeline
            pipeline (dict): The Pipeline to save
        """
        key = f"{self.prefix}{name}.json"
        self.log.important(f"Saving {name} to S3: {self.bucket}/{key}...")

        # Save the pipeline as an S3 JSON object
        self.s3_client.put_object(Body=json.dumps(pipeline, indent=4), Bucket=self.bucket, Key=key)

    def delete_pipeline(self, name: str):
        """Delete a Pipeline from S3

        Args:
            name (str): The name of the Pipeline to delete
        """
        key = f"{self.prefix}{name}.json"
        self.log.important(f"Deleting {name} from S3: {self.bucket}/{key}...")

        # Delete the pipeline object from S3
        self.s3_client.delete_object(Bucket=self.bucket, Key=key)

    # Save a Pipeline to a local file
    def save_pipeline_to_file(self, pipeline: dict, filepath: str):
        """Save a Pipeline to a local file

        Args:
            pipeline (dict): The Pipeline to save
            filepath (str): The path to save the Pipeline
        """

        # Sanity check the filepath
        if not filepath.endswith(".json"):
            filepath += ".json"

        # Save the pipeline as a local JSON file
        with open(filepath, "w") as fp:
            json.dump(pipeline, fp, indent=4)

    def load_pipeline_from_file(self, filepath: str) -> dict:
        """Load a Pipeline from a local file

        Args:
            filepath (str): The path of the Pipeline to load

        Returns:
            dict: The Pipeline loaded from the file
        """

        # Load a pipeline as a local JSON file
        with open(filepath, "r") as fp:
            pipeline = json.load(fp)
            return pipeline

    def publish_pipeline_from_file(self, filepath: str):
        """Publish a Pipeline to SageWorks from a local file

        Args:
            filepath (str): The path of the Pipeline to publish
        """

        # Load a pipeline as a local JSON file
        pipeline = self.load_pipeline_from_file(filepath)

        # Get the pipeline name
        pipeline_name = filepath.split("/")[-1].replace(".json", "")

        # Publish the Pipeline
        self.publish_pipeline(pipeline_name, pipeline)

__init__()

Pipeline Init Method

Source code in src/sageworks/api/pipeline_manager.py
def __init__(self):
    """Pipeline Init Method"""
    self.log = logging.getLogger("sageworks")

    # Grab our SageWorks Bucket from Config
    self.cm = ConfigManager()
    self.sageworks_bucket = self.cm.get_config("SAGEWORKS_BUCKET")
    if self.sageworks_bucket is None:
        self.log = logging.getLogger("sageworks")
        self.log.critical("Could not find ENV var for SAGEWORKS_BUCKET!")
        sys.exit(1)

    # Set the S3 Path for Pipelines
    self.bucket = self.sageworks_bucket
    self.prefix = "pipelines/"
    self.pipelines_s3_path = f"s3://{self.sageworks_bucket}/pipelines/"

    # Grab a SageWorks Session (this allows us to assume the SageWorks ExecutionRole)
    self.boto3_session = AWSAccountClamp().boto3_session

    # Read all the Pipelines from this S3 path
    self.s3_client = self.boto3_session.client("s3")

create_from_endpoint(endpoint_name)

Create a Pipeline from an Endpoint

Parameters:

Name Type Description Default
endpoint_name str

The name of the Endpoint

required

Returns:

Name Type Description
dict dict

A dictionary of the Pipeline

Source code in src/sageworks/api/pipeline_manager.py
def create_from_endpoint(self, endpoint_name: str) -> dict:
    """Create a Pipeline from an Endpoint

    Args:
        endpoint_name (str): The name of the Endpoint

    Returns:
        dict: A dictionary of the Pipeline
    """
    self.log.important(f"Creating Pipeline from Endpoint: {endpoint_name}...")
    pipeline = {}
    endpoint = Endpoint(endpoint_name)
    model = Model(endpoint.get_input())
    feature_set = FeatureSet(model.get_input())
    data_source = DataSource(feature_set.get_input())
    s3_source = data_source.get_input()
    for name in ["data_source", "feature_set", "model", "endpoint"]:
        artifact = locals()[name]
        pipeline[name] = {"name": artifact.uuid, "tags": artifact.get_tags(), "input": artifact.get_input()}
        if name == "model":
            pipeline[name]["model_type"] = artifact.model_type.value
            pipeline[name]["target_column"] = artifact.target()
            pipeline[name]["feature_list"] = artifact.features()

    # Return the Pipeline
    return pipeline

delete_pipeline(name)

Delete a Pipeline from S3

Parameters:

Name Type Description Default
name str

The name of the Pipeline to delete

required
Source code in src/sageworks/api/pipeline_manager.py
def delete_pipeline(self, name: str):
    """Delete a Pipeline from S3

    Args:
        name (str): The name of the Pipeline to delete
    """
    key = f"{self.prefix}{name}.json"
    self.log.important(f"Deleting {name} from S3: {self.bucket}/{key}...")

    # Delete the pipeline object from S3
    self.s3_client.delete_object(Bucket=self.bucket, Key=key)

list_pipelines()

List all the Pipelines in the S3 Bucket

Returns:

Name Type Description
list list

A list of Pipeline names and details

Source code in src/sageworks/api/pipeline_manager.py
def list_pipelines(self) -> list:
    """List all the Pipelines in the S3 Bucket

    Returns:
        list: A list of Pipeline names and details
    """
    # List objects using the S3 client
    response = self.s3_client.list_objects_v2(Bucket=self.bucket, Prefix=self.prefix)

    # Check if there are objects
    if "Contents" in response:
        # Process the list of dictionaries (we only need the filename, the LastModified, and the Size)
        pipelines = [
            {
                "name": pipeline["Key"].split("/")[-1].replace(".json", ""),
                "last_modified": pipeline["LastModified"],
                "size": pipeline["Size"],
            }
            for pipeline in response["Contents"]
        ]
        return pipelines
    else:
        self.log.important(f"No pipelines found at {self.pipelines_s3_path}...")
        return []

load_pipeline_from_file(filepath)

Load a Pipeline from a local file

Parameters:

Name Type Description Default
filepath str

The path of the Pipeline to load

required

Returns:

Name Type Description
dict dict

The Pipeline loaded from the file

Source code in src/sageworks/api/pipeline_manager.py
def load_pipeline_from_file(self, filepath: str) -> dict:
    """Load a Pipeline from a local file

    Args:
        filepath (str): The path of the Pipeline to load

    Returns:
        dict: The Pipeline loaded from the file
    """

    # Load a pipeline as a local JSON file
    with open(filepath, "r") as fp:
        pipeline = json.load(fp)
        return pipeline

publish_pipeline(name, pipeline)

Save a Pipeline to S3

Parameters:

Name Type Description Default
name str

The name of the Pipeline

required
pipeline dict

The Pipeline to save

required
Source code in src/sageworks/api/pipeline_manager.py
def publish_pipeline(self, name: str, pipeline: dict):
    """Save a Pipeline to S3

    Args:
        name (str): The name of the Pipeline
        pipeline (dict): The Pipeline to save
    """
    key = f"{self.prefix}{name}.json"
    self.log.important(f"Saving {name} to S3: {self.bucket}/{key}...")

    # Save the pipeline as an S3 JSON object
    self.s3_client.put_object(Body=json.dumps(pipeline, indent=4), Bucket=self.bucket, Key=key)

publish_pipeline_from_file(filepath)

Publish a Pipeline to SageWorks from a local file

Parameters:

Name Type Description Default
filepath str

The path of the Pipeline to publish

required
Source code in src/sageworks/api/pipeline_manager.py
def publish_pipeline_from_file(self, filepath: str):
    """Publish a Pipeline to SageWorks from a local file

    Args:
        filepath (str): The path of the Pipeline to publish
    """

    # Load a pipeline as a local JSON file
    pipeline = self.load_pipeline_from_file(filepath)

    # Get the pipeline name
    pipeline_name = filepath.split("/")[-1].replace(".json", "")

    # Publish the Pipeline
    self.publish_pipeline(pipeline_name, pipeline)

save_pipeline_to_file(pipeline, filepath)

Save a Pipeline to a local file

Parameters:

Name Type Description Default
pipeline dict

The Pipeline to save

required
filepath str

The path to save the Pipeline

required
Source code in src/sageworks/api/pipeline_manager.py
def save_pipeline_to_file(self, pipeline: dict, filepath: str):
    """Save a Pipeline to a local file

    Args:
        pipeline (dict): The Pipeline to save
        filepath (str): The path to save the Pipeline
    """

    # Sanity check the filepath
    if not filepath.endswith(".json"):
        filepath += ".json"

    # Save the pipeline as a local JSON file
    with open(filepath, "w") as fp:
        json.dump(pipeline, fp, indent=4)

Pipeline: Manages the details around a SageWorks Pipeline, including Execution

Pipeline

Pipeline: SageWorks Pipeline API Class

Common Usage
my_pipeline = Pipeline("name")
my_pipeline.details()
my_pipeline.execute()  # Execute entire pipeline
my_pipeline.execute_partial(["data_source", "feature_set"])
my_pipeline.execute_partial(["model", "endpoint"])
Source code in src/sageworks/api/pipeline.py
class Pipeline:
    """Pipeline: SageWorks Pipeline API Class

    Common Usage:
        ```python
        my_pipeline = Pipeline("name")
        my_pipeline.details()
        my_pipeline.execute()  # Execute entire pipeline
        my_pipeline.execute_partial(["data_source", "feature_set"])
        my_pipeline.execute_partial(["model", "endpoint"])
        ```
    """

    def __init__(self, name: str):
        """Pipeline Init Method"""
        self.log = logging.getLogger("sageworks")
        self.name = name

        # Grab our SageWorks Bucket from Config
        self.cm = ConfigManager()
        self.sageworks_bucket = self.cm.get_config("SAGEWORKS_BUCKET")
        if self.sageworks_bucket is None:
            self.log = logging.getLogger("sageworks")
            self.log.critical("Could not find ENV var for SAGEWORKS_BUCKET!")
            sys.exit(1)

        # Set the S3 Path for this Pipeline
        self.bucket = self.sageworks_bucket
        self.key = f"pipelines/{self.name}.json"
        self.s3_path = f"s3://{self.bucket}/{self.key}"

        # Grab a SageWorks Session (this allows us to assume the SageWorks ExecutionRole)
        self.boto3_session = AWSAccountClamp().boto3_session
        self.s3_client = self.boto3_session.client("s3")

        # If this S3 Path exists, load the Pipeline
        if wr.s3.does_object_exist(self.s3_path):
            self.pipeline = self._get_pipeline()
        else:
            self.log.warning(f"Pipeline {self.name} not found at {self.s3_path}")
            self.pipeline = None

    def set_input(self, input: Union[str, pd.DataFrame], artifact: str = "data_source"):
        """Set the input for the Pipeline

        Args:
            input (Union[str, pd.DataFrame]): The input for the Pipeline
            artifact (str): The artifact to set the input for (default: "data_source")
        """
        self.pipeline[artifact]["input"] = input

    def set_training_holdouts(self, id_column: str, holdout_ids: list[str]):
        """Set the input for the Pipeline

        Args:
            id_column (str): The column name of the unique identifier
            holdout_ids (list[str]): The list of unique identifiers to hold out
        """
        self.pipeline["feature_set"]["id_column"] = id_column
        self.pipeline["feature_set"]["holdout_ids"] = holdout_ids

    def execute(self):
        """Execute the entire Pipeline

        Raises:
            RunTimeException: If the pipeline execution fails in any way
        """
        pipeline_executor = PipelineExecutor(self)
        pipeline_executor.execute()

    def execute_partial(self, subset: list):
        """Execute a partial Pipeline

        Args:
            subset (list): A subset of the pipeline to execute

        Raises:
            RunTimeException: If the pipeline execution fails in any way
        """
        pipeline_executor = PipelineExecutor(self)
        pipeline_executor.execute_partial(subset)

    def report_settable_fields(self, pipeline: dict = {}, path: str = "") -> None:
        """
        Recursively finds and prints keys with settable fields in a JSON-like dictionary.

        Args:
        pipeline (dict): pipeline (or sub pipeline) to process.
        path (str): Current path to the key, used for nested dictionaries.
        """
        # Grab the entire pipeline if not provided (first call)
        if not pipeline:
            self.log.important(f"Checking Pipeline: {self.name}...")
            pipeline = self.pipeline
        for key, value in pipeline.items():
            if isinstance(value, dict):
                # Recurse into sub-dictionary
                self.report_settable_fields(value, path + key + " -> ")
            elif isinstance(value, str) and value.startswith("<<") and value.endswith(">>"):
                # Check if required or optional
                required = "[Required]" if "required" in value else "[Optional]"
                self.log.important(f"{required} Path: {path + key}")

    def delete(self):
        """Pipeline Deletion"""
        self.log.info(f"Deleting Pipeline: {self.name}...")
        wr.s3.delete_objects(self.s3_path)

    def _get_pipeline(self) -> dict:
        """Internal: Get the pipeline as a JSON object from the specified S3 bucket and key."""
        response = self.s3_client.get_object(Bucket=self.bucket, Key=self.key)
        json_object = json.loads(response["Body"].read())
        return json_object

    def __repr__(self) -> str:
        """String representation of this pipeline

        Returns:
            str: String representation of this pipeline
        """
        # Class name and details
        class_name = self.__class__.__name__
        pipeline_details = json.dumps(self.pipeline, indent=4)
        return f"{class_name}({pipeline_details})"

__init__(name)

Pipeline Init Method

Source code in src/sageworks/api/pipeline.py
def __init__(self, name: str):
    """Pipeline Init Method"""
    self.log = logging.getLogger("sageworks")
    self.name = name

    # Grab our SageWorks Bucket from Config
    self.cm = ConfigManager()
    self.sageworks_bucket = self.cm.get_config("SAGEWORKS_BUCKET")
    if self.sageworks_bucket is None:
        self.log = logging.getLogger("sageworks")
        self.log.critical("Could not find ENV var for SAGEWORKS_BUCKET!")
        sys.exit(1)

    # Set the S3 Path for this Pipeline
    self.bucket = self.sageworks_bucket
    self.key = f"pipelines/{self.name}.json"
    self.s3_path = f"s3://{self.bucket}/{self.key}"

    # Grab a SageWorks Session (this allows us to assume the SageWorks ExecutionRole)
    self.boto3_session = AWSAccountClamp().boto3_session
    self.s3_client = self.boto3_session.client("s3")

    # If this S3 Path exists, load the Pipeline
    if wr.s3.does_object_exist(self.s3_path):
        self.pipeline = self._get_pipeline()
    else:
        self.log.warning(f"Pipeline {self.name} not found at {self.s3_path}")
        self.pipeline = None

__repr__()

String representation of this pipeline

Returns:

Name Type Description
str str

String representation of this pipeline

Source code in src/sageworks/api/pipeline.py
def __repr__(self) -> str:
    """String representation of this pipeline

    Returns:
        str: String representation of this pipeline
    """
    # Class name and details
    class_name = self.__class__.__name__
    pipeline_details = json.dumps(self.pipeline, indent=4)
    return f"{class_name}({pipeline_details})"

delete()

Pipeline Deletion

Source code in src/sageworks/api/pipeline.py
def delete(self):
    """Pipeline Deletion"""
    self.log.info(f"Deleting Pipeline: {self.name}...")
    wr.s3.delete_objects(self.s3_path)

execute()

Execute the entire Pipeline

Raises:

Type Description
RunTimeException

If the pipeline execution fails in any way

Source code in src/sageworks/api/pipeline.py
def execute(self):
    """Execute the entire Pipeline

    Raises:
        RunTimeException: If the pipeline execution fails in any way
    """
    pipeline_executor = PipelineExecutor(self)
    pipeline_executor.execute()

execute_partial(subset)

Execute a partial Pipeline

Parameters:

Name Type Description Default
subset list

A subset of the pipeline to execute

required

Raises:

Type Description
RunTimeException

If the pipeline execution fails in any way

Source code in src/sageworks/api/pipeline.py
def execute_partial(self, subset: list):
    """Execute a partial Pipeline

    Args:
        subset (list): A subset of the pipeline to execute

    Raises:
        RunTimeException: If the pipeline execution fails in any way
    """
    pipeline_executor = PipelineExecutor(self)
    pipeline_executor.execute_partial(subset)

report_settable_fields(pipeline={}, path='')

Recursively finds and prints keys with settable fields in a JSON-like dictionary.

Args: pipeline (dict): pipeline (or sub pipeline) to process. path (str): Current path to the key, used for nested dictionaries.

Source code in src/sageworks/api/pipeline.py
def report_settable_fields(self, pipeline: dict = {}, path: str = "") -> None:
    """
    Recursively finds and prints keys with settable fields in a JSON-like dictionary.

    Args:
    pipeline (dict): pipeline (or sub pipeline) to process.
    path (str): Current path to the key, used for nested dictionaries.
    """
    # Grab the entire pipeline if not provided (first call)
    if not pipeline:
        self.log.important(f"Checking Pipeline: {self.name}...")
        pipeline = self.pipeline
    for key, value in pipeline.items():
        if isinstance(value, dict):
            # Recurse into sub-dictionary
            self.report_settable_fields(value, path + key + " -> ")
        elif isinstance(value, str) and value.startswith("<<") and value.endswith(">>"):
            # Check if required or optional
            required = "[Required]" if "required" in value else "[Optional]"
            self.log.important(f"{required} Path: {path + key}")

set_input(input, artifact='data_source')

Set the input for the Pipeline

Parameters:

Name Type Description Default
input Union[str, DataFrame]

The input for the Pipeline

required
artifact str

The artifact to set the input for (default: "data_source")

'data_source'
Source code in src/sageworks/api/pipeline.py
def set_input(self, input: Union[str, pd.DataFrame], artifact: str = "data_source"):
    """Set the input for the Pipeline

    Args:
        input (Union[str, pd.DataFrame]): The input for the Pipeline
        artifact (str): The artifact to set the input for (default: "data_source")
    """
    self.pipeline[artifact]["input"] = input

set_training_holdouts(id_column, holdout_ids)

Set the input for the Pipeline

Parameters:

Name Type Description Default
id_column str

The column name of the unique identifier

required
holdout_ids list[str]

The list of unique identifiers to hold out

required
Source code in src/sageworks/api/pipeline.py
def set_training_holdouts(self, id_column: str, holdout_ids: list[str]):
    """Set the input for the Pipeline

    Args:
        id_column (str): The column name of the unique identifier
        holdout_ids (list[str]): The list of unique identifiers to hold out
    """
    self.pipeline["feature_set"]["id_column"] = id_column
    self.pipeline["feature_set"]["holdout_ids"] = holdout_ids

Examples

Make a Pipeline

Pipelines are just JSON files (see sageworks/examples/pipelines/). You can copy one and make changes to fit your objects/use case, or if you have a set of SageWorks artifacts created you can 'backtrack' from the Endpoint and have it create the Pipeline for you.

pipeline_manager.py
from sageworks.api.pipeline_manager import PipelineManager

 # Create a PipelineManager
my_manager = PipelineManager()

# List the Pipelines
pprint(my_manager.list_pipelines())

# Create a Pipeline from an Endpoint
abalone_pipeline = my_manager.create_from_endpoint("abalone-regression-end")

# Publish the Pipeline
my_manager.publish_pipeline("abalone_pipeline_v1", abalone_pipeline)

Output

Listing Pipelines...
[{'last_modified': datetime.datetime(2024, 4, 16, 21, 10, 6, tzinfo=tzutc()),
  'name': 'abalone_pipeline_v1',
  'size': 445}]
Pipeline Details

pipeline_details.py
from sageworks.api.pipeline import Pipeline

# Retrieve an existing Pipeline
my_pipeline = Pipeline("abalone_pipeline_v1")
pprint(my_pipeline.details())

Output

{
    "name": "abalone_pipeline_v1",
    "s3_path": "s3://sandbox/pipelines/abalone_pipeline_v1.json",
    "pipeline": {
        "data_source": {
            "name": "abalone_data",
            "tags": [
                "abalone_data"
            ],
            "input": "/Users/briford/work/sageworks/data/abalone.csv"
        },
        "feature_set": {
            "name": "abalone_features",
            "tags": [
                "abalone_features"
            ],
            "input": "abalone_data"
        },
        "model": {
            "name": "abalone-regression",
            "tags": [
                "abalone",
                "regression"
            ],
            "input": "abalone_features"
        },
        ...
    }
}

Pipeline Execution

Pipeline Execution

Executing the Pipeline is obviously the most important reason for creating one. If gives you a reproducible way to capture, inspect, and run the same ML pipeline on different data (nightly).

pipeline_execution.py
from sageworks.api.pipeline import Pipeline

# Retrieve an existing Pipeline
my_pipeline = Pipeline("abalone_pipeline_v1")

# Execute the Pipeline
my_pipeline.execute()  # Full execution

# Partial executions
my_pipeline.execute_partial(["data_source", "feature_set"])
my_pipeline.execute_partial(["model", "endpoint"])

Pipelines Advanced

As part of the flexible architecture sometimes DataSources or FeatureSets can be created with a Pandas DataFrame. To support a DataFrame as input to a pipeline we can call the set_input() method to the pipeline object. If you'd like to specify the set_hold_out_ids() you can also provide a list of ids.

    def set_input(self, input: Union[str, pd.DataFrame], artifact: str = "data_source"):
        """Set the input for the Pipeline

        Args:
            input (Union[str, pd.DataFrame]): The input for the Pipeline
            artifact (str): The artifact to set the input for (default: "data_source")
        """
        self.pipeline[artifact]["input"] = input

    def set_hold_out_ids(self, id_list: list):
        """Set the input for the Pipeline

        Args:
           id_list (list): The list of hold out ids
        """
        self.pipeline["feature_set"]["hold_out_ids"] = id_list

Running a pipeline creates and deploys a set of SageWorks Artifacts, DataSource, FeatureSet, Model and Endpoint. These artifacts can be viewed in the Sagemaker Console/Notebook interfaces or in the SageWorks Dashboard UI.

Not Finding a particular method?

The SageWorks API Classes use the 'Core' Classes Internally, so for an extensive listing of all the methods available please take a deep dive into: SageWorks Core Classes