Endpoint automatically routes to AsyncEndpointCore when the underlying SageMaker endpoint was deployed as async (workbench_meta["async_endpoint"]). Callers use Endpoint — the async S3 round-trip is handled internally.
AsyncEndpointCore is the implementation that backs async (long-running) inference for endpoints whose model takes longer than the 60-second sync invocation cap. It supports invocations up to 60 minutes and scales to zero when idle, so you only pay for compute during active batch runs.
Async endpoints add an S3 I/O layer for long-running invocations and scale to zero when idle.
Extends EndpointCore to support SageMaker async inference endpoints.
Async endpoints accept the same model artifacts and container images as
realtime endpoints, but invocations are non-blocking: input is uploaded
to S3, the response is written to an S3 output location, and the caller
polls for completion.
This is useful for workloads where per-invocation latency exceeds the
realtime 60-second server-side timeout (e.g., Boltzmann 3D descriptor
generation that can take minutes per molecule).
The API surface is identical to EndpointCore — inference() and
fast_inference() return DataFrames synchronously, hiding the async
S3 round-trip from the caller.
Implementation: the protocol-level invocation lives in
workbench.endpoints.async_inference; this class adds
Workbench-specific concerns (workbench_meta knobs for batch sizing
and concurrency, capture/monitoring, S3 path resolution).
EndpointCore subclass for SageMaker async inference endpoints.
Overrides the invocation path (_predict / fast_inference) to use the
async S3 upload → invoke_async → poll S3 → download pattern. All
metadata, metrics, and capture logic is inherited unchanged.
Source code in src/workbench/core/artifacts/async_endpoint_core.py
classAsyncEndpointCore(EndpointCore):"""EndpointCore subclass for SageMaker async inference endpoints. Overrides the invocation path (_predict / fast_inference) to use the async S3 upload → invoke_async → poll S3 → download pattern. All metadata, metrics, and capture logic is inherited unchanged. """def__init__(self,endpoint_name:str,**kwargs):super().__init__(endpoint_name,**kwargs)# -----------------------------------------------------------------# Override: _predict (called by EndpointCore.inference for modeled endpoints)# -----------------------------------------------------------------def_predict(self,eval_df:pd.DataFrame,features:list[str],drop_error_rows:bool=False)->pd.DataFrame:"""Run async prediction on a DataFrame. Follows the same contract as EndpointCore._predict: accepts a DataFrame, returns a DataFrame with prediction/feature columns added. Internally uploads chunks to S3, calls invoke_async, polls for output. """ifeval_df.empty:log.warning("Evaluation DataFrame has 0 rows.")returnpd.DataFrame(columns=eval_df.columns)# Validate featuresdf_columns_lower=set(col.lower()forcolineval_df.columns)features_lower=set(f.lower()forfinfeatures)ifnotfeatures_lower.issubset(df_columns_lower):missing=features_lower-df_columns_lowerraiseValueError(f"DataFrame does not contain required features: {missing}")returnself._async_batch_invoke(eval_df)# -----------------------------------------------------------------# Override: fast_inference (called for "floating" endpoints with no model)# -----------------------------------------------------------------deffast_inference(self,eval_df:pd.DataFrame,threads:int=4)->pd.DataFrame:"""Async version of fast_inference — ignores threads, uses S3 polling."""ifeval_df.empty:returnpd.DataFrame(columns=eval_df.columns)returnself._async_batch_invoke(eval_df)# -----------------------------------------------------------------# Queue management# -----------------------------------------------------------------defpurge_async_queue(self)->int:"""Cancel all queued async invocations for this endpoint. Thin wrapper over :func:`workbench.endpoints.async_inference.purge_async_queue`. See that function for behavior, caveats, and return semantics. """returnpurge_async_queue(endpoint_name=self.name,s3_bucket=self.workbench_bucket,sm_session=self.boto3_session,)# -----------------------------------------------------------------# Override: auto_inference (smoke test capped at 10 rows)# -----------------------------------------------------------------defauto_inference(self)->pd.DataFrame:"""Run a 10-row smoke test on this async endpoint. Async workloads can run at seconds-to-minutes per row, so the sync default of "all holdout rows (~20% of the FeatureSet)" turns a smoke test into a multi-minute round-trip. This override caps the eval set at 10 rows — enough to verify the endpoint responds end-to-end without paying for a full holdout pass. """fromworkbench.core.artifacts.model_coreimportModelCoremodel=ModelCore(self.get_input())ifnotmodel.exists():self.log.error("No model found for this endpoint. Returning empty DataFrame.")returnpd.DataFrame()all_df=model.training_view().pull_dataframe()eval_df=all_df[~all_df["training"]].head(10)aws_cols=["write_time","api_invocation_time","is_deleted","event_time"]eval_df=eval_df.drop(columns=aws_cols,errors="ignore")returnself.inference(eval_df,"auto_inference")# -----------------------------------------------------------------# Internal: delegate to the lightweight bridges client# -----------------------------------------------------------------def_async_batch_invoke(self,eval_df:pd.DataFrame)->pd.DataFrame:"""Delegate batch invocation to ``workbench.endpoints.async_inference``. Reads two tunable knobs from ``workbench_meta()``: * ``inference_batch_size`` (default 10): rows per invocation. * ``inference_max_in_flight`` (default :data:`_MAX_IN_FLIGHT_CAP`): outstanding invocation cap for direct calls bypassing :class:`InferenceCache`. For MetaEndpoints (detected via ``workbench_meta["meta_endpoint_dag"]``), an ``instances_str_fn`` callable is passed to ``async_inference`` so its ``instances=`` log field renders per-child counts instead of the meta orchestrator's own (always-1) count. The callable composes :meth:`Endpoint.instance_counts` per async child. """meta=self.workbench_meta()or{}batch_size=int(meta.get("inference_batch_size",_DEFAULT_BATCH_SIZE))# Estimate chunks for sizing; bridges re-derives this internally too.n_batches=max(1,(len(eval_df)+batch_size-1)//batch_size)max_in_flight=_resolve_max_in_flight(meta,n_batches=n_batches)returnasync_inference(endpoint_name=self.endpoint_name,eval_df=eval_df,sm_session=self.boto3_session,batch_size=batch_size,max_in_flight=max_in_flight,s3_bucket=self.workbench_bucket,s3_input_prefix=f"endpoints/{self.name}/async-input",instances_str_fn=self._meta_instances_str_fn(meta),)def_meta_instances_str_fn(self,meta:dict):"""Build the ``instances_str_fn`` callable for a MetaEndpoint, or None. For non-meta endpoints, returns ``None`` so workbench-bridges renders its default (``endpoint_name``'s own count). For MetaEndpoints, returns a callable that composes :meth:`Endpoint.instance_counts` per async child: ``[child_a:2, child_b:1→3]``. Returns an empty string when the meta has no async children, which suppresses the ``instances=`` field. """dag_dict=meta.get("meta_endpoint_dag")ifnotdag_dict:returnNoneasync_children=[nameforname,is_asyncindag_dict.get("endpoint_async",{}).items()ifis_async]ifnotasync_children:returnlambda:""fromworkbench.api.endpointimportEndpointdeffn()->str:parts=[]forchild_nameinasync_children:# Construction fetches fresh; use the cached read helper# to avoid a redundant refresh round-trip.counts=Endpoint(child_name)._read_instance_counts()ifnotcounts:parts.append(f"{child_name}:?")continuec,d=counts["current"],counts["desired"]val=str(c)ifc==delsef"{c}→{d}"parts.append(f"{child_name}:{val}")return"["+", ".join(parts)+"]"returnfn
auto_inference()
Run a 10-row smoke test on this async endpoint.
Async workloads can run at seconds-to-minutes per row, so the
sync default of "all holdout rows (~20% of the FeatureSet)" turns
a smoke test into a multi-minute round-trip. This override caps
the eval set at 10 rows — enough to verify the endpoint responds
end-to-end without paying for a full holdout pass.
Source code in src/workbench/core/artifacts/async_endpoint_core.py
defauto_inference(self)->pd.DataFrame:"""Run a 10-row smoke test on this async endpoint. Async workloads can run at seconds-to-minutes per row, so the sync default of "all holdout rows (~20% of the FeatureSet)" turns a smoke test into a multi-minute round-trip. This override caps the eval set at 10 rows — enough to verify the endpoint responds end-to-end without paying for a full holdout pass. """fromworkbench.core.artifacts.model_coreimportModelCoremodel=ModelCore(self.get_input())ifnotmodel.exists():self.log.error("No model found for this endpoint. Returning empty DataFrame.")returnpd.DataFrame()all_df=model.training_view().pull_dataframe()eval_df=all_df[~all_df["training"]].head(10)aws_cols=["write_time","api_invocation_time","is_deleted","event_time"]eval_df=eval_df.drop(columns=aws_cols,errors="ignore")returnself.inference(eval_df,"auto_inference")
fast_inference(eval_df,threads=4)
Async version of fast_inference — ignores threads, uses S3 polling.
Source code in src/workbench/core/artifacts/async_endpoint_core.py
deffast_inference(self,eval_df:pd.DataFrame,threads:int=4)->pd.DataFrame:"""Async version of fast_inference — ignores threads, uses S3 polling."""ifeval_df.empty:returnpd.DataFrame(columns=eval_df.columns)returnself._async_batch_invoke(eval_df)
purge_async_queue()
Cancel all queued async invocations for this endpoint.
Thin wrapper over :func:workbench.endpoints.async_inference.purge_async_queue.
See that function for behavior, caveats, and return semantics.
Source code in src/workbench/core/artifacts/async_endpoint_core.py
defpurge_async_queue(self)->int:"""Cancel all queued async invocations for this endpoint. Thin wrapper over :func:`workbench.endpoints.async_inference.purge_async_queue`. See that function for behavior, caveats, and return semantics. """returnpurge_async_queue(endpoint_name=self.name,s3_bucket=self.workbench_bucket,sm_session=self.boto3_session,)
Examples
The examples below use the Endpoint API class — the same interface you use for sync endpoints. Routing to AsyncEndpointCore happens automatically based on the endpoint's deploy-time metadata.
Run Inference on an Async Endpoint
async_endpoint_inference.py
fromworkbench.apiimportEndpoint# Endpoint detects async deployment and routes through AsyncEndpointCore internallyendpoint=Endpoint("smiles-to-3d-full-v1")results_df=endpoint.inference(df)
Use with InferenceCache for Batch Processing
async_cached_inference.py
fromworkbench.apiimportEndpointfromworkbench.api.inference_cacheimportInferenceCacheendpoint=Endpoint("smiles-to-3d-full-v1")cached_endpoint=InferenceCache(endpoint,cache_key_column="smiles")# Only uncached rows are sent to the endpointresults_df=cached_endpoint.inference(big_df)
Deploy an Async Endpoint from a Model
deploy_async_endpoint.py
fromworkbench.apiimportModelmodel=Model("smiles-to-3d-full-v1")end=model.to_endpoint(async_endpoint=True,tags=["smiles","3d descriptors","full"],)# Override the default ml.c7i.xlarge with instance="ml.c7i.2xlarge" if your# model needs more CPU/memory per worker.
Async endpoints deploy with scale-to-zero auto-scaling — the instance spins down after ~10 minutes of idle time and cold-starts on the next request. This makes them cost-effective for overnight batch workloads.
When to Use Async vs Sync
Sync Endpoint
Async Endpoint
Invocation timeout
60 seconds
60 minutes
Scaling
Fixed instance count
Scale-to-zero when idle
Best for
Realtime inference, low latency
Long-running batch processing
Cost when idle
Pays for running instance
Zero (scales down)
Caller code
Endpoint(name).inference(df)
Endpoint(name).inference(df) (identical)
The sync/async choice is made at deploy time via model.to_endpoint(async_endpoint=True). Caller code is identical in both cases.