Skip to content

AsyncEndpointCore

API Pass-Through

Endpoint automatically routes to AsyncEndpointCore when the underlying SageMaker endpoint was deployed as async (workbench_meta["async_endpoint"]). Callers use Endpoint — the async S3 round-trip is handled internally.

AsyncEndpointCore is the implementation that backs async (long-running) inference for endpoints whose model takes longer than the 60-second sync invocation cap. It supports invocations up to 60 minutes and scales to zero when idle, so you only pay for compute during active batch runs.

Async endpoint flow: S3 Upload → SageMaker → Uvicorn → FastAPI → Model → S3 Result
Async endpoints add an S3 I/O layer for long-running invocations and scale to zero when idle.

AsyncEndpointCore: Workbench Async Endpoint support.

Extends EndpointCore to support SageMaker async inference endpoints. Async endpoints accept the same model artifacts and container images as realtime endpoints, but invocations are non-blocking: input is uploaded to S3, the response is written to an S3 output location, and the caller polls for completion.

This is useful for workloads where per-invocation latency exceeds the realtime 60-second server-side timeout (e.g., Boltzmann 3D descriptor generation that can take minutes per molecule).

The API surface is identical to EndpointCore — inference() and fast_inference() return DataFrames synchronously, hiding the async S3 round-trip from the caller.

Implementation: the protocol-level invocation lives in workbench.endpoints.async_inference; this class adds Workbench-specific concerns (workbench_meta knobs for batch sizing and concurrency, capture/monitoring, S3 path resolution).

AsyncEndpointCore

Bases: EndpointCore

EndpointCore subclass for SageMaker async inference endpoints.

Overrides the invocation path (_predict / fast_inference) to use the async S3 upload → invoke_async → poll S3 → download pattern. All metadata, metrics, and capture logic is inherited unchanged.

Source code in src/workbench/core/artifacts/async_endpoint_core.py
class AsyncEndpointCore(EndpointCore):
    """EndpointCore subclass for SageMaker async inference endpoints.

    Overrides the invocation path (_predict / fast_inference) to use the
    async S3 upload → invoke_async → poll S3 → download pattern.  All
    metadata, metrics, and capture logic is inherited unchanged.
    """

    def __init__(self, endpoint_name: str, **kwargs):
        super().__init__(endpoint_name, **kwargs)

    # -----------------------------------------------------------------
    # Override: _predict  (called by EndpointCore.inference for modeled endpoints)
    # -----------------------------------------------------------------
    def _predict(self, eval_df: pd.DataFrame, features: list[str], drop_error_rows: bool = False) -> pd.DataFrame:
        """Run async prediction on a DataFrame.

        Follows the same contract as EndpointCore._predict: accepts a
        DataFrame, returns a DataFrame with prediction/feature columns added.
        Internally uploads chunks to S3, calls invoke_async, polls for output.
        """
        if eval_df.empty:
            log.warning("Evaluation DataFrame has 0 rows.")
            return pd.DataFrame(columns=eval_df.columns)

        # Validate features
        df_columns_lower = set(col.lower() for col in eval_df.columns)
        features_lower = set(f.lower() for f in features)
        if not features_lower.issubset(df_columns_lower):
            missing = features_lower - df_columns_lower
            raise ValueError(f"DataFrame does not contain required features: {missing}")

        return self._async_batch_invoke(eval_df)

    # -----------------------------------------------------------------
    # Override: fast_inference  (called for "floating" endpoints with no model)
    # -----------------------------------------------------------------
    def fast_inference(self, eval_df: pd.DataFrame, threads: int = 4) -> pd.DataFrame:
        """Async version of fast_inference — ignores threads, uses S3 polling."""
        if eval_df.empty:
            return pd.DataFrame(columns=eval_df.columns)

        return self._async_batch_invoke(eval_df)

    # -----------------------------------------------------------------
    # Queue management
    # -----------------------------------------------------------------
    def purge_async_queue(self) -> int:
        """Cancel all queued async invocations for this endpoint.

        Thin wrapper over :func:`workbench.endpoints.async_inference.purge_async_queue`.
        See that function for behavior, caveats, and return semantics.
        """
        return purge_async_queue(
            endpoint_name=self.name,
            s3_bucket=self.workbench_bucket,
            sm_session=self.boto3_session,
        )

    # -----------------------------------------------------------------
    # Override: auto_inference  (smoke test capped at 10 rows)
    # -----------------------------------------------------------------
    def auto_inference(self) -> pd.DataFrame:
        """Run a 10-row smoke test on this async endpoint.

        Async workloads can run at seconds-to-minutes per row, so the
        sync default of "all holdout rows (~20% of the FeatureSet)" turns
        a smoke test into a multi-minute round-trip. This override caps
        the eval set at 10 rows — enough to verify the endpoint responds
        end-to-end without paying for a full holdout pass.
        """
        from workbench.core.artifacts.model_core import ModelCore

        model = ModelCore(self.get_input())
        if not model.exists():
            self.log.error("No model found for this endpoint. Returning empty DataFrame.")
            return pd.DataFrame()

        all_df = model.training_view().pull_dataframe()
        eval_df = all_df[~all_df["training"]].head(10)

        aws_cols = ["write_time", "api_invocation_time", "is_deleted", "event_time"]
        eval_df = eval_df.drop(columns=aws_cols, errors="ignore")

        return self.inference(eval_df, "auto_inference")

    # -----------------------------------------------------------------
    # Internal: delegate to the lightweight bridges client
    # -----------------------------------------------------------------
    def _async_batch_invoke(self, eval_df: pd.DataFrame) -> pd.DataFrame:
        """Delegate batch invocation to ``workbench.endpoints.async_inference``.

        Reads two tunable knobs from ``workbench_meta()``:
          * ``inference_batch_size`` (default 10): rows per invocation.
          * ``inference_max_in_flight`` (default :data:`_MAX_IN_FLIGHT_CAP`):
            outstanding invocation cap for direct calls bypassing
            :class:`InferenceCache`.

        For MetaEndpoints (detected via ``workbench_meta["meta_endpoint_dag"]``),
        an ``instances_str_fn`` callable is passed to ``async_inference`` so
        its ``instances=`` log field renders per-child counts instead of the
        meta orchestrator's own (always-1) count. The callable composes
        :meth:`Endpoint.instance_counts` per async child.
        """
        meta = self.workbench_meta() or {}
        batch_size = int(meta.get("inference_batch_size", _DEFAULT_BATCH_SIZE))
        # Estimate chunks for sizing; bridges re-derives this internally too.
        n_batches = max(1, (len(eval_df) + batch_size - 1) // batch_size)
        max_in_flight = _resolve_max_in_flight(meta, n_batches=n_batches)

        return async_inference(
            endpoint_name=self.endpoint_name,
            eval_df=eval_df,
            sm_session=self.boto3_session,
            batch_size=batch_size,
            max_in_flight=max_in_flight,
            s3_bucket=self.workbench_bucket,
            s3_input_prefix=f"endpoints/{self.name}/async-input",
            instances_str_fn=self._meta_instances_str_fn(meta),
        )

    def _meta_instances_str_fn(self, meta: dict):
        """Build the ``instances_str_fn`` callable for a MetaEndpoint, or None.

        For non-meta endpoints, returns ``None`` so workbench-bridges renders
        its default (``endpoint_name``'s own count).

        For MetaEndpoints, returns a callable that composes
        :meth:`Endpoint.instance_counts` per async child:
        ``[child_a:2, child_b:1→3]``. Returns an empty string when the meta
        has no async children, which suppresses the ``instances=`` field.
        """
        dag_dict = meta.get("meta_endpoint_dag")
        if not dag_dict:
            return None

        async_children = [name for name, is_async in dag_dict.get("endpoint_async", {}).items() if is_async]
        if not async_children:
            return lambda: ""

        from workbench.api.endpoint import Endpoint

        def fn() -> str:
            parts = []
            for child_name in async_children:
                # Construction fetches fresh; use the cached read helper
                # to avoid a redundant refresh round-trip.
                counts = Endpoint(child_name)._read_instance_counts()
                if not counts:
                    parts.append(f"{child_name}:?")
                    continue
                c, d = counts["current"], counts["desired"]
                val = str(c) if c == d else f"{c}{d}"
                parts.append(f"{child_name}:{val}")
            return "[" + ", ".join(parts) + "]"

        return fn

auto_inference()

Run a 10-row smoke test on this async endpoint.

Async workloads can run at seconds-to-minutes per row, so the sync default of "all holdout rows (~20% of the FeatureSet)" turns a smoke test into a multi-minute round-trip. This override caps the eval set at 10 rows — enough to verify the endpoint responds end-to-end without paying for a full holdout pass.

Source code in src/workbench/core/artifacts/async_endpoint_core.py
def auto_inference(self) -> pd.DataFrame:
    """Run a 10-row smoke test on this async endpoint.

    Async workloads can run at seconds-to-minutes per row, so the
    sync default of "all holdout rows (~20% of the FeatureSet)" turns
    a smoke test into a multi-minute round-trip. This override caps
    the eval set at 10 rows — enough to verify the endpoint responds
    end-to-end without paying for a full holdout pass.
    """
    from workbench.core.artifacts.model_core import ModelCore

    model = ModelCore(self.get_input())
    if not model.exists():
        self.log.error("No model found for this endpoint. Returning empty DataFrame.")
        return pd.DataFrame()

    all_df = model.training_view().pull_dataframe()
    eval_df = all_df[~all_df["training"]].head(10)

    aws_cols = ["write_time", "api_invocation_time", "is_deleted", "event_time"]
    eval_df = eval_df.drop(columns=aws_cols, errors="ignore")

    return self.inference(eval_df, "auto_inference")

fast_inference(eval_df, threads=4)

Async version of fast_inference — ignores threads, uses S3 polling.

Source code in src/workbench/core/artifacts/async_endpoint_core.py
def fast_inference(self, eval_df: pd.DataFrame, threads: int = 4) -> pd.DataFrame:
    """Async version of fast_inference — ignores threads, uses S3 polling."""
    if eval_df.empty:
        return pd.DataFrame(columns=eval_df.columns)

    return self._async_batch_invoke(eval_df)

purge_async_queue()

Cancel all queued async invocations for this endpoint.

Thin wrapper over :func:workbench.endpoints.async_inference.purge_async_queue. See that function for behavior, caveats, and return semantics.

Source code in src/workbench/core/artifacts/async_endpoint_core.py
def purge_async_queue(self) -> int:
    """Cancel all queued async invocations for this endpoint.

    Thin wrapper over :func:`workbench.endpoints.async_inference.purge_async_queue`.
    See that function for behavior, caveats, and return semantics.
    """
    return purge_async_queue(
        endpoint_name=self.name,
        s3_bucket=self.workbench_bucket,
        sm_session=self.boto3_session,
    )

Examples

The examples below use the Endpoint API class — the same interface you use for sync endpoints. Routing to AsyncEndpointCore happens automatically based on the endpoint's deploy-time metadata.

Run Inference on an Async Endpoint

async_endpoint_inference.py
from workbench.api import Endpoint

# Endpoint detects async deployment and routes through AsyncEndpointCore internally
endpoint = Endpoint("smiles-to-3d-full-v1")
results_df = endpoint.inference(df)

Use with InferenceCache for Batch Processing

async_cached_inference.py
from workbench.api import Endpoint
from workbench.api.inference_cache import InferenceCache

endpoint = Endpoint("smiles-to-3d-full-v1")
cached_endpoint = InferenceCache(endpoint, cache_key_column="smiles")

# Only uncached rows are sent to the endpoint
results_df = cached_endpoint.inference(big_df)

Deploy an Async Endpoint from a Model

deploy_async_endpoint.py
from workbench.api import Model

model = Model("smiles-to-3d-full-v1")
end = model.to_endpoint(
    async_endpoint=True,
    tags=["smiles", "3d descriptors", "full"],
)
# Override the default ml.c7i.xlarge with instance="ml.c7i.2xlarge" if your
# model needs more CPU/memory per worker.

Async endpoints deploy with scale-to-zero auto-scaling — the instance spins down after ~10 minutes of idle time and cold-starts on the next request. This makes them cost-effective for overnight batch workloads.

When to Use Async vs Sync

Sync Endpoint Async Endpoint
Invocation timeout 60 seconds 60 minutes
Scaling Fixed instance count Scale-to-zero when idle
Best for Realtime inference, low latency Long-running batch processing
Cost when idle Pays for running instance Zero (scales down)
Caller code Endpoint(name).inference(df) Endpoint(name).inference(df) (identical)

The sync/async choice is made at deploy time via model.to_endpoint(async_endpoint=True). Caller code is identical in both cases.