Skip to content

Pandas Dataframe Algorithms

Pandas Dataframes

Pandas dataframes are obviously not going to scale as well as our Spark and SQL Algorithms, but for 'moderate' sized data these algorithms provide some nice functionality.

Pandas Dataframe Algorithms

Workbench has a growing set of algorithms and data processing tools for Pandas Dataframes. In general these algorithm will take a dataframe as input and give you back a dataframe with additional columns.

FeatureSpaceProximity: A class for neighbor lookups using KNN with optional target information.

FeatureSpaceProximity

Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
class FeatureSpaceProximity:
    def __init__(self, df: pd.DataFrame, features: list, id_column: str, target: str = None, neighbors: int = 10):
        """FeatureSpaceProximity: A class for neighbor lookups using KNN with optional target information.

        Args:
            df: Pandas DataFrame
            features: List of feature column names
            id_column: Name of the ID column
            target: Optional name of the target column to include target-based functionality (default: None)
            neighbors: Number of neighbors to use in the KNN model (default: 10)
        """
        self.log = logging.getLogger("workbench")
        self.df = df
        self.features = features
        self.id_column = id_column
        self.target = target
        self.knn_neighbors = neighbors

        # Standardize the feature values and build the KNN model
        self.log.info("Building KNN model for FeatureSpaceProximity...")
        self.scaler = StandardScaler().fit(df[features])
        scaled_features = self.scaler.transform(df[features])
        self.knn_model = NearestNeighbors(n_neighbors=neighbors, algorithm="auto").fit(scaled_features)

        # Compute Z-Scores or Consistency Scores for the target values
        if self.target and is_numeric_dtype(self.df[self.target]):
            self.log.info("Computing Z-Scores for target values...")
            self.target_z_scores()
        else:
            self.log.info("Computing target consistency scores...")
            self.target_consistency()

        # Now compute the outlier scores
        self.log.info("Computing outlier scores...")
        self.outliers()

    @classmethod
    def from_model(cls, model) -> "FeatureSpaceProximity":
        """Create a FeatureSpaceProximity instance from a Workbench model object.

        Args:
            model (Model): A Workbench model object.

        Returns:
            FeatureSpaceProximity: A new instance of the FeatureSpaceProximity class.
        """
        from workbench.api import FeatureSet

        # Extract necessary attributes from the Workbench model
        fs = FeatureSet(model.get_input())
        features = model.features()
        target = model.target()

        # Retrieve the training DataFrame from the feature set
        df = fs.view("training").pull_dataframe()

        # Create and return a new instance of FeatureSpaceProximity
        return cls(df=df, features=features, id_column=fs.id_column, target=target)

    def neighbors(self, query_id: Union[str, int], radius: float = None, include_self: bool = True) -> pd.DataFrame:
        """Return neighbors of the given query ID, either by fixed neighbors or within a radius.

        Args:
            query_id (Union[str, int]): The ID of the query point.
            radius (float): Optional radius within which neighbors are to be searched, else use fixed neighbors.
            include_self (bool): Whether to include the query ID itself in the neighbor results.

        Returns:
            pd.DataFrame: Filtered DataFrame that includes the query ID, its neighbors, and optionally target values.
        """
        if query_id not in self.df[self.id_column].values:
            self.log.warning(f"Query ID '{query_id}' not found in the DataFrame. Returning an empty DataFrame.")
            return pd.DataFrame()

        # Get a single-row DataFrame for the query ID
        query_df = self.df[self.df[self.id_column] == query_id]

        # Use the neighbors_bulk method with the appropriate radius
        neighbors_info_df = self.neighbors_bulk(query_df, radius=radius, include_self=include_self)

        # Extract the neighbor IDs and distances from the results
        neighbor_ids = neighbors_info_df["neighbor_ids"].iloc[0]
        neighbor_distances = neighbors_info_df["neighbor_distances"].iloc[0]

        # Sort neighbors by distance (ascending order)
        sorted_neighbors = sorted(zip(neighbor_ids, neighbor_distances), key=lambda x: x[1])
        sorted_ids, sorted_distances = zip(*sorted_neighbors)

        # Filter the internal DataFrame to include only the sorted neighbors
        neighbors_df = self.df[self.df[self.id_column].isin(sorted_ids)]
        neighbors_df = neighbors_df.set_index(self.id_column).reindex(sorted_ids).reset_index()
        neighbors_df["knn_distance"] = sorted_distances
        return neighbors_df

    def neighbors_bulk(self, query_df: pd.DataFrame, radius: float = None, include_self: bool = False) -> pd.DataFrame:
        """Return neighbors for each row in the given query dataframe, either by fixed neighbors or within a radius.

        Args:
            query_df: Pandas DataFrame with the same features as the training data.
            radius: Optional radius within which neighbors are to be searched, else use fixed neighbors.
            include_self: Boolean indicating whether to include the query ID in the neighbor results.

        Returns:
            pd.DataFrame: DataFrame with query ID, neighbor IDs, neighbor targets, and neighbor distances.
        """
        # Scale the query data using the same scaler as the training data
        query_scaled = self.scaler.transform(query_df[self.features])

        # Retrieve neighbors based on radius or standard neighbors
        if radius is not None:
            distances, indices = self.knn_model.radius_neighbors(query_scaled, radius=radius)
        else:
            distances, indices = self.knn_model.kneighbors(query_scaled)

        # Collect neighbor information (IDs, target values, and distances)
        query_ids = query_df[self.id_column].values
        neighbor_ids = [[self.df.iloc[idx][self.id_column] for idx in index_list] for index_list in indices]
        neighbor_targets = (
            [
                [self.df.loc[self.df[self.id_column] == neighbor, self.target].values[0] for neighbor in index_list]
                for index_list in neighbor_ids
            ]
            if self.target
            else None
        )
        neighbor_distances = [list(dist_list) for dist_list in distances]

        # Automatically remove the query ID itself from the neighbor results if include_self is False
        for i, query_id in enumerate(query_ids):
            if query_id in neighbor_ids[i] and not include_self:
                idx_to_remove = neighbor_ids[i].index(query_id)
                neighbor_ids[i].pop(idx_to_remove)
                neighbor_distances[i].pop(idx_to_remove)
                if neighbor_targets:
                    neighbor_targets[i].pop(idx_to_remove)

            # Sort neighbors by distance (ascending order)
            sorted_neighbors = sorted(zip(neighbor_ids[i], neighbor_distances[i]), key=lambda x: x[1])
            neighbor_ids[i], neighbor_distances[i] = list(zip(*sorted_neighbors)) if sorted_neighbors else ([], [])
            if neighbor_targets:
                neighbor_targets[i] = [
                    self.df.loc[self.df[self.id_column] == neighbor, self.target].values[0]
                    for neighbor in neighbor_ids[i]
                ]

        # Create and return a results DataFrame with the updated neighbor information
        result_df = pd.DataFrame(
            {
                "query_id": query_ids,
                "neighbor_ids": neighbor_ids,
                "neighbor_distances": neighbor_distances,
            }
        )

        if neighbor_targets:
            result_df["neighbor_targets"] = neighbor_targets

        return result_df

    def outliers(self) -> None:
        """Compute a unified 'outlier' score based on either 'target_z' or 'target_consistency'."""
        if "target_z" in self.df.columns:
            # Normalize Z-Scores to a 0-1 range
            self.df["outlier"] = (self.df["target_z"].abs() / (self.df["target_z"].abs().max() + 1e-6)).clip(0, 1)

        elif "target_consistency" in self.df.columns:
            # Calculate outlier score as 1 - consistency
            self.df["outlier"] = 1 - self.df["target_consistency"]

        else:
            self.log.warning("No 'target_z' or 'target_consistency' column found to compute outlier scores.")

    def target_z_scores(self) -> None:
        """Compute Z-Scores for NUMERIC target values."""
        if not self.target:
            self.log.warning("No target column defined for Z-Score computation.")
            return

        # Get the neighbors and distances for each internal observation
        distances, indices = self.knn_model.kneighbors()

        # Retrieve all neighbor target values in a single operation
        neighbor_targets = self.df[self.target].values[indices]  # Shape will be (n_samples, n_neighbors)

        # Compute the mean and std along the neighbors axis (axis=1)
        neighbor_means = neighbor_targets.mean(axis=1)
        neighbor_stds = neighbor_targets.std(axis=1, ddof=0)

        # Vectorized Z-score calculation
        current_targets = self.df[self.target].values
        z_scores = np.where(neighbor_stds == 0, 0.0, (current_targets - neighbor_means) / neighbor_stds)

        # Assign the computed Z-Scores back to the DataFrame
        self.df["target_z"] = z_scores

    def target_consistency(self) -> None:
        """Compute a Neighborhood Consistency Score for CATEGORICAL targets."""
        if not self.target:
            self.log.warning("No target column defined for neighborhood consistency computation.")
            return

        # Get the neighbors and distances for each internal observation (already excludes the query)
        distances, indices = self.knn_model.kneighbors()

        # Calculate the Neighborhood Consistency Score for each observation
        consistency_scores = []
        for idx, idx_list in enumerate(indices):
            query_target = self.df.iloc[idx][self.target]  # Get current observation's target value

            # Get the neighbors' target values
            neighbor_targets = self.df.iloc[idx_list][self.target]

            # Calculate the proportion of neighbors that have the same category as the query observation
            consistency_score = (neighbor_targets == query_target).mean()
            consistency_scores.append(consistency_score)

        # Add the 'target_consistency' column to the internal dataframe
        self.df["target_consistency"] = consistency_scores

    def get_neighbor_indices_and_distances(self):
        """Retrieve neighbor indices and distances for all points in the dataset."""
        distances, indices = self.knn_model.kneighbors()
        return indices, distances

    def target_summary(self, query_id: Union[str, int]) -> pd.DataFrame:
        """WIP: Provide a summary of target values in the neighborhood of the given query ID"""
        neighbors_df = self.neighbors(query_id, include_self=False)
        if self.target and not neighbors_df.empty:
            summary_stats = neighbors_df[self.target].describe()
            return pd.DataFrame(summary_stats).transpose()
        else:
            self.log.warning(f"No target values found for neighbors of Query ID '{query_id}'.")
            return pd.DataFrame()

__init__(df, features, id_column, target=None, neighbors=10)

FeatureSpaceProximity: A class for neighbor lookups using KNN with optional target information.

Parameters:

Name Type Description Default
df DataFrame

Pandas DataFrame

required
features list

List of feature column names

required
id_column str

Name of the ID column

required
target str

Optional name of the target column to include target-based functionality (default: None)

None
neighbors int

Number of neighbors to use in the KNN model (default: 10)

10
Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
def __init__(self, df: pd.DataFrame, features: list, id_column: str, target: str = None, neighbors: int = 10):
    """FeatureSpaceProximity: A class for neighbor lookups using KNN with optional target information.

    Args:
        df: Pandas DataFrame
        features: List of feature column names
        id_column: Name of the ID column
        target: Optional name of the target column to include target-based functionality (default: None)
        neighbors: Number of neighbors to use in the KNN model (default: 10)
    """
    self.log = logging.getLogger("workbench")
    self.df = df
    self.features = features
    self.id_column = id_column
    self.target = target
    self.knn_neighbors = neighbors

    # Standardize the feature values and build the KNN model
    self.log.info("Building KNN model for FeatureSpaceProximity...")
    self.scaler = StandardScaler().fit(df[features])
    scaled_features = self.scaler.transform(df[features])
    self.knn_model = NearestNeighbors(n_neighbors=neighbors, algorithm="auto").fit(scaled_features)

    # Compute Z-Scores or Consistency Scores for the target values
    if self.target and is_numeric_dtype(self.df[self.target]):
        self.log.info("Computing Z-Scores for target values...")
        self.target_z_scores()
    else:
        self.log.info("Computing target consistency scores...")
        self.target_consistency()

    # Now compute the outlier scores
    self.log.info("Computing outlier scores...")
    self.outliers()

from_model(model) classmethod

Create a FeatureSpaceProximity instance from a Workbench model object.

Parameters:

Name Type Description Default
model Model

A Workbench model object.

required

Returns:

Name Type Description
FeatureSpaceProximity FeatureSpaceProximity

A new instance of the FeatureSpaceProximity class.

Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
@classmethod
def from_model(cls, model) -> "FeatureSpaceProximity":
    """Create a FeatureSpaceProximity instance from a Workbench model object.

    Args:
        model (Model): A Workbench model object.

    Returns:
        FeatureSpaceProximity: A new instance of the FeatureSpaceProximity class.
    """
    from workbench.api import FeatureSet

    # Extract necessary attributes from the Workbench model
    fs = FeatureSet(model.get_input())
    features = model.features()
    target = model.target()

    # Retrieve the training DataFrame from the feature set
    df = fs.view("training").pull_dataframe()

    # Create and return a new instance of FeatureSpaceProximity
    return cls(df=df, features=features, id_column=fs.id_column, target=target)

get_neighbor_indices_and_distances()

Retrieve neighbor indices and distances for all points in the dataset.

Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
def get_neighbor_indices_and_distances(self):
    """Retrieve neighbor indices and distances for all points in the dataset."""
    distances, indices = self.knn_model.kneighbors()
    return indices, distances

neighbors(query_id, radius=None, include_self=True)

Return neighbors of the given query ID, either by fixed neighbors or within a radius.

Parameters:

Name Type Description Default
query_id Union[str, int]

The ID of the query point.

required
radius float

Optional radius within which neighbors are to be searched, else use fixed neighbors.

None
include_self bool

Whether to include the query ID itself in the neighbor results.

True

Returns:

Type Description
DataFrame

pd.DataFrame: Filtered DataFrame that includes the query ID, its neighbors, and optionally target values.

Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
def neighbors(self, query_id: Union[str, int], radius: float = None, include_self: bool = True) -> pd.DataFrame:
    """Return neighbors of the given query ID, either by fixed neighbors or within a radius.

    Args:
        query_id (Union[str, int]): The ID of the query point.
        radius (float): Optional radius within which neighbors are to be searched, else use fixed neighbors.
        include_self (bool): Whether to include the query ID itself in the neighbor results.

    Returns:
        pd.DataFrame: Filtered DataFrame that includes the query ID, its neighbors, and optionally target values.
    """
    if query_id not in self.df[self.id_column].values:
        self.log.warning(f"Query ID '{query_id}' not found in the DataFrame. Returning an empty DataFrame.")
        return pd.DataFrame()

    # Get a single-row DataFrame for the query ID
    query_df = self.df[self.df[self.id_column] == query_id]

    # Use the neighbors_bulk method with the appropriate radius
    neighbors_info_df = self.neighbors_bulk(query_df, radius=radius, include_self=include_self)

    # Extract the neighbor IDs and distances from the results
    neighbor_ids = neighbors_info_df["neighbor_ids"].iloc[0]
    neighbor_distances = neighbors_info_df["neighbor_distances"].iloc[0]

    # Sort neighbors by distance (ascending order)
    sorted_neighbors = sorted(zip(neighbor_ids, neighbor_distances), key=lambda x: x[1])
    sorted_ids, sorted_distances = zip(*sorted_neighbors)

    # Filter the internal DataFrame to include only the sorted neighbors
    neighbors_df = self.df[self.df[self.id_column].isin(sorted_ids)]
    neighbors_df = neighbors_df.set_index(self.id_column).reindex(sorted_ids).reset_index()
    neighbors_df["knn_distance"] = sorted_distances
    return neighbors_df

neighbors_bulk(query_df, radius=None, include_self=False)

Return neighbors for each row in the given query dataframe, either by fixed neighbors or within a radius.

Parameters:

Name Type Description Default
query_df DataFrame

Pandas DataFrame with the same features as the training data.

required
radius float

Optional radius within which neighbors are to be searched, else use fixed neighbors.

None
include_self bool

Boolean indicating whether to include the query ID in the neighbor results.

False

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with query ID, neighbor IDs, neighbor targets, and neighbor distances.

Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
def neighbors_bulk(self, query_df: pd.DataFrame, radius: float = None, include_self: bool = False) -> pd.DataFrame:
    """Return neighbors for each row in the given query dataframe, either by fixed neighbors or within a radius.

    Args:
        query_df: Pandas DataFrame with the same features as the training data.
        radius: Optional radius within which neighbors are to be searched, else use fixed neighbors.
        include_self: Boolean indicating whether to include the query ID in the neighbor results.

    Returns:
        pd.DataFrame: DataFrame with query ID, neighbor IDs, neighbor targets, and neighbor distances.
    """
    # Scale the query data using the same scaler as the training data
    query_scaled = self.scaler.transform(query_df[self.features])

    # Retrieve neighbors based on radius or standard neighbors
    if radius is not None:
        distances, indices = self.knn_model.radius_neighbors(query_scaled, radius=radius)
    else:
        distances, indices = self.knn_model.kneighbors(query_scaled)

    # Collect neighbor information (IDs, target values, and distances)
    query_ids = query_df[self.id_column].values
    neighbor_ids = [[self.df.iloc[idx][self.id_column] for idx in index_list] for index_list in indices]
    neighbor_targets = (
        [
            [self.df.loc[self.df[self.id_column] == neighbor, self.target].values[0] for neighbor in index_list]
            for index_list in neighbor_ids
        ]
        if self.target
        else None
    )
    neighbor_distances = [list(dist_list) for dist_list in distances]

    # Automatically remove the query ID itself from the neighbor results if include_self is False
    for i, query_id in enumerate(query_ids):
        if query_id in neighbor_ids[i] and not include_self:
            idx_to_remove = neighbor_ids[i].index(query_id)
            neighbor_ids[i].pop(idx_to_remove)
            neighbor_distances[i].pop(idx_to_remove)
            if neighbor_targets:
                neighbor_targets[i].pop(idx_to_remove)

        # Sort neighbors by distance (ascending order)
        sorted_neighbors = sorted(zip(neighbor_ids[i], neighbor_distances[i]), key=lambda x: x[1])
        neighbor_ids[i], neighbor_distances[i] = list(zip(*sorted_neighbors)) if sorted_neighbors else ([], [])
        if neighbor_targets:
            neighbor_targets[i] = [
                self.df.loc[self.df[self.id_column] == neighbor, self.target].values[0]
                for neighbor in neighbor_ids[i]
            ]

    # Create and return a results DataFrame with the updated neighbor information
    result_df = pd.DataFrame(
        {
            "query_id": query_ids,
            "neighbor_ids": neighbor_ids,
            "neighbor_distances": neighbor_distances,
        }
    )

    if neighbor_targets:
        result_df["neighbor_targets"] = neighbor_targets

    return result_df

outliers()

Compute a unified 'outlier' score based on either 'target_z' or 'target_consistency'.

Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
def outliers(self) -> None:
    """Compute a unified 'outlier' score based on either 'target_z' or 'target_consistency'."""
    if "target_z" in self.df.columns:
        # Normalize Z-Scores to a 0-1 range
        self.df["outlier"] = (self.df["target_z"].abs() / (self.df["target_z"].abs().max() + 1e-6)).clip(0, 1)

    elif "target_consistency" in self.df.columns:
        # Calculate outlier score as 1 - consistency
        self.df["outlier"] = 1 - self.df["target_consistency"]

    else:
        self.log.warning("No 'target_z' or 'target_consistency' column found to compute outlier scores.")

target_consistency()

Compute a Neighborhood Consistency Score for CATEGORICAL targets.

Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
def target_consistency(self) -> None:
    """Compute a Neighborhood Consistency Score for CATEGORICAL targets."""
    if not self.target:
        self.log.warning("No target column defined for neighborhood consistency computation.")
        return

    # Get the neighbors and distances for each internal observation (already excludes the query)
    distances, indices = self.knn_model.kneighbors()

    # Calculate the Neighborhood Consistency Score for each observation
    consistency_scores = []
    for idx, idx_list in enumerate(indices):
        query_target = self.df.iloc[idx][self.target]  # Get current observation's target value

        # Get the neighbors' target values
        neighbor_targets = self.df.iloc[idx_list][self.target]

        # Calculate the proportion of neighbors that have the same category as the query observation
        consistency_score = (neighbor_targets == query_target).mean()
        consistency_scores.append(consistency_score)

    # Add the 'target_consistency' column to the internal dataframe
    self.df["target_consistency"] = consistency_scores

target_summary(query_id)

WIP: Provide a summary of target values in the neighborhood of the given query ID

Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
def target_summary(self, query_id: Union[str, int]) -> pd.DataFrame:
    """WIP: Provide a summary of target values in the neighborhood of the given query ID"""
    neighbors_df = self.neighbors(query_id, include_self=False)
    if self.target and not neighbors_df.empty:
        summary_stats = neighbors_df[self.target].describe()
        return pd.DataFrame(summary_stats).transpose()
    else:
        self.log.warning(f"No target values found for neighbors of Query ID '{query_id}'.")
        return pd.DataFrame()

target_z_scores()

Compute Z-Scores for NUMERIC target values.

Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
def target_z_scores(self) -> None:
    """Compute Z-Scores for NUMERIC target values."""
    if not self.target:
        self.log.warning("No target column defined for Z-Score computation.")
        return

    # Get the neighbors and distances for each internal observation
    distances, indices = self.knn_model.kneighbors()

    # Retrieve all neighbor target values in a single operation
    neighbor_targets = self.df[self.target].values[indices]  # Shape will be (n_samples, n_neighbors)

    # Compute the mean and std along the neighbors axis (axis=1)
    neighbor_means = neighbor_targets.mean(axis=1)
    neighbor_stds = neighbor_targets.std(axis=1, ddof=0)

    # Vectorized Z-score calculation
    current_targets = self.df[self.target].values
    z_scores = np.where(neighbor_stds == 0, 0.0, (current_targets - neighbor_means) / neighbor_stds)

    # Assign the computed Z-Scores back to the DataFrame
    self.df["target_z"] = z_scores

ResidualsCalculator

Bases: BaseEstimator, TransformerMixin

A custom transformer for calculating residuals using cross-validation or an endpoint.

This transformer performs K-Fold cross-validation (if no endpoint is provided), or it uses the endpoint to generate predictions and compute residuals. It adds 'prediction', 'residuals', 'residuals_abs', 'prediction_100', 'residuals_100', and 'residuals_100_abs' columns to the input DataFrame.

Attributes:

Name Type Description
model_class Union[RegressorMixin, XGBRegressor]

The machine learning model class used for predictions.

n_splits int

Number of splits for cross-validation.

random_state int

Random state for reproducibility.

endpoint Optional

The Workbench endpoint object for running inference, if provided.

Source code in src/workbench/algorithms/dataframe/residuals_calculator.py
class ResidualsCalculator(BaseEstimator, TransformerMixin):
    """
    A custom transformer for calculating residuals using cross-validation or an endpoint.

    This transformer performs K-Fold cross-validation (if no endpoint is provided), or it uses the endpoint
    to generate predictions and compute residuals. It adds 'prediction', 'residuals', 'residuals_abs',
    'prediction_100', 'residuals_100', and 'residuals_100_abs' columns to the input DataFrame.

    Attributes:
        model_class (Union[RegressorMixin, XGBRegressor]): The machine learning model class used for predictions.
        n_splits (int): Number of splits for cross-validation.
        random_state (int): Random state for reproducibility.
        endpoint (Optional): The Workbench endpoint object for running inference, if provided.
    """

    def __init__(
        self,
        endpoint: Optional[object] = None,
        reference_model_class: Union[RegressorMixin, XGBRegressor] = XGBRegressor,
    ):
        """
        Initializes the ResidualsCalculator with the specified parameters.

        Args:
            endpoint (Optional): A Workbench endpoint object to run inference, if available.
            reference_model_class (Union[RegressorMixin, XGBRegressor]): The reference model class for predictions.
        """
        self.n_splits = 5
        self.random_state = 42
        self.reference_model_class = reference_model_class  # Store the class, instantiate the model later
        self.reference_model = None  # Lazy model initialization
        self.endpoint = endpoint  # Use this endpoint for inference if provided
        self.X = None
        self.y = None
        super().__init__()

    def fit(self, X: pd.DataFrame, y: pd.Series) -> BaseEstimator:
        """
        Fits the model. If no endpoint is provided, fitting involves storing the input data
        and initializing a reference model.

        Args:
            X (pd.DataFrame): The input features.
            y (pd.Series): The target variable.

        Returns:
            self: Returns an instance of self.
        """
        self.X = X
        self.y = y

        if self.endpoint is None:
            # Only initialize the reference model if no endpoint is provided
            self.reference_model = self.reference_model_class()
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Transforms the input DataFrame by adding 'prediction', 'residuals', 'residuals_abs',
        'prediction_100', 'residuals_100', and 'residuals_100_abs' columns.

        Args:
            X (pd.DataFrame): The input features.

        Returns:
            pd.DataFrame: The transformed DataFrame with additional columns.
        """
        check_is_fitted(self, ["X", "y"])  # Ensure fit has been called

        if self.endpoint:
            # If an endpoint is provided, run inference on the full data
            result_df = self._run_inference_via_endpoint(X)
        else:
            # If no endpoint, perform cross-validation and full model fitting
            result_df = self._run_cross_validation(X)

        return result_df

    def _run_cross_validation(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Handles the cross-validation process when no endpoint is provided.

        Args:
            X (pd.DataFrame): The input features.

        Returns:
            pd.DataFrame: DataFrame with predictions and residuals from cross-validation and full model fit.
        """
        kf = KFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)

        # Initialize pandas Series to store predictions and residuals, aligned by index
        predictions = pd.Series(index=self.y.index, dtype=np.float64)
        residuals = pd.Series(index=self.y.index, dtype=np.float64)
        residuals_abs = pd.Series(index=self.y.index, dtype=np.float64)

        # Perform cross-validation and collect predictions and residuals
        for train_index, test_index in kf.split(self.X):
            X_train, X_test = self.X.iloc[train_index], self.X.iloc[test_index]
            y_train, y_test = self.y.iloc[train_index], self.y.iloc[test_index]

            # Fit the model on the training data
            self.reference_model.fit(X_train, y_train)

            # Predict on the test data
            y_pred = self.reference_model.predict(X_test)

            # Compute residuals and absolute residuals
            residuals_fold = y_test - y_pred
            residuals_abs_fold = np.abs(residuals_fold)

            # Place the predictions and residuals in the correct positions based on index
            predictions.iloc[test_index] = y_pred
            residuals.iloc[test_index] = residuals_fold
            residuals_abs.iloc[test_index] = residuals_abs_fold

        # Train on all data and compute residuals for 100% training
        self.reference_model.fit(self.X, self.y)
        y_pred_100 = self.reference_model.predict(self.X)
        residuals_100 = self.y - y_pred_100
        residuals_100_abs = np.abs(residuals_100)

        # Create a copy of the provided DataFrame and add the new columns
        result_df = X.copy()
        result_df["prediction"] = predictions
        result_df["residuals"] = residuals
        result_df["residuals_abs"] = residuals_abs
        result_df["prediction_100"] = y_pred_100
        result_df["residuals_100"] = residuals_100
        result_df["residuals_100_abs"] = residuals_100_abs
        result_df[self.y.name] = self.y  # Add the target column back

        return result_df

    def _run_inference_via_endpoint(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Handles the inference process when an endpoint is provided.

        Args:
            X (pd.DataFrame): The input features.

        Returns:
            pd.DataFrame: DataFrame with predictions and residuals from the endpoint.
        """
        # Run inference on all data using the endpoint (include the target column)
        X = X.copy()
        X.loc[:, self.y.name] = self.y
        results_df = self.endpoint.inference(X)
        predictions = results_df["prediction"]

        # Compute residuals and residuals_abs based on the endpoint's predictions
        residuals = self.y - predictions
        residuals_abs = np.abs(residuals)

        # To maintain consistency, populate both 'prediction' and 'prediction_100' with the same values
        result_df = X.copy()
        result_df["prediction"] = predictions
        result_df["residuals"] = residuals
        result_df["residuals_abs"] = residuals_abs
        result_df["prediction_100"] = predictions
        result_df["residuals_100"] = residuals
        result_df["residuals_100_abs"] = residuals_abs

        return result_df

__init__(endpoint=None, reference_model_class=XGBRegressor)

Initializes the ResidualsCalculator with the specified parameters.

Parameters:

Name Type Description Default
endpoint Optional

A Workbench endpoint object to run inference, if available.

None
reference_model_class Union[RegressorMixin, XGBRegressor]

The reference model class for predictions.

XGBRegressor
Source code in src/workbench/algorithms/dataframe/residuals_calculator.py
def __init__(
    self,
    endpoint: Optional[object] = None,
    reference_model_class: Union[RegressorMixin, XGBRegressor] = XGBRegressor,
):
    """
    Initializes the ResidualsCalculator with the specified parameters.

    Args:
        endpoint (Optional): A Workbench endpoint object to run inference, if available.
        reference_model_class (Union[RegressorMixin, XGBRegressor]): The reference model class for predictions.
    """
    self.n_splits = 5
    self.random_state = 42
    self.reference_model_class = reference_model_class  # Store the class, instantiate the model later
    self.reference_model = None  # Lazy model initialization
    self.endpoint = endpoint  # Use this endpoint for inference if provided
    self.X = None
    self.y = None
    super().__init__()

fit(X, y)

Fits the model. If no endpoint is provided, fitting involves storing the input data and initializing a reference model.

Parameters:

Name Type Description Default
X DataFrame

The input features.

required
y Series

The target variable.

required

Returns:

Name Type Description
self BaseEstimator

Returns an instance of self.

Source code in src/workbench/algorithms/dataframe/residuals_calculator.py
def fit(self, X: pd.DataFrame, y: pd.Series) -> BaseEstimator:
    """
    Fits the model. If no endpoint is provided, fitting involves storing the input data
    and initializing a reference model.

    Args:
        X (pd.DataFrame): The input features.
        y (pd.Series): The target variable.

    Returns:
        self: Returns an instance of self.
    """
    self.X = X
    self.y = y

    if self.endpoint is None:
        # Only initialize the reference model if no endpoint is provided
        self.reference_model = self.reference_model_class()
    return self

transform(X)

Transforms the input DataFrame by adding 'prediction', 'residuals', 'residuals_abs', 'prediction_100', 'residuals_100', and 'residuals_100_abs' columns.

Parameters:

Name Type Description Default
X DataFrame

The input features.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The transformed DataFrame with additional columns.

Source code in src/workbench/algorithms/dataframe/residuals_calculator.py
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """
    Transforms the input DataFrame by adding 'prediction', 'residuals', 'residuals_abs',
    'prediction_100', 'residuals_100', and 'residuals_100_abs' columns.

    Args:
        X (pd.DataFrame): The input features.

    Returns:
        pd.DataFrame: The transformed DataFrame with additional columns.
    """
    check_is_fitted(self, ["X", "y"])  # Ensure fit has been called

    if self.endpoint:
        # If an endpoint is provided, run inference on the full data
        result_df = self._run_inference_via_endpoint(X)
    else:
        # If no endpoint, perform cross-validation and full model fitting
        result_df = self._run_cross_validation(X)

    return result_df

DimensionalityReduction: Perform Dimensionality Reduction on a DataFrame

DimensionalityReduction

Source code in src/workbench/algorithms/dataframe/dimensionality_reduction.py
class DimensionalityReduction:
    def __init__(self):
        """DimensionalityReduction:  Perform Dimensionality Reduction on a DataFrame"""
        self.log = logging.getLogger("workbench")
        self.projection_model = None
        self.features = None

    def fit_transform(self, df: pd.DataFrame, features: list = None, projection: str = "TSNE") -> pd.DataFrame:
        """Fit and Transform the DataFrame
        Args:
            df: Pandas DataFrame
            features: List of feature column names (default: None)
            projection: The projection model to use (TSNE, MDS or PCA, default: PCA)
        Returns:
            Pandas DataFrame with new columns x and y
        """

        # If no features are given, indentify all numeric columns
        if features is None:
            features = [x for x in df.select_dtypes(include="number").columns.tolist() if not x.endswith("id")]
            # Also drop group_count if it exists
            features = [x for x in features if x != "group_count"]
            self.log.info("No features given, auto identifying numeric columns...")
            self.log.info(f"{features}")
        self.features = features

        # Sanity checks
        if not all(column in df.columns for column in self.features):
            self.log.critical("Some features are missing in the DataFrame")
            return df
        if len(self.features) < 2:
            self.log.critical("At least two features are required")
            return df
        if df.empty:
            self.log.critical("DataFrame is empty")
            return df

        # Most projection models will fail if there are any NaNs in the data
        # So we'll fill NaNs with the mean value for that column
        for col in df[self.features].columns:
            df[col].fillna(df[col].mean(), inplace=True)

        # Normalize the features
        scaler = StandardScaler()
        normalized_data = scaler.fit_transform(df[self.features])
        df[self.features] = normalized_data

        # Project the multidimensional features onto an x,y plane
        self.log.info("Projecting features onto an x,y plane...")

        # Perform the projection
        if projection == "TSNE":
            # Perplexity is a hyperparameter that controls the number of neighbors used to compute the manifold
            # The number of neighbors should be less than the number of samples
            perplexity = min(40, len(df) - 1)
            self.log.info(f"Perplexity: {perplexity}")
            self.projection_model = TSNE(perplexity=perplexity)
        elif projection == "MDS":
            self.projection_model = MDS(n_components=2, random_state=0)
        elif projection == "PCA":
            self.projection_model = PCA(n_components=2)

        # Fit the projection model
        # Hack PCA + TSNE to work together
        projection = self.projection_model.fit_transform(df[self.features])

        # Put the projection results back into the given DataFrame
        df["x"] = projection[:, 0]  # Projection X Column
        df["y"] = projection[:, 1]  # Projection Y Column

        # Jitter the data to resolve coincident points
        # df = self.resolve_coincident_points(df)

        # Return the DataFrame with the new columns
        return df

    @staticmethod
    def resolve_coincident_points(df: pd.DataFrame):
        """Resolve coincident points in a DataFrame
        Args:
            df(pd.DataFrame): The DataFrame to resolve coincident points in
        Returns:
            pd.DataFrame: The DataFrame with resolved coincident points
        """
        # Adding Jitter to the projection
        x_scale = (df["x"].max() - df["x"].min()) * 0.1
        y_scale = (df["y"].max() - df["y"].min()) * 0.1
        df["x"] += np.random.normal(-x_scale, +x_scale, len(df))
        df["y"] += np.random.normal(-y_scale, +y_scale, len(df))
        return df

__init__()

DimensionalityReduction: Perform Dimensionality Reduction on a DataFrame

Source code in src/workbench/algorithms/dataframe/dimensionality_reduction.py
def __init__(self):
    """DimensionalityReduction:  Perform Dimensionality Reduction on a DataFrame"""
    self.log = logging.getLogger("workbench")
    self.projection_model = None
    self.features = None

fit_transform(df, features=None, projection='TSNE')

Fit and Transform the DataFrame Args: df: Pandas DataFrame features: List of feature column names (default: None) projection: The projection model to use (TSNE, MDS or PCA, default: PCA) Returns: Pandas DataFrame with new columns x and y

Source code in src/workbench/algorithms/dataframe/dimensionality_reduction.py
def fit_transform(self, df: pd.DataFrame, features: list = None, projection: str = "TSNE") -> pd.DataFrame:
    """Fit and Transform the DataFrame
    Args:
        df: Pandas DataFrame
        features: List of feature column names (default: None)
        projection: The projection model to use (TSNE, MDS or PCA, default: PCA)
    Returns:
        Pandas DataFrame with new columns x and y
    """

    # If no features are given, indentify all numeric columns
    if features is None:
        features = [x for x in df.select_dtypes(include="number").columns.tolist() if not x.endswith("id")]
        # Also drop group_count if it exists
        features = [x for x in features if x != "group_count"]
        self.log.info("No features given, auto identifying numeric columns...")
        self.log.info(f"{features}")
    self.features = features

    # Sanity checks
    if not all(column in df.columns for column in self.features):
        self.log.critical("Some features are missing in the DataFrame")
        return df
    if len(self.features) < 2:
        self.log.critical("At least two features are required")
        return df
    if df.empty:
        self.log.critical("DataFrame is empty")
        return df

    # Most projection models will fail if there are any NaNs in the data
    # So we'll fill NaNs with the mean value for that column
    for col in df[self.features].columns:
        df[col].fillna(df[col].mean(), inplace=True)

    # Normalize the features
    scaler = StandardScaler()
    normalized_data = scaler.fit_transform(df[self.features])
    df[self.features] = normalized_data

    # Project the multidimensional features onto an x,y plane
    self.log.info("Projecting features onto an x,y plane...")

    # Perform the projection
    if projection == "TSNE":
        # Perplexity is a hyperparameter that controls the number of neighbors used to compute the manifold
        # The number of neighbors should be less than the number of samples
        perplexity = min(40, len(df) - 1)
        self.log.info(f"Perplexity: {perplexity}")
        self.projection_model = TSNE(perplexity=perplexity)
    elif projection == "MDS":
        self.projection_model = MDS(n_components=2, random_state=0)
    elif projection == "PCA":
        self.projection_model = PCA(n_components=2)

    # Fit the projection model
    # Hack PCA + TSNE to work together
    projection = self.projection_model.fit_transform(df[self.features])

    # Put the projection results back into the given DataFrame
    df["x"] = projection[:, 0]  # Projection X Column
    df["y"] = projection[:, 1]  # Projection Y Column

    # Jitter the data to resolve coincident points
    # df = self.resolve_coincident_points(df)

    # Return the DataFrame with the new columns
    return df

resolve_coincident_points(df) staticmethod

Resolve coincident points in a DataFrame Args: df(pd.DataFrame): The DataFrame to resolve coincident points in Returns: pd.DataFrame: The DataFrame with resolved coincident points

Source code in src/workbench/algorithms/dataframe/dimensionality_reduction.py
@staticmethod
def resolve_coincident_points(df: pd.DataFrame):
    """Resolve coincident points in a DataFrame
    Args:
        df(pd.DataFrame): The DataFrame to resolve coincident points in
    Returns:
        pd.DataFrame: The DataFrame with resolved coincident points
    """
    # Adding Jitter to the projection
    x_scale = (df["x"].max() - df["x"].min()) * 0.1
    y_scale = (df["y"].max() - df["y"].min()) * 0.1
    df["x"] += np.random.normal(-x_scale, +x_scale, len(df))
    df["y"] += np.random.normal(-y_scale, +y_scale, len(df))
    return df

test()

Test for the Dimensionality Reduction Class

Source code in src/workbench/algorithms/dataframe/dimensionality_reduction.py
def test():
    """Test for the Dimensionality Reduction Class"""
    # Set some pandas options
    pd.set_option("display.max_columns", None)
    pd.set_option("display.width", 1000)

    # Make some fake data
    data = {
        "ID": [
            "id_0",
            "id_0",
            "id_2",
            "id_3",
            "id_4",
            "id_5",
            "id_6",
            "id_7",
            "id_8",
            "id_9",
        ],
        "feat1": [1.0, 1.0, 1.1, 3.0, 4.0, 1.0, 1.0, 1.1, 3.0, 4.0],
        "feat2": [1.0, 1.0, 1.1, 3.0, 4.0, 1.0, 1.0, 1.1, 3.0, 4.0],
        "feat3": [0.1, 0.1, 0.2, 1.6, 2.5, 0.1, 0.1, 0.2, 1.6, 2.5],
        "price": [31, 60, 62, 40, 20, 31, 61, 60, 40, 20],
    }
    data_df = pd.DataFrame(data)
    features = ["feat1", "feat2", "feat3"]

    # Create the class and run the dimensionality reduction
    projection = DimensionalityReduction()
    new_df = projection.fit_transform(data_df, features=features, projection="TSNE")

    # Check that the x and y columns were added
    assert "x" in new_df.columns
    assert "y" in new_df.columns

    # Output the DataFrame
    print(new_df)

Questions?

The SuperCowPowers team is happy to answer any questions you may have about AWS and Workbench. Please contact us at workbench@supercowpowers.com or on chat us up on Discord