Skip to content

AsyncEndpoint

AsyncEndpoint Examples

Examples of using the AsyncEndpoint class are listed at the bottom of this page Examples.

AsyncEndpoint is a drop-in replacement for Endpoint that supports long-running inference (up to 15 minutes per invocation). It scales to zero when idle so you only pay for compute during active batch runs. The API is the same as Endpoint: send a DataFrame, get a DataFrame back. The async S3 round-trip is handled internally — callers don't see it.

Async endpoint flow: S3 Upload → SageMaker → Uvicorn → FastAPI → Model → S3 Result
Async endpoints add an S3 I/O layer for long-running invocations and scale to zero when idle.

AsyncEndpoint: Workbench API wrapper for async SageMaker endpoints.

Drop-in replacement for Endpoint that uses the async invocation path internally. The caller-facing API is identical — inference() returns a DataFrame synchronously.

Example
from workbench.api import AsyncEndpoint

end = AsyncEndpoint("smiles-to-3d-boltzmann-v1")
df_result = end.inference(my_df)

AsyncEndpoint

Bases: AsyncEndpointCore

Workbench AsyncEndpoint API class.

Inherits all functionality from AsyncEndpointCore. This thin wrapper exists to match the Endpoint / EndpointCore pattern used elsewhere in the Workbench API layer.

Source code in src/workbench/api/async_endpoint.py
class AsyncEndpoint(AsyncEndpointCore):
    """Workbench AsyncEndpoint API class.

    Inherits all functionality from AsyncEndpointCore. This thin wrapper
    exists to match the Endpoint / EndpointCore pattern used elsewhere
    in the Workbench API layer.
    """

    def inference(
        self,
        eval_df: pd.DataFrame,
        capture_name: str = None,
        id_column: str = None,
        drop_error_rows: bool = False,
        include_quantiles: bool = False,
    ) -> pd.DataFrame:
        """Run inference on the Endpoint using the provided DataFrame.

        Args:
            eval_df (pd.DataFrame): The DataFrame to run predictions on
            capture_name (str, optional): Name of the inference capture (default: None)
            id_column (str, optional): Name of the ID column (default: None)
            drop_error_rows (bool): Whether to drop rows with errors (default: False)
            include_quantiles (bool): Include q_* quantile columns (default: False)

        Returns:
            pd.DataFrame: The DataFrame with inference results
        """
        return super().inference(eval_df, capture_name, id_column, drop_error_rows, include_quantiles)

    def fast_inference(self, eval_df: pd.DataFrame, threads: int = 4) -> pd.DataFrame:
        """Run inference on the Endpoint (async path, threads ignored).

        Args:
            eval_df (pd.DataFrame): The DataFrame to run predictions on
            threads (int): Ignored for async endpoints (kept for API compat)

        Returns:
            pd.DataFrame: The DataFrame with predictions
        """
        return super().fast_inference(eval_df, threads=threads)

fast_inference(eval_df, threads=4)

Run inference on the Endpoint (async path, threads ignored).

Parameters:

Name Type Description Default
eval_df DataFrame

The DataFrame to run predictions on

required
threads int

Ignored for async endpoints (kept for API compat)

4

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with predictions

Source code in src/workbench/api/async_endpoint.py
def fast_inference(self, eval_df: pd.DataFrame, threads: int = 4) -> pd.DataFrame:
    """Run inference on the Endpoint (async path, threads ignored).

    Args:
        eval_df (pd.DataFrame): The DataFrame to run predictions on
        threads (int): Ignored for async endpoints (kept for API compat)

    Returns:
        pd.DataFrame: The DataFrame with predictions
    """
    return super().fast_inference(eval_df, threads=threads)

inference(eval_df, capture_name=None, id_column=None, drop_error_rows=False, include_quantiles=False)

Run inference on the Endpoint using the provided DataFrame.

Parameters:

Name Type Description Default
eval_df DataFrame

The DataFrame to run predictions on

required
capture_name str

Name of the inference capture (default: None)

None
id_column str

Name of the ID column (default: None)

None
drop_error_rows bool

Whether to drop rows with errors (default: False)

False
include_quantiles bool

Include q_* quantile columns (default: False)

False

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with inference results

Source code in src/workbench/api/async_endpoint.py
def inference(
    self,
    eval_df: pd.DataFrame,
    capture_name: str = None,
    id_column: str = None,
    drop_error_rows: bool = False,
    include_quantiles: bool = False,
) -> pd.DataFrame:
    """Run inference on the Endpoint using the provided DataFrame.

    Args:
        eval_df (pd.DataFrame): The DataFrame to run predictions on
        capture_name (str, optional): Name of the inference capture (default: None)
        id_column (str, optional): Name of the ID column (default: None)
        drop_error_rows (bool): Whether to drop rows with errors (default: False)
        include_quantiles (bool): Include q_* quantile columns (default: False)

    Returns:
        pd.DataFrame: The DataFrame with inference results
    """
    return super().inference(eval_df, capture_name, id_column, drop_error_rows, include_quantiles)

Examples

Run Inference on an Async Endpoint

async_endpoint_inference.py
from workbench.api import AsyncEndpoint

# Grab an existing Async Endpoint
endpoint = AsyncEndpoint("smiles-to-3d-boltzmann-v1")

# Run inference — same API as Endpoint, async S3 polling is handled internally
results_df = endpoint.inference(df)

Use with InferenceCache for Batch Processing

async_cached_inference.py
from workbench.api import AsyncEndpoint
from workbench.api.inference_cache import InferenceCache

# Wrap in InferenceCache for persistent S3-backed caching
endpoint = AsyncEndpoint("smiles-to-3d-boltzmann-v1")
cached_endpoint = InferenceCache(endpoint, cache_key_column="smiles")

# Only uncached rows are sent to the endpoint
results_df = cached_endpoint.inference(big_df)

Deploy an Async Endpoint from a Model

deploy_async_endpoint.py
from workbench.api import Model

model = Model("smiles-to-3d-boltzmann-v1")
end = model.to_endpoint(
    async_endpoint=True,
    instance="ml.c7i.2xlarge",
    tags=["smiles", "3d descriptors", "boltzmann"],
)

Async endpoints deploy with scale-to-zero auto-scaling -- the instance spins down after ~10 minutes of idle time and cold-starts on the next request. This makes them cost-effective for overnight batch workloads.

When to Use AsyncEndpoint vs Endpoint

Endpoint AsyncEndpoint
Invocation timeout 60 seconds 15 minutes
Scaling Fixed instance count Scale-to-zero when idle
Best for Realtime inference, low latency Long-running batch processing
Cost when idle Pays for running instance Zero (scales down)

Not Finding a particular method?

The Workbench API Classes use the 'Core' Classes Internally, so for an extensive listing of all the methods available please take a deep dive into: Workbench Core Classes