Skip to content

Pandas Dataframe Algorithms

Pandas Dataframes

Pandas dataframes are obviously not going to scale as well as our Spark and SQL Algorithms, but for 'moderate' sized data these algorithms provide some nice functionality.

Pandas Dataframe Algorithms

Workbench has a growing set of algorithms and data processing tools for Pandas Dataframes. In general these algorithm will take a dataframe as input and give you back a dataframe with additional columns.

FeatureSpaceProximity

Bases: Proximity

Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
class FeatureSpaceProximity(Proximity):
    def __init__(
        self, df: pd.DataFrame, id_column: str, features: List[str], target: str = None, n_neighbors: int = 10
    ) -> None:
        """
        Initialize the FeatureSpaceProximity class.

        Args:
            df (pd.DataFrame): DataFrame containing feature data.
            id_column (str): Name of the column used as an identifier.
            features (List[str]): List of feature column names to be used for neighbor computations.
            target (str): Optional name of the target column.
            n_neighbors (int): Number of neighbors to compute.
        """
        if not features:
            raise ValueError("The 'features' list must be defined and contain at least one feature.")
        super().__init__(df, id_column=id_column, features=features, target=target, n_neighbors=n_neighbors)

    def _prepare_data(self) -> None:
        """
        Prepare the feature matrix by scaling numeric features.
        """
        # Scale features for better distance computation
        scaler = StandardScaler()
        self.X = scaler.fit_transform(self.data[self.features].values)
        self.nn = NearestNeighbors(n_neighbors=self.n_neighbors + 1).fit(self.X)

    @classmethod
    def from_model(cls, model) -> "FeatureSpaceProximity":
        """Create a FeatureSpaceProximity instance from a Workbench model object.

        Args:
            model (Model): A Workbench model object.

        Returns:
            FeatureSpaceProximity: A new instance of the FeatureSpaceProximity class.
        """
        from workbench.api import FeatureSet

        # Extract necessary attributes from the Workbench model
        fs = FeatureSet(model.get_input())
        features = model.features()
        target = model.target()

        # Retrieve the training DataFrame from the feature set
        df = fs.view("training").pull_dataframe()

        # Create and return a new instance of FeatureSpaceProximity
        return cls(df=df, id_column=fs.id_column, features=features, target=target)

__init__(df, id_column, features, target=None, n_neighbors=10)

Initialize the FeatureSpaceProximity class.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing feature data.

required
id_column str

Name of the column used as an identifier.

required
features List[str]

List of feature column names to be used for neighbor computations.

required
target str

Optional name of the target column.

None
n_neighbors int

Number of neighbors to compute.

10
Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
def __init__(
    self, df: pd.DataFrame, id_column: str, features: List[str], target: str = None, n_neighbors: int = 10
) -> None:
    """
    Initialize the FeatureSpaceProximity class.

    Args:
        df (pd.DataFrame): DataFrame containing feature data.
        id_column (str): Name of the column used as an identifier.
        features (List[str]): List of feature column names to be used for neighbor computations.
        target (str): Optional name of the target column.
        n_neighbors (int): Number of neighbors to compute.
    """
    if not features:
        raise ValueError("The 'features' list must be defined and contain at least one feature.")
    super().__init__(df, id_column=id_column, features=features, target=target, n_neighbors=n_neighbors)

from_model(model) classmethod

Create a FeatureSpaceProximity instance from a Workbench model object.

Parameters:

Name Type Description Default
model Model

A Workbench model object.

required

Returns:

Name Type Description
FeatureSpaceProximity FeatureSpaceProximity

A new instance of the FeatureSpaceProximity class.

Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
@classmethod
def from_model(cls, model) -> "FeatureSpaceProximity":
    """Create a FeatureSpaceProximity instance from a Workbench model object.

    Args:
        model (Model): A Workbench model object.

    Returns:
        FeatureSpaceProximity: A new instance of the FeatureSpaceProximity class.
    """
    from workbench.api import FeatureSet

    # Extract necessary attributes from the Workbench model
    fs = FeatureSet(model.get_input())
    features = model.features()
    target = model.target()

    # Retrieve the training DataFrame from the feature set
    df = fs.view("training").pull_dataframe()

    # Create and return a new instance of FeatureSpaceProximity
    return cls(df=df, id_column=fs.id_column, features=features, target=target)

FingerprintProximity

Bases: Proximity

Source code in src/workbench/algorithms/dataframe/fingerprint_proximity.py
class FingerprintProximity(Proximity):
    def __init__(
        self, df: pd.DataFrame, fingerprint_column: str, id_column: Union[int, str], n_neighbors: int = 10
    ) -> None:
        """
        Initialize the FingerprintProximity class.

        Args:
            df (pd.DataFrame): DataFrame containing fingerprints and other features.
            fingerprint_column (str): Name of the column containing fingerprints.
            id_column (Union[int, str]): Name of the column used as an identifier.
            n_neighbors (int): Number of neighbors to compute.
        """
        self.fingerprint_column = fingerprint_column
        super().__init__(df, id_column=id_column, n_neighbors=n_neighbors)

    def _prepare_data(self) -> None:
        """
        Prepare the DataFrame by converting fingerprints into a binary feature matrix.
        """
        # Convert the fingerprint strings to binary arrays
        log.info("Converting fingerprints to binary feature matrix...")
        fingerprint_bits = self.data[self.fingerprint_column].apply(
            lambda fp: np.array([int(bit) for bit in fp], dtype=np.bool_)
        )
        self.X = np.vstack(fingerprint_bits)

        # Use Jaccard similarity for binary fingerprints
        log.info("Computing NearestNeighbors with Jaccard metric...")
        self.nn = NearestNeighbors(metric="jaccard", n_neighbors=self.n_neighbors + 1).fit(self.X)

    def get_edge_weight(self, row: pd.Series) -> float:
        """
        Compute edge weight using similarity for fingerprints.
        """
        return row["similarity"]

    def neighbors(
        self, query_id: Union[int, str], similarity: float = None, include_self: bool = False
    ) -> pd.DataFrame:
        """
        Return neighbors of the given query ID, either by fixed neighbors or above a similarity threshold.

        Args:
            query_id (Union[int, str]): The ID of the query point.
            similarity (float): Optional similarity threshold above which neighbors are to be included.
            include_self (bool): Whether to include the query ID itself in the neighbor results.

        Returns:
            pd.DataFrame: Filtered DataFrame that includes the query ID, its neighbors, and their similarities.
        """
        # Convert similarity to a radius (1 - similarity)
        radius = 1 - similarity if similarity is not None else None
        neighbors_df = super().neighbors(query_id=query_id, radius=radius, include_self=include_self)

        # Convert distances to Tanimoto similarities
        if "distance" in neighbors_df.columns:
            neighbors_df["similarity"] = 1 - neighbors_df["distance"]
            neighbors_df = neighbors_df.drop(columns=["distance"])

        return neighbors_df

    def all_neighbors(self, include_self: bool = False) -> pd.DataFrame:
        """
        Compute nearest neighbors for all rows in the dataset.

        Args:
            include_self (bool): Whether to include self-loops in the results.

        Returns:
            pd.DataFrame: A DataFrame of neighbors and their Tanimoto similarities.
        """
        all_neighbors_df = super().all_neighbors(include_self=include_self)

        # Convert distances to Tanimoto similarities
        if "distance" in all_neighbors_df.columns:
            all_neighbors_df["similarity"] = 1 - all_neighbors_df["distance"]
            all_neighbors_df = all_neighbors_df.drop(columns=["distance"])

        return all_neighbors_df

__init__(df, fingerprint_column, id_column, n_neighbors=10)

Initialize the FingerprintProximity class.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing fingerprints and other features.

required
fingerprint_column str

Name of the column containing fingerprints.

required
id_column Union[int, str]

Name of the column used as an identifier.

required
n_neighbors int

Number of neighbors to compute.

10
Source code in src/workbench/algorithms/dataframe/fingerprint_proximity.py
def __init__(
    self, df: pd.DataFrame, fingerprint_column: str, id_column: Union[int, str], n_neighbors: int = 10
) -> None:
    """
    Initialize the FingerprintProximity class.

    Args:
        df (pd.DataFrame): DataFrame containing fingerprints and other features.
        fingerprint_column (str): Name of the column containing fingerprints.
        id_column (Union[int, str]): Name of the column used as an identifier.
        n_neighbors (int): Number of neighbors to compute.
    """
    self.fingerprint_column = fingerprint_column
    super().__init__(df, id_column=id_column, n_neighbors=n_neighbors)

all_neighbors(include_self=False)

Compute nearest neighbors for all rows in the dataset.

Parameters:

Name Type Description Default
include_self bool

Whether to include self-loops in the results.

False

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame of neighbors and their Tanimoto similarities.

Source code in src/workbench/algorithms/dataframe/fingerprint_proximity.py
def all_neighbors(self, include_self: bool = False) -> pd.DataFrame:
    """
    Compute nearest neighbors for all rows in the dataset.

    Args:
        include_self (bool): Whether to include self-loops in the results.

    Returns:
        pd.DataFrame: A DataFrame of neighbors and their Tanimoto similarities.
    """
    all_neighbors_df = super().all_neighbors(include_self=include_self)

    # Convert distances to Tanimoto similarities
    if "distance" in all_neighbors_df.columns:
        all_neighbors_df["similarity"] = 1 - all_neighbors_df["distance"]
        all_neighbors_df = all_neighbors_df.drop(columns=["distance"])

    return all_neighbors_df

get_edge_weight(row)

Compute edge weight using similarity for fingerprints.

Source code in src/workbench/algorithms/dataframe/fingerprint_proximity.py
def get_edge_weight(self, row: pd.Series) -> float:
    """
    Compute edge weight using similarity for fingerprints.
    """
    return row["similarity"]

neighbors(query_id, similarity=None, include_self=False)

Return neighbors of the given query ID, either by fixed neighbors or above a similarity threshold.

Parameters:

Name Type Description Default
query_id Union[int, str]

The ID of the query point.

required
similarity float

Optional similarity threshold above which neighbors are to be included.

None
include_self bool

Whether to include the query ID itself in the neighbor results.

False

Returns:

Type Description
DataFrame

pd.DataFrame: Filtered DataFrame that includes the query ID, its neighbors, and their similarities.

Source code in src/workbench/algorithms/dataframe/fingerprint_proximity.py
def neighbors(
    self, query_id: Union[int, str], similarity: float = None, include_self: bool = False
) -> pd.DataFrame:
    """
    Return neighbors of the given query ID, either by fixed neighbors or above a similarity threshold.

    Args:
        query_id (Union[int, str]): The ID of the query point.
        similarity (float): Optional similarity threshold above which neighbors are to be included.
        include_self (bool): Whether to include the query ID itself in the neighbor results.

    Returns:
        pd.DataFrame: Filtered DataFrame that includes the query ID, its neighbors, and their similarities.
    """
    # Convert similarity to a radius (1 - similarity)
    radius = 1 - similarity if similarity is not None else None
    neighbors_df = super().neighbors(query_id=query_id, radius=radius, include_self=include_self)

    # Convert distances to Tanimoto similarities
    if "distance" in neighbors_df.columns:
        neighbors_df["similarity"] = 1 - neighbors_df["distance"]
        neighbors_df = neighbors_df.drop(columns=["distance"])

    return neighbors_df

ResidualsCalculator

Bases: BaseEstimator, TransformerMixin

A custom transformer for calculating residuals using cross-validation or an endpoint.

This transformer performs K-Fold cross-validation (if no endpoint is provided), or it uses the endpoint to generate predictions and compute residuals. It adds 'prediction', 'residuals', 'residuals_abs', 'prediction_100', 'residuals_100', and 'residuals_100_abs' columns to the input DataFrame.

Attributes:

Name Type Description
model_class Union[RegressorMixin, XGBRegressor]

The machine learning model class used for predictions.

n_splits int

Number of splits for cross-validation.

random_state int

Random state for reproducibility.

endpoint Optional

The Workbench endpoint object for running inference, if provided.

Source code in src/workbench/algorithms/dataframe/residuals_calculator.py
class ResidualsCalculator(BaseEstimator, TransformerMixin):
    """
    A custom transformer for calculating residuals using cross-validation or an endpoint.

    This transformer performs K-Fold cross-validation (if no endpoint is provided), or it uses the endpoint
    to generate predictions and compute residuals. It adds 'prediction', 'residuals', 'residuals_abs',
    'prediction_100', 'residuals_100', and 'residuals_100_abs' columns to the input DataFrame.

    Attributes:
        model_class (Union[RegressorMixin, XGBRegressor]): The machine learning model class used for predictions.
        n_splits (int): Number of splits for cross-validation.
        random_state (int): Random state for reproducibility.
        endpoint (Optional): The Workbench endpoint object for running inference, if provided.
    """

    def __init__(
        self,
        endpoint: Optional[object] = None,
        reference_model_class: Union[RegressorMixin, XGBRegressor] = XGBRegressor,
    ):
        """
        Initializes the ResidualsCalculator with the specified parameters.

        Args:
            endpoint (Optional): A Workbench endpoint object to run inference, if available.
            reference_model_class (Union[RegressorMixin, XGBRegressor]): The reference model class for predictions.
        """
        self.n_splits = 5
        self.random_state = 42
        self.reference_model_class = reference_model_class  # Store the class, instantiate the model later
        self.reference_model = None  # Lazy model initialization
        self.endpoint = endpoint  # Use this endpoint for inference if provided
        self.X = None
        self.y = None
        super().__init__()

    def fit(self, X: pd.DataFrame, y: pd.Series) -> BaseEstimator:
        """
        Fits the model. If no endpoint is provided, fitting involves storing the input data
        and initializing a reference model.

        Args:
            X (pd.DataFrame): The input features.
            y (pd.Series): The target variable.

        Returns:
            self: Returns an instance of self.
        """
        self.X = X
        self.y = y

        if self.endpoint is None:
            # Only initialize the reference model if no endpoint is provided
            self.reference_model = self.reference_model_class()
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Transforms the input DataFrame by adding 'prediction', 'residuals', 'residuals_abs',
        'prediction_100', 'residuals_100', and 'residuals_100_abs' columns.

        Args:
            X (pd.DataFrame): The input features.

        Returns:
            pd.DataFrame: The transformed DataFrame with additional columns.
        """
        check_is_fitted(self, ["X", "y"])  # Ensure fit has been called

        if self.endpoint:
            # If an endpoint is provided, run inference on the full data
            result_df = self._run_inference_via_endpoint(X)
        else:
            # If no endpoint, perform cross-validation and full model fitting
            result_df = self._run_cross_validation(X)

        return result_df

    def _run_cross_validation(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Handles the cross-validation process when no endpoint is provided.

        Args:
            X (pd.DataFrame): The input features.

        Returns:
            pd.DataFrame: DataFrame with predictions and residuals from cross-validation and full model fit.
        """
        kf = KFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)

        # Initialize pandas Series to store predictions and residuals, aligned by index
        predictions = pd.Series(index=self.y.index, dtype=np.float64)
        residuals = pd.Series(index=self.y.index, dtype=np.float64)
        residuals_abs = pd.Series(index=self.y.index, dtype=np.float64)

        # Perform cross-validation and collect predictions and residuals
        for train_index, test_index in kf.split(self.X):
            X_train, X_test = self.X.iloc[train_index], self.X.iloc[test_index]
            y_train, y_test = self.y.iloc[train_index], self.y.iloc[test_index]

            # Fit the model on the training data
            self.reference_model.fit(X_train, y_train)

            # Predict on the test data
            y_pred = self.reference_model.predict(X_test)

            # Compute residuals and absolute residuals
            residuals_fold = y_test - y_pred
            residuals_abs_fold = np.abs(residuals_fold)

            # Place the predictions and residuals in the correct positions based on index
            predictions.iloc[test_index] = y_pred
            residuals.iloc[test_index] = residuals_fold
            residuals_abs.iloc[test_index] = residuals_abs_fold

        # Train on all data and compute residuals for 100% training
        self.reference_model.fit(self.X, self.y)
        y_pred_100 = self.reference_model.predict(self.X)
        residuals_100 = self.y - y_pred_100
        residuals_100_abs = np.abs(residuals_100)

        # Create a copy of the provided DataFrame and add the new columns
        result_df = X.copy()
        result_df["prediction"] = predictions
        result_df["residuals"] = residuals
        result_df["residuals_abs"] = residuals_abs
        result_df["prediction_100"] = y_pred_100
        result_df["residuals_100"] = residuals_100
        result_df["residuals_100_abs"] = residuals_100_abs
        result_df[self.y.name] = self.y  # Add the target column back

        return result_df

    def _run_inference_via_endpoint(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Handles the inference process when an endpoint is provided.

        Args:
            X (pd.DataFrame): The input features.

        Returns:
            pd.DataFrame: DataFrame with predictions and residuals from the endpoint.
        """
        # Run inference on all data using the endpoint (include the target column)
        X = X.copy()
        X.loc[:, self.y.name] = self.y
        results_df = self.endpoint.inference(X)
        predictions = results_df["prediction"]

        # Compute residuals and residuals_abs based on the endpoint's predictions
        residuals = self.y - predictions
        residuals_abs = np.abs(residuals)

        # To maintain consistency, populate both 'prediction' and 'prediction_100' with the same values
        result_df = X.copy()
        result_df["prediction"] = predictions
        result_df["residuals"] = residuals
        result_df["residuals_abs"] = residuals_abs
        result_df["prediction_100"] = predictions
        result_df["residuals_100"] = residuals
        result_df["residuals_100_abs"] = residuals_abs

        return result_df

__init__(endpoint=None, reference_model_class=XGBRegressor)

Initializes the ResidualsCalculator with the specified parameters.

Parameters:

Name Type Description Default
endpoint Optional

A Workbench endpoint object to run inference, if available.

None
reference_model_class Union[RegressorMixin, XGBRegressor]

The reference model class for predictions.

XGBRegressor
Source code in src/workbench/algorithms/dataframe/residuals_calculator.py
def __init__(
    self,
    endpoint: Optional[object] = None,
    reference_model_class: Union[RegressorMixin, XGBRegressor] = XGBRegressor,
):
    """
    Initializes the ResidualsCalculator with the specified parameters.

    Args:
        endpoint (Optional): A Workbench endpoint object to run inference, if available.
        reference_model_class (Union[RegressorMixin, XGBRegressor]): The reference model class for predictions.
    """
    self.n_splits = 5
    self.random_state = 42
    self.reference_model_class = reference_model_class  # Store the class, instantiate the model later
    self.reference_model = None  # Lazy model initialization
    self.endpoint = endpoint  # Use this endpoint for inference if provided
    self.X = None
    self.y = None
    super().__init__()

fit(X, y)

Fits the model. If no endpoint is provided, fitting involves storing the input data and initializing a reference model.

Parameters:

Name Type Description Default
X DataFrame

The input features.

required
y Series

The target variable.

required

Returns:

Name Type Description
self BaseEstimator

Returns an instance of self.

Source code in src/workbench/algorithms/dataframe/residuals_calculator.py
def fit(self, X: pd.DataFrame, y: pd.Series) -> BaseEstimator:
    """
    Fits the model. If no endpoint is provided, fitting involves storing the input data
    and initializing a reference model.

    Args:
        X (pd.DataFrame): The input features.
        y (pd.Series): The target variable.

    Returns:
        self: Returns an instance of self.
    """
    self.X = X
    self.y = y

    if self.endpoint is None:
        # Only initialize the reference model if no endpoint is provided
        self.reference_model = self.reference_model_class()
    return self

transform(X)

Transforms the input DataFrame by adding 'prediction', 'residuals', 'residuals_abs', 'prediction_100', 'residuals_100', and 'residuals_100_abs' columns.

Parameters:

Name Type Description Default
X DataFrame

The input features.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The transformed DataFrame with additional columns.

Source code in src/workbench/algorithms/dataframe/residuals_calculator.py
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """
    Transforms the input DataFrame by adding 'prediction', 'residuals', 'residuals_abs',
    'prediction_100', 'residuals_100', and 'residuals_100_abs' columns.

    Args:
        X (pd.DataFrame): The input features.

    Returns:
        pd.DataFrame: The transformed DataFrame with additional columns.
    """
    check_is_fitted(self, ["X", "y"])  # Ensure fit has been called

    if self.endpoint:
        # If an endpoint is provided, run inference on the full data
        result_df = self._run_inference_via_endpoint(X)
    else:
        # If no endpoint, perform cross-validation and full model fitting
        result_df = self._run_cross_validation(X)

    return result_df

DimensionalityReduction: Perform Dimensionality Reduction on a DataFrame

DimensionalityReduction

Source code in src/workbench/algorithms/dataframe/dimensionality_reduction.py
class DimensionalityReduction:
    def __init__(self):
        """DimensionalityReduction:  Perform Dimensionality Reduction on a DataFrame"""
        self.log = logging.getLogger("workbench")
        self.projection_model = None
        self.features = None

    def fit_transform(self, df: pd.DataFrame, features: list = None, projection: str = "TSNE") -> pd.DataFrame:
        """Fit and Transform the DataFrame
        Args:
            df: Pandas DataFrame
            features: List of feature column names (default: None)
            projection: The projection model to use (TSNE, MDS or PCA, default: PCA)
        Returns:
            Pandas DataFrame with new columns x and y
        """

        # If no features are given, indentify all numeric columns
        if features is None:
            features = [x for x in df.select_dtypes(include="number").columns.tolist() if not x.endswith("id")]
            # Also drop group_count if it exists
            features = [x for x in features if x != "group_count"]
            self.log.info("No features given, auto identifying numeric columns...")
            self.log.info(f"{features}")
        self.features = features

        # Sanity checks
        if not all(column in df.columns for column in self.features):
            self.log.critical("Some features are missing in the DataFrame")
            return df
        if len(self.features) < 2:
            self.log.critical("At least two features are required")
            return df
        if df.empty:
            self.log.critical("DataFrame is empty")
            return df

        # Most projection models will fail if there are any NaNs in the data
        # So we'll fill NaNs with the mean value for that column
        for col in df[self.features].columns:
            df[col].fillna(df[col].mean(), inplace=True)

        # Normalize the features
        scaler = StandardScaler()
        normalized_data = scaler.fit_transform(df[self.features])
        df[self.features] = normalized_data

        # Project the multidimensional features onto an x,y plane
        self.log.info("Projecting features onto an x,y plane...")

        # Perform the projection
        if projection == "TSNE":
            # Perplexity is a hyperparameter that controls the number of neighbors used to compute the manifold
            # The number of neighbors should be less than the number of samples
            perplexity = min(40, len(df) - 1)
            self.log.info(f"Perplexity: {perplexity}")
            self.projection_model = TSNE(perplexity=perplexity)
        elif projection == "MDS":
            self.projection_model = MDS(n_components=2, random_state=0)
        elif projection == "PCA":
            self.projection_model = PCA(n_components=2)

        # Fit the projection model
        # Hack PCA + TSNE to work together
        projection = self.projection_model.fit_transform(df[self.features])

        # Put the projection results back into the given DataFrame
        df["x"] = projection[:, 0]  # Projection X Column
        df["y"] = projection[:, 1]  # Projection Y Column

        # Jitter the data to resolve coincident points
        # df = self.resolve_coincident_points(df)

        # Return the DataFrame with the new columns
        return df

    @staticmethod
    def resolve_coincident_points(df: pd.DataFrame):
        """Resolve coincident points in a DataFrame
        Args:
            df(pd.DataFrame): The DataFrame to resolve coincident points in
        Returns:
            pd.DataFrame: The DataFrame with resolved coincident points
        """
        # Adding Jitter to the projection
        x_scale = (df["x"].max() - df["x"].min()) * 0.1
        y_scale = (df["y"].max() - df["y"].min()) * 0.1
        df["x"] += np.random.normal(-x_scale, +x_scale, len(df))
        df["y"] += np.random.normal(-y_scale, +y_scale, len(df))
        return df

__init__()

DimensionalityReduction: Perform Dimensionality Reduction on a DataFrame

Source code in src/workbench/algorithms/dataframe/dimensionality_reduction.py
def __init__(self):
    """DimensionalityReduction:  Perform Dimensionality Reduction on a DataFrame"""
    self.log = logging.getLogger("workbench")
    self.projection_model = None
    self.features = None

fit_transform(df, features=None, projection='TSNE')

Fit and Transform the DataFrame Args: df: Pandas DataFrame features: List of feature column names (default: None) projection: The projection model to use (TSNE, MDS or PCA, default: PCA) Returns: Pandas DataFrame with new columns x and y

Source code in src/workbench/algorithms/dataframe/dimensionality_reduction.py
def fit_transform(self, df: pd.DataFrame, features: list = None, projection: str = "TSNE") -> pd.DataFrame:
    """Fit and Transform the DataFrame
    Args:
        df: Pandas DataFrame
        features: List of feature column names (default: None)
        projection: The projection model to use (TSNE, MDS or PCA, default: PCA)
    Returns:
        Pandas DataFrame with new columns x and y
    """

    # If no features are given, indentify all numeric columns
    if features is None:
        features = [x for x in df.select_dtypes(include="number").columns.tolist() if not x.endswith("id")]
        # Also drop group_count if it exists
        features = [x for x in features if x != "group_count"]
        self.log.info("No features given, auto identifying numeric columns...")
        self.log.info(f"{features}")
    self.features = features

    # Sanity checks
    if not all(column in df.columns for column in self.features):
        self.log.critical("Some features are missing in the DataFrame")
        return df
    if len(self.features) < 2:
        self.log.critical("At least two features are required")
        return df
    if df.empty:
        self.log.critical("DataFrame is empty")
        return df

    # Most projection models will fail if there are any NaNs in the data
    # So we'll fill NaNs with the mean value for that column
    for col in df[self.features].columns:
        df[col].fillna(df[col].mean(), inplace=True)

    # Normalize the features
    scaler = StandardScaler()
    normalized_data = scaler.fit_transform(df[self.features])
    df[self.features] = normalized_data

    # Project the multidimensional features onto an x,y plane
    self.log.info("Projecting features onto an x,y plane...")

    # Perform the projection
    if projection == "TSNE":
        # Perplexity is a hyperparameter that controls the number of neighbors used to compute the manifold
        # The number of neighbors should be less than the number of samples
        perplexity = min(40, len(df) - 1)
        self.log.info(f"Perplexity: {perplexity}")
        self.projection_model = TSNE(perplexity=perplexity)
    elif projection == "MDS":
        self.projection_model = MDS(n_components=2, random_state=0)
    elif projection == "PCA":
        self.projection_model = PCA(n_components=2)

    # Fit the projection model
    # Hack PCA + TSNE to work together
    projection = self.projection_model.fit_transform(df[self.features])

    # Put the projection results back into the given DataFrame
    df["x"] = projection[:, 0]  # Projection X Column
    df["y"] = projection[:, 1]  # Projection Y Column

    # Jitter the data to resolve coincident points
    # df = self.resolve_coincident_points(df)

    # Return the DataFrame with the new columns
    return df

resolve_coincident_points(df) staticmethod

Resolve coincident points in a DataFrame Args: df(pd.DataFrame): The DataFrame to resolve coincident points in Returns: pd.DataFrame: The DataFrame with resolved coincident points

Source code in src/workbench/algorithms/dataframe/dimensionality_reduction.py
@staticmethod
def resolve_coincident_points(df: pd.DataFrame):
    """Resolve coincident points in a DataFrame
    Args:
        df(pd.DataFrame): The DataFrame to resolve coincident points in
    Returns:
        pd.DataFrame: The DataFrame with resolved coincident points
    """
    # Adding Jitter to the projection
    x_scale = (df["x"].max() - df["x"].min()) * 0.1
    y_scale = (df["y"].max() - df["y"].min()) * 0.1
    df["x"] += np.random.normal(-x_scale, +x_scale, len(df))
    df["y"] += np.random.normal(-y_scale, +y_scale, len(df))
    return df

test()

Test for the Dimensionality Reduction Class

Source code in src/workbench/algorithms/dataframe/dimensionality_reduction.py
def test():
    """Test for the Dimensionality Reduction Class"""
    # Set some pandas options
    pd.set_option("display.max_columns", None)
    pd.set_option("display.width", 1000)

    # Make some fake data
    data = {
        "ID": [
            "id_0",
            "id_0",
            "id_2",
            "id_3",
            "id_4",
            "id_5",
            "id_6",
            "id_7",
            "id_8",
            "id_9",
        ],
        "feat1": [1.0, 1.0, 1.1, 3.0, 4.0, 1.0, 1.0, 1.1, 3.0, 4.0],
        "feat2": [1.0, 1.0, 1.1, 3.0, 4.0, 1.0, 1.0, 1.1, 3.0, 4.0],
        "feat3": [0.1, 0.1, 0.2, 1.6, 2.5, 0.1, 0.1, 0.2, 1.6, 2.5],
        "price": [31, 60, 62, 40, 20, 31, 61, 60, 40, 20],
    }
    data_df = pd.DataFrame(data)
    features = ["feat1", "feat2", "feat3"]

    # Create the class and run the dimensionality reduction
    projection = DimensionalityReduction()
    new_df = projection.fit_transform(data_df, features=features, projection="TSNE")

    # Check that the x and y columns were added
    assert "x" in new_df.columns
    assert "y" in new_df.columns

    # Output the DataFrame
    print(new_df)

Questions?

The SuperCowPowers team is happy to answer any questions you may have about AWS and Workbench. Please contact us at workbench@supercowpowers.com or on chat us up on Discord