Skip to content

AsyncEndpointCore

API Pass-Through

Endpoint automatically routes to AsyncEndpointCore when the underlying SageMaker endpoint was deployed as async (workbench_meta["async_endpoint"]). Callers use Endpoint — the async S3 round-trip is handled internally.

AsyncEndpointCore is the implementation that backs async (long-running) inference for endpoints whose model takes longer than the 60-second sync invocation cap. It supports invocations up to 60 minutes and scales to zero when idle, so you only pay for compute during active batch runs.

Async endpoint flow: S3 Upload → SageMaker → Uvicorn → FastAPI → Model → S3 Result
Async endpoints add an S3 I/O layer for long-running invocations and scale to zero when idle.

AsyncEndpointCore: Workbench Async Endpoint support.

Extends EndpointCore to support SageMaker async inference endpoints. Async endpoints accept the same model artifacts and container images as realtime endpoints, but invocations are non-blocking: input is uploaded to S3, the response is written to an S3 output location, and the caller polls for completion.

This is useful for workloads where per-invocation latency exceeds the realtime 60-second server-side timeout (e.g., Boltzmann 3D descriptor generation that can take minutes per molecule).

The API surface is identical to EndpointCore — inference() and fast_inference() return DataFrames synchronously, hiding the async S3 round-trip from the caller.

Implementation: the protocol-level invocation lives in workbench_bridges.endpoints.async_inference; this class adds Workbench-specific concerns (workbench_meta knobs for batch sizing and concurrency, capture/monitoring, S3 path resolution).

AsyncEndpointCore

Bases: EndpointCore

EndpointCore subclass for SageMaker async inference endpoints.

Overrides the invocation path (_predict / fast_inference) to use the async S3 upload → invoke_async → poll S3 → download pattern. All metadata, metrics, and capture logic is inherited unchanged.

Source code in src/workbench/core/artifacts/async_endpoint_core.py
class AsyncEndpointCore(EndpointCore):
    """EndpointCore subclass for SageMaker async inference endpoints.

    Overrides the invocation path (_predict / fast_inference) to use the
    async S3 upload → invoke_async → poll S3 → download pattern.  All
    metadata, metrics, and capture logic is inherited unchanged.
    """

    def __init__(self, endpoint_name: str, **kwargs):
        super().__init__(endpoint_name, **kwargs)

        # S3 paths for async I/O — these mirror the paths configured in
        # AsyncInferenceConfig during deployment.
        base = f"{self.endpoints_s3_path}/{self.name}"
        self.async_output_path = f"{base}/async-output"
        self.async_failure_path = f"{base}/async-failures"
        self.async_input_path = f"{base}/async-input"

    # -----------------------------------------------------------------
    # Override: _predict  (called by EndpointCore.inference for modeled endpoints)
    # -----------------------------------------------------------------
    def _predict(self, eval_df: pd.DataFrame, features: list[str], drop_error_rows: bool = False) -> pd.DataFrame:
        """Run async prediction on a DataFrame.

        Follows the same contract as EndpointCore._predict: accepts a
        DataFrame, returns a DataFrame with prediction/feature columns added.
        Internally uploads chunks to S3, calls invoke_async, polls for output.
        """
        if eval_df.empty:
            log.warning("Evaluation DataFrame has 0 rows.")
            return pd.DataFrame(columns=eval_df.columns)

        # Validate features
        df_columns_lower = set(col.lower() for col in eval_df.columns)
        features_lower = set(f.lower() for f in features)
        if not features_lower.issubset(df_columns_lower):
            missing = features_lower - df_columns_lower
            raise ValueError(f"DataFrame does not contain required features: {missing}")

        return self._async_batch_invoke(eval_df)

    # -----------------------------------------------------------------
    # Override: fast_inference  (called for "floating" endpoints with no model)
    # -----------------------------------------------------------------
    def fast_inference(self, eval_df: pd.DataFrame, threads: int = 4) -> pd.DataFrame:
        """Async version of fast_inference — ignores threads, uses S3 polling."""
        if eval_df.empty:
            return pd.DataFrame(columns=eval_df.columns)

        return self._async_batch_invoke(eval_df)

    # -----------------------------------------------------------------
    # Internal: delegate to the lightweight bridges client
    # -----------------------------------------------------------------
    def _async_batch_invoke(self, eval_df: pd.DataFrame) -> pd.DataFrame:
        """Delegate batch invocation to ``workbench_bridges.async_inference``.

        Reads two tunable knobs from ``workbench_meta()``:
          * ``inference_batch_size`` (default 10): rows per invocation.
          * ``inference_max_in_flight`` (default :data:`_MAX_IN_FLIGHT_CAP`):
            outstanding invocation cap for direct calls bypassing
            :class:`InferenceCache`.
        """
        meta = self.workbench_meta() or {}
        batch_size = int(meta.get("inference_batch_size", _DEFAULT_BATCH_SIZE))
        # Estimate chunks for sizing; bridges re-derives this internally too.
        n_batches = max(1, (len(eval_df) + batch_size - 1) // batch_size)
        max_in_flight = _resolve_max_in_flight(meta, n_batches=n_batches)

        return async_inference(
            endpoint_name=self.endpoint_name,
            eval_df=eval_df,
            sm_session=self.boto3_session,
            batch_size=batch_size,
            max_in_flight=max_in_flight,
            s3_bucket=self.workbench_bucket,
            s3_input_prefix=f"endpoints/{self.name}/async-input",
        )

fast_inference(eval_df, threads=4)

Async version of fast_inference — ignores threads, uses S3 polling.

Source code in src/workbench/core/artifacts/async_endpoint_core.py
def fast_inference(self, eval_df: pd.DataFrame, threads: int = 4) -> pd.DataFrame:
    """Async version of fast_inference — ignores threads, uses S3 polling."""
    if eval_df.empty:
        return pd.DataFrame(columns=eval_df.columns)

    return self._async_batch_invoke(eval_df)

Examples

The examples below use the Endpoint API class — the same interface you use for sync endpoints. Routing to AsyncEndpointCore happens automatically based on the endpoint's deploy-time metadata.

Run Inference on an Async Endpoint

async_endpoint_inference.py
from workbench.api import Endpoint

# Endpoint detects async deployment and routes through AsyncEndpointCore internally
endpoint = Endpoint("smiles-to-3d-full-v1")
results_df = endpoint.inference(df)

Use with InferenceCache for Batch Processing

async_cached_inference.py
from workbench.api import Endpoint
from workbench.api.inference_cache import InferenceCache

endpoint = Endpoint("smiles-to-3d-full-v1")
cached_endpoint = InferenceCache(endpoint, cache_key_column="smiles")

# Only uncached rows are sent to the endpoint
results_df = cached_endpoint.inference(big_df)

Deploy an Async Endpoint from a Model

deploy_async_endpoint.py
from workbench.api import Model

model = Model("smiles-to-3d-full-v1")
end = model.to_endpoint(
    async_endpoint=True,
    tags=["smiles", "3d descriptors", "full"],
)
# Override the default ml.c7i.xlarge with instance="ml.c7i.2xlarge" if your
# model needs more CPU/memory per worker.

Async endpoints deploy with scale-to-zero auto-scaling — the instance spins down after ~10 minutes of idle time and cold-starts on the next request. This makes them cost-effective for overnight batch workloads.

When to Use Async vs Sync

Sync Endpoint Async Endpoint
Invocation timeout 60 seconds 60 minutes
Scaling Fixed instance count Scale-to-zero when idle
Best for Realtime inference, low latency Long-running batch processing
Cost when idle Pays for running instance Zero (scales down)
Caller code Endpoint(name).inference(df) Endpoint(name).inference(df) (identical)

The sync/async choice is made at deploy time via model.to_endpoint(async_endpoint=True). Caller code is identical in both cases.