Skip to content

SQL Algorithms

SQL Algorithms

One of the main benefit of SQL Algorithms is that the 'heavy lifting' is all done on the SQL Database, so if you have large datassets this is the place for you.

  • SQL: SQL queries that provide a wide range of functionality:

    • Outliers
    • Descriptive Stats
    • Correlations
    • and More Oulliers

SQL based Outliers: Compute outliers for all the columns in a DataSource using SQL

Outliers

Outliers: Class to compute outliers for all the columns in a DataSource using SQL

Source code in src/sageworks/algorithms/sql/outliers.py
class Outliers:
    """Outliers: Class to compute outliers for all the columns in a DataSource using SQL"""

    def __init__(self):
        """SQLOutliers Initialization"""
        self.outlier_group = "unknown"

    def compute_outliers(
        self, data_source: DataSourceAbstract, scale: float = 1.5, use_stddev: bool = False
    ) -> pd.DataFrame:
        """Compute outliers for all the numeric columns in a DataSource
        Args:
            data_source(DataSource): The DataSource that we're computing outliers on
            scale (float): The scale to use for either the IQR or stddev outlier calculation (default: 1.5)
            use_stddev (bool): Option to use the standard deviation for the outlier calculation (default: False)
        Returns:
            pd.DataFrame: A DataFrame of outliers for this DataSource
        Notes:
            Uses the IQR * 1.5 (~= 2.5 Sigma) (use 1.7 for ~= 3 Sigma)
            The scale parameter can be adjusted to change the IQR multiplier
        """

        # Note: If use_stddev is True, then the scale parameter needs to be adjusted
        if use_stddev and scale == 1.5:  # If the default scale is used, adjust it
            scale = 2.5

        # Compute the numeric outliers
        outlier_df = self._numeric_outliers(data_source, scale, use_stddev)

        # If there are no outliers, return a DataFrame with the computation columns but no rows
        if outlier_df is None:
            columns = data_source.view("computation").columns
            return pd.DataFrame(columns=columns + ["outlier_group"])

        # Get the top N outliers for each outlier group
        outlier_df = self.get_top_n_outliers(outlier_df)

        # Make sure the dataframe isn't too big, if it's too big sample it down
        if len(outlier_df) > 300:
            log.important(f"Outliers DataFrame is too large {len(outlier_df)}, sampling down to 300 rows")
            outlier_df = outlier_df.sample(300)

        # Sort by outlier_group and reset the index
        outlier_df = outlier_df.sort_values("outlier_group").reset_index(drop=True)

        # Shorten any long string values
        outlier_df = shorten_values(outlier_df)
        return outlier_df

    def _numeric_outliers(self, data_source: DataSourceAbstract, scale: float, use_stddev=False) -> pd.DataFrame:
        """Internal method to compute outliers for all numeric columns
        Args:
            data_source(DataSource): The DataSource that we're computing outliers on
            scale (float): The scale to use for the IQR outlier calculation
            use_stddev (bool): Option to use the standard deviation for the outlier calculation (default: False)
        Returns:
            pd.DataFrame: A DataFrame of all the outliers combined
        """

        # Grab the column stats and descriptive stats for this DataSource
        column_stats = data_source.column_stats()
        descriptive_stats = data_source.descriptive_stats()

        # If there are no numeric columns, return None
        if not descriptive_stats:
            log.warning("No numeric columns found in the current computation view of the DataSource")
            log.warning("If the data source was created from a DataFrame, ensure that the DataFrame was properly typed")
            log.warning("Recommendation: Properly type the DataFrame and recreate the SageWorks artifact")
            return None

        # Get the column names and types from the DataSource
        column_details = data_source.view("computation").column_details()

        # For every column in the data_source that is numeric get the outliers
        # This loop computes the columns, lower bounds, and upper bounds for the SQL query
        log.info("Computing Outliers for numeric columns...")
        numeric = ["tinyint", "smallint", "int", "bigint", "float", "double", "decimal"]
        columns = []
        lower_bounds = []
        upper_bounds = []
        for column, data_type in column_details.items():
            if data_type in numeric:
                # Skip columns that just have one value (or are all nans)
                if column_stats[column]["unique"] <= 1:
                    log.info(f"Skipping unary column {column} with value {descriptive_stats[column]['min']}")
                    continue

                # Skip columns that are 'binary' columns
                if column_stats[column]["unique"] == 2:
                    log.info(f"Skipping binary column {column}")
                    continue

                # Do they want to use the stddev instead of IQR?
                if use_stddev:
                    mean = descriptive_stats[column]["mean"]
                    stddev = descriptive_stats[column]["stddev"]
                    lower_bound = mean - (stddev * scale)
                    upper_bound = mean + (stddev * scale)

                # Compute the IQR for this column
                else:
                    iqr = descriptive_stats[column]["q3"] - descriptive_stats[column]["q1"]
                    lower_bound = descriptive_stats[column]["q1"] - (iqr * scale)
                    upper_bound = descriptive_stats[column]["q3"] + (iqr * scale)

                # Add the column, lower bound, and upper bound to the lists
                columns.append(column)
                lower_bounds.append(lower_bound)
                upper_bounds.append(upper_bound)

        # Compute the SQL query
        query = self._multi_column_outlier_query(data_source, columns, lower_bounds, upper_bounds)
        outlier_df = data_source.query(query)

        # Label the outlier groups
        outlier_df = self._label_outlier_groups(outlier_df, columns, lower_bounds, upper_bounds)
        return outlier_df

    @staticmethod
    def _multi_column_outlier_query(
        data_source: DataSourceAbstract, columns: list, lower_bounds: list, upper_bounds: list
    ) -> str:
        """Internal method to compute outliers for multiple columns
        Args:
            data_source(DataSource): The DataSource that we're computing outliers on
            columns(list): The columns to compute outliers on
            lower_bounds(list): The lower bounds for outliers
            upper_bounds(list): The upper bounds for outliers
        Returns:
            str: A SQL query to compute outliers for multiple columns
        """
        # Grab the DataSource computation table name
        table = data_source.view("computation").table

        # Get the column names and types from the DataSource
        column_details = data_source.view("computation").column_details()
        sql_columns = ", ".join([f'"{col}"' for col in column_details.keys()])

        query = f"SELECT {sql_columns} FROM {table} WHERE "
        for col, lb, ub in zip(columns, lower_bounds, upper_bounds):
            query += f"({col} < {lb} OR {col} > {ub}) OR "
        query = query[:-4]

        # Add a limit just in case
        query += " LIMIT 5000"
        return query

    @staticmethod
    def _label_outlier_groups(
        outlier_df: pd.DataFrame, columns: list, lower_bounds: list, upper_bounds: list
    ) -> pd.DataFrame:
        """Internal method to label outliers by group.
        Args:
            outlier_df(pd.DataFrame): The DataFrame of outliers
            columns(list): The columns for which to compute outliers
            lower_bounds(list): The lower bounds for each column
            upper_bounds(list): The upper bounds for each column
        Returns:
            pd.DataFrame: A DataFrame with an added 'outlier_group' column, indicating the type of outlier.
        """

        column_outlier_dfs = []
        for col, lb, ub in zip(columns, lower_bounds, upper_bounds):
            mask_low = outlier_df[col] < lb
            mask_high = outlier_df[col] > ub

            low_df = outlier_df[mask_low].copy()
            low_df["outlier_group"] = f"{col}_low"

            high_df = outlier_df[mask_high].copy()
            high_df["outlier_group"] = f"{col}_high"

            column_outlier_dfs.extend([low_df, high_df])

        # If there are no outliers, return the original DataFrame with an empty 'outlier_group' column
        if not column_outlier_dfs:
            log.critical("No outliers found in the data source.. probably something is wrong")
            return outlier_df.assign(outlier_group="")

        # Concatenate the DataFrames and return
        return pd.concat(column_outlier_dfs, ignore_index=True)

    @staticmethod
    def get_top_n_outliers(outlier_df: pd.DataFrame, n: int = 10) -> pd.DataFrame:
        """Function to retrieve the top N highest and lowest outliers for each outlier group.

        Args:
            outlier_df (pd.DataFrame): The DataFrame of outliers with 'outlier_group' column
            n (int): Number of top outliers to retrieve for each group, defaults to 10

        Returns:
            pd.DataFrame: A DataFrame containing the top N outliers for each outlier group
        """

        def get_extreme_values(group: pd.DataFrame) -> pd.DataFrame:
            """Helper function to get the top N extreme values from a group."""
            col, extreme_type = group.name.rsplit("_", 1)
            if extreme_type == "low":
                return group.nsmallest(n, col)
            else:
                return group.nlargest(n, col)

        # Group by 'outlier_group' and apply the helper function, explicitly selecting columns
        top_outliers = outlier_df.groupby("outlier_group", group_keys=False).apply(
            get_extreme_values, include_groups=True
        )
        return top_outliers.reset_index(drop=True)

__init__()

SQLOutliers Initialization

Source code in src/sageworks/algorithms/sql/outliers.py
def __init__(self):
    """SQLOutliers Initialization"""
    self.outlier_group = "unknown"

compute_outliers(data_source, scale=1.5, use_stddev=False)

Compute outliers for all the numeric columns in a DataSource Args: data_source(DataSource): The DataSource that we're computing outliers on scale (float): The scale to use for either the IQR or stddev outlier calculation (default: 1.5) use_stddev (bool): Option to use the standard deviation for the outlier calculation (default: False) Returns: pd.DataFrame: A DataFrame of outliers for this DataSource Notes: Uses the IQR * 1.5 (~= 2.5 Sigma) (use 1.7 for ~= 3 Sigma) The scale parameter can be adjusted to change the IQR multiplier

Source code in src/sageworks/algorithms/sql/outliers.py
def compute_outliers(
    self, data_source: DataSourceAbstract, scale: float = 1.5, use_stddev: bool = False
) -> pd.DataFrame:
    """Compute outliers for all the numeric columns in a DataSource
    Args:
        data_source(DataSource): The DataSource that we're computing outliers on
        scale (float): The scale to use for either the IQR or stddev outlier calculation (default: 1.5)
        use_stddev (bool): Option to use the standard deviation for the outlier calculation (default: False)
    Returns:
        pd.DataFrame: A DataFrame of outliers for this DataSource
    Notes:
        Uses the IQR * 1.5 (~= 2.5 Sigma) (use 1.7 for ~= 3 Sigma)
        The scale parameter can be adjusted to change the IQR multiplier
    """

    # Note: If use_stddev is True, then the scale parameter needs to be adjusted
    if use_stddev and scale == 1.5:  # If the default scale is used, adjust it
        scale = 2.5

    # Compute the numeric outliers
    outlier_df = self._numeric_outliers(data_source, scale, use_stddev)

    # If there are no outliers, return a DataFrame with the computation columns but no rows
    if outlier_df is None:
        columns = data_source.view("computation").columns
        return pd.DataFrame(columns=columns + ["outlier_group"])

    # Get the top N outliers for each outlier group
    outlier_df = self.get_top_n_outliers(outlier_df)

    # Make sure the dataframe isn't too big, if it's too big sample it down
    if len(outlier_df) > 300:
        log.important(f"Outliers DataFrame is too large {len(outlier_df)}, sampling down to 300 rows")
        outlier_df = outlier_df.sample(300)

    # Sort by outlier_group and reset the index
    outlier_df = outlier_df.sort_values("outlier_group").reset_index(drop=True)

    # Shorten any long string values
    outlier_df = shorten_values(outlier_df)
    return outlier_df

get_top_n_outliers(outlier_df, n=10) staticmethod

Function to retrieve the top N highest and lowest outliers for each outlier group.

Parameters:

Name Type Description Default
outlier_df DataFrame

The DataFrame of outliers with 'outlier_group' column

required
n int

Number of top outliers to retrieve for each group, defaults to 10

10

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame containing the top N outliers for each outlier group

Source code in src/sageworks/algorithms/sql/outliers.py
@staticmethod
def get_top_n_outliers(outlier_df: pd.DataFrame, n: int = 10) -> pd.DataFrame:
    """Function to retrieve the top N highest and lowest outliers for each outlier group.

    Args:
        outlier_df (pd.DataFrame): The DataFrame of outliers with 'outlier_group' column
        n (int): Number of top outliers to retrieve for each group, defaults to 10

    Returns:
        pd.DataFrame: A DataFrame containing the top N outliers for each outlier group
    """

    def get_extreme_values(group: pd.DataFrame) -> pd.DataFrame:
        """Helper function to get the top N extreme values from a group."""
        col, extreme_type = group.name.rsplit("_", 1)
        if extreme_type == "low":
            return group.nsmallest(n, col)
        else:
            return group.nlargest(n, col)

    # Group by 'outlier_group' and apply the helper function, explicitly selecting columns
    top_outliers = outlier_df.groupby("outlier_group", group_keys=False).apply(
        get_extreme_values, include_groups=True
    )
    return top_outliers.reset_index(drop=True)

SQL based Descriptive Stats: Compute Descriptive Stats for all the numeric columns in a DataSource using SQL

descriptive_stats(data_source)

Compute Descriptive Stats for all the numeric columns in a DataSource Args: data_source(DataSource): The DataSource that we're computing descriptive stats on Returns: dict(dict): A dictionary of descriptive stats for each column in this format {'col1': {'min': 0, 'q1': 1, 'median': 2, 'q3': 3, 'max': 4, 'mean': 2.5, 'stddev': 1.5}, 'col2': ...}

Source code in src/sageworks/algorithms/sql/descriptive_stats.py
def descriptive_stats(data_source: DataSourceAbstract) -> dict[dict]:
    """Compute Descriptive Stats for all the numeric columns in a DataSource
    Args:
        data_source(DataSource): The DataSource that we're computing descriptive stats on
    Returns:
        dict(dict): A dictionary of descriptive stats for each column in this format
             {'col1': {'min': 0, 'q1': 1, 'median': 2, 'q3': 3, 'max': 4, 'mean': 2.5, 'stddev': 1.5},
              'col2': ...}
    """
    # Grab the DataSource computation view table name
    table = data_source.view("computation").table

    # Figure out which columns are numeric
    num_type = ["double", "float", "int", "bigint", "smallint", "tinyint"]
    details = data_source.view("computation").column_details()
    numeric = [column for column, data_type in details.items() if data_type in num_type]

    # Sanity Check for numeric columns
    if len(numeric) == 0:
        log.warning("No numeric columns found in the current computation view of the DataSource")
        log.warning("If the data source was created from a DataFrame, ensure that the DataFrame was properly typed")
        log.warning("Recommendation: Properly type the DataFrame and recreate the SageWorks artifact")
        return {}

    # Build the query
    query = descriptive_stats_query(numeric, table)

    # Run the query
    log.debug(query)
    result_df = data_source.query(query)

    # Process the results
    # Note: The result_df is a DataFrame with a single row and a column for each stat metric
    stats_dict = result_df.to_dict(orient="index")[0]

    # Convert the dictionary to a nested dictionary
    # Note: The keys are in the format col1__col2
    nested_descriptive_stats = defaultdict(dict)
    for key, value in stats_dict.items():
        col1, col2 = key.split("___")
        nested_descriptive_stats[col1][col2] = value

    # Return the nested dictionary
    return dict(nested_descriptive_stats)

descriptive_stats_query(columns, table_name)

Build a query to compute the descriptive stats for all columns in a table Args: columns(list(str)): The columns to compute descriptive stats on table_name(str): The table to compute descriptive stats on Returns: str: The SQL query to compute descriptive stats

Source code in src/sageworks/algorithms/sql/descriptive_stats.py
def descriptive_stats_query(columns: list[str], table_name: str) -> str:
    """Build a query to compute the descriptive stats for all columns in a table
    Args:
        columns(list(str)): The columns to compute descriptive stats on
        table_name(str): The table to compute descriptive stats on
    Returns:
        str: The SQL query to compute descriptive stats
    """
    query = f"SELECT <<column_descriptive_stats>> FROM {table_name}"
    column_descriptive_stats = ""
    for c in columns:
        column_descriptive_stats += (
            f'min("{c}") AS "{c}___min", '
            f'approx_percentile("{c}", 0.25) AS "{c}___q1", '
            f'approx_percentile("{c}", 0.5) AS "{c}___median", '
            f'approx_percentile("{c}", 0.75) AS "{c}___q3", '
            f'max("{c}") AS "{c}___max", '
            f'avg("{c}") AS "{c}___mean", '
            f'stddev("{c}") AS "{c}___stddev", '
        )
    query = query.replace("<<column_descriptive_stats>>", column_descriptive_stats[:-2])

    # Return the query
    return query

SQL based Correlations: Compute Correlations for all the numeric columns in a DataSource using SQL

correlation_query(columns, table_name)

Build a query to compute the correlations between columns in a table Args: columns(list(str)): The columns to compute correlations on table_name(str): The table to compute correlations on Returns: str: The SQL query to compute correlations

Source code in src/sageworks/algorithms/sql/correlations.py
def correlation_query(columns: list[str], table_name: str) -> str:
    """Build a query to compute the correlations between columns in a table
    Args:
        columns(list(str)): The columns to compute correlations on
        table_name(str): The table to compute correlations on
    Returns:
        str: The SQL query to compute correlations
    """
    query = f"SELECT <<cross_correlations>> FROM {table_name}"
    cross_correlations = ""
    for c in columns:
        for d in columns:
            if c != d:
                cross_correlations += f'corr("{c}", "{d}") AS "{c}__{d}", '
    query = query.replace("<<cross_correlations>>", cross_correlations[:-2])

    # Return the query
    return query

correlations(data_source)

Compute Correlations for all the numeric columns in a DataSource Args: data_source(DataSource): The DataSource that we're computing correlations on Returns: dict(dict): A dictionary of correlations for each column in this format {'col1': {'col2': 0.5, 'col3': 0.9, 'col4': 0.4, ...}, 'col2': {'col1': 0.5, 'col3': 0.8, 'col4': 0.3, ...}}

Source code in src/sageworks/algorithms/sql/correlations.py
def correlations(data_source: DataSourceAbstract) -> dict[dict]:
    """Compute Correlations for all the numeric columns in a DataSource
    Args:
        data_source(DataSource): The DataSource that we're computing correlations on
    Returns:
        dict(dict): A dictionary of correlations for each column in this format
             {'col1': {'col2': 0.5, 'col3': 0.9, 'col4': 0.4, ...},
              'col2': {'col1': 0.5, 'col3': 0.8, 'col4': 0.3, ...}}
    """
    data_source.log.info("Computing Correlations for numeric columns...")

    # Figure out which columns are numeric
    num_type = ["double", "float", "int", "bigint", "smallint", "tinyint"]
    details = data_source.view("computation").column_details()

    # Get the numeric columns
    numeric = [column for column, data_type in details.items() if data_type in num_type]

    # If we have at least two numeric columns, compute the correlations
    if len(numeric) < 2:
        return {}

    # Grab the DataSource computation table name
    table = data_source.view("computation").table

    # Build the query
    query = correlation_query(numeric, table)

    # Run the query
    log.debug(query)
    result_df = data_source.query(query)

    # Drop any columns that have NaNs
    result_df = result_df.dropna(axis=1)

    # Process the results
    # Note: The result_df is a DataFrame with a single row and a column for each pairwise correlation
    correlation_dict = result_df.to_dict(orient="index")[0]

    # Convert the dictionary to a nested dictionary
    # Note: The keys are in the format col1__col2
    nested_corr = defaultdict(dict)
    for key, value in correlation_dict.items():
        col1, col2 = key.split("__")
        nested_corr[col1][col2] = value

    # Sort the nested dictionaries
    sorted_dict = {}
    for key, sub_dict in nested_corr.items():
        sorted_dict[key] = {k: v for k, v in sorted(sub_dict.items(), key=lambda item: item[1], reverse=True)}
    return sorted_dict

Questions?

The SuperCowPowers team is happy to answer any questions you may have about AWS and SageWorks. Please contact us at sageworks@supercowpowers.com or on chat us up on Discord