Skip to content

DataSource

DataSource Examples

Examples of using the DataSource class are in the Examples section at the bottom of this page. S3 data, local files, and Pandas dataframes, DataSource can read data from many different sources.

DataSource: Manages AWS Data Catalog creation and management. DataSources are set up so that can easily be queried with AWS Athena. All DataSources are run through a full set of Exploratory Data Analysis (EDA) techniques (data quality, distributions, stats, outliers, etc.) DataSources can be viewed and explored within the Workbench Dashboard UI.

DataSource

Bases: AthenaSource

DataSource: Workbench DataSource API Class

Common Usage
my_data = DataSource(name_of_source)
my_data.details()
my_features = my_data.to_features()
Source code in src/workbench/api/data_source.py
class DataSource(AthenaSource):
    """DataSource: Workbench DataSource API Class

    Common Usage:
        ```python
        my_data = DataSource(name_of_source)
        my_data.details()
        my_features = my_data.to_features()
        ```
    """

    def __init__(self, source: Union[str, pd.DataFrame], name: str = None, tags: list = None, **kwargs):
        """
        Initializes a new DataSource object.

        Args:
            source (Union[str, pd.DataFrame]): Source of data (existing name, filepath, S3 path, or a Pandas DataFrame)
            name (str): The name of the data source (must be lowercase). If not specified, a name will be generated
            tags (list[str]): A list of tags associated with the data source. If not specified tags will be generated.
        """

        # Ensure the ds_name is valid
        if name:
            Artifact.is_name_valid(name)

        # If the data source name wasn't given, generate it
        else:
            name = extract_data_source_basename(source)
            name = Artifact.generate_valid_name(name)

            # Sanity check for dataframe sources
            if name == "dataframe":
                msg = "Set the 'name' argument in the constructor: DataSource(df, name='my_data')"
                self.log.critical(msg)
                raise ValueError(msg)

        # Set the tags and load the source
        tags = [name] if tags is None else tags
        self._load_source(source, name, tags)

        # Call superclass init
        super().__init__(name, **kwargs)

    def details(self, **kwargs) -> dict:
        """DataSource Details

        Returns:
            dict: A dictionary of details about the DataSource
        """
        return super().details(**kwargs)

    def query(self, query: str) -> pd.DataFrame:
        """Query the AthenaSource

        Args:
            query (str): The query to run against the DataSource

        Returns:
            pd.DataFrame: The results of the query
        """
        return super().query(query)

    def pull_dataframe(self, include_aws_columns=False) -> pd.DataFrame:
        """Return a DataFrame of ALL the data from this DataSource

        Args:
            include_aws_columns (bool): Include the AWS columns in the DataFrame (default: False)

        Returns:
            pd.DataFrame: A DataFrame of ALL the data from this DataSource

        Note:
            Obviously this is not recommended for large datasets :)
        """

        # Get the table associated with the data
        self.log.info(f"Pulling all data from {self.uuid}...")
        table = super().table
        query = f'SELECT * FROM "{table}"'
        df = self.query(query)

        # Drop any columns generated from AWS
        if not include_aws_columns:
            aws_cols = ["write_time", "api_invocation_time", "is_deleted", "event_time"]
            df = df.drop(columns=aws_cols, errors="ignore")
        return df

    def to_features(
        self,
        name: str,
        id_column: str = None,
        tags: list = None,
        event_time_column: str = None,
        one_hot_columns: list = None,
    ) -> Union[FeatureSet, None]:
        """
        Convert the DataSource to a FeatureSet

        Args:
            name (str): Set the name for feature set (must be lowercase).
            id_column (str, optional): The ID column (if not specified, an 'auto_id' will be generated).
            tags (list, optional: Set the tags for the feature set. If not specified tags will be generated
            event_time_column (str, optional): Set the event time for feature set. If not specified will be generated
            one_hot_columns (list, optional): Set the columns to be one-hot encoded. (default: None)

        Returns:
            FeatureSet: The FeatureSet created from the DataSource (or None if the FeatureSet isn't created)
        """

        # Ensure the feature_set_name is valid
        if not Artifact.is_name_valid(name):
            self.log.critical(f"Invalid FeatureSet name: {name}, not creating FeatureSet!")
            return None

        # Set the Tags
        tags = [name] if tags is None else tags

        # Transform the DataSource to a FeatureSet
        data_to_features = DataToFeaturesLight(self.uuid, name)
        data_to_features.set_output_tags(tags)
        data_to_features.transform(
            id_column=id_column,
            event_time_column=event_time_column,
            one_hot_columns=one_hot_columns,
        )

        # Return the FeatureSet (which will now be up-to-date)
        return FeatureSet(name)

    def _load_source(self, source: str, name: str, tags: list):
        """Load the source of the data"""
        self.log.info(f"Loading source: {source}...")

        # Pandas DataFrame Source
        if isinstance(source, pd.DataFrame):
            my_loader = PandasToData(name)
            my_loader.set_input(source)
            my_loader.set_output_tags(tags)
            my_loader.transform()

        # S3 Source
        source = source if isinstance(source, str) else str(source)
        if source.startswith("s3://"):
            my_loader = S3ToDataSourceLight(source, name)
            my_loader.set_output_tags(tags)
            my_loader.transform()

        # File Source
        elif os.path.isfile(source):
            my_loader = CSVToDataSource(source, name)
            my_loader.set_output_tags(tags)
            my_loader.transform()

__init__(source, name=None, tags=None, **kwargs)

Initializes a new DataSource object.

Parameters:

Name Type Description Default
source Union[str, DataFrame]

Source of data (existing name, filepath, S3 path, or a Pandas DataFrame)

required
name str

The name of the data source (must be lowercase). If not specified, a name will be generated

None
tags list[str]

A list of tags associated with the data source. If not specified tags will be generated.

None
Source code in src/workbench/api/data_source.py
def __init__(self, source: Union[str, pd.DataFrame], name: str = None, tags: list = None, **kwargs):
    """
    Initializes a new DataSource object.

    Args:
        source (Union[str, pd.DataFrame]): Source of data (existing name, filepath, S3 path, or a Pandas DataFrame)
        name (str): The name of the data source (must be lowercase). If not specified, a name will be generated
        tags (list[str]): A list of tags associated with the data source. If not specified tags will be generated.
    """

    # Ensure the ds_name is valid
    if name:
        Artifact.is_name_valid(name)

    # If the data source name wasn't given, generate it
    else:
        name = extract_data_source_basename(source)
        name = Artifact.generate_valid_name(name)

        # Sanity check for dataframe sources
        if name == "dataframe":
            msg = "Set the 'name' argument in the constructor: DataSource(df, name='my_data')"
            self.log.critical(msg)
            raise ValueError(msg)

    # Set the tags and load the source
    tags = [name] if tags is None else tags
    self._load_source(source, name, tags)

    # Call superclass init
    super().__init__(name, **kwargs)

details(**kwargs)

DataSource Details

Returns:

Name Type Description
dict dict

A dictionary of details about the DataSource

Source code in src/workbench/api/data_source.py
def details(self, **kwargs) -> dict:
    """DataSource Details

    Returns:
        dict: A dictionary of details about the DataSource
    """
    return super().details(**kwargs)

pull_dataframe(include_aws_columns=False)

Return a DataFrame of ALL the data from this DataSource

Parameters:

Name Type Description Default
include_aws_columns bool

Include the AWS columns in the DataFrame (default: False)

False

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame of ALL the data from this DataSource

Note

Obviously this is not recommended for large datasets :)

Source code in src/workbench/api/data_source.py
def pull_dataframe(self, include_aws_columns=False) -> pd.DataFrame:
    """Return a DataFrame of ALL the data from this DataSource

    Args:
        include_aws_columns (bool): Include the AWS columns in the DataFrame (default: False)

    Returns:
        pd.DataFrame: A DataFrame of ALL the data from this DataSource

    Note:
        Obviously this is not recommended for large datasets :)
    """

    # Get the table associated with the data
    self.log.info(f"Pulling all data from {self.uuid}...")
    table = super().table
    query = f'SELECT * FROM "{table}"'
    df = self.query(query)

    # Drop any columns generated from AWS
    if not include_aws_columns:
        aws_cols = ["write_time", "api_invocation_time", "is_deleted", "event_time"]
        df = df.drop(columns=aws_cols, errors="ignore")
    return df

query(query)

Query the AthenaSource

Parameters:

Name Type Description Default
query str

The query to run against the DataSource

required

Returns:

Type Description
DataFrame

pd.DataFrame: The results of the query

Source code in src/workbench/api/data_source.py
def query(self, query: str) -> pd.DataFrame:
    """Query the AthenaSource

    Args:
        query (str): The query to run against the DataSource

    Returns:
        pd.DataFrame: The results of the query
    """
    return super().query(query)

to_features(name, id_column=None, tags=None, event_time_column=None, one_hot_columns=None)

Convert the DataSource to a FeatureSet

Parameters:

Name Type Description Default
name str

Set the name for feature set (must be lowercase).

required
id_column str

The ID column (if not specified, an 'auto_id' will be generated).

None
tags list

Set the tags for the feature set. If not specified tags will be generated

None
event_time_column str

Set the event time for feature set. If not specified will be generated

None
one_hot_columns list

Set the columns to be one-hot encoded. (default: None)

None

Returns:

Name Type Description
FeatureSet Union[FeatureSet, None]

The FeatureSet created from the DataSource (or None if the FeatureSet isn't created)

Source code in src/workbench/api/data_source.py
def to_features(
    self,
    name: str,
    id_column: str = None,
    tags: list = None,
    event_time_column: str = None,
    one_hot_columns: list = None,
) -> Union[FeatureSet, None]:
    """
    Convert the DataSource to a FeatureSet

    Args:
        name (str): Set the name for feature set (must be lowercase).
        id_column (str, optional): The ID column (if not specified, an 'auto_id' will be generated).
        tags (list, optional: Set the tags for the feature set. If not specified tags will be generated
        event_time_column (str, optional): Set the event time for feature set. If not specified will be generated
        one_hot_columns (list, optional): Set the columns to be one-hot encoded. (default: None)

    Returns:
        FeatureSet: The FeatureSet created from the DataSource (or None if the FeatureSet isn't created)
    """

    # Ensure the feature_set_name is valid
    if not Artifact.is_name_valid(name):
        self.log.critical(f"Invalid FeatureSet name: {name}, not creating FeatureSet!")
        return None

    # Set the Tags
    tags = [name] if tags is None else tags

    # Transform the DataSource to a FeatureSet
    data_to_features = DataToFeaturesLight(self.uuid, name)
    data_to_features.set_output_tags(tags)
    data_to_features.transform(
        id_column=id_column,
        event_time_column=event_time_column,
        one_hot_columns=one_hot_columns,
    )

    # Return the FeatureSet (which will now be up-to-date)
    return FeatureSet(name)

Examples

All of the Workbench Examples are in the Workbench Repository under the examples/ directory. For a full code listing of any example please visit our Workbench Examples

Create a DataSource from an S3 Path or File Path

datasource_from_s3.py
from workbench.api.data_source import DataSource

# Create a DataSource from an S3 Path (or a local file)
source_path = "s3://workbench-public-data/common/abalone.csv"
# source_path = "/full/path/to/local/file.csv"

my_data = DataSource(source_path)
print(my_data.details())

Create a DataSource from a Pandas Dataframe

datasource_from_df.py
from workbench.utils.test_data_generator import TestDataGenerator
from workbench.api.data_source import DataSource

# Create a DataSource from a Pandas DataFrame
gen_data = TestDataGenerator()
df = gen_data.person_data()

test_data = DataSource(df, name="test_data")
print(test_data.details())

Query a DataSource

All Workbench DataSources use AWS Athena, so any query that you can make with Athena is accessible through the DataSource API.

datasource_query.py
from workbench.api.data_source import DataSource

# Grab a DataSource
my_data = DataSource("abalone_data")

# Make some queries using the Athena backend
df = my_data.query("select * from abalone_data where height > .3")
print(df.head())

df = my_data.query("select * from abalone_data where class_number_of_rings < 3")
print(df.head())

Output

  sex  length  diameter  height  whole_weight  shucked_weight  viscera_weight  shell_weight  class_number_of_rings
0   M   0.705     0.565   0.515         2.210          1.1075          0.4865        0.5120                     10
1   F   0.455     0.355   1.130         0.594          0.3320          0.1160        0.1335                      8

  sex  length  diameter  height  whole_weight  shucked_weight  viscera_weight  shell_weight  class_number_of_rings
0   I   0.075     0.055   0.010         0.002          0.0010          0.0005        0.0015                      1
1   I   0.150     0.100   0.025         0.015          0.0045          0.0040        0.0050                      2

Create a FeatureSet from a DataSource

datasource_to_featureset.py
from workbench.api.data_source import DataSource

# Convert the Data Source to a Feature Set
test_data = DataSource('test_data')
my_features = test_data.to_features("test_features")
print(my_features.details())

Workbench UI

Whenever a DataSource is created Workbench performs a comprehensive set of Exploratory Data Analysis techniques on your data, pushes the results into AWS, and provides a detailed web visualization of the results.

workbench_new_light
Workbench Dashboard: DataSources

Not Finding a particular method?

The Workbench API Classes use the 'Core' Classes Internally, so for an extensive listing of all the methods available please take a deep dive into: Workbench Core Classes