Skip to content

EndpointCore

API Classes

Found a method here you want to use? The API Classes have method pass-through so just call the method on the Endpoint API Class and voilà it works the same.

EndpointCore: Workbench EndpointCore Class

EndpointCore

Bases: Artifact

EndpointCore: Workbench EndpointCore Class

Common Usage
my_endpoint = EndpointCore(endpoint_uuid)
prediction_df = my_endpoint.predict(test_df)
metrics = my_endpoint.regression_metrics(target_column, prediction_df)
for metric, value in metrics.items():
    print(f"{metric}: {value:0.3f}")
Source code in src/workbench/core/artifacts/endpoint_core.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
class EndpointCore(Artifact):
    """EndpointCore: Workbench EndpointCore Class

    Common Usage:
        ```python
        my_endpoint = EndpointCore(endpoint_uuid)
        prediction_df = my_endpoint.predict(test_df)
        metrics = my_endpoint.regression_metrics(target_column, prediction_df)
        for metric, value in metrics.items():
            print(f"{metric}: {value:0.3f}")
        ```
    """

    def __init__(self, endpoint_uuid, **kwargs):
        """EndpointCore Initialization

        Args:
            endpoint_uuid (str): Name of Endpoint in Workbench
        """

        # Make sure the endpoint_uuid is a valid name
        self.is_name_valid(endpoint_uuid, delimiter="-", lower_case=False)

        # Call SuperClass Initialization
        super().__init__(endpoint_uuid, **kwargs)

        # Grab an Cloud Metadata object and pull information for Endpoints
        self.endpoint_name = endpoint_uuid
        self.endpoint_meta = self.meta.endpoint(self.endpoint_name)

        # Sanity check that we found the endpoint
        if self.endpoint_meta is None:
            self.log.important(f"Could not find endpoint {self.uuid} within current visibility scope")
            return

        # Sanity check the Endpoint state
        if self.endpoint_meta["EndpointStatus"] == "Failed":
            self.log.critical(f"Endpoint {self.uuid} is in a failed state")
            reason = self.endpoint_meta["FailureReason"]
            self.log.critical(f"Failure Reason: {reason}")
            self.log.critical("Please delete this endpoint and re-deploy...")

        # Set the Inference, Capture, and Monitoring S3 Paths
        self.endpoint_inference_path = self.endpoints_s3_path + "/inference/" + self.uuid
        self.endpoint_data_capture_path = self.endpoints_s3_path + "/data_capture/" + self.uuid
        self.endpoint_monitoring_path = self.endpoints_s3_path + "/monitoring/" + self.uuid

        # Set the Model Name
        self.model_name = self.get_input()

        # This is for endpoint error handling later
        self.endpoint_return_columns = None

        # We temporary cache the endpoint metrics
        self.temp_storage = Cache(prefix="temp_storage", expire=300)  # 5 minutes

        # Call SuperClass Post Initialization
        super().__post_init__()

        # All done
        self.log.info(f"EndpointCore Initialized: {self.endpoint_name}")

    def refresh_meta(self):
        """Refresh the Artifact's metadata"""
        self.endpoint_meta = self.meta.endpoint(self.endpoint_name)

    def exists(self) -> bool:
        """Does the feature_set_name exist in the AWS Metadata?"""
        if self.endpoint_meta is None:
            self.log.debug(f"Endpoint {self.endpoint_name} not found in AWS Metadata")
            return False
        return True

    def health_check(self) -> list[str]:
        """Perform a health check on this model

        Returns:
            list[str]: List of health issues
        """
        if not self.ready():
            return ["needs_onboard"]

        # Call the base class health check
        health_issues = super().health_check()

        # Does this endpoint have a config?
        # Note: This is not an authoritative check, so improve later
        if self.endpoint_meta.get("ProductionVariants") is None:
            health_issues.append("no_config")

        # We're going to check for 5xx errors and no activity
        endpoint_metrics = self.endpoint_metrics()

        # Check if we have metrics
        if endpoint_metrics is None:
            health_issues.append("unknown_error")
            return health_issues

        # Check for 5xx errors
        num_errors = endpoint_metrics["Invocation5XXErrors"].sum()
        if num_errors > 5:
            health_issues.append("5xx_errors")
        elif num_errors > 0:
            health_issues.append("5xx_errors_min")
        else:
            self.remove_health_tag("5xx_errors")
            self.remove_health_tag("5xx_errors_min")

        # Check for Endpoint activity
        num_invocations = endpoint_metrics["Invocations"].sum()
        if num_invocations == 0:
            health_issues.append("no_activity")
        else:
            self.remove_health_tag("no_activity")
        return health_issues

    def is_serverless(self) -> bool:
        """Check if the current endpoint is serverless.

        Returns:
            bool: True if the endpoint is serverless, False otherwise.
        """
        return "Serverless" in self.endpoint_meta["InstanceType"]

    def add_data_capture(self):
        """Add data capture to the endpoint"""
        self.get_monitor().add_data_capture()

    def get_monitor(self):
        """Get the MonitorCore class for this endpoint"""
        from workbench.core.artifacts.monitor_core import MonitorCore

        return MonitorCore(self.endpoint_name)

    def size(self) -> float:
        """Return the size of this data in MegaBytes"""
        return 0.0

    def aws_meta(self) -> dict:
        """Get ALL the AWS metadata for this artifact"""
        return self.endpoint_meta

    def arn(self) -> str:
        """AWS ARN (Amazon Resource Name) for this artifact"""
        return self.endpoint_meta["EndpointArn"]

    def aws_url(self):
        """The AWS URL for looking at/querying this data source"""
        return f"https://{self.aws_region}.console.aws.amazon.com/athena/home"

    def created(self) -> datetime:
        """Return the datetime when this artifact was created"""
        return self.endpoint_meta["CreationTime"]

    def modified(self) -> datetime:
        """Return the datetime when this artifact was last modified"""
        return self.endpoint_meta["LastModifiedTime"]

    def hash(self) -> Optional[str]:
        """Return the hash for the internal model used by this endpoint

        Returns:
            Optional[str]: The hash for the internal model used by this endpoint
        """
        from workbench.utils.endpoint_utils import get_model_data_url  # Avoid circular import

        model_url = get_model_data_url(self.endpoint_config_name(), self.boto3_session)
        return compute_s3_object_hash(model_url, self.boto3_session)

    def endpoint_metrics(self) -> Union[pd.DataFrame, None]:
        """Return the metrics for this endpoint

        Returns:
            pd.DataFrame: DataFrame with the metrics for this endpoint (or None if no metrics)
        """

        # Do we have it cached?
        metrics_key = f"endpoint:{self.uuid}:endpoint_metrics"
        endpoint_metrics = self.temp_storage.get(metrics_key)
        if endpoint_metrics is not None:
            return endpoint_metrics

        # We don't have it cached so let's get it from CloudWatch
        if "ProductionVariants" not in self.endpoint_meta:
            return None
        self.log.important("Updating endpoint metrics...")
        variant = self.endpoint_meta["ProductionVariants"][0]["VariantName"]
        endpoint_metrics = EndpointMetrics().get_metrics(self.uuid, variant=variant)
        self.temp_storage.set(metrics_key, endpoint_metrics)
        return endpoint_metrics

    def details(self, recompute: bool = False) -> dict:
        """Additional Details about this Endpoint
        Args:
            recompute (bool): Recompute the details (default: False)
        Returns:
            dict(dict): A dictionary of details about this Endpoint
        """

        # Fill in all the details about this Endpoint
        details = self.summary()

        # Get details from our AWS Metadata
        details["status"] = self.endpoint_meta["EndpointStatus"]
        details["instance"] = self.endpoint_meta["InstanceType"]
        try:
            details["instance_count"] = self.endpoint_meta["ProductionVariants"][0]["CurrentInstanceCount"] or "-"
        except KeyError:
            details["instance_count"] = "-"
        if "ProductionVariants" in self.endpoint_meta:
            details["variant"] = self.endpoint_meta["ProductionVariants"][0]["VariantName"]
        else:
            details["variant"] = "-"

        # Add endpoint metrics from CloudWatch
        details["endpoint_metrics"] = self.endpoint_metrics()

        # Return the details
        return details

    def onboard(self, interactive: bool = False) -> bool:
        """This is a BLOCKING method that will onboard the Endpoint (make it ready)
        Args:
            interactive (bool, optional): If True, will prompt the user for information. (default: False)
        Returns:
            bool: True if the Endpoint is successfully onboarded, False otherwise
        """

        # Make sure our input is defined
        if self.get_input() == "unknown":
            if interactive:
                input_model = input("Input Model?: ")
            else:
                self.log.critical("Input Model is not defined!")
                return False
        else:
            input_model = self.get_input()

        # Now that we have the details, let's onboard the Endpoint with args
        return self.onboard_with_args(input_model)

    def onboard_with_args(self, input_model: str) -> bool:
        """Onboard the Endpoint with the given arguments

        Args:
            input_model (str): The input model for this endpoint
        Returns:
            bool: True if the Endpoint is successfully onboarded, False otherwise
        """
        # Set the status to onboarding
        self.set_status("onboarding")

        self.upsert_workbench_meta({"workbench_input": input_model})
        self.model_name = input_model

        # Remove the needs_onboard tag
        self.remove_health_tag("needs_onboard")
        self.set_status("ready")

        # Run a health check and refresh the meta
        time.sleep(2)  # Give the AWS Metadata a chance to update
        self.health_check()
        self.refresh_meta()
        self.details(recompute=True)
        return True

    def auto_inference(self, capture: bool = False) -> pd.DataFrame:
        """Run inference on the endpoint using FeatureSet data

        Args:
            capture (bool, optional): Capture the inference results and metrics (default=False)
        """

        # Sanity Check that we have a model
        model = ModelCore(self.get_input())
        if not model.exists():
            self.log.error("No model found for this endpoint. Returning empty DataFrame.")
            return pd.DataFrame()

        # Now get the FeatureSet and make sure it exists
        fs = FeatureSetCore(model.get_input())
        if not fs.exists():
            self.log.error("No FeatureSet found for this endpoint. Returning empty DataFrame.")
            return pd.DataFrame()

        # Grab the evaluation data from the FeatureSet
        table = fs.view("training").table
        eval_df = fs.query(f'SELECT * FROM "{table}" where training = FALSE')
        capture_uuid = "auto_inference" if capture else None
        return self.inference(eval_df, capture_uuid, id_column=fs.id_column)

    def inference(self, eval_df: pd.DataFrame, capture_uuid: str = None, id_column: str = None) -> pd.DataFrame:
        """Run inference and compute performance metrics with optional capture

        Args:
            eval_df (pd.DataFrame): DataFrame to run predictions on (must have superset of features)
            capture_uuid (str, optional): UUID of the inference capture (default=None)
            id_column (str, optional): Name of the ID column (default=None)

        Returns:
            pd.DataFrame: DataFrame with the inference results

        Note:
            If capture=True inference/performance metrics are written to S3 Endpoint Inference Folder
        """

        # Run predictions on the evaluation data
        prediction_df = self._predict(eval_df)
        if prediction_df.empty:
            self.log.warning("No predictions were made. Returning empty DataFrame.")
            return prediction_df

        # Get the target column
        model = ModelCore(self.model_name)
        target_column = model.target()

        # Sanity Check that the target column is present
        if target_column and (target_column not in prediction_df.columns):
            self.log.important(f"Target Column {target_column} not found in prediction_df!")
            self.log.important("In order to compute metrics, the target column must be present!")
            return prediction_df

        # Compute the standard performance metrics for this model
        model_type = model.model_type
        if model_type in [ModelType.REGRESSOR, ModelType.QUANTILE_REGRESSOR]:
            prediction_df = self.residuals(target_column, prediction_df)
            metrics = self.regression_metrics(target_column, prediction_df)
        elif model_type == ModelType.CLASSIFIER:
            metrics = self.classification_metrics(target_column, prediction_df)
        else:
            # For other model types, we don't compute metrics
            self.log.important(f"Model Type: {model_type} doesn't have metrics...")
            metrics = pd.DataFrame()

        # Print out the metrics
        if not metrics.empty:
            print(f"Performance Metrics for {self.model_name} on {self.uuid}")
            print(metrics.head())

            # Capture the inference results and metrics
            if capture_uuid is not None:
                description = capture_uuid.replace("_", " ").title()
                self._capture_inference_results(
                    capture_uuid, prediction_df, target_column, model_type, metrics, description, id_column
                )

        # Return the prediction DataFrame
        return prediction_df

    def fast_inference(self, eval_df: pd.DataFrame) -> pd.DataFrame:
        """Run inference on the Endpoint using the provided DataFrame

        Args:
            eval_df (pd.DataFrame): The DataFrame to run predictions on

        Returns:
            pd.DataFrame: The DataFrame with predictions

        Note:
            There's no sanity checks or error handling... just FAST Inference!
        """
        return fast_inference(self.uuid, eval_df, self.sm_session)

    def _predict(self, eval_df: pd.DataFrame) -> pd.DataFrame:
        """Internal: Run prediction on the given observations in the given DataFrame
        Args:
            eval_df (pd.DataFrame): DataFrame to run predictions on (must have superset of features)
        Returns:
            pd.DataFrame: Return the DataFrame with additional columns, prediction and any _proba columns
        """

        # Sanity check: Does the DataFrame have 0 rows?
        if eval_df.empty:
            self.log.warning("Evaluation DataFrame has 0 rows. No predictions to run.")
            return pd.DataFrame(columns=eval_df.columns)  # Return empty DataFrame with same structure

        # Sanity check: Does the Model have Features?
        features = ModelCore(self.model_name).features()
        if not features:
            self.log.warning("Model does not have features defined, using all columns in the DataFrame")
        else:
            # Sanity check: Does the DataFrame have the required features?
            df_columns_lower = set(col.lower() for col in eval_df.columns)
            features_lower = set(feature.lower() for feature in features)

            # Check if the features are a subset of the DataFrame columns (case-insensitive)
            if not features_lower.issubset(df_columns_lower):
                missing_features = features_lower - df_columns_lower
                raise ValueError(f"DataFrame does not contain required features: {missing_features}")

        # Create our Endpoint Predictor Class
        predictor = Predictor(
            self.endpoint_name,
            sagemaker_session=self.sm_session,
            serializer=CSVSerializer(),
            deserializer=CSVDeserializer(),
        )

        # Now split up the dataframe into 500 row chunks, send those chunks to our
        # endpoint (with error handling) and stitch all the chunks back together
        df_list = []
        for index in range(0, len(eval_df), 500):
            self.log.info("Processing...")

            # Compute partial DataFrames, add them to a list, and concatenate at the end
            partial_df = self._endpoint_error_handling(predictor, eval_df[index : index + 500])
            df_list.append(partial_df)

        # Concatenate the dataframes
        combined_df = pd.concat(df_list, ignore_index=True)

        # Convert data to numeric
        # Note: Since we're using CSV serializers numeric columns often get changed to generic 'object' types

        # Hard Conversion
        # Note: We explicitly catch exceptions for columns that cannot be converted to numeric
        converted_df = combined_df.copy()
        for column in combined_df.columns:
            try:
                converted_df[column] = pd.to_numeric(combined_df[column])
            except ValueError:
                # If a ValueError is raised, the column cannot be converted to numeric, so we keep it as is
                pass

        # Soft Conversion
        # Convert columns to the best possible dtype that supports the pd.NA missing value.
        converted_df = converted_df.convert_dtypes()

        # Report on any rows that failed
        failed_rows = converted_df[converted_df["prediction"].isna()]
        if not failed_rows.empty:
            self.log.warning(f"Rows that failed:\n{failed_rows}")

        # Convert pd.NA placeholders to pd.NA
        # Note: CSV serialization converts pd.NA to blank strings, so we have to put in placeholders
        converted_df.replace("__NA__", pd.NA, inplace=True)

        # Return the Dataframe
        return converted_df

    def _endpoint_error_handling(self, predictor, feature_df):
        """Internal: Handles errors, retries, and binary search for problematic rows."""

        # Convert DataFrame into a CSV buffer
        csv_buffer = StringIO()
        feature_df.to_csv(csv_buffer, index=False)

        try:
            # Send CSV buffer to the predictor and process results
            results = predictor.predict(csv_buffer.getvalue())
            results_df = pd.DataFrame.from_records(results[1:], columns=results[0])
            self.endpoint_return_columns = results_df.columns.tolist()
            return results_df

        except botocore.exceptions.ClientError as err:
            error_code = err.response["Error"]["Code"]

            if error_code == "ModelNotReadyException":
                self.log.error(f"Error {error_code}: {err.response.get('Message', 'No message')}")
                self.log.error("Model not ready. Sleeping and retrying...")
                time.sleep(60)
                return self._endpoint_error_handling(predictor, feature_df)

            elif error_code == "ModelError":
                self.log.warning("Model error. Bisecting the DataFrame and retrying...")

                # Base case: If there is only one row, we can't binary search further
                if len(feature_df) == 1:
                    if not self.endpoint_return_columns:
                        raise
                    return self._error_df(feature_df, self.endpoint_return_columns)

                # Binary search to find the problematic row(s)
                mid_point = len(feature_df) // 2
                first_half = self._endpoint_error_handling(predictor, feature_df.iloc[:mid_point])
                second_half = self._endpoint_error_handling(predictor, feature_df.iloc[mid_point:])
                return pd.concat([first_half, second_half], ignore_index=True)

            else:
                # Unknown ClientError, raise the exception
                self.log.critical(f"Unexpected ClientError: {err}")
                raise

        except Exception as err:
            self.log.critical(f"Unexpected general error: {err}")
            raise

    def _error_df(self, df, all_columns):
        """Internal: Method to construct an Error DataFrame (a Pandas DataFrame with one row of NaNs)"""
        # Create a new dataframe with all NaNs
        error_df = pd.DataFrame(dict(zip(all_columns, [[np.NaN]] * len(self.endpoint_return_columns))))
        # Now set the original values for the incoming dataframe
        for column in df.columns:
            error_df[column] = df[column].values
        return error_df

    def _capture_inference_results(
        self,
        capture_uuid: str,
        pred_results_df: pd.DataFrame,
        target_column: str,
        model_type: ModelType,
        metrics: pd.DataFrame,
        description: str,
        id_column: str = None,
    ):
        """Internal: Capture the inference results and metrics to S3

        Args:
            capture_uuid (str): UUID of the inference capture
            pred_results_df (pd.DataFrame): DataFrame with the prediction results
            target_column (str): Name of the target column
            model_type (ModelType): Type of the model (e.g. REGRESSOR, CLASSIFIER)
            metrics (pd.DataFrame): DataFrame with the performance metrics
            description (str): Description of the inference results
            id_column (str, optional): Name of the ID column (default=None)
        """

        # Compute a dataframe hash (just use the last 8)
        data_hash = joblib.hash(pred_results_df)[:8]

        # Metadata for the model inference
        inference_meta = {
            "name": capture_uuid,
            "data_hash": data_hash,
            "num_rows": len(pred_results_df),
            "description": description,
        }

        # Create the S3 Path for the Inference Capture
        inference_capture_path = f"{self.endpoint_inference_path}/{capture_uuid}"

        # Write the metadata dictionary and metrics to our S3 Model Inference Folder
        wr.s3.to_json(
            pd.DataFrame([inference_meta]),
            f"{inference_capture_path}/inference_meta.json",
            index=False,
        )
        self.log.info(f"Writing metrics to {inference_capture_path}/inference_metrics.csv")
        wr.s3.to_csv(metrics, f"{inference_capture_path}/inference_metrics.csv", index=False)

        # Grab the target column, prediction column, any _proba columns, and the ID column (if present)
        prediction_col = "prediction" if "prediction" in pred_results_df.columns else "predictions"
        output_columns = [target_column, prediction_col]

        # Add any _proba columns to the output columns
        output_columns += [col for col in pred_results_df.columns if col.endswith("_proba")]

        # Add any quantile columns to the output columns
        output_columns += [col for col in pred_results_df.columns if col.startswith("q_") or col.startswith("qr_")]

        # Add the ID column
        if id_column and id_column in pred_results_df.columns:
            output_columns.append(id_column)

        # Write the predictions to our S3 Model Inference Folder
        self.log.info(f"Writing predictions to {inference_capture_path}/inference_predictions.csv")
        subset_df = pred_results_df[output_columns]
        wr.s3.to_csv(subset_df, f"{inference_capture_path}/inference_predictions.csv", index=False)

        # CLASSIFIER: Write the confusion matrix to our S3 Model Inference Folder
        if model_type == ModelType.CLASSIFIER:
            conf_mtx = self.generate_confusion_matrix(target_column, pred_results_df)
            self.log.info(f"Writing confusion matrix to {inference_capture_path}/inference_cm.csv")
            # Note: Unlike other dataframes here, we want to write the index (labels) to the CSV
            wr.s3.to_csv(conf_mtx, f"{inference_capture_path}/inference_cm.csv", index=True)

        # Generate SHAP values for our Prediction Dataframe
        generate_shap_values(self.endpoint_name, model_type.value, pred_results_df, inference_capture_path)

        # Now recompute the details for our Model
        self.log.important(f"Recomputing Details for {self.model_name} to show latest Inference Results...")
        model = ModelCore(self.model_name)
        model._load_inference_metrics(capture_uuid)
        model.details(recompute=True)

        # Recompute the details so that inference model metrics are updated
        self.log.important(f"Recomputing Details for {self.uuid} to show latest Inference Results...")
        self.details(recompute=True)

    def regression_metrics(self, target_column: str, prediction_df: pd.DataFrame) -> pd.DataFrame:
        """Compute the performance metrics for this Endpoint
        Args:
            target_column (str): Name of the target column
            prediction_df (pd.DataFrame): DataFrame with the prediction results
        Returns:
            pd.DataFrame: DataFrame with the performance metrics
        """

        # Sanity Check the prediction DataFrame
        if prediction_df.empty:
            self.log.warning("No predictions were made. Returning empty DataFrame.")
            return pd.DataFrame()

        # Compute the metrics
        y_true = prediction_df[target_column]
        prediction_col = "prediction" if "prediction" in prediction_df.columns else "predictions"
        y_pred = prediction_df[prediction_col]

        mae = mean_absolute_error(y_true, y_pred)
        rmse = np.sqrt(mean_squared_error(y_true, y_pred))
        r2 = r2_score(y_true, y_pred)
        # Mean Absolute Percentage Error
        mape = np.mean(np.where(y_true != 0, np.abs((y_true - y_pred) / y_true), np.abs(y_true - y_pred))) * 100
        # Median Absolute Error
        medae = median_absolute_error(y_true, y_pred)

        # Organize and return the metrics
        metrics = {
            "MAE": round(mae, 3),
            "RMSE": round(rmse, 3),
            "R2": round(r2, 3),
            "MAPE": round(mape, 3),
            "MedAE": round(medae, 3),
            "NumRows": len(prediction_df),
        }
        return pd.DataFrame.from_records([metrics])

    def residuals(self, target_column: str, prediction_df: pd.DataFrame) -> pd.DataFrame:
        """Add the residuals to the prediction DataFrame
        Args:
            target_column (str): Name of the target column
            prediction_df (pd.DataFrame): DataFrame with the prediction results
        Returns:
            pd.DataFrame: DataFrame with two new columns called 'residuals' and 'residuals_abs'
        """

        # Compute the residuals
        y_true = prediction_df[target_column]
        prediction_col = "prediction" if "prediction" in prediction_df.columns else "predictions"
        y_pred = prediction_df[prediction_col]

        # Check for classification scenario
        if not pd.api.types.is_numeric_dtype(y_true) or not pd.api.types.is_numeric_dtype(y_pred):
            self.log.warning("Target and Prediction columns are not numeric. Computing 'diffs'...")
            prediction_df["residuals"] = (y_true != y_pred).astype(int)
            prediction_df["residuals_abs"] = prediction_df["residuals"]
        else:
            # Compute numeric residuals for regression
            prediction_df["residuals"] = y_true - y_pred
            prediction_df["residuals_abs"] = np.abs(prediction_df["residuals"])

        return prediction_df

    @staticmethod
    def validate_proba_columns(prediction_df: pd.DataFrame, class_labels: list, guessing: bool = False):
        """Ensure probability columns are correctly aligned with class labels

        Args:
            prediction_df (pd.DataFrame): DataFrame with the prediction results
            class_labels (list): List of class labels
            guessing (bool, optional): Whether we're guessing the class labels. Defaults to False.
        """
        proba_columns = [col.replace("_proba", "") for col in prediction_df.columns if col.endswith("_proba")]

        if sorted(class_labels) != sorted(proba_columns):
            if guessing:
                raise ValueError(f"_proba columns {proba_columns} != GUESSED class_labels {class_labels}!")
            else:
                raise ValueError(f"_proba columns {proba_columns} != class_labels {class_labels}!")

    def classification_metrics(self, target_column: str, prediction_df: pd.DataFrame) -> pd.DataFrame:
        """Compute the performance metrics for this Endpoint

        Args:
            target_column (str): Name of the target column
            prediction_df (pd.DataFrame): DataFrame with the prediction results

        Returns:
            pd.DataFrame: DataFrame with the performance metrics
        """
        # Get the class labels from the model
        class_labels = ModelCore(self.model_name).class_labels()
        if class_labels is None:
            self.log.warning(
                "Class labels not found in the model. Guessing class labels from the prediction DataFrame."
            )
            class_labels = prediction_df[target_column].unique().tolist()
            self.validate_proba_columns(prediction_df, class_labels, guessing=True)
        else:
            self.validate_proba_columns(prediction_df, class_labels)

        # Calculate precision, recall, fscore, and support, handling zero division
        prediction_col = "prediction" if "prediction" in prediction_df.columns else "predictions"
        scores = precision_recall_fscore_support(
            prediction_df[target_column],
            prediction_df[prediction_col],
            average=None,
            labels=class_labels,
            zero_division=0,
        )

        # Identify the probability columns and keep them as a Pandas DataFrame
        proba_columns = [f"{label}_proba" for label in class_labels]
        y_score = prediction_df[proba_columns]

        # One-hot encode the true labels using all class labels (fit with class_labels)
        encoder = OneHotEncoder(categories=[class_labels], sparse_output=False)
        y_true = encoder.fit_transform(prediction_df[[target_column]])

        # Calculate ROC AUC per label and handle exceptions for missing classes
        roc_auc_per_label = []
        for i, label in enumerate(class_labels):
            try:
                roc_auc = roc_auc_score(y_true[:, i], y_score.iloc[:, i])
            except ValueError as e:
                self.log.warning(f"ROC AUC calculation failed for label {label}.")
                self.log.warning(f"{str(e)}")
                roc_auc = 0.0
            roc_auc_per_label.append(roc_auc)

        # Put the scores into a DataFrame
        score_df = pd.DataFrame(
            {
                target_column: class_labels,
                "precision": scores[0],
                "recall": scores[1],
                "fscore": scores[2],
                "roc_auc": roc_auc_per_label,
                "support": scores[3],
            }
        )

        # Sort the target labels
        score_df = score_df.sort_values(by=[target_column], ascending=True)
        return score_df

    def generate_confusion_matrix(self, target_column: str, prediction_df: pd.DataFrame) -> pd.DataFrame:
        """Compute the confusion matrix for this Endpoint
        Args:
            target_column (str): Name of the target column
            prediction_df (pd.DataFrame): DataFrame with the prediction results
        Returns:
            pd.DataFrame: DataFrame with the confusion matrix
        """

        y_true = prediction_df[target_column]
        prediction_col = "prediction" if "prediction" in prediction_df.columns else "predictions"
        y_pred = prediction_df[prediction_col]

        # Check if our model has class labels, if not we'll use the unique labels in the prediction
        class_labels = ModelCore(self.model_name).class_labels()
        if class_labels is None:
            class_labels = sorted(list(set(y_true) | set(y_pred)))

        # Compute the confusion matrix (sklearn confusion_matrix)
        conf_mtx = confusion_matrix(y_true, y_pred, labels=class_labels)

        # Create a DataFrame
        conf_mtx_df = pd.DataFrame(conf_mtx, index=class_labels, columns=class_labels)
        conf_mtx_df.index.name = "labels"

        # Check if our model has class labels. If so make the index and columns ordered
        model_class_labels = ModelCore(self.model_name).class_labels()
        if model_class_labels:
            self.log.important("Reordering the confusion matrix based on model class labels...")
            conf_mtx_df.index = pd.Categorical(conf_mtx_df.index, categories=model_class_labels, ordered=True)
            conf_mtx_df.columns = pd.Categorical(conf_mtx_df.columns, categories=model_class_labels, ordered=True)
            conf_mtx_df = conf_mtx_df.sort_index().sort_index(axis=1)
        return conf_mtx_df

    def endpoint_config_name(self) -> str:
        # Grab the Endpoint Config Name from the AWS
        details = self.sm_client.describe_endpoint(EndpointName=self.endpoint_name)
        return details["EndpointConfigName"]

    def set_input(self, input: str, force=False):
        """Override: Set the input data for this artifact

        Args:
            input (str): Name of input for this artifact
            force (bool, optional): Force the input to be set. Defaults to False.
        Note:
            We're going to not allow this to be used for Models
        """
        if not force:
            self.log.warning(f"Endpoint {self.uuid}: Does not allow manual override of the input!")
            return

        # Okay we're going to allow this to be set
        self.log.important(f"{self.uuid}: Setting input to {input}...")
        self.log.important("Be careful with this! It breaks automatic provenance of the artifact!")
        self.upsert_workbench_meta({"workbench_input": input})

    def delete(self):
        """ "Delete an existing Endpoint: Underlying Models, Configuration, and Endpoint"""
        if not self.exists():
            self.log.warning(f"Trying to delete an Model that doesn't exist: {self.uuid}")

        # Call the Class Method to delete the FeatureSet
        EndpointCore.managed_delete(endpoint_name=self.uuid)

    @classmethod
    def managed_delete(cls, endpoint_name: str):
        """Delete the Endpoint and associated resources if it exists"""

        # Check if the endpoint exists
        try:
            endpoint_info = cls.sm_client.describe_endpoint(EndpointName=endpoint_name)
        except ClientError as e:
            if e.response["Error"]["Code"] in ["ValidationException", "ResourceNotFound"]:
                cls.log.info(f"Endpoint {endpoint_name} not found!")
                return
            raise  # Re-raise unexpected errors

        # Delete underlying models (Endpoints store/use models internally)
        cls.delete_endpoint_models(endpoint_name)

        # Get Endpoint Config Name and delete if exists
        endpoint_config_name = endpoint_info["EndpointConfigName"]
        try:
            cls.log.info(f"Deleting Endpoint Config {endpoint_config_name}...")
            cls.sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
        except ClientError:
            cls.log.info(f"Endpoint Config {endpoint_config_name} not found...")

        # Delete any monitoring schedules associated with the endpoint
        monitoring_schedules = cls.sm_client.list_monitoring_schedules(EndpointName=endpoint_name)[
            "MonitoringScheduleSummaries"
        ]
        for schedule in monitoring_schedules:
            cls.log.info(f"Deleting Monitoring Schedule {schedule['MonitoringScheduleName']}...")
            cls.sm_client.delete_monitoring_schedule(MonitoringScheduleName=schedule["MonitoringScheduleName"])

        # Delete related S3 artifacts (inference, data capture, monitoring)
        endpoint_inference_path = cls.endpoints_s3_path + "/inference/" + endpoint_name
        endpoint_data_capture_path = cls.endpoints_s3_path + "/data_capture/" + endpoint_name
        endpoint_monitoring_path = cls.endpoints_s3_path + "/monitoring/" + endpoint_name
        for s3_path in [endpoint_inference_path, endpoint_data_capture_path, endpoint_monitoring_path]:
            s3_path = f"{s3_path.rstrip('/')}/"
            objects = wr.s3.list_objects(s3_path, boto3_session=cls.boto3_session)
            if objects:
                cls.log.info(f"Deleting S3 Objects at {s3_path}...")
                wr.s3.delete_objects(objects, boto3_session=cls.boto3_session)

        # Delete any dataframes that were stored in the Dataframe Cache
        cls.log.info("Deleting Dataframe Cache...")
        cls.df_cache.delete_recursive(endpoint_name)

        # Delete the endpoint
        time.sleep(2)  # Allow AWS to catch up
        try:
            cls.log.info(f"Deleting Endpoint {endpoint_name}...")
            cls.sm_client.delete_endpoint(EndpointName=endpoint_name)
        except ClientError as e:
            cls.log.error("Error deleting endpoint.")
            raise e

        time.sleep(5)  # Final sleep for AWS to fully register deletions

    @classmethod
    def delete_endpoint_models(cls, endpoint_name: str):
        """Delete the underlying Model for an Endpoint

        Args:
            endpoint_name (str): The name of the endpoint to delete
        """

        # Grab the Endpoint Config Name from AWS
        endpoint_config_name = cls.sm_client.describe_endpoint(EndpointName=endpoint_name)["EndpointConfigName"]

        # Retrieve the Model Names from the Endpoint Config
        try:
            endpoint_config = cls.sm_client.describe_endpoint_config(EndpointConfigName=endpoint_config_name)
        except botocore.exceptions.ClientError:
            cls.log.info(f"Endpoint Config {endpoint_config_name} doesn't exist...")
            return
        model_names = [variant["ModelName"] for variant in endpoint_config["ProductionVariants"]]
        for model_name in model_names:
            cls.log.info(f"Deleting Internal Model {model_name}...")
            try:
                cls.sm_client.delete_model(ModelName=model_name)
            except botocore.exceptions.ClientError as error:
                error_code = error.response["Error"]["Code"]
                error_message = error.response["Error"]["Message"]
                if error_code == "ResourceInUse":
                    cls.log.warning(f"Model {model_name} is still in use...")
                else:
                    cls.log.warning(f"Error: {error_code} - {error_message}")

__init__(endpoint_uuid, **kwargs)

EndpointCore Initialization

Parameters:

Name Type Description Default
endpoint_uuid str

Name of Endpoint in Workbench

required
Source code in src/workbench/core/artifacts/endpoint_core.py
def __init__(self, endpoint_uuid, **kwargs):
    """EndpointCore Initialization

    Args:
        endpoint_uuid (str): Name of Endpoint in Workbench
    """

    # Make sure the endpoint_uuid is a valid name
    self.is_name_valid(endpoint_uuid, delimiter="-", lower_case=False)

    # Call SuperClass Initialization
    super().__init__(endpoint_uuid, **kwargs)

    # Grab an Cloud Metadata object and pull information for Endpoints
    self.endpoint_name = endpoint_uuid
    self.endpoint_meta = self.meta.endpoint(self.endpoint_name)

    # Sanity check that we found the endpoint
    if self.endpoint_meta is None:
        self.log.important(f"Could not find endpoint {self.uuid} within current visibility scope")
        return

    # Sanity check the Endpoint state
    if self.endpoint_meta["EndpointStatus"] == "Failed":
        self.log.critical(f"Endpoint {self.uuid} is in a failed state")
        reason = self.endpoint_meta["FailureReason"]
        self.log.critical(f"Failure Reason: {reason}")
        self.log.critical("Please delete this endpoint and re-deploy...")

    # Set the Inference, Capture, and Monitoring S3 Paths
    self.endpoint_inference_path = self.endpoints_s3_path + "/inference/" + self.uuid
    self.endpoint_data_capture_path = self.endpoints_s3_path + "/data_capture/" + self.uuid
    self.endpoint_monitoring_path = self.endpoints_s3_path + "/monitoring/" + self.uuid

    # Set the Model Name
    self.model_name = self.get_input()

    # This is for endpoint error handling later
    self.endpoint_return_columns = None

    # We temporary cache the endpoint metrics
    self.temp_storage = Cache(prefix="temp_storage", expire=300)  # 5 minutes

    # Call SuperClass Post Initialization
    super().__post_init__()

    # All done
    self.log.info(f"EndpointCore Initialized: {self.endpoint_name}")

add_data_capture()

Add data capture to the endpoint

Source code in src/workbench/core/artifacts/endpoint_core.py
def add_data_capture(self):
    """Add data capture to the endpoint"""
    self.get_monitor().add_data_capture()

arn()

AWS ARN (Amazon Resource Name) for this artifact

Source code in src/workbench/core/artifacts/endpoint_core.py
def arn(self) -> str:
    """AWS ARN (Amazon Resource Name) for this artifact"""
    return self.endpoint_meta["EndpointArn"]

auto_inference(capture=False)

Run inference on the endpoint using FeatureSet data

Parameters:

Name Type Description Default
capture bool

Capture the inference results and metrics (default=False)

False
Source code in src/workbench/core/artifacts/endpoint_core.py
def auto_inference(self, capture: bool = False) -> pd.DataFrame:
    """Run inference on the endpoint using FeatureSet data

    Args:
        capture (bool, optional): Capture the inference results and metrics (default=False)
    """

    # Sanity Check that we have a model
    model = ModelCore(self.get_input())
    if not model.exists():
        self.log.error("No model found for this endpoint. Returning empty DataFrame.")
        return pd.DataFrame()

    # Now get the FeatureSet and make sure it exists
    fs = FeatureSetCore(model.get_input())
    if not fs.exists():
        self.log.error("No FeatureSet found for this endpoint. Returning empty DataFrame.")
        return pd.DataFrame()

    # Grab the evaluation data from the FeatureSet
    table = fs.view("training").table
    eval_df = fs.query(f'SELECT * FROM "{table}" where training = FALSE')
    capture_uuid = "auto_inference" if capture else None
    return self.inference(eval_df, capture_uuid, id_column=fs.id_column)

aws_meta()

Get ALL the AWS metadata for this artifact

Source code in src/workbench/core/artifacts/endpoint_core.py
def aws_meta(self) -> dict:
    """Get ALL the AWS metadata for this artifact"""
    return self.endpoint_meta

aws_url()

The AWS URL for looking at/querying this data source

Source code in src/workbench/core/artifacts/endpoint_core.py
def aws_url(self):
    """The AWS URL for looking at/querying this data source"""
    return f"https://{self.aws_region}.console.aws.amazon.com/athena/home"

classification_metrics(target_column, prediction_df)

Compute the performance metrics for this Endpoint

Parameters:

Name Type Description Default
target_column str

Name of the target column

required
prediction_df DataFrame

DataFrame with the prediction results

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with the performance metrics

Source code in src/workbench/core/artifacts/endpoint_core.py
def classification_metrics(self, target_column: str, prediction_df: pd.DataFrame) -> pd.DataFrame:
    """Compute the performance metrics for this Endpoint

    Args:
        target_column (str): Name of the target column
        prediction_df (pd.DataFrame): DataFrame with the prediction results

    Returns:
        pd.DataFrame: DataFrame with the performance metrics
    """
    # Get the class labels from the model
    class_labels = ModelCore(self.model_name).class_labels()
    if class_labels is None:
        self.log.warning(
            "Class labels not found in the model. Guessing class labels from the prediction DataFrame."
        )
        class_labels = prediction_df[target_column].unique().tolist()
        self.validate_proba_columns(prediction_df, class_labels, guessing=True)
    else:
        self.validate_proba_columns(prediction_df, class_labels)

    # Calculate precision, recall, fscore, and support, handling zero division
    prediction_col = "prediction" if "prediction" in prediction_df.columns else "predictions"
    scores = precision_recall_fscore_support(
        prediction_df[target_column],
        prediction_df[prediction_col],
        average=None,
        labels=class_labels,
        zero_division=0,
    )

    # Identify the probability columns and keep them as a Pandas DataFrame
    proba_columns = [f"{label}_proba" for label in class_labels]
    y_score = prediction_df[proba_columns]

    # One-hot encode the true labels using all class labels (fit with class_labels)
    encoder = OneHotEncoder(categories=[class_labels], sparse_output=False)
    y_true = encoder.fit_transform(prediction_df[[target_column]])

    # Calculate ROC AUC per label and handle exceptions for missing classes
    roc_auc_per_label = []
    for i, label in enumerate(class_labels):
        try:
            roc_auc = roc_auc_score(y_true[:, i], y_score.iloc[:, i])
        except ValueError as e:
            self.log.warning(f"ROC AUC calculation failed for label {label}.")
            self.log.warning(f"{str(e)}")
            roc_auc = 0.0
        roc_auc_per_label.append(roc_auc)

    # Put the scores into a DataFrame
    score_df = pd.DataFrame(
        {
            target_column: class_labels,
            "precision": scores[0],
            "recall": scores[1],
            "fscore": scores[2],
            "roc_auc": roc_auc_per_label,
            "support": scores[3],
        }
    )

    # Sort the target labels
    score_df = score_df.sort_values(by=[target_column], ascending=True)
    return score_df

created()

Return the datetime when this artifact was created

Source code in src/workbench/core/artifacts/endpoint_core.py
def created(self) -> datetime:
    """Return the datetime when this artifact was created"""
    return self.endpoint_meta["CreationTime"]

delete()

"Delete an existing Endpoint: Underlying Models, Configuration, and Endpoint

Source code in src/workbench/core/artifacts/endpoint_core.py
def delete(self):
    """ "Delete an existing Endpoint: Underlying Models, Configuration, and Endpoint"""
    if not self.exists():
        self.log.warning(f"Trying to delete an Model that doesn't exist: {self.uuid}")

    # Call the Class Method to delete the FeatureSet
    EndpointCore.managed_delete(endpoint_name=self.uuid)

delete_endpoint_models(endpoint_name) classmethod

Delete the underlying Model for an Endpoint

Parameters:

Name Type Description Default
endpoint_name str

The name of the endpoint to delete

required
Source code in src/workbench/core/artifacts/endpoint_core.py
@classmethod
def delete_endpoint_models(cls, endpoint_name: str):
    """Delete the underlying Model for an Endpoint

    Args:
        endpoint_name (str): The name of the endpoint to delete
    """

    # Grab the Endpoint Config Name from AWS
    endpoint_config_name = cls.sm_client.describe_endpoint(EndpointName=endpoint_name)["EndpointConfigName"]

    # Retrieve the Model Names from the Endpoint Config
    try:
        endpoint_config = cls.sm_client.describe_endpoint_config(EndpointConfigName=endpoint_config_name)
    except botocore.exceptions.ClientError:
        cls.log.info(f"Endpoint Config {endpoint_config_name} doesn't exist...")
        return
    model_names = [variant["ModelName"] for variant in endpoint_config["ProductionVariants"]]
    for model_name in model_names:
        cls.log.info(f"Deleting Internal Model {model_name}...")
        try:
            cls.sm_client.delete_model(ModelName=model_name)
        except botocore.exceptions.ClientError as error:
            error_code = error.response["Error"]["Code"]
            error_message = error.response["Error"]["Message"]
            if error_code == "ResourceInUse":
                cls.log.warning(f"Model {model_name} is still in use...")
            else:
                cls.log.warning(f"Error: {error_code} - {error_message}")

details(recompute=False)

Additional Details about this Endpoint Args: recompute (bool): Recompute the details (default: False) Returns: dict(dict): A dictionary of details about this Endpoint

Source code in src/workbench/core/artifacts/endpoint_core.py
def details(self, recompute: bool = False) -> dict:
    """Additional Details about this Endpoint
    Args:
        recompute (bool): Recompute the details (default: False)
    Returns:
        dict(dict): A dictionary of details about this Endpoint
    """

    # Fill in all the details about this Endpoint
    details = self.summary()

    # Get details from our AWS Metadata
    details["status"] = self.endpoint_meta["EndpointStatus"]
    details["instance"] = self.endpoint_meta["InstanceType"]
    try:
        details["instance_count"] = self.endpoint_meta["ProductionVariants"][0]["CurrentInstanceCount"] or "-"
    except KeyError:
        details["instance_count"] = "-"
    if "ProductionVariants" in self.endpoint_meta:
        details["variant"] = self.endpoint_meta["ProductionVariants"][0]["VariantName"]
    else:
        details["variant"] = "-"

    # Add endpoint metrics from CloudWatch
    details["endpoint_metrics"] = self.endpoint_metrics()

    # Return the details
    return details

endpoint_metrics()

Return the metrics for this endpoint

Returns:

Type Description
Union[DataFrame, None]

pd.DataFrame: DataFrame with the metrics for this endpoint (or None if no metrics)

Source code in src/workbench/core/artifacts/endpoint_core.py
def endpoint_metrics(self) -> Union[pd.DataFrame, None]:
    """Return the metrics for this endpoint

    Returns:
        pd.DataFrame: DataFrame with the metrics for this endpoint (or None if no metrics)
    """

    # Do we have it cached?
    metrics_key = f"endpoint:{self.uuid}:endpoint_metrics"
    endpoint_metrics = self.temp_storage.get(metrics_key)
    if endpoint_metrics is not None:
        return endpoint_metrics

    # We don't have it cached so let's get it from CloudWatch
    if "ProductionVariants" not in self.endpoint_meta:
        return None
    self.log.important("Updating endpoint metrics...")
    variant = self.endpoint_meta["ProductionVariants"][0]["VariantName"]
    endpoint_metrics = EndpointMetrics().get_metrics(self.uuid, variant=variant)
    self.temp_storage.set(metrics_key, endpoint_metrics)
    return endpoint_metrics

exists()

Does the feature_set_name exist in the AWS Metadata?

Source code in src/workbench/core/artifacts/endpoint_core.py
def exists(self) -> bool:
    """Does the feature_set_name exist in the AWS Metadata?"""
    if self.endpoint_meta is None:
        self.log.debug(f"Endpoint {self.endpoint_name} not found in AWS Metadata")
        return False
    return True

fast_inference(eval_df)

Run inference on the Endpoint using the provided DataFrame

Parameters:

Name Type Description Default
eval_df DataFrame

The DataFrame to run predictions on

required

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with predictions

Note

There's no sanity checks or error handling... just FAST Inference!

Source code in src/workbench/core/artifacts/endpoint_core.py
def fast_inference(self, eval_df: pd.DataFrame) -> pd.DataFrame:
    """Run inference on the Endpoint using the provided DataFrame

    Args:
        eval_df (pd.DataFrame): The DataFrame to run predictions on

    Returns:
        pd.DataFrame: The DataFrame with predictions

    Note:
        There's no sanity checks or error handling... just FAST Inference!
    """
    return fast_inference(self.uuid, eval_df, self.sm_session)

generate_confusion_matrix(target_column, prediction_df)

Compute the confusion matrix for this Endpoint Args: target_column (str): Name of the target column prediction_df (pd.DataFrame): DataFrame with the prediction results Returns: pd.DataFrame: DataFrame with the confusion matrix

Source code in src/workbench/core/artifacts/endpoint_core.py
def generate_confusion_matrix(self, target_column: str, prediction_df: pd.DataFrame) -> pd.DataFrame:
    """Compute the confusion matrix for this Endpoint
    Args:
        target_column (str): Name of the target column
        prediction_df (pd.DataFrame): DataFrame with the prediction results
    Returns:
        pd.DataFrame: DataFrame with the confusion matrix
    """

    y_true = prediction_df[target_column]
    prediction_col = "prediction" if "prediction" in prediction_df.columns else "predictions"
    y_pred = prediction_df[prediction_col]

    # Check if our model has class labels, if not we'll use the unique labels in the prediction
    class_labels = ModelCore(self.model_name).class_labels()
    if class_labels is None:
        class_labels = sorted(list(set(y_true) | set(y_pred)))

    # Compute the confusion matrix (sklearn confusion_matrix)
    conf_mtx = confusion_matrix(y_true, y_pred, labels=class_labels)

    # Create a DataFrame
    conf_mtx_df = pd.DataFrame(conf_mtx, index=class_labels, columns=class_labels)
    conf_mtx_df.index.name = "labels"

    # Check if our model has class labels. If so make the index and columns ordered
    model_class_labels = ModelCore(self.model_name).class_labels()
    if model_class_labels:
        self.log.important("Reordering the confusion matrix based on model class labels...")
        conf_mtx_df.index = pd.Categorical(conf_mtx_df.index, categories=model_class_labels, ordered=True)
        conf_mtx_df.columns = pd.Categorical(conf_mtx_df.columns, categories=model_class_labels, ordered=True)
        conf_mtx_df = conf_mtx_df.sort_index().sort_index(axis=1)
    return conf_mtx_df

get_monitor()

Get the MonitorCore class for this endpoint

Source code in src/workbench/core/artifacts/endpoint_core.py
def get_monitor(self):
    """Get the MonitorCore class for this endpoint"""
    from workbench.core.artifacts.monitor_core import MonitorCore

    return MonitorCore(self.endpoint_name)

hash()

Return the hash for the internal model used by this endpoint

Returns:

Type Description
Optional[str]

Optional[str]: The hash for the internal model used by this endpoint

Source code in src/workbench/core/artifacts/endpoint_core.py
def hash(self) -> Optional[str]:
    """Return the hash for the internal model used by this endpoint

    Returns:
        Optional[str]: The hash for the internal model used by this endpoint
    """
    from workbench.utils.endpoint_utils import get_model_data_url  # Avoid circular import

    model_url = get_model_data_url(self.endpoint_config_name(), self.boto3_session)
    return compute_s3_object_hash(model_url, self.boto3_session)

health_check()

Perform a health check on this model

Returns:

Type Description
list[str]

list[str]: List of health issues

Source code in src/workbench/core/artifacts/endpoint_core.py
def health_check(self) -> list[str]:
    """Perform a health check on this model

    Returns:
        list[str]: List of health issues
    """
    if not self.ready():
        return ["needs_onboard"]

    # Call the base class health check
    health_issues = super().health_check()

    # Does this endpoint have a config?
    # Note: This is not an authoritative check, so improve later
    if self.endpoint_meta.get("ProductionVariants") is None:
        health_issues.append("no_config")

    # We're going to check for 5xx errors and no activity
    endpoint_metrics = self.endpoint_metrics()

    # Check if we have metrics
    if endpoint_metrics is None:
        health_issues.append("unknown_error")
        return health_issues

    # Check for 5xx errors
    num_errors = endpoint_metrics["Invocation5XXErrors"].sum()
    if num_errors > 5:
        health_issues.append("5xx_errors")
    elif num_errors > 0:
        health_issues.append("5xx_errors_min")
    else:
        self.remove_health_tag("5xx_errors")
        self.remove_health_tag("5xx_errors_min")

    # Check for Endpoint activity
    num_invocations = endpoint_metrics["Invocations"].sum()
    if num_invocations == 0:
        health_issues.append("no_activity")
    else:
        self.remove_health_tag("no_activity")
    return health_issues

inference(eval_df, capture_uuid=None, id_column=None)

Run inference and compute performance metrics with optional capture

Parameters:

Name Type Description Default
eval_df DataFrame

DataFrame to run predictions on (must have superset of features)

required
capture_uuid str

UUID of the inference capture (default=None)

None
id_column str

Name of the ID column (default=None)

None

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with the inference results

Note

If capture=True inference/performance metrics are written to S3 Endpoint Inference Folder

Source code in src/workbench/core/artifacts/endpoint_core.py
def inference(self, eval_df: pd.DataFrame, capture_uuid: str = None, id_column: str = None) -> pd.DataFrame:
    """Run inference and compute performance metrics with optional capture

    Args:
        eval_df (pd.DataFrame): DataFrame to run predictions on (must have superset of features)
        capture_uuid (str, optional): UUID of the inference capture (default=None)
        id_column (str, optional): Name of the ID column (default=None)

    Returns:
        pd.DataFrame: DataFrame with the inference results

    Note:
        If capture=True inference/performance metrics are written to S3 Endpoint Inference Folder
    """

    # Run predictions on the evaluation data
    prediction_df = self._predict(eval_df)
    if prediction_df.empty:
        self.log.warning("No predictions were made. Returning empty DataFrame.")
        return prediction_df

    # Get the target column
    model = ModelCore(self.model_name)
    target_column = model.target()

    # Sanity Check that the target column is present
    if target_column and (target_column not in prediction_df.columns):
        self.log.important(f"Target Column {target_column} not found in prediction_df!")
        self.log.important("In order to compute metrics, the target column must be present!")
        return prediction_df

    # Compute the standard performance metrics for this model
    model_type = model.model_type
    if model_type in [ModelType.REGRESSOR, ModelType.QUANTILE_REGRESSOR]:
        prediction_df = self.residuals(target_column, prediction_df)
        metrics = self.regression_metrics(target_column, prediction_df)
    elif model_type == ModelType.CLASSIFIER:
        metrics = self.classification_metrics(target_column, prediction_df)
    else:
        # For other model types, we don't compute metrics
        self.log.important(f"Model Type: {model_type} doesn't have metrics...")
        metrics = pd.DataFrame()

    # Print out the metrics
    if not metrics.empty:
        print(f"Performance Metrics for {self.model_name} on {self.uuid}")
        print(metrics.head())

        # Capture the inference results and metrics
        if capture_uuid is not None:
            description = capture_uuid.replace("_", " ").title()
            self._capture_inference_results(
                capture_uuid, prediction_df, target_column, model_type, metrics, description, id_column
            )

    # Return the prediction DataFrame
    return prediction_df

is_serverless()

Check if the current endpoint is serverless.

Returns:

Name Type Description
bool bool

True if the endpoint is serverless, False otherwise.

Source code in src/workbench/core/artifacts/endpoint_core.py
def is_serverless(self) -> bool:
    """Check if the current endpoint is serverless.

    Returns:
        bool: True if the endpoint is serverless, False otherwise.
    """
    return "Serverless" in self.endpoint_meta["InstanceType"]

managed_delete(endpoint_name) classmethod

Delete the Endpoint and associated resources if it exists

Source code in src/workbench/core/artifacts/endpoint_core.py
@classmethod
def managed_delete(cls, endpoint_name: str):
    """Delete the Endpoint and associated resources if it exists"""

    # Check if the endpoint exists
    try:
        endpoint_info = cls.sm_client.describe_endpoint(EndpointName=endpoint_name)
    except ClientError as e:
        if e.response["Error"]["Code"] in ["ValidationException", "ResourceNotFound"]:
            cls.log.info(f"Endpoint {endpoint_name} not found!")
            return
        raise  # Re-raise unexpected errors

    # Delete underlying models (Endpoints store/use models internally)
    cls.delete_endpoint_models(endpoint_name)

    # Get Endpoint Config Name and delete if exists
    endpoint_config_name = endpoint_info["EndpointConfigName"]
    try:
        cls.log.info(f"Deleting Endpoint Config {endpoint_config_name}...")
        cls.sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
    except ClientError:
        cls.log.info(f"Endpoint Config {endpoint_config_name} not found...")

    # Delete any monitoring schedules associated with the endpoint
    monitoring_schedules = cls.sm_client.list_monitoring_schedules(EndpointName=endpoint_name)[
        "MonitoringScheduleSummaries"
    ]
    for schedule in monitoring_schedules:
        cls.log.info(f"Deleting Monitoring Schedule {schedule['MonitoringScheduleName']}...")
        cls.sm_client.delete_monitoring_schedule(MonitoringScheduleName=schedule["MonitoringScheduleName"])

    # Delete related S3 artifacts (inference, data capture, monitoring)
    endpoint_inference_path = cls.endpoints_s3_path + "/inference/" + endpoint_name
    endpoint_data_capture_path = cls.endpoints_s3_path + "/data_capture/" + endpoint_name
    endpoint_monitoring_path = cls.endpoints_s3_path + "/monitoring/" + endpoint_name
    for s3_path in [endpoint_inference_path, endpoint_data_capture_path, endpoint_monitoring_path]:
        s3_path = f"{s3_path.rstrip('/')}/"
        objects = wr.s3.list_objects(s3_path, boto3_session=cls.boto3_session)
        if objects:
            cls.log.info(f"Deleting S3 Objects at {s3_path}...")
            wr.s3.delete_objects(objects, boto3_session=cls.boto3_session)

    # Delete any dataframes that were stored in the Dataframe Cache
    cls.log.info("Deleting Dataframe Cache...")
    cls.df_cache.delete_recursive(endpoint_name)

    # Delete the endpoint
    time.sleep(2)  # Allow AWS to catch up
    try:
        cls.log.info(f"Deleting Endpoint {endpoint_name}...")
        cls.sm_client.delete_endpoint(EndpointName=endpoint_name)
    except ClientError as e:
        cls.log.error("Error deleting endpoint.")
        raise e

    time.sleep(5)  # Final sleep for AWS to fully register deletions

modified()

Return the datetime when this artifact was last modified

Source code in src/workbench/core/artifacts/endpoint_core.py
def modified(self) -> datetime:
    """Return the datetime when this artifact was last modified"""
    return self.endpoint_meta["LastModifiedTime"]

onboard(interactive=False)

This is a BLOCKING method that will onboard the Endpoint (make it ready) Args: interactive (bool, optional): If True, will prompt the user for information. (default: False) Returns: bool: True if the Endpoint is successfully onboarded, False otherwise

Source code in src/workbench/core/artifacts/endpoint_core.py
def onboard(self, interactive: bool = False) -> bool:
    """This is a BLOCKING method that will onboard the Endpoint (make it ready)
    Args:
        interactive (bool, optional): If True, will prompt the user for information. (default: False)
    Returns:
        bool: True if the Endpoint is successfully onboarded, False otherwise
    """

    # Make sure our input is defined
    if self.get_input() == "unknown":
        if interactive:
            input_model = input("Input Model?: ")
        else:
            self.log.critical("Input Model is not defined!")
            return False
    else:
        input_model = self.get_input()

    # Now that we have the details, let's onboard the Endpoint with args
    return self.onboard_with_args(input_model)

onboard_with_args(input_model)

Onboard the Endpoint with the given arguments

Parameters:

Name Type Description Default
input_model str

The input model for this endpoint

required

Returns: bool: True if the Endpoint is successfully onboarded, False otherwise

Source code in src/workbench/core/artifacts/endpoint_core.py
def onboard_with_args(self, input_model: str) -> bool:
    """Onboard the Endpoint with the given arguments

    Args:
        input_model (str): The input model for this endpoint
    Returns:
        bool: True if the Endpoint is successfully onboarded, False otherwise
    """
    # Set the status to onboarding
    self.set_status("onboarding")

    self.upsert_workbench_meta({"workbench_input": input_model})
    self.model_name = input_model

    # Remove the needs_onboard tag
    self.remove_health_tag("needs_onboard")
    self.set_status("ready")

    # Run a health check and refresh the meta
    time.sleep(2)  # Give the AWS Metadata a chance to update
    self.health_check()
    self.refresh_meta()
    self.details(recompute=True)
    return True

refresh_meta()

Refresh the Artifact's metadata

Source code in src/workbench/core/artifacts/endpoint_core.py
def refresh_meta(self):
    """Refresh the Artifact's metadata"""
    self.endpoint_meta = self.meta.endpoint(self.endpoint_name)

regression_metrics(target_column, prediction_df)

Compute the performance metrics for this Endpoint Args: target_column (str): Name of the target column prediction_df (pd.DataFrame): DataFrame with the prediction results Returns: pd.DataFrame: DataFrame with the performance metrics

Source code in src/workbench/core/artifacts/endpoint_core.py
def regression_metrics(self, target_column: str, prediction_df: pd.DataFrame) -> pd.DataFrame:
    """Compute the performance metrics for this Endpoint
    Args:
        target_column (str): Name of the target column
        prediction_df (pd.DataFrame): DataFrame with the prediction results
    Returns:
        pd.DataFrame: DataFrame with the performance metrics
    """

    # Sanity Check the prediction DataFrame
    if prediction_df.empty:
        self.log.warning("No predictions were made. Returning empty DataFrame.")
        return pd.DataFrame()

    # Compute the metrics
    y_true = prediction_df[target_column]
    prediction_col = "prediction" if "prediction" in prediction_df.columns else "predictions"
    y_pred = prediction_df[prediction_col]

    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    # Mean Absolute Percentage Error
    mape = np.mean(np.where(y_true != 0, np.abs((y_true - y_pred) / y_true), np.abs(y_true - y_pred))) * 100
    # Median Absolute Error
    medae = median_absolute_error(y_true, y_pred)

    # Organize and return the metrics
    metrics = {
        "MAE": round(mae, 3),
        "RMSE": round(rmse, 3),
        "R2": round(r2, 3),
        "MAPE": round(mape, 3),
        "MedAE": round(medae, 3),
        "NumRows": len(prediction_df),
    }
    return pd.DataFrame.from_records([metrics])

residuals(target_column, prediction_df)

Add the residuals to the prediction DataFrame Args: target_column (str): Name of the target column prediction_df (pd.DataFrame): DataFrame with the prediction results Returns: pd.DataFrame: DataFrame with two new columns called 'residuals' and 'residuals_abs'

Source code in src/workbench/core/artifacts/endpoint_core.py
def residuals(self, target_column: str, prediction_df: pd.DataFrame) -> pd.DataFrame:
    """Add the residuals to the prediction DataFrame
    Args:
        target_column (str): Name of the target column
        prediction_df (pd.DataFrame): DataFrame with the prediction results
    Returns:
        pd.DataFrame: DataFrame with two new columns called 'residuals' and 'residuals_abs'
    """

    # Compute the residuals
    y_true = prediction_df[target_column]
    prediction_col = "prediction" if "prediction" in prediction_df.columns else "predictions"
    y_pred = prediction_df[prediction_col]

    # Check for classification scenario
    if not pd.api.types.is_numeric_dtype(y_true) or not pd.api.types.is_numeric_dtype(y_pred):
        self.log.warning("Target and Prediction columns are not numeric. Computing 'diffs'...")
        prediction_df["residuals"] = (y_true != y_pred).astype(int)
        prediction_df["residuals_abs"] = prediction_df["residuals"]
    else:
        # Compute numeric residuals for regression
        prediction_df["residuals"] = y_true - y_pred
        prediction_df["residuals_abs"] = np.abs(prediction_df["residuals"])

    return prediction_df

set_input(input, force=False)

Override: Set the input data for this artifact

Parameters:

Name Type Description Default
input str

Name of input for this artifact

required
force bool

Force the input to be set. Defaults to False.

False

Note: We're going to not allow this to be used for Models

Source code in src/workbench/core/artifacts/endpoint_core.py
def set_input(self, input: str, force=False):
    """Override: Set the input data for this artifact

    Args:
        input (str): Name of input for this artifact
        force (bool, optional): Force the input to be set. Defaults to False.
    Note:
        We're going to not allow this to be used for Models
    """
    if not force:
        self.log.warning(f"Endpoint {self.uuid}: Does not allow manual override of the input!")
        return

    # Okay we're going to allow this to be set
    self.log.important(f"{self.uuid}: Setting input to {input}...")
    self.log.important("Be careful with this! It breaks automatic provenance of the artifact!")
    self.upsert_workbench_meta({"workbench_input": input})

size()

Return the size of this data in MegaBytes

Source code in src/workbench/core/artifacts/endpoint_core.py
def size(self) -> float:
    """Return the size of this data in MegaBytes"""
    return 0.0

validate_proba_columns(prediction_df, class_labels, guessing=False) staticmethod

Ensure probability columns are correctly aligned with class labels

Parameters:

Name Type Description Default
prediction_df DataFrame

DataFrame with the prediction results

required
class_labels list

List of class labels

required
guessing bool

Whether we're guessing the class labels. Defaults to False.

False
Source code in src/workbench/core/artifacts/endpoint_core.py
@staticmethod
def validate_proba_columns(prediction_df: pd.DataFrame, class_labels: list, guessing: bool = False):
    """Ensure probability columns are correctly aligned with class labels

    Args:
        prediction_df (pd.DataFrame): DataFrame with the prediction results
        class_labels (list): List of class labels
        guessing (bool, optional): Whether we're guessing the class labels. Defaults to False.
    """
    proba_columns = [col.replace("_proba", "") for col in prediction_df.columns if col.endswith("_proba")]

    if sorted(class_labels) != sorted(proba_columns):
        if guessing:
            raise ValueError(f"_proba columns {proba_columns} != GUESSED class_labels {class_labels}!")
        else:
            raise ValueError(f"_proba columns {proba_columns} != class_labels {class_labels}!")