Skip to content

Endpoint

Endpoint Examples

Examples of using the Endpoint class are listed at the bottom of this page Examples.

Endpoints manage AWS SageMaker endpoint creation, deployment, and inference. They handle model hosting, auto-scaling, data capture, and performance monitoring. The API is simple: send a DataFrame, get a DataFrame back. Workbench endpoints run on a modern ASGI stack (Uvicorn + FastAPI) and every endpoint follows this same DataFrame-in, DataFrame-out contract.

For long-running inference workloads (>60s per invocation), see AsyncEndpoint.

Workbench endpoint request flow: Uvicorn → FastAPI → Model Script
Every Workbench endpoint runs on Uvicorn + FastAPI. Any client that can make an HTTP request gets the same results.

Endpoint: Manages AWS Endpoint creation and deployment. Endpoints are automatically set up and provisioned for deployment into AWS. Endpoints can be viewed in the AWS Sagemaker interfaces or in the Workbench Dashboard UI, which provides additional model details and performance metrics

Endpoint

Bases: EndpointCore

Endpoint: Workbench Endpoint API Class

Common Usage
my_endpoint = Endpoint(name)
my_endpoint.details()
my_endpoint.inference(eval_df)

If the underlying endpoint was deployed as async (workbench_meta["async_endpoint"]), inference() / fast_inference() transparently route through an internal async core so callers get correct behavior from a single object.

For feature endpoints (those that emit registered feature columns), use :meth:feature_list to retrieve the column list.

Source code in src/workbench/api/endpoint.py
class Endpoint(EndpointCore):
    """Endpoint: Workbench Endpoint API Class

    Common Usage:
        ```python
        my_endpoint = Endpoint(name)
        my_endpoint.details()
        my_endpoint.inference(eval_df)
        ```

    If the underlying endpoint was deployed as async (``workbench_meta["async_endpoint"]``),
    ``inference()`` / ``fast_inference()`` transparently route through an internal
    async core so callers get correct behavior from a single object.

    For feature endpoints (those that emit registered feature columns), use
    :meth:`feature_list` to retrieve the column list.
    """

    def __init__(self, endpoint_name: str):
        super().__init__(endpoint_name)
        self._async = None
        if self.exists() and (self.workbench_meta() or {}).get("async_endpoint"):
            from workbench.core.artifacts.async_endpoint_core import AsyncEndpointCore

            self._async = AsyncEndpointCore(endpoint_name)

    def details(self, **kwargs) -> dict:
        """Endpoint Details

        Returns:
            dict: A dictionary of details about the Endpoint
        """
        return super().details(**kwargs)

    def inference(
        self,
        eval_df: pd.DataFrame,
        capture_name: str = None,
        id_column: str = None,
        drop_error_rows: bool = False,
        include_quantiles: bool = False,
    ) -> pd.DataFrame:
        """Run inference on the Endpoint using the provided DataFrame

        Args:
            eval_df (pd.DataFrame): The DataFrame to run predictions on
            capture_name (str, optional): The Name of the capture to use (default: None)
            id_column (str, optional): The name of the column to use as the ID (default: None)
            drop_error_rows (bool): Whether to drop rows with errors (default: False)
            include_quantiles (bool): Include q_* quantile columns in saved output (default: False)

        Returns:
            pd.DataFrame: The DataFrame with predictions
        """
        if self._async is not None:
            return self._async.inference(eval_df, capture_name, id_column, drop_error_rows, include_quantiles)
        return super().inference(eval_df, capture_name, id_column, drop_error_rows, include_quantiles)

    def auto_inference(self) -> pd.DataFrame:
        """Run inference on the Endpoint using the test data from the model training view

        Returns:
            pd.DataFrame: The DataFrame with predictions
        """
        return super().auto_inference()

    def full_inference(self) -> pd.DataFrame:
        """Run inference on the Endpoint using the full data from the model training view

        Returns:
            pd.DataFrame: The DataFrame with predictions
        """
        return super().full_inference()

    def ts_inference(self, date_column: str, after_date: str, exclude_ids: list = None) -> pd.DataFrame:
        """Run temporal hold-out inference on this Endpoint.

        Re-runs the temporal split on the FeatureSet data to identify holdout rows
        (those with date > after_date), then runs inference on that holdout set.

        Args:
            date_column (str): Name of the date column.
            after_date (str): Run inference on rows strictly after this date.
            exclude_ids (list): IDs to exclude from the holdout set (e.g., anomalous
                compounds from compute_sample_weights).

        Returns:
            pd.DataFrame: DataFrame with the inference results (empty if no hold-out rows)
        """
        return super().ts_inference(date_column, after_date=after_date, exclude_ids=exclude_ids)

    def fast_inference(self, eval_df: pd.DataFrame, threads: int = 4) -> pd.DataFrame:
        """Run inference on the Endpoint using the provided DataFrame

        Args:
            eval_df (pd.DataFrame): The DataFrame to run predictions on
            threads (int): The number of threads to use (default: 4)

        Returns:
            pd.DataFrame: The DataFrame with predictions

        Note:
            There's no sanity checks or error handling... just FAST Inference!
        """
        if self._async is not None:
            return self._async.fast_inference(eval_df, threads=threads)
        return super().fast_inference(eval_df, threads=threads)

    def cross_fold_inference(self, include_quantiles: bool = False) -> pd.DataFrame:
        """Pull cross-fold inference from model associated with this Endpoint

        Args:
            include_quantiles (bool): Include q_* quantile columns in saved output (default: False)

        Returns:
            pd.DataFrame: A DataFrame with cross fold predictions
        """
        return super().cross_fold_inference(include_quantiles)

    def output_columns(self) -> List[str]:
        """Return this endpoint's registered output columns.

        Works for any endpoint that emits new columns during inference:
        feature endpoints emit computed feature columns; predictor endpoints
        emit prediction / confidence / quantile columns.

        Cached at ``/workbench/endpoints/<name>/output_columns``; lazily
        populated by :func:`workbench.utils.endpoint_utils.register_output_columns`
        on first call (smoke inference) and refreshed when the endpoint is
        redeployed.
        """
        from workbench.utils.endpoint_utils import (
            lookup_cached_columns,
            output_columns_key,
            register_output_columns,
        )

        return lookup_cached_columns(self, output_columns_key(self.name), register_output_columns, "output columns")

    def input_columns(self) -> List[str]:
        """Return this endpoint's declared input columns.

        The columns the endpoint consumes during inference (e.g. ``["smiles"]``
        for a feature endpoint, or the model's training features for a
        predictor endpoint).

        Cached at ``/workbench/endpoints/<name>/input_columns``; lazily
        populated by :func:`workbench.utils.endpoint_utils.register_input_columns`
        on first call (reads ``model.features()``) and refreshed when the
        endpoint is redeployed.
        """
        from workbench.utils.endpoint_utils import (
            input_columns_key,
            lookup_cached_columns,
            register_input_columns,
        )

        return lookup_cached_columns(self, input_columns_key(self.name), register_input_columns, "input columns")

    def inference_batch_size(self) -> int:
        """Return the per-invocation batch size declared for this endpoint.

        Reads ``workbench_meta["inference_batch_size"]`` if set; otherwise
        returns the framework default — 10 for async endpoints, 100 for sync.
        """
        meta = self.workbench_meta() or {}
        if "inference_batch_size" in meta:
            return int(meta["inference_batch_size"])
        return 10 if meta.get("async_endpoint") else 100

auto_inference()

Run inference on the Endpoint using the test data from the model training view

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with predictions

Source code in src/workbench/api/endpoint.py
def auto_inference(self) -> pd.DataFrame:
    """Run inference on the Endpoint using the test data from the model training view

    Returns:
        pd.DataFrame: The DataFrame with predictions
    """
    return super().auto_inference()

cross_fold_inference(include_quantiles=False)

Pull cross-fold inference from model associated with this Endpoint

Parameters:

Name Type Description Default
include_quantiles bool

Include q_* quantile columns in saved output (default: False)

False

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame with cross fold predictions

Source code in src/workbench/api/endpoint.py
def cross_fold_inference(self, include_quantiles: bool = False) -> pd.DataFrame:
    """Pull cross-fold inference from model associated with this Endpoint

    Args:
        include_quantiles (bool): Include q_* quantile columns in saved output (default: False)

    Returns:
        pd.DataFrame: A DataFrame with cross fold predictions
    """
    return super().cross_fold_inference(include_quantiles)

details(**kwargs)

Endpoint Details

Returns:

Name Type Description
dict dict

A dictionary of details about the Endpoint

Source code in src/workbench/api/endpoint.py
def details(self, **kwargs) -> dict:
    """Endpoint Details

    Returns:
        dict: A dictionary of details about the Endpoint
    """
    return super().details(**kwargs)

fast_inference(eval_df, threads=4)

Run inference on the Endpoint using the provided DataFrame

Parameters:

Name Type Description Default
eval_df DataFrame

The DataFrame to run predictions on

required
threads int

The number of threads to use (default: 4)

4

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with predictions

Note

There's no sanity checks or error handling... just FAST Inference!

Source code in src/workbench/api/endpoint.py
def fast_inference(self, eval_df: pd.DataFrame, threads: int = 4) -> pd.DataFrame:
    """Run inference on the Endpoint using the provided DataFrame

    Args:
        eval_df (pd.DataFrame): The DataFrame to run predictions on
        threads (int): The number of threads to use (default: 4)

    Returns:
        pd.DataFrame: The DataFrame with predictions

    Note:
        There's no sanity checks or error handling... just FAST Inference!
    """
    if self._async is not None:
        return self._async.fast_inference(eval_df, threads=threads)
    return super().fast_inference(eval_df, threads=threads)

full_inference()

Run inference on the Endpoint using the full data from the model training view

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with predictions

Source code in src/workbench/api/endpoint.py
def full_inference(self) -> pd.DataFrame:
    """Run inference on the Endpoint using the full data from the model training view

    Returns:
        pd.DataFrame: The DataFrame with predictions
    """
    return super().full_inference()

inference(eval_df, capture_name=None, id_column=None, drop_error_rows=False, include_quantiles=False)

Run inference on the Endpoint using the provided DataFrame

Parameters:

Name Type Description Default
eval_df DataFrame

The DataFrame to run predictions on

required
capture_name str

The Name of the capture to use (default: None)

None
id_column str

The name of the column to use as the ID (default: None)

None
drop_error_rows bool

Whether to drop rows with errors (default: False)

False
include_quantiles bool

Include q_* quantile columns in saved output (default: False)

False

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with predictions

Source code in src/workbench/api/endpoint.py
def inference(
    self,
    eval_df: pd.DataFrame,
    capture_name: str = None,
    id_column: str = None,
    drop_error_rows: bool = False,
    include_quantiles: bool = False,
) -> pd.DataFrame:
    """Run inference on the Endpoint using the provided DataFrame

    Args:
        eval_df (pd.DataFrame): The DataFrame to run predictions on
        capture_name (str, optional): The Name of the capture to use (default: None)
        id_column (str, optional): The name of the column to use as the ID (default: None)
        drop_error_rows (bool): Whether to drop rows with errors (default: False)
        include_quantiles (bool): Include q_* quantile columns in saved output (default: False)

    Returns:
        pd.DataFrame: The DataFrame with predictions
    """
    if self._async is not None:
        return self._async.inference(eval_df, capture_name, id_column, drop_error_rows, include_quantiles)
    return super().inference(eval_df, capture_name, id_column, drop_error_rows, include_quantiles)

inference_batch_size()

Return the per-invocation batch size declared for this endpoint.

Reads workbench_meta["inference_batch_size"] if set; otherwise returns the framework default — 10 for async endpoints, 100 for sync.

Source code in src/workbench/api/endpoint.py
def inference_batch_size(self) -> int:
    """Return the per-invocation batch size declared for this endpoint.

    Reads ``workbench_meta["inference_batch_size"]`` if set; otherwise
    returns the framework default — 10 for async endpoints, 100 for sync.
    """
    meta = self.workbench_meta() or {}
    if "inference_batch_size" in meta:
        return int(meta["inference_batch_size"])
    return 10 if meta.get("async_endpoint") else 100

input_columns()

Return this endpoint's declared input columns.

The columns the endpoint consumes during inference (e.g. ["smiles"] for a feature endpoint, or the model's training features for a predictor endpoint).

Cached at /workbench/endpoints/<name>/input_columns; lazily populated by :func:workbench.utils.endpoint_utils.register_input_columns on first call (reads model.features()) and refreshed when the endpoint is redeployed.

Source code in src/workbench/api/endpoint.py
def input_columns(self) -> List[str]:
    """Return this endpoint's declared input columns.

    The columns the endpoint consumes during inference (e.g. ``["smiles"]``
    for a feature endpoint, or the model's training features for a
    predictor endpoint).

    Cached at ``/workbench/endpoints/<name>/input_columns``; lazily
    populated by :func:`workbench.utils.endpoint_utils.register_input_columns`
    on first call (reads ``model.features()``) and refreshed when the
    endpoint is redeployed.
    """
    from workbench.utils.endpoint_utils import (
        input_columns_key,
        lookup_cached_columns,
        register_input_columns,
    )

    return lookup_cached_columns(self, input_columns_key(self.name), register_input_columns, "input columns")

output_columns()

Return this endpoint's registered output columns.

Works for any endpoint that emits new columns during inference: feature endpoints emit computed feature columns; predictor endpoints emit prediction / confidence / quantile columns.

Cached at /workbench/endpoints/<name>/output_columns; lazily populated by :func:workbench.utils.endpoint_utils.register_output_columns on first call (smoke inference) and refreshed when the endpoint is redeployed.

Source code in src/workbench/api/endpoint.py
def output_columns(self) -> List[str]:
    """Return this endpoint's registered output columns.

    Works for any endpoint that emits new columns during inference:
    feature endpoints emit computed feature columns; predictor endpoints
    emit prediction / confidence / quantile columns.

    Cached at ``/workbench/endpoints/<name>/output_columns``; lazily
    populated by :func:`workbench.utils.endpoint_utils.register_output_columns`
    on first call (smoke inference) and refreshed when the endpoint is
    redeployed.
    """
    from workbench.utils.endpoint_utils import (
        lookup_cached_columns,
        output_columns_key,
        register_output_columns,
    )

    return lookup_cached_columns(self, output_columns_key(self.name), register_output_columns, "output columns")

ts_inference(date_column, after_date, exclude_ids=None)

Run temporal hold-out inference on this Endpoint.

Re-runs the temporal split on the FeatureSet data to identify holdout rows (those with date > after_date), then runs inference on that holdout set.

Parameters:

Name Type Description Default
date_column str

Name of the date column.

required
after_date str

Run inference on rows strictly after this date.

required
exclude_ids list

IDs to exclude from the holdout set (e.g., anomalous compounds from compute_sample_weights).

None

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with the inference results (empty if no hold-out rows)

Source code in src/workbench/api/endpoint.py
def ts_inference(self, date_column: str, after_date: str, exclude_ids: list = None) -> pd.DataFrame:
    """Run temporal hold-out inference on this Endpoint.

    Re-runs the temporal split on the FeatureSet data to identify holdout rows
    (those with date > after_date), then runs inference on that holdout set.

    Args:
        date_column (str): Name of the date column.
        after_date (str): Run inference on rows strictly after this date.
        exclude_ids (list): IDs to exclude from the holdout set (e.g., anomalous
            compounds from compute_sample_weights).

    Returns:
        pd.DataFrame: DataFrame with the inference results (empty if no hold-out rows)
    """
    return super().ts_inference(date_column, after_date=after_date, exclude_ids=exclude_ids)

Examples

Run Inference on an Endpoint

endpoint_inference.py
from workbench.api import Endpoint
from workbench.utils.endpoint_utils import get_evaluation_data

# Grab an existing Endpoint
endpoint = Endpoint("abalone-regression-end")

# Workbench has full ML Pipeline provenance, so we can backtrack the inputs,
# get a DataFrame of data (not used for training) and run inference
df = get_evaluation_data(endpoint)

# Run inference/predictions on the Endpoint
results_df = endpoint.inference(df)

# Run inference/predictions and capture the results
results_df = endpoint.inference(df, capture=True)

# Run inference/predictions using the FeatureSet evaluation data
results_df = endpoint.auto_inference()

Output

Processing...
     class_number_of_rings  prediction
0                       13   11.477922
1                       12   12.316887
2                        8    7.612847
3                        8    9.663341
4                        9    9.075263
..                     ...         ...
839                      8    8.069856
840                     15   14.915502
841                     11   10.977605
842                     10   10.173433
843                      7    7.297976
Endpoint Details

The details() method

The detail() method on the Endpoint class provides a lot of useful information. All of the Workbench classes have a details() method try it out!

endpoint_details.py
from workbench.api.endpoint import Endpoint
from pprint import pprint

# Get Endpoint and print out it's details
endpoint = Endpoint("abalone-regression-end")
pprint(endpoint.details())

Output

{
 'input': 'abalone-regression',
 'instance': 'Serverless (2GB/5)',
 'model_metrics':   metric_name  value
            0        RMSE  2.190
            1         MAE  1.544
            2          R2  0.504,
 'model_name': 'abalone-regression',
 'model_type': 'regressor',
 'modified': datetime.datetime(2023, 12, 29, 17, 48, 35, 115000, tzinfo=datetime.timezone.utc),
     class_number_of_rings  prediction
0                        9    8.648378
1                       11    9.717787
2                       11   10.933070
3                       10    9.899738
4                        9   10.014504
..                     ...         ...
495                     10   10.261657
496                      9   10.788254
497                     13    7.779886
498                     12   14.718514
499                     13   10.637320
 'workbench_tags': ['abalone', 'regression'],
 'status': 'InService',
 'name': 'abalone-regression-end',
 'variant': 'AllTraffic'}

Endpoint Metrics

endpoint_metrics.py
from workbench.api.endpoint import Endpoint

# Grab an existing Endpoint
endpoint = Endpoint("abalone-regression-end")

# Workbench tracks both Model performance and Endpoint Metrics
model_metrics = endpoint.details()["model_metrics"]
endpoint_metrics = endpoint.endpoint_metrics()
print(model_metrics)
print(endpoint_metrics)

Output

  metric_name  value
0        RMSE  2.190
1         MAE  1.544
2          R2  0.504

    Invocations  ModelLatency  OverheadLatency  ModelSetupTime  Invocation5XXErrors
29          0.0          0.00             0.00            0.00                  0.0
30          1.0          1.11            23.73           23.34                  0.0
31          0.0          0.00             0.00            0.00                  0.0
48          0.0          0.00             0.00            0.00                  0.0
49          5.0          0.45             9.64           23.57                  0.0
50          2.0          0.57             0.08            0.00                  0.0
51          0.0          0.00             0.00            0.00                  0.0
60          4.0          0.33             5.80           22.65                  0.0
61          1.0          1.11            23.35           23.10                  0.0
62          0.0          0.00             0.00            0.00                  0.0
...

Workbench UI

Running these few lines of code creates and deploys an AWS Endpoint. The Endpoint artifacts can be viewed in the Sagemaker Console/Notebook interfaces or in the Workbench Dashboard UI. Workbench will monitor the endpoint, plot invocations, latencies, and tracks error metrics.

workbench_endpoints
Workbench Dashboard: Endpoints

Not Finding a particular method?

The Workbench API Classes use the 'Core' Classes Internally, so for an extensive listing of all the methods available please take a deep dive into: Workbench Core Classes