Skip to content

Graph Algorithms

Graph Algorithms

WIP: These classes are currently actively being developed and are subject to change in both API and functionality over time.

Graph Algorithms

Docs TBD

ProximityGraph

Build a NetworkX graph using the Proximity class.

Source code in src/workbench/algorithms/graph/light/proximity_graph.py
class ProximityGraph:
    """Build a NetworkX graph using the Proximity class."""

    def __init__(self):
        """Build a NetworkX graph using the Proximity class."""
        self._nx_graph = None
        self.graph_store = GraphStore()

    def build_graph(self, prox: Proximity, n_neighbors: int = 5, min_edges: int = 2, min_weight: float = 0.8) -> None:
        """Build a NetworkX graph from a Proximity instance.

        Args:
            prox (Proximity): An instance of a Proximity class.
            n_neighbors (int): Number of neighbors to retrieve per node (default: 5).
            min_edges (int): Minimum edges per node (default: 2).
            min_weight (float): Weight threshold for additional edges beyond min_edges (default: 0.8).
        """
        node_df = prox.df
        id_column = prox.id_column

        # Get all neighbor pairs
        log.info("Retrieving all neighbors...")
        all_ids = node_df[id_column].tolist()
        neighbors_df = prox.neighbors(all_ids, n_neighbors=n_neighbors, include_self=False)

        # Build the graph and add nodes
        log.info("Adding nodes to the proximity graph...")
        self._nx_graph = nx.Graph()

        # Handle duplicate IDs
        if not node_df[id_column].is_unique:
            log.warning(f"Column '{id_column}' contains duplicate values. Using first occurrence.")
            node_df = node_df.drop_duplicates(subset=[id_column], keep="first")

        self._nx_graph.add_nodes_from(node_df.set_index(id_column, drop=False).to_dict("index").items())

        # Compute edge weights (handle both distance-based and similarity-based proximity)
        if "similarity" in neighbors_df.columns:
            neighbors_df["weight"] = neighbors_df["similarity"]
        else:
            max_distance = neighbors_df["distance"].max()
            neighbors_df["weight"] = 1.0 - neighbors_df["distance"] / max_distance if max_distance > 0 else 1.0

        # Add edges: guarantee min_edges per node, plus any above min_weight
        log.info("Adding edges to the graph...")
        for source_id, group in neighbors_df.groupby(id_column):
            sorted_group = group.sort_values("weight", ascending=False)
            n_top = min(len(sorted_group), min_edges)
            top_edges = sorted_group.iloc[:n_top]
            extra_edges = sorted_group.iloc[n_top:]
            extra_edges = extra_edges[extra_edges["weight"] > min_weight]
            edges = pd.concat([top_edges, extra_edges])
            self._nx_graph.add_edges_from(
                [(source_id, row["neighbor_id"], {"weight": row["weight"]}) for _, row in edges.iterrows()]
            )

    @property
    def nx_graph(self) -> nx.Graph:
        """Get the NetworkX graph object.

        Returns:
            nx.Graph: The NetworkX graph object.
        """
        return self._nx_graph

    def load_graph(self, graph_path: str):
        """Load a graph from the GraphStore.

        Args:
            graph_path (str): The path to the graph in GraphStore.
        """
        self._nx_graph = self.graph_store.get(graph_path)

    def store_graph(self, graph_path: str):
        """Store the graph in the GraphStore.

        Args:
            graph_path (str): The path to store the graph in GraphStore.
        """
        self.graph_store.upsert(graph_path, self._nx_graph)

    def get_neighborhood(self, node_id, radius: int = 1) -> nx.Graph:
        """Get a subgraph containing nodes within a given number of hops around a node.

        Args:
            node_id: The ID of the center node.
            radius (int): Number of hops (default: 1).

        Returns:
            nx.Graph: A subgraph containing the neighborhood.
        """
        if node_id not in self._nx_graph:
            raise ValueError(f"Node ID '{node_id}' not found in the graph.")
        return nx.ego_graph(self._nx_graph, node_id, radius=radius)

nx_graph property

Get the NetworkX graph object.

Returns:

Type Description
Graph

nx.Graph: The NetworkX graph object.

__init__()

Build a NetworkX graph using the Proximity class.

Source code in src/workbench/algorithms/graph/light/proximity_graph.py
def __init__(self):
    """Build a NetworkX graph using the Proximity class."""
    self._nx_graph = None
    self.graph_store = GraphStore()

build_graph(prox, n_neighbors=5, min_edges=2, min_weight=0.8)

Build a NetworkX graph from a Proximity instance.

Parameters:

Name Type Description Default
prox Proximity

An instance of a Proximity class.

required
n_neighbors int

Number of neighbors to retrieve per node (default: 5).

5
min_edges int

Minimum edges per node (default: 2).

2
min_weight float

Weight threshold for additional edges beyond min_edges (default: 0.8).

0.8
Source code in src/workbench/algorithms/graph/light/proximity_graph.py
def build_graph(self, prox: Proximity, n_neighbors: int = 5, min_edges: int = 2, min_weight: float = 0.8) -> None:
    """Build a NetworkX graph from a Proximity instance.

    Args:
        prox (Proximity): An instance of a Proximity class.
        n_neighbors (int): Number of neighbors to retrieve per node (default: 5).
        min_edges (int): Minimum edges per node (default: 2).
        min_weight (float): Weight threshold for additional edges beyond min_edges (default: 0.8).
    """
    node_df = prox.df
    id_column = prox.id_column

    # Get all neighbor pairs
    log.info("Retrieving all neighbors...")
    all_ids = node_df[id_column].tolist()
    neighbors_df = prox.neighbors(all_ids, n_neighbors=n_neighbors, include_self=False)

    # Build the graph and add nodes
    log.info("Adding nodes to the proximity graph...")
    self._nx_graph = nx.Graph()

    # Handle duplicate IDs
    if not node_df[id_column].is_unique:
        log.warning(f"Column '{id_column}' contains duplicate values. Using first occurrence.")
        node_df = node_df.drop_duplicates(subset=[id_column], keep="first")

    self._nx_graph.add_nodes_from(node_df.set_index(id_column, drop=False).to_dict("index").items())

    # Compute edge weights (handle both distance-based and similarity-based proximity)
    if "similarity" in neighbors_df.columns:
        neighbors_df["weight"] = neighbors_df["similarity"]
    else:
        max_distance = neighbors_df["distance"].max()
        neighbors_df["weight"] = 1.0 - neighbors_df["distance"] / max_distance if max_distance > 0 else 1.0

    # Add edges: guarantee min_edges per node, plus any above min_weight
    log.info("Adding edges to the graph...")
    for source_id, group in neighbors_df.groupby(id_column):
        sorted_group = group.sort_values("weight", ascending=False)
        n_top = min(len(sorted_group), min_edges)
        top_edges = sorted_group.iloc[:n_top]
        extra_edges = sorted_group.iloc[n_top:]
        extra_edges = extra_edges[extra_edges["weight"] > min_weight]
        edges = pd.concat([top_edges, extra_edges])
        self._nx_graph.add_edges_from(
            [(source_id, row["neighbor_id"], {"weight": row["weight"]}) for _, row in edges.iterrows()]
        )

get_neighborhood(node_id, radius=1)

Get a subgraph containing nodes within a given number of hops around a node.

Parameters:

Name Type Description Default
node_id

The ID of the center node.

required
radius int

Number of hops (default: 1).

1

Returns:

Type Description
Graph

nx.Graph: A subgraph containing the neighborhood.

Source code in src/workbench/algorithms/graph/light/proximity_graph.py
def get_neighborhood(self, node_id, radius: int = 1) -> nx.Graph:
    """Get a subgraph containing nodes within a given number of hops around a node.

    Args:
        node_id: The ID of the center node.
        radius (int): Number of hops (default: 1).

    Returns:
        nx.Graph: A subgraph containing the neighborhood.
    """
    if node_id not in self._nx_graph:
        raise ValueError(f"Node ID '{node_id}' not found in the graph.")
    return nx.ego_graph(self._nx_graph, node_id, radius=radius)

load_graph(graph_path)

Load a graph from the GraphStore.

Parameters:

Name Type Description Default
graph_path str

The path to the graph in GraphStore.

required
Source code in src/workbench/algorithms/graph/light/proximity_graph.py
def load_graph(self, graph_path: str):
    """Load a graph from the GraphStore.

    Args:
        graph_path (str): The path to the graph in GraphStore.
    """
    self._nx_graph = self.graph_store.get(graph_path)

store_graph(graph_path)

Store the graph in the GraphStore.

Parameters:

Name Type Description Default
graph_path str

The path to store the graph in GraphStore.

required
Source code in src/workbench/algorithms/graph/light/proximity_graph.py
def store_graph(self, graph_path: str):
    """Store the graph in the GraphStore.

    Args:
        graph_path (str): The path to store the graph in GraphStore.
    """
    self.graph_store.upsert(graph_path, self._nx_graph)

Questions?

The SuperCowPowers team is happy to answer any questions you may have about AWS and Workbench. Please contact us at workbench@supercowpowers.com or on chat us up on Discord