Skip to content

DataSource Abstract

API Classes

Found a method here you want to use? The API Classes have method pass-through so just call the method on the DataSource API Class and voilĂ  it works the same.

The DataSource Abstract class is a base/abstract class that defines API implemented by all the child classes (currently just AthenaSource but later RDSSource, FutureThing ).

DataSourceAbstract: Abstract Base Class for all data sources (S3: CSV, JSONL, Parquet, RDS, etc)

DataSourceAbstract

Bases: Artifact

Source code in src/workbench/core/artifacts/data_source_abstract.py
class DataSourceAbstract(Artifact):
    def __init__(self, data_uuid: str, database: str = "workbench", **kwargs):
        """DataSourceAbstract: Abstract Base Class for all data sources
        Args:
            data_uuid(str): The UUID for this Data Source
            database(str): The database to use for this Data Source (default: workbench)
        """

        # Call superclass init
        super().__init__(data_uuid, **kwargs)

        # Set up our instance attributes
        self._database = database
        self._table_name = data_uuid

    def __post_init__(self):
        # Call superclass post_init
        super().__post_init__()

    @deprecated(version="0.9")
    def get_database(self) -> str:
        """Get the database for this Data Source"""
        return self._database

    @property
    def database(self) -> str:
        """Get the database for this Data Source"""
        return self._database

    @property
    def table(self) -> str:
        """Get the base table name for this Data Source"""
        return self._table_name

    @abstractmethod
    def num_rows(self) -> int:
        """Return the number of rows for this Data Source"""
        pass

    @abstractmethod
    def num_columns(self) -> int:
        """Return the number of columns for this Data Source"""
        pass

    @property
    @abstractmethod
    def columns(self) -> list[str]:
        """Return the column names for this Data Source"""
        pass

    @property
    @abstractmethod
    def column_types(self) -> list[str]:
        """Return the column types for this Data Source"""
        pass

    def column_details(self) -> dict:
        """Return the column details for this Data Source

        Returns:
            dict: The column details for this Data Source
        """
        return dict(zip(self.columns, self.column_types))

    def views(self) -> list[str]:
        """Return the views for this Data Source"""
        from workbench.core.views.view_utils import list_views

        return list_views(self)

    def view(self, view_name: str) -> "View":
        """Return a DataFrame for a specific view
        Args:
            view_name (str): The name of the view to return
        Returns:
            pd.DataFrame: A DataFrame for the specified view
        """
        from workbench.core.views import View

        return View(self, view_name)

    def set_display_columns(self, diplay_columns: list[str]):
        """Set the display columns for this Data Source

        Args:
            diplay_columns (list[str]): The display columns for this Data Source
        """
        # Check mismatch of display columns to computation columns
        c_view = self.view("computation")
        computation_columns = c_view.columns
        mismatch_columns = [col for col in diplay_columns if col not in computation_columns]
        if mismatch_columns:
            self.log.monitor(f"Display View/Computation mismatch: {mismatch_columns}")

        self.log.important(f"Setting Display Columns...{diplay_columns}")
        from workbench.core.views import DisplayView

        # Create a NEW display view
        DisplayView.create(self, source_table=c_view.table, column_list=diplay_columns)

    def set_computation_columns(self, computation_columns: list[str], recompute_stats: bool = True):
        """Set the computation columns for this Data Source

        Args:
            computation_columns (list[str]): The computation columns for this Data Source
            recompute_stats (bool): Recomputes all the stats for this Data Source (default: True)
        """
        self.log.important(f"Setting Computation Columns...{computation_columns}")
        from workbench.core.views import ComputationView

        # Create a NEW computation view
        ComputationView.create(self, column_list=computation_columns)
        if recompute_stats:
            self.recompute_stats()

    def _create_display_view(self):
        """Internal: Create the Display View for this DataSource"""
        from workbench.core.views import View

        View(self, "display")

    @abstractmethod
    def query(self, query: str) -> pd.DataFrame:
        """Query the DataSourceAbstract
        Args:
            query(str): The SQL query to execute
        """
        pass

    @abstractmethod
    def execute_statement(self, query: str):
        """Execute an SQL statement that doesn't return a result
        Args:
            query(str): The SQL statement to execute
        """
        pass

    @abstractmethod
    def sample(self) -> pd.DataFrame:
        """Return a sample DataFrame from this DataSourceAbstract

        Returns:
            pd.DataFrame: A sample DataFrame from this DataSource
        """
        pass

    @abstractmethod
    def descriptive_stats(self, recompute: bool = False) -> dict[dict]:
        """Compute Descriptive Stats for all the numeric columns in a DataSource
        Args:
            recompute (bool): Recompute the descriptive stats (default: False)
        Returns:
            dict(dict): A dictionary of descriptive stats for each column in the form
                 {'col1': {'min': 0, 'q1': 1, 'median': 2, 'q3': 3, 'max': 4},
                  'col2': ...}
        """
        pass

    @abstractmethod
    def outliers(self, scale: float = 1.5) -> pd.DataFrame:
        """Return a DataFrame of outliers from this DataSource

        Args:
            scale (float): The scale to use for the IQR (default: 1.5)

        Returns:
            pd.DataFrame: A DataFrame of outliers from this DataSource

        Notes:
            Uses the IQR * 1.5 (~= 2.5 Sigma) method to compute outliers
            The scale parameter can be adjusted to change the IQR multiplier
        """
        pass

    @abstractmethod
    def smart_sample(self) -> pd.DataFrame:
        """Get a SMART sample dataframe from this DataSource
        Returns:
            pd.DataFrame: A combined DataFrame of sample data + outliers
        """
        pass

    @abstractmethod
    def value_counts(self, recompute: bool = False) -> dict[dict]:
        """Compute 'value_counts' for all the string columns in a DataSource
        Args:
            recompute (bool): Recompute the value counts (default: False)
        Returns:
            dict(dict): A dictionary of value counts for each column in the form
                 {'col1': {'value_1': X, 'value_2': Y, 'value_3': Z,...},
                  'col2': ...}
        """
        pass

    @abstractmethod
    def column_stats(self, recompute: bool = False) -> dict[dict]:
        """Compute Column Stats for all the columns in a DataSource
        Args:
            recompute (bool): Recompute the column stats (default: False)
        Returns:
            dict(dict): A dictionary of stats for each column this format
            NB: String columns will NOT have num_zeros and descriptive stats
             {'col1': {'dtype': 'string', 'unique': 4321, 'nulls': 12},
              'col2': {'dtype': 'int', 'unique': 4321, 'nulls': 12, 'num_zeros': 100, 'descriptive_stats': {...}},
              ...}
        """
        pass

    @abstractmethod
    def correlations(self, recompute: bool = False) -> dict[dict]:
        """Compute Correlations for all the numeric columns in a DataSource

        Args:
            recompute (bool): Recompute the column stats (default: False)

        Returns:
            dict(dict): A dictionary of correlations for each column in this format
                 {'col1': {'col2': 0.5, 'col3': 0.9, 'col4': 0.4, ...},
                  'col2': {'col1': 0.5, 'col3': 0.8, 'col4': 0.3, ...}}
        """
        pass

    def details(self) -> dict:
        """Additional Details about this DataSourceAbstract Artifact"""
        details = self.summary()
        details["num_rows"] = self.num_rows()
        details["num_columns"] = self.num_columns()
        details["column_details"] = self.column_details()
        return details

    def expected_meta(self) -> list[str]:
        """DataSources have quite a bit of expected Metadata for EDA displays"""

        # For DataSources, we expect to see the following metadata
        expected_meta = [
            # FIXME: Revisit this
            # "workbench_details",
            "workbench_descriptive_stats",
            "workbench_value_counts",
            "workbench_correlations",
            "workbench_column_stats",
        ]
        return expected_meta

    def ready(self) -> bool:
        """Is the DataSource ready?"""

        # Check if the Artifact is ready
        if not super().ready():
            return False

        # If we don't have a smart_sample we're probably not ready
        if not self.df_cache.check(f"{self.uuid}/smart_sample"):
            self.log.warning(f"DataSource {self.uuid} not ready...")
            return False

        # Okay so we have sample, outliers, and smart_sample so we are ready
        return True

    def onboard(self) -> bool:
        """This is a BLOCKING method that will onboard the data source (make it ready)

        Returns:
            bool: True if the DataSource was onboarded successfully
        """
        self.log.important(f"Onboarding {self.uuid}...")
        self.set_status("onboarding")
        self.remove_health_tag("needs_onboard")

        # Make sure our display view actually exists
        self.view("display").ensure_exists()

        # Recompute the stats
        self.recompute_stats()

        # Run a health check and refresh the meta
        time.sleep(2)  # Give the AWS Metadata a chance to update
        self.health_check()
        self.refresh_meta()
        self.details(recompute=True)
        self.set_status("ready")
        return True

    def recompute_stats(self) -> bool:
        """This is a BLOCKING method that will recompute the stats for the data source

        Returns:
            bool: True if the DataSource stats were recomputed successfully
        """
        self.log.important(f"Recomputing Stats {self.uuid}...")

        # Make sure our computation view actually exists
        self.view("computation").ensure_exists()

        # Compute the sample, column stats, outliers, and smart_sample
        self.df_cache.delete(f"{self.uuid}/sample")
        self.sample()
        self.column_stats(recompute=True)
        self.refresh_meta()  # Refresh the meta since outliers needs descriptive_stats and value_counts
        self.df_cache.delete(f"{self.uuid}/outliers")
        self.outliers()
        self.df_cache.delete(f"{self.uuid}/smart_sample")
        self.smart_sample()
        return True

column_types abstractmethod property

Return the column types for this Data Source

columns abstractmethod property

Return the column names for this Data Source

database property

Get the database for this Data Source

table property

Get the base table name for this Data Source

__init__(data_uuid, database='workbench', **kwargs)

DataSourceAbstract: Abstract Base Class for all data sources Args: data_uuid(str): The UUID for this Data Source database(str): The database to use for this Data Source (default: workbench)

Source code in src/workbench/core/artifacts/data_source_abstract.py
def __init__(self, data_uuid: str, database: str = "workbench", **kwargs):
    """DataSourceAbstract: Abstract Base Class for all data sources
    Args:
        data_uuid(str): The UUID for this Data Source
        database(str): The database to use for this Data Source (default: workbench)
    """

    # Call superclass init
    super().__init__(data_uuid, **kwargs)

    # Set up our instance attributes
    self._database = database
    self._table_name = data_uuid

column_details()

Return the column details for this Data Source

Returns:

Name Type Description
dict dict

The column details for this Data Source

Source code in src/workbench/core/artifacts/data_source_abstract.py
def column_details(self) -> dict:
    """Return the column details for this Data Source

    Returns:
        dict: The column details for this Data Source
    """
    return dict(zip(self.columns, self.column_types))

column_stats(recompute=False) abstractmethod

Compute Column Stats for all the columns in a DataSource Args: recompute (bool): Recompute the column stats (default: False) Returns: dict(dict): A dictionary of stats for each column this format NB: String columns will NOT have num_zeros and descriptive stats {'col1': {'dtype': 'string', 'unique': 4321, 'nulls': 12}, 'col2': {'dtype': 'int', 'unique': 4321, 'nulls': 12, 'num_zeros': 100, 'descriptive_stats': {...}}, ...}

Source code in src/workbench/core/artifacts/data_source_abstract.py
@abstractmethod
def column_stats(self, recompute: bool = False) -> dict[dict]:
    """Compute Column Stats for all the columns in a DataSource
    Args:
        recompute (bool): Recompute the column stats (default: False)
    Returns:
        dict(dict): A dictionary of stats for each column this format
        NB: String columns will NOT have num_zeros and descriptive stats
         {'col1': {'dtype': 'string', 'unique': 4321, 'nulls': 12},
          'col2': {'dtype': 'int', 'unique': 4321, 'nulls': 12, 'num_zeros': 100, 'descriptive_stats': {...}},
          ...}
    """
    pass

correlations(recompute=False) abstractmethod

Compute Correlations for all the numeric columns in a DataSource

Parameters:

Name Type Description Default
recompute bool

Recompute the column stats (default: False)

False

Returns:

Name Type Description
dict dict

A dictionary of correlations for each column in this format {'col1': {'col2': 0.5, 'col3': 0.9, 'col4': 0.4, ...}, 'col2': {'col1': 0.5, 'col3': 0.8, 'col4': 0.3, ...}}

Source code in src/workbench/core/artifacts/data_source_abstract.py
@abstractmethod
def correlations(self, recompute: bool = False) -> dict[dict]:
    """Compute Correlations for all the numeric columns in a DataSource

    Args:
        recompute (bool): Recompute the column stats (default: False)

    Returns:
        dict(dict): A dictionary of correlations for each column in this format
             {'col1': {'col2': 0.5, 'col3': 0.9, 'col4': 0.4, ...},
              'col2': {'col1': 0.5, 'col3': 0.8, 'col4': 0.3, ...}}
    """
    pass

descriptive_stats(recompute=False) abstractmethod

Compute Descriptive Stats for all the numeric columns in a DataSource Args: recompute (bool): Recompute the descriptive stats (default: False) Returns: dict(dict): A dictionary of descriptive stats for each column in the form {'col1': {'min': 0, 'q1': 1, 'median': 2, 'q3': 3, 'max': 4}, 'col2': ...}

Source code in src/workbench/core/artifacts/data_source_abstract.py
@abstractmethod
def descriptive_stats(self, recompute: bool = False) -> dict[dict]:
    """Compute Descriptive Stats for all the numeric columns in a DataSource
    Args:
        recompute (bool): Recompute the descriptive stats (default: False)
    Returns:
        dict(dict): A dictionary of descriptive stats for each column in the form
             {'col1': {'min': 0, 'q1': 1, 'median': 2, 'q3': 3, 'max': 4},
              'col2': ...}
    """
    pass

details()

Additional Details about this DataSourceAbstract Artifact

Source code in src/workbench/core/artifacts/data_source_abstract.py
def details(self) -> dict:
    """Additional Details about this DataSourceAbstract Artifact"""
    details = self.summary()
    details["num_rows"] = self.num_rows()
    details["num_columns"] = self.num_columns()
    details["column_details"] = self.column_details()
    return details

execute_statement(query) abstractmethod

Execute an SQL statement that doesn't return a result Args: query(str): The SQL statement to execute

Source code in src/workbench/core/artifacts/data_source_abstract.py
@abstractmethod
def execute_statement(self, query: str):
    """Execute an SQL statement that doesn't return a result
    Args:
        query(str): The SQL statement to execute
    """
    pass

expected_meta()

DataSources have quite a bit of expected Metadata for EDA displays

Source code in src/workbench/core/artifacts/data_source_abstract.py
def expected_meta(self) -> list[str]:
    """DataSources have quite a bit of expected Metadata for EDA displays"""

    # For DataSources, we expect to see the following metadata
    expected_meta = [
        # FIXME: Revisit this
        # "workbench_details",
        "workbench_descriptive_stats",
        "workbench_value_counts",
        "workbench_correlations",
        "workbench_column_stats",
    ]
    return expected_meta

get_database()

Get the database for this Data Source

Source code in src/workbench/core/artifacts/data_source_abstract.py
@deprecated(version="0.9")
def get_database(self) -> str:
    """Get the database for this Data Source"""
    return self._database

num_columns() abstractmethod

Return the number of columns for this Data Source

Source code in src/workbench/core/artifacts/data_source_abstract.py
@abstractmethod
def num_columns(self) -> int:
    """Return the number of columns for this Data Source"""
    pass

num_rows() abstractmethod

Return the number of rows for this Data Source

Source code in src/workbench/core/artifacts/data_source_abstract.py
@abstractmethod
def num_rows(self) -> int:
    """Return the number of rows for this Data Source"""
    pass

onboard()

This is a BLOCKING method that will onboard the data source (make it ready)

Returns:

Name Type Description
bool bool

True if the DataSource was onboarded successfully

Source code in src/workbench/core/artifacts/data_source_abstract.py
def onboard(self) -> bool:
    """This is a BLOCKING method that will onboard the data source (make it ready)

    Returns:
        bool: True if the DataSource was onboarded successfully
    """
    self.log.important(f"Onboarding {self.uuid}...")
    self.set_status("onboarding")
    self.remove_health_tag("needs_onboard")

    # Make sure our display view actually exists
    self.view("display").ensure_exists()

    # Recompute the stats
    self.recompute_stats()

    # Run a health check and refresh the meta
    time.sleep(2)  # Give the AWS Metadata a chance to update
    self.health_check()
    self.refresh_meta()
    self.details(recompute=True)
    self.set_status("ready")
    return True

outliers(scale=1.5) abstractmethod

Return a DataFrame of outliers from this DataSource

Parameters:

Name Type Description Default
scale float

The scale to use for the IQR (default: 1.5)

1.5

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame of outliers from this DataSource

Notes

Uses the IQR * 1.5 (~= 2.5 Sigma) method to compute outliers The scale parameter can be adjusted to change the IQR multiplier

Source code in src/workbench/core/artifacts/data_source_abstract.py
@abstractmethod
def outliers(self, scale: float = 1.5) -> pd.DataFrame:
    """Return a DataFrame of outliers from this DataSource

    Args:
        scale (float): The scale to use for the IQR (default: 1.5)

    Returns:
        pd.DataFrame: A DataFrame of outliers from this DataSource

    Notes:
        Uses the IQR * 1.5 (~= 2.5 Sigma) method to compute outliers
        The scale parameter can be adjusted to change the IQR multiplier
    """
    pass

query(query) abstractmethod

Query the DataSourceAbstract Args: query(str): The SQL query to execute

Source code in src/workbench/core/artifacts/data_source_abstract.py
@abstractmethod
def query(self, query: str) -> pd.DataFrame:
    """Query the DataSourceAbstract
    Args:
        query(str): The SQL query to execute
    """
    pass

ready()

Is the DataSource ready?

Source code in src/workbench/core/artifacts/data_source_abstract.py
def ready(self) -> bool:
    """Is the DataSource ready?"""

    # Check if the Artifact is ready
    if not super().ready():
        return False

    # If we don't have a smart_sample we're probably not ready
    if not self.df_cache.check(f"{self.uuid}/smart_sample"):
        self.log.warning(f"DataSource {self.uuid} not ready...")
        return False

    # Okay so we have sample, outliers, and smart_sample so we are ready
    return True

recompute_stats()

This is a BLOCKING method that will recompute the stats for the data source

Returns:

Name Type Description
bool bool

True if the DataSource stats were recomputed successfully

Source code in src/workbench/core/artifacts/data_source_abstract.py
def recompute_stats(self) -> bool:
    """This is a BLOCKING method that will recompute the stats for the data source

    Returns:
        bool: True if the DataSource stats were recomputed successfully
    """
    self.log.important(f"Recomputing Stats {self.uuid}...")

    # Make sure our computation view actually exists
    self.view("computation").ensure_exists()

    # Compute the sample, column stats, outliers, and smart_sample
    self.df_cache.delete(f"{self.uuid}/sample")
    self.sample()
    self.column_stats(recompute=True)
    self.refresh_meta()  # Refresh the meta since outliers needs descriptive_stats and value_counts
    self.df_cache.delete(f"{self.uuid}/outliers")
    self.outliers()
    self.df_cache.delete(f"{self.uuid}/smart_sample")
    self.smart_sample()
    return True

sample() abstractmethod

Return a sample DataFrame from this DataSourceAbstract

Returns:

Type Description
DataFrame

pd.DataFrame: A sample DataFrame from this DataSource

Source code in src/workbench/core/artifacts/data_source_abstract.py
@abstractmethod
def sample(self) -> pd.DataFrame:
    """Return a sample DataFrame from this DataSourceAbstract

    Returns:
        pd.DataFrame: A sample DataFrame from this DataSource
    """
    pass

set_computation_columns(computation_columns, recompute_stats=True)

Set the computation columns for this Data Source

Parameters:

Name Type Description Default
computation_columns list[str]

The computation columns for this Data Source

required
recompute_stats bool

Recomputes all the stats for this Data Source (default: True)

True
Source code in src/workbench/core/artifacts/data_source_abstract.py
def set_computation_columns(self, computation_columns: list[str], recompute_stats: bool = True):
    """Set the computation columns for this Data Source

    Args:
        computation_columns (list[str]): The computation columns for this Data Source
        recompute_stats (bool): Recomputes all the stats for this Data Source (default: True)
    """
    self.log.important(f"Setting Computation Columns...{computation_columns}")
    from workbench.core.views import ComputationView

    # Create a NEW computation view
    ComputationView.create(self, column_list=computation_columns)
    if recompute_stats:
        self.recompute_stats()

set_display_columns(diplay_columns)

Set the display columns for this Data Source

Parameters:

Name Type Description Default
diplay_columns list[str]

The display columns for this Data Source

required
Source code in src/workbench/core/artifacts/data_source_abstract.py
def set_display_columns(self, diplay_columns: list[str]):
    """Set the display columns for this Data Source

    Args:
        diplay_columns (list[str]): The display columns for this Data Source
    """
    # Check mismatch of display columns to computation columns
    c_view = self.view("computation")
    computation_columns = c_view.columns
    mismatch_columns = [col for col in diplay_columns if col not in computation_columns]
    if mismatch_columns:
        self.log.monitor(f"Display View/Computation mismatch: {mismatch_columns}")

    self.log.important(f"Setting Display Columns...{diplay_columns}")
    from workbench.core.views import DisplayView

    # Create a NEW display view
    DisplayView.create(self, source_table=c_view.table, column_list=diplay_columns)

smart_sample() abstractmethod

Get a SMART sample dataframe from this DataSource Returns: pd.DataFrame: A combined DataFrame of sample data + outliers

Source code in src/workbench/core/artifacts/data_source_abstract.py
@abstractmethod
def smart_sample(self) -> pd.DataFrame:
    """Get a SMART sample dataframe from this DataSource
    Returns:
        pd.DataFrame: A combined DataFrame of sample data + outliers
    """
    pass

value_counts(recompute=False) abstractmethod

Compute 'value_counts' for all the string columns in a DataSource Args: recompute (bool): Recompute the value counts (default: False) Returns: dict(dict): A dictionary of value counts for each column in the form {'col1': {'value_1': X, 'value_2': Y, 'value_3': Z,...}, 'col2': ...}

Source code in src/workbench/core/artifacts/data_source_abstract.py
@abstractmethod
def value_counts(self, recompute: bool = False) -> dict[dict]:
    """Compute 'value_counts' for all the string columns in a DataSource
    Args:
        recompute (bool): Recompute the value counts (default: False)
    Returns:
        dict(dict): A dictionary of value counts for each column in the form
             {'col1': {'value_1': X, 'value_2': Y, 'value_3': Z,...},
              'col2': ...}
    """
    pass

view(view_name)

Return a DataFrame for a specific view Args: view_name (str): The name of the view to return Returns: pd.DataFrame: A DataFrame for the specified view

Source code in src/workbench/core/artifacts/data_source_abstract.py
def view(self, view_name: str) -> "View":
    """Return a DataFrame for a specific view
    Args:
        view_name (str): The name of the view to return
    Returns:
        pd.DataFrame: A DataFrame for the specified view
    """
    from workbench.core.views import View

    return View(self, view_name)

views()

Return the views for this Data Source

Source code in src/workbench/core/artifacts/data_source_abstract.py
def views(self) -> list[str]:
    """Return the views for this Data Source"""
    from workbench.core.views.view_utils import list_views

    return list_views(self)