Graph Algorithms
Graph Algorithms
WIP: These classes are currently actively being developed and are subject to change in both API and functionality over time.
Graph Algorithms
Docs TBD
ProximityGraph
Build a NetworkX graph using the Proximity class.
Source code in src/workbench/algorithms/graph/light/proximity_graph.py
| class ProximityGraph:
"""Build a NetworkX graph using the Proximity class."""
def __init__(self):
"""Build a NetworkX graph using the Proximity class."""
self._nx_graph = None
self.graph_store = GraphStore()
def build_graph(self, prox: Proximity, n_neighbors: int = 5, min_edges: int = 2, min_weight: float = 0.8) -> None:
"""Build a NetworkX graph from a Proximity instance.
Args:
prox (Proximity): An instance of a Proximity class.
n_neighbors (int): Number of neighbors to retrieve per node (default: 5).
min_edges (int): Minimum edges per node (default: 2).
min_weight (float): Weight threshold for additional edges beyond min_edges (default: 0.8).
"""
node_df = prox.df
id_column = prox.id_column
# Get all neighbor pairs
log.info("Retrieving all neighbors...")
all_ids = node_df[id_column].tolist()
neighbors_df = prox.neighbors(all_ids, n_neighbors=n_neighbors, include_self=False)
# Build the graph and add nodes
log.info("Adding nodes to the proximity graph...")
self._nx_graph = nx.Graph()
# Handle duplicate IDs
if not node_df[id_column].is_unique:
log.warning(f"Column '{id_column}' contains duplicate values. Using first occurrence.")
node_df = node_df.drop_duplicates(subset=[id_column], keep="first")
self._nx_graph.add_nodes_from(node_df.set_index(id_column, drop=False).to_dict("index").items())
# Compute edge weights (handle both distance-based and similarity-based proximity)
if "similarity" in neighbors_df.columns:
neighbors_df["weight"] = neighbors_df["similarity"]
else:
max_distance = neighbors_df["distance"].max()
neighbors_df["weight"] = 1.0 - neighbors_df["distance"] / max_distance if max_distance > 0 else 1.0
# Add edges: guarantee min_edges per node, plus any above min_weight
log.info("Adding edges to the graph...")
for source_id, group in neighbors_df.groupby(id_column):
sorted_group = group.sort_values("weight", ascending=False)
n_top = min(len(sorted_group), min_edges)
top_edges = sorted_group.iloc[:n_top]
extra_edges = sorted_group.iloc[n_top:]
extra_edges = extra_edges[extra_edges["weight"] > min_weight]
edges = pd.concat([top_edges, extra_edges])
self._nx_graph.add_edges_from(
[(source_id, row["neighbor_id"], {"weight": row["weight"]}) for _, row in edges.iterrows()]
)
@property
def nx_graph(self) -> nx.Graph:
"""Get the NetworkX graph object.
Returns:
nx.Graph: The NetworkX graph object.
"""
return self._nx_graph
def load_graph(self, graph_path: str):
"""Load a graph from the GraphStore.
Args:
graph_path (str): The path to the graph in GraphStore.
"""
self._nx_graph = self.graph_store.get(graph_path)
def store_graph(self, graph_path: str):
"""Store the graph in the GraphStore.
Args:
graph_path (str): The path to store the graph in GraphStore.
"""
self.graph_store.upsert(graph_path, self._nx_graph)
def get_neighborhood(self, node_id, radius: int = 1) -> nx.Graph:
"""Get a subgraph containing nodes within a given number of hops around a node.
Args:
node_id: The ID of the center node.
radius (int): Number of hops (default: 1).
Returns:
nx.Graph: A subgraph containing the neighborhood.
"""
if node_id not in self._nx_graph:
raise ValueError(f"Node ID '{node_id}' not found in the graph.")
return nx.ego_graph(self._nx_graph, node_id, radius=radius)
|
nx_graph
property
Get the NetworkX graph object.
Returns:
| Type |
Description |
Graph
|
nx.Graph: The NetworkX graph object.
|
__init__()
Build a NetworkX graph using the Proximity class.
Source code in src/workbench/algorithms/graph/light/proximity_graph.py
| def __init__(self):
"""Build a NetworkX graph using the Proximity class."""
self._nx_graph = None
self.graph_store = GraphStore()
|
build_graph(prox, n_neighbors=5, min_edges=2, min_weight=0.8)
Build a NetworkX graph from a Proximity instance.
Parameters:
| Name |
Type |
Description |
Default |
prox
|
Proximity
|
An instance of a Proximity class.
|
required
|
n_neighbors
|
int
|
Number of neighbors to retrieve per node (default: 5).
|
5
|
min_edges
|
int
|
Minimum edges per node (default: 2).
|
2
|
min_weight
|
float
|
Weight threshold for additional edges beyond min_edges (default: 0.8).
|
0.8
|
Source code in src/workbench/algorithms/graph/light/proximity_graph.py
| def build_graph(self, prox: Proximity, n_neighbors: int = 5, min_edges: int = 2, min_weight: float = 0.8) -> None:
"""Build a NetworkX graph from a Proximity instance.
Args:
prox (Proximity): An instance of a Proximity class.
n_neighbors (int): Number of neighbors to retrieve per node (default: 5).
min_edges (int): Minimum edges per node (default: 2).
min_weight (float): Weight threshold for additional edges beyond min_edges (default: 0.8).
"""
node_df = prox.df
id_column = prox.id_column
# Get all neighbor pairs
log.info("Retrieving all neighbors...")
all_ids = node_df[id_column].tolist()
neighbors_df = prox.neighbors(all_ids, n_neighbors=n_neighbors, include_self=False)
# Build the graph and add nodes
log.info("Adding nodes to the proximity graph...")
self._nx_graph = nx.Graph()
# Handle duplicate IDs
if not node_df[id_column].is_unique:
log.warning(f"Column '{id_column}' contains duplicate values. Using first occurrence.")
node_df = node_df.drop_duplicates(subset=[id_column], keep="first")
self._nx_graph.add_nodes_from(node_df.set_index(id_column, drop=False).to_dict("index").items())
# Compute edge weights (handle both distance-based and similarity-based proximity)
if "similarity" in neighbors_df.columns:
neighbors_df["weight"] = neighbors_df["similarity"]
else:
max_distance = neighbors_df["distance"].max()
neighbors_df["weight"] = 1.0 - neighbors_df["distance"] / max_distance if max_distance > 0 else 1.0
# Add edges: guarantee min_edges per node, plus any above min_weight
log.info("Adding edges to the graph...")
for source_id, group in neighbors_df.groupby(id_column):
sorted_group = group.sort_values("weight", ascending=False)
n_top = min(len(sorted_group), min_edges)
top_edges = sorted_group.iloc[:n_top]
extra_edges = sorted_group.iloc[n_top:]
extra_edges = extra_edges[extra_edges["weight"] > min_weight]
edges = pd.concat([top_edges, extra_edges])
self._nx_graph.add_edges_from(
[(source_id, row["neighbor_id"], {"weight": row["weight"]}) for _, row in edges.iterrows()]
)
|
get_neighborhood(node_id, radius=1)
Get a subgraph containing nodes within a given number of hops around a node.
Parameters:
| Name |
Type |
Description |
Default |
node_id
|
|
The ID of the center node.
|
required
|
radius
|
int
|
Number of hops (default: 1).
|
1
|
Returns:
| Type |
Description |
Graph
|
nx.Graph: A subgraph containing the neighborhood.
|
Source code in src/workbench/algorithms/graph/light/proximity_graph.py
| def get_neighborhood(self, node_id, radius: int = 1) -> nx.Graph:
"""Get a subgraph containing nodes within a given number of hops around a node.
Args:
node_id: The ID of the center node.
radius (int): Number of hops (default: 1).
Returns:
nx.Graph: A subgraph containing the neighborhood.
"""
if node_id not in self._nx_graph:
raise ValueError(f"Node ID '{node_id}' not found in the graph.")
return nx.ego_graph(self._nx_graph, node_id, radius=radius)
|
load_graph(graph_path)
Load a graph from the GraphStore.
Parameters:
| Name |
Type |
Description |
Default |
graph_path
|
str
|
The path to the graph in GraphStore.
|
required
|
Source code in src/workbench/algorithms/graph/light/proximity_graph.py
| def load_graph(self, graph_path: str):
"""Load a graph from the GraphStore.
Args:
graph_path (str): The path to the graph in GraphStore.
"""
self._nx_graph = self.graph_store.get(graph_path)
|
store_graph(graph_path)
Store the graph in the GraphStore.
Parameters:
| Name |
Type |
Description |
Default |
graph_path
|
str
|
The path to store the graph in GraphStore.
|
required
|
Source code in src/workbench/algorithms/graph/light/proximity_graph.py
| def store_graph(self, graph_path: str):
"""Store the graph in the GraphStore.
Args:
graph_path (str): The path to store the graph in GraphStore.
"""
self.graph_store.upsert(graph_path, self._nx_graph)
|
Questions?

The SuperCowPowers team is happy to answer any questions you may have about AWS and Workbench. Please contact us at workbench@supercowpowers.com or on chat us up on Discord