Pandas Dataframe Algorithms
Pandas Dataframes
Pandas dataframes are obviously not going to scale as well as our Spark and SQL Algorithms, but for 'moderate' sized data these algorithms provide some nice functionality.
Pandas Dataframe Algorithms
Workbench has a growing set of algorithms and data processing tools for Pandas Dataframes. In general these algorithm will take a dataframe as input and give you back a dataframe with additional columns.
FeatureSpaceProximity
Bases: Proximity
Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
| class FeatureSpaceProximity(Proximity):
def __init__(self, model: Model, n_neighbors: int = 10) -> None:
"""
Initialize the FeatureSpaceProximity class.
Args:
model (Model): A Workbench model object.
"""
# Grab the features and target from the model
features = model.features()
target = model.target()
# Grab the feature set for the model
fs = FeatureSet(model.get_input())
# If we have a "inference" view, pull the data from that view
view_name = f"inf_{model.uuid.replace('-', '_')}"
if view_name in fs.views():
self.df = fs.view(view_name).pull_dataframe()
# Otherwise, pull the data from the feature set and run inference
else:
inf_view = InferenceView.create(model)
self.df = inf_view.pull_dataframe()
# Call the parent class constructor
super().__init__(self.df, id_column=fs.id_column, features=features, target=target, n_neighbors=n_neighbors)
# Project the data to 2D
self.df = Projection2D().fit_transform(self.df, features=features)
|
__init__(model, n_neighbors=10)
Initialize the FeatureSpaceProximity class.
Parameters:
Name |
Type |
Description |
Default |
model
|
Model
|
A Workbench model object.
|
required
|
Source code in src/workbench/algorithms/dataframe/feature_space_proximity.py
| def __init__(self, model: Model, n_neighbors: int = 10) -> None:
"""
Initialize the FeatureSpaceProximity class.
Args:
model (Model): A Workbench model object.
"""
# Grab the features and target from the model
features = model.features()
target = model.target()
# Grab the feature set for the model
fs = FeatureSet(model.get_input())
# If we have a "inference" view, pull the data from that view
view_name = f"inf_{model.uuid.replace('-', '_')}"
if view_name in fs.views():
self.df = fs.view(view_name).pull_dataframe()
# Otherwise, pull the data from the feature set and run inference
else:
inf_view = InferenceView.create(model)
self.df = inf_view.pull_dataframe()
# Call the parent class constructor
super().__init__(self.df, id_column=fs.id_column, features=features, target=target, n_neighbors=n_neighbors)
# Project the data to 2D
self.df = Projection2D().fit_transform(self.df, features=features)
|
FingerprintProximity
Bases: Proximity
Source code in src/workbench/algorithms/dataframe/fingerprint_proximity.py
| class FingerprintProximity(Proximity):
def __init__(
self, df: pd.DataFrame, fingerprint_column: str, id_column: Union[int, str], n_neighbors: int = 10
) -> None:
"""
Initialize the FingerprintProximity class.
Args:
df (pd.DataFrame): DataFrame containing fingerprints and other features.
fingerprint_column (str): Name of the column containing fingerprints.
id_column (Union[int, str]): Name of the column used as an identifier.
n_neighbors (int): Number of neighbors to compute.
"""
self.fingerprint_column = fingerprint_column
super().__init__(df, id_column=id_column, n_neighbors=n_neighbors)
def _prepare_data(self) -> None:
"""
Prepare the DataFrame by converting fingerprints into a binary feature matrix.
"""
# Convert the fingerprint strings to binary arrays
log.info("Converting fingerprints to binary feature matrix...")
fingerprint_bits = self.df[self.fingerprint_column].apply(
lambda fp: np.array([int(bit) for bit in fp], dtype=np.bool_)
)
self.X = np.vstack(fingerprint_bits)
# Use Jaccard similarity for binary fingerprints
log.info("Computing NearestNeighbors with Jaccard metric...")
self.nn = NearestNeighbors(metric="jaccard", n_neighbors=self.n_neighbors + 1).fit(self.X)
def get_edge_weight(self, row: pd.Series) -> float:
"""
Compute edge weight using similarity for fingerprints.
"""
return row["similarity"]
def neighbors(
self, query_id: Union[int, str], similarity: float = None, include_self: bool = False
) -> pd.DataFrame:
"""
Return neighbors of the given query ID, either by fixed neighbors or above a similarity threshold.
Args:
query_id (Union[int, str]): The ID of the query point.
similarity (float): Optional similarity threshold above which neighbors are to be included.
include_self (bool): Whether to include the query ID itself in the neighbor results.
Returns:
pd.DataFrame: Filtered DataFrame that includes the query ID, its neighbors, and their similarities.
"""
# Convert similarity to a radius (1 - similarity)
radius = 1 - similarity if similarity is not None else None
neighbors_df = super().neighbors(query_id=query_id, radius=radius, include_self=include_self)
# Convert distances to Tanimoto similarities
if "distance" in neighbors_df.columns:
neighbors_df["similarity"] = 1 - neighbors_df["distance"]
neighbors_df = neighbors_df.drop(columns=["distance"])
return neighbors_df
def all_neighbors(self, include_self: bool = False) -> pd.DataFrame:
"""
Compute nearest neighbors for all rows in the dataset.
Args:
include_self (bool): Whether to include self-loops in the results.
Returns:
pd.DataFrame: A DataFrame of neighbors and their Tanimoto similarities.
"""
all_neighbors_df = super().all_neighbors(include_self=include_self)
# Convert distances to Tanimoto similarities
if "distance" in all_neighbors_df.columns:
all_neighbors_df["similarity"] = 1 - all_neighbors_df["distance"]
all_neighbors_df = all_neighbors_df.drop(columns=["distance"])
return all_neighbors_df
|
__init__(df, fingerprint_column, id_column, n_neighbors=10)
Initialize the FingerprintProximity class.
Parameters:
Name |
Type |
Description |
Default |
df
|
DataFrame
|
DataFrame containing fingerprints and other features.
|
required
|
fingerprint_column
|
str
|
Name of the column containing fingerprints.
|
required
|
id_column
|
Union[int, str]
|
Name of the column used as an identifier.
|
required
|
n_neighbors
|
int
|
Number of neighbors to compute.
|
10
|
Source code in src/workbench/algorithms/dataframe/fingerprint_proximity.py
| def __init__(
self, df: pd.DataFrame, fingerprint_column: str, id_column: Union[int, str], n_neighbors: int = 10
) -> None:
"""
Initialize the FingerprintProximity class.
Args:
df (pd.DataFrame): DataFrame containing fingerprints and other features.
fingerprint_column (str): Name of the column containing fingerprints.
id_column (Union[int, str]): Name of the column used as an identifier.
n_neighbors (int): Number of neighbors to compute.
"""
self.fingerprint_column = fingerprint_column
super().__init__(df, id_column=id_column, n_neighbors=n_neighbors)
|
_prepare_data()
Prepare the DataFrame by converting fingerprints into a binary feature matrix.
Source code in src/workbench/algorithms/dataframe/fingerprint_proximity.py
| def _prepare_data(self) -> None:
"""
Prepare the DataFrame by converting fingerprints into a binary feature matrix.
"""
# Convert the fingerprint strings to binary arrays
log.info("Converting fingerprints to binary feature matrix...")
fingerprint_bits = self.df[self.fingerprint_column].apply(
lambda fp: np.array([int(bit) for bit in fp], dtype=np.bool_)
)
self.X = np.vstack(fingerprint_bits)
# Use Jaccard similarity for binary fingerprints
log.info("Computing NearestNeighbors with Jaccard metric...")
self.nn = NearestNeighbors(metric="jaccard", n_neighbors=self.n_neighbors + 1).fit(self.X)
|
all_neighbors(include_self=False)
Compute nearest neighbors for all rows in the dataset.
Parameters:
Name |
Type |
Description |
Default |
include_self
|
bool
|
Whether to include self-loops in the results.
|
False
|
Returns:
Type |
Description |
DataFrame
|
pd.DataFrame: A DataFrame of neighbors and their Tanimoto similarities.
|
Source code in src/workbench/algorithms/dataframe/fingerprint_proximity.py
| def all_neighbors(self, include_self: bool = False) -> pd.DataFrame:
"""
Compute nearest neighbors for all rows in the dataset.
Args:
include_self (bool): Whether to include self-loops in the results.
Returns:
pd.DataFrame: A DataFrame of neighbors and their Tanimoto similarities.
"""
all_neighbors_df = super().all_neighbors(include_self=include_self)
# Convert distances to Tanimoto similarities
if "distance" in all_neighbors_df.columns:
all_neighbors_df["similarity"] = 1 - all_neighbors_df["distance"]
all_neighbors_df = all_neighbors_df.drop(columns=["distance"])
return all_neighbors_df
|
get_edge_weight(row)
Compute edge weight using similarity for fingerprints.
Source code in src/workbench/algorithms/dataframe/fingerprint_proximity.py
| def get_edge_weight(self, row: pd.Series) -> float:
"""
Compute edge weight using similarity for fingerprints.
"""
return row["similarity"]
|
neighbors(query_id, similarity=None, include_self=False)
Return neighbors of the given query ID, either by fixed neighbors or above a similarity threshold.
Parameters:
Name |
Type |
Description |
Default |
query_id
|
Union[int, str]
|
The ID of the query point.
|
required
|
similarity
|
float
|
Optional similarity threshold above which neighbors are to be included.
|
None
|
include_self
|
bool
|
Whether to include the query ID itself in the neighbor results.
|
False
|
Returns:
Type |
Description |
DataFrame
|
pd.DataFrame: Filtered DataFrame that includes the query ID, its neighbors, and their similarities.
|
Source code in src/workbench/algorithms/dataframe/fingerprint_proximity.py
| def neighbors(
self, query_id: Union[int, str], similarity: float = None, include_self: bool = False
) -> pd.DataFrame:
"""
Return neighbors of the given query ID, either by fixed neighbors or above a similarity threshold.
Args:
query_id (Union[int, str]): The ID of the query point.
similarity (float): Optional similarity threshold above which neighbors are to be included.
include_self (bool): Whether to include the query ID itself in the neighbor results.
Returns:
pd.DataFrame: Filtered DataFrame that includes the query ID, its neighbors, and their similarities.
"""
# Convert similarity to a radius (1 - similarity)
radius = 1 - similarity if similarity is not None else None
neighbors_df = super().neighbors(query_id=query_id, radius=radius, include_self=include_self)
# Convert distances to Tanimoto similarities
if "distance" in neighbors_df.columns:
neighbors_df["similarity"] = 1 - neighbors_df["distance"]
neighbors_df = neighbors_df.drop(columns=["distance"])
return neighbors_df
|
Projection2D
Perform Dimensionality Reduction on a DataFrame using TSNE, MDS, PCA, or UMAP.
Source code in src/workbench/algorithms/dataframe/projection_2d.py
| class Projection2D:
"""Perform Dimensionality Reduction on a DataFrame using TSNE, MDS, PCA, or UMAP."""
def __init__(self):
"""Initialize the Projection2D class."""
self.log = logging.getLogger("workbench")
self.projection_model = None
def fit_transform(self, input_df: pd.DataFrame, features: list = None, projection: str = "UMAP") -> pd.DataFrame:
"""Fit and transform a DataFrame using the selected dimensionality reduction method.
This method creates a copy of the input DataFrame, processes the specified features
for normalization and projection, and returns a new DataFrame with added 'x' and 'y' columns
containing the projected 2D coordinates.
Args:
input_df (pd.DataFrame): The DataFrame containing features to project.
features (list, optional): List of feature column names. If None, numeric columns are auto-selected.
projection (str, optional): The projection to use ('UMAP', 'TSNE', 'MDS' or 'PCA'). Default 'UMAP'.
Returns:
pd.DataFrame: A new DataFrame (a copy of input_df) with added 'x' and 'y' columns.
"""
# Create a copy of the input DataFrame
df = input_df.copy()
# Auto-identify numeric features if none are provided
if features is None:
features = [col for col in df.select_dtypes(include="number").columns if not col.endswith("id")]
self.log.info(f"Auto-identified numeric features: {features}")
if len(features) < 2 or df.empty:
self.log.critical("At least two numeric features are required, and DataFrame must not be empty.")
return df
# Process a copy of the feature data for projection
X = df[features]
X = X.apply(lambda col: col.fillna(col.mean()))
X_scaled = StandardScaler().fit_transform(X)
# Select the projection method (using df for perplexity calculation)
self.projection_model = self._get_projection_model(projection, df)
# Apply the projection on the normalized data
projection_result = self.projection_model.fit_transform(X_scaled)
df[["x", "y"]] = projection_result
# Resolve coincident points by adding jitter and return the new DataFrame
return self.resolve_coincident_points(df)
def _get_projection_model(self, projection: str, df: pd.DataFrame):
"""Select and return the appropriate projection model.
Args:
projection (str): The projection method ('TSNE', 'MDS', 'PCA', or 'UMAP').
df (pd.DataFrame): The DataFrame being transformed (used for computing perplexity).
Returns:
A dimensionality reduction model instance.
"""
if projection == "TSNE":
perplexity = min(40, len(df) - 1)
self.log.info(f"Projection: TSNE with perplexity {perplexity}")
return TSNE(perplexity=perplexity)
if projection == "MDS":
self.log.info("Projection: MDS")
return MDS(n_components=2, random_state=0)
if projection == "PCA":
self.log.info("Projection: PCA")
return PCA(n_components=2)
if projection == "UMAP" and UMAP_AVAILABLE:
self.log.info("Projection: UMAP")
return umap.UMAP(n_components=2)
self.log.warning(
f"Projection method '{projection}' not recognized or UMAP not available. Falling back to TSNE."
)
return TSNE(perplexity=min(40, len(df) - 1))
@staticmethod
def resolve_coincident_points(df: pd.DataFrame) -> pd.DataFrame:
"""Resolve coincident points in a DataFrame by adding jitter.
Args:
df (pd.DataFrame): The DataFrame containing x and y projection coordinates.
Returns:
pd.DataFrame: The DataFrame with resolved coincident points.
"""
jitter_x = (df["x"].max() - df["x"].min()) * 0.005
jitter_y = (df["y"].max() - df["y"].min()) * 0.005
df["x"] += np.random.normal(0, jitter_x, len(df))
df["y"] += np.random.normal(0, jitter_y, len(df))
return df
|
__init__()
Initialize the Projection2D class.
Source code in src/workbench/algorithms/dataframe/projection_2d.py
| def __init__(self):
"""Initialize the Projection2D class."""
self.log = logging.getLogger("workbench")
self.projection_model = None
|
_get_projection_model(projection, df)
Select and return the appropriate projection model.
Parameters:
Name |
Type |
Description |
Default |
projection
|
str
|
The projection method ('TSNE', 'MDS', 'PCA', or 'UMAP').
|
required
|
df
|
DataFrame
|
The DataFrame being transformed (used for computing perplexity).
|
required
|
Returns:
Type |
Description |
|
A dimensionality reduction model instance.
|
Source code in src/workbench/algorithms/dataframe/projection_2d.py
| def _get_projection_model(self, projection: str, df: pd.DataFrame):
"""Select and return the appropriate projection model.
Args:
projection (str): The projection method ('TSNE', 'MDS', 'PCA', or 'UMAP').
df (pd.DataFrame): The DataFrame being transformed (used for computing perplexity).
Returns:
A dimensionality reduction model instance.
"""
if projection == "TSNE":
perplexity = min(40, len(df) - 1)
self.log.info(f"Projection: TSNE with perplexity {perplexity}")
return TSNE(perplexity=perplexity)
if projection == "MDS":
self.log.info("Projection: MDS")
return MDS(n_components=2, random_state=0)
if projection == "PCA":
self.log.info("Projection: PCA")
return PCA(n_components=2)
if projection == "UMAP" and UMAP_AVAILABLE:
self.log.info("Projection: UMAP")
return umap.UMAP(n_components=2)
self.log.warning(
f"Projection method '{projection}' not recognized or UMAP not available. Falling back to TSNE."
)
return TSNE(perplexity=min(40, len(df) - 1))
|
Fit and transform a DataFrame using the selected dimensionality reduction method.
This method creates a copy of the input DataFrame, processes the specified features
for normalization and projection, and returns a new DataFrame with added 'x' and 'y' columns
containing the projected 2D coordinates.
Parameters:
Name |
Type |
Description |
Default |
input_df
|
DataFrame
|
The DataFrame containing features to project.
|
required
|
features
|
list
|
List of feature column names. If None, numeric columns are auto-selected.
|
None
|
projection
|
str
|
The projection to use ('UMAP', 'TSNE', 'MDS' or 'PCA'). Default 'UMAP'.
|
'UMAP'
|
Returns:
Type |
Description |
DataFrame
|
pd.DataFrame: A new DataFrame (a copy of input_df) with added 'x' and 'y' columns.
|
Source code in src/workbench/algorithms/dataframe/projection_2d.py
| def fit_transform(self, input_df: pd.DataFrame, features: list = None, projection: str = "UMAP") -> pd.DataFrame:
"""Fit and transform a DataFrame using the selected dimensionality reduction method.
This method creates a copy of the input DataFrame, processes the specified features
for normalization and projection, and returns a new DataFrame with added 'x' and 'y' columns
containing the projected 2D coordinates.
Args:
input_df (pd.DataFrame): The DataFrame containing features to project.
features (list, optional): List of feature column names. If None, numeric columns are auto-selected.
projection (str, optional): The projection to use ('UMAP', 'TSNE', 'MDS' or 'PCA'). Default 'UMAP'.
Returns:
pd.DataFrame: A new DataFrame (a copy of input_df) with added 'x' and 'y' columns.
"""
# Create a copy of the input DataFrame
df = input_df.copy()
# Auto-identify numeric features if none are provided
if features is None:
features = [col for col in df.select_dtypes(include="number").columns if not col.endswith("id")]
self.log.info(f"Auto-identified numeric features: {features}")
if len(features) < 2 or df.empty:
self.log.critical("At least two numeric features are required, and DataFrame must not be empty.")
return df
# Process a copy of the feature data for projection
X = df[features]
X = X.apply(lambda col: col.fillna(col.mean()))
X_scaled = StandardScaler().fit_transform(X)
# Select the projection method (using df for perplexity calculation)
self.projection_model = self._get_projection_model(projection, df)
# Apply the projection on the normalized data
projection_result = self.projection_model.fit_transform(X_scaled)
df[["x", "y"]] = projection_result
# Resolve coincident points by adding jitter and return the new DataFrame
return self.resolve_coincident_points(df)
|
resolve_coincident_points(df)
staticmethod
Resolve coincident points in a DataFrame by adding jitter.
Parameters:
Name |
Type |
Description |
Default |
df
|
DataFrame
|
The DataFrame containing x and y projection coordinates.
|
required
|
Returns:
Type |
Description |
DataFrame
|
pd.DataFrame: The DataFrame with resolved coincident points.
|
Source code in src/workbench/algorithms/dataframe/projection_2d.py
| @staticmethod
def resolve_coincident_points(df: pd.DataFrame) -> pd.DataFrame:
"""Resolve coincident points in a DataFrame by adding jitter.
Args:
df (pd.DataFrame): The DataFrame containing x and y projection coordinates.
Returns:
pd.DataFrame: The DataFrame with resolved coincident points.
"""
jitter_x = (df["x"].max() - df["x"].min()) * 0.005
jitter_y = (df["y"].max() - df["y"].min()) * 0.005
df["x"] += np.random.normal(0, jitter_x, len(df))
df["y"] += np.random.normal(0, jitter_y, len(df))
return df
|
Questions?

The SuperCowPowers team is happy to answer any questions you may have about AWS and Workbench. Please contact us at workbench@supercowpowers.com or on chat us up on Discord