Skip to content

Pandas Transforms

API Classes

The API Classes will often provide helpful methods that give you a DataFrame (data_source.query() for instance), so always check out the API Classes first.

These Transforms will give you the ultimate in customization and flexibility when creating AWS Machine Learning Pipelines. Grab a Pandas DataFrame from a DataSource or FeatureSet process in whatever way for your use case and simply create another Workbench DataSource or FeatureSet from the resulting DataFrame.

Lots of Options:

Not for Large Data

Pandas Transforms can't handle large datasets (> 4 GigaBytes). For doing transforma on large data see our Heavy Transforms

  • S3 --> DF --> DataSource
  • DataSource --> DF --> DataSource
  • DataSoruce --> DF --> FeatureSet
  • Get Creative!

Welcome to the Workbench Pandas Transform Classes

These classes provide low-level APIs for using Pandas DataFrames

  • DataToPandas: Pull a dataframe from a Workbench DataSource
  • FeaturesToPandas: Pull a dataframe from a Workbench FeatureSet
  • PandasToData: Create a Workbench DataSource using a Pandas DataFrame as the source
  • PandasToFeatures: Create a Workbench FeatureSet using a Pandas DataFrame as the source
  • PandasToFeaturesChunked: Create a Workbench FeatureSet using a Chunked/Streaming Pandas DataFrame as the source

DataToPandas

Bases: Transform

DataToPandas: Class to transform a Data Source into a Pandas DataFrame

Common Usage
data_to_df = DataToPandas(data_source_uuid)
data_to_df.transform(query=<optional SQL query to filter/process data>)
data_to_df.transform(max_rows=<optional max rows to sample>)
my_df = data_to_df.get_output()

Note: query is the best way to use this class, so use it :)
Source code in src/workbench/core/transforms/pandas_transforms/data_to_pandas.py
class DataToPandas(Transform):
    """DataToPandas: Class to transform a Data Source into a Pandas DataFrame

    Common Usage:
        ```python
        data_to_df = DataToPandas(data_source_uuid)
        data_to_df.transform(query=<optional SQL query to filter/process data>)
        data_to_df.transform(max_rows=<optional max rows to sample>)
        my_df = data_to_df.get_output()

        Note: query is the best way to use this class, so use it :)
        ```
    """

    def __init__(self, input_uuid: str):
        """DataToPandas Initialization"""

        # Call superclass init
        super().__init__(input_uuid, "DataFrame")

        # Set up all my instance attributes
        self.input_type = TransformInput.DATA_SOURCE
        self.output_type = TransformOutput.PANDAS_DF
        self.output_df = None

    def transform_impl(self, query: str = None, max_rows=100000):
        """Convert the DataSource into a Pandas DataFrame
        Args:
            query(str): The query to run against the DataSource (default: None)
            max_rows(int): The maximum number of rows to return (default: 100000)
        """

        # Grab the Input (Data Source)
        input_data = DataSourceFactory(self.input_uuid)
        if not input_data.exists():
            self.log.critical(f"Data Check on {self.input_uuid} failed!")
            return

        # If a query is provided, that overrides the queries below
        if query:
            self.log.info(f"Querying {self.input_uuid} with {query}...")
            self.output_df = input_data.query(query)
            return

        # If the data source has more rows than max_rows, do a sample query
        num_rows = input_data.num_rows()
        if num_rows > max_rows:
            percentage = round(max_rows * 100.0 / num_rows)
            self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
            query = f"SELECT * FROM {self.input_uuid} TABLESAMPLE BERNOULLI({percentage})"
        else:
            query = f"SELECT * FROM {self.input_uuid}"

        # Mark the transform as complete and set the output DataFrame
        self.output_df = input_data.query(query)

    def post_transform(self, **kwargs):
        """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
        self.log.info("Post-Transform: Checking Pandas DataFrame...")
        self.log.info(f"DataFrame Shape: {self.output_df.shape}")

    def get_output(self) -> pd.DataFrame:
        """Get the DataFrame Output from this Transform"""
        return self.output_df

__init__(input_uuid)

DataToPandas Initialization

Source code in src/workbench/core/transforms/pandas_transforms/data_to_pandas.py
def __init__(self, input_uuid: str):
    """DataToPandas Initialization"""

    # Call superclass init
    super().__init__(input_uuid, "DataFrame")

    # Set up all my instance attributes
    self.input_type = TransformInput.DATA_SOURCE
    self.output_type = TransformOutput.PANDAS_DF
    self.output_df = None

get_output()

Get the DataFrame Output from this Transform

Source code in src/workbench/core/transforms/pandas_transforms/data_to_pandas.py
def get_output(self) -> pd.DataFrame:
    """Get the DataFrame Output from this Transform"""
    return self.output_df

post_transform(**kwargs)

Post-Transform: Any checks on the Pandas DataFrame that need to be done

Source code in src/workbench/core/transforms/pandas_transforms/data_to_pandas.py
def post_transform(self, **kwargs):
    """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
    self.log.info("Post-Transform: Checking Pandas DataFrame...")
    self.log.info(f"DataFrame Shape: {self.output_df.shape}")

transform_impl(query=None, max_rows=100000)

Convert the DataSource into a Pandas DataFrame Args: query(str): The query to run against the DataSource (default: None) max_rows(int): The maximum number of rows to return (default: 100000)

Source code in src/workbench/core/transforms/pandas_transforms/data_to_pandas.py
def transform_impl(self, query: str = None, max_rows=100000):
    """Convert the DataSource into a Pandas DataFrame
    Args:
        query(str): The query to run against the DataSource (default: None)
        max_rows(int): The maximum number of rows to return (default: 100000)
    """

    # Grab the Input (Data Source)
    input_data = DataSourceFactory(self.input_uuid)
    if not input_data.exists():
        self.log.critical(f"Data Check on {self.input_uuid} failed!")
        return

    # If a query is provided, that overrides the queries below
    if query:
        self.log.info(f"Querying {self.input_uuid} with {query}...")
        self.output_df = input_data.query(query)
        return

    # If the data source has more rows than max_rows, do a sample query
    num_rows = input_data.num_rows()
    if num_rows > max_rows:
        percentage = round(max_rows * 100.0 / num_rows)
        self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
        query = f"SELECT * FROM {self.input_uuid} TABLESAMPLE BERNOULLI({percentage})"
    else:
        query = f"SELECT * FROM {self.input_uuid}"

    # Mark the transform as complete and set the output DataFrame
    self.output_df = input_data.query(query)

FeaturesToPandas

Bases: Transform

FeaturesToPandas: Class to transform a FeatureSet into a Pandas DataFrame

Common Usage
feature_to_df = FeaturesToPandas(feature_set_uuid)
feature_to_df.transform(max_rows=<optional max rows to sample>)
my_df = feature_to_df.get_output()
Source code in src/workbench/core/transforms/pandas_transforms/features_to_pandas.py
class FeaturesToPandas(Transform):
    """FeaturesToPandas: Class to transform a FeatureSet into a Pandas DataFrame

    Common Usage:
        ```python
        feature_to_df = FeaturesToPandas(feature_set_uuid)
        feature_to_df.transform(max_rows=<optional max rows to sample>)
        my_df = feature_to_df.get_output()
        ```
    """

    def __init__(self, feature_set_name: str):
        """FeaturesToPandas Initialization"""

        # Call superclass init
        super().__init__(input_uuid=feature_set_name, output_uuid="DataFrame")

        # Set up all my instance attributes
        self.input_type = TransformInput.FEATURE_SET
        self.output_type = TransformOutput.PANDAS_DF
        self.output_df = None
        self.transform_run = False

    def transform_impl(self, max_rows=100000):
        """Convert the FeatureSet into a Pandas DataFrame"""

        # Grab the Input (Feature Set)
        input_data = FeatureSetCore(self.input_uuid)
        if not input_data.exists():
            self.log.critical(f"Feature Set Check on {self.input_uuid} failed!")
            return

        # Grab the table for this Feature Set
        table = input_data.athena_table

        # Get the list of columns (and subtract metadata columns that might get added)
        columns = input_data.columns
        filter_columns = ["write_time", "api_invocation_time", "is_deleted"]
        columns = ", ".join([x for x in columns if x not in filter_columns])

        # Get the number of rows in the Feature Set
        num_rows = input_data.num_rows()

        # If the data source has more rows than max_rows, do a sample query
        if num_rows > max_rows:
            percentage = round(max_rows * 100.0 / num_rows)
            self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
            query = f'SELECT {columns} FROM "{table}" TABLESAMPLE BERNOULLI({percentage})'
        else:
            query = f'SELECT {columns} FROM "{table}"'

        # Mark the transform as complete and set the output DataFrame
        self.transform_run = True
        self.output_df = input_data.query(query)

    def post_transform(self, **kwargs):
        """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
        self.log.info("Post-Transform: Checking Pandas DataFrame...")
        self.log.info(f"DataFrame Shape: {self.output_df.shape}")

    def get_output(self) -> pd.DataFrame:
        """Get the DataFrame Output from this Transform"""
        if not self.transform_run:
            self.transform()
        return self.output_df

__init__(feature_set_name)

FeaturesToPandas Initialization

Source code in src/workbench/core/transforms/pandas_transforms/features_to_pandas.py
def __init__(self, feature_set_name: str):
    """FeaturesToPandas Initialization"""

    # Call superclass init
    super().__init__(input_uuid=feature_set_name, output_uuid="DataFrame")

    # Set up all my instance attributes
    self.input_type = TransformInput.FEATURE_SET
    self.output_type = TransformOutput.PANDAS_DF
    self.output_df = None
    self.transform_run = False

get_output()

Get the DataFrame Output from this Transform

Source code in src/workbench/core/transforms/pandas_transforms/features_to_pandas.py
def get_output(self) -> pd.DataFrame:
    """Get the DataFrame Output from this Transform"""
    if not self.transform_run:
        self.transform()
    return self.output_df

post_transform(**kwargs)

Post-Transform: Any checks on the Pandas DataFrame that need to be done

Source code in src/workbench/core/transforms/pandas_transforms/features_to_pandas.py
def post_transform(self, **kwargs):
    """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
    self.log.info("Post-Transform: Checking Pandas DataFrame...")
    self.log.info(f"DataFrame Shape: {self.output_df.shape}")

transform_impl(max_rows=100000)

Convert the FeatureSet into a Pandas DataFrame

Source code in src/workbench/core/transforms/pandas_transforms/features_to_pandas.py
def transform_impl(self, max_rows=100000):
    """Convert the FeatureSet into a Pandas DataFrame"""

    # Grab the Input (Feature Set)
    input_data = FeatureSetCore(self.input_uuid)
    if not input_data.exists():
        self.log.critical(f"Feature Set Check on {self.input_uuid} failed!")
        return

    # Grab the table for this Feature Set
    table = input_data.athena_table

    # Get the list of columns (and subtract metadata columns that might get added)
    columns = input_data.columns
    filter_columns = ["write_time", "api_invocation_time", "is_deleted"]
    columns = ", ".join([x for x in columns if x not in filter_columns])

    # Get the number of rows in the Feature Set
    num_rows = input_data.num_rows()

    # If the data source has more rows than max_rows, do a sample query
    if num_rows > max_rows:
        percentage = round(max_rows * 100.0 / num_rows)
        self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
        query = f'SELECT {columns} FROM "{table}" TABLESAMPLE BERNOULLI({percentage})'
    else:
        query = f'SELECT {columns} FROM "{table}"'

    # Mark the transform as complete and set the output DataFrame
    self.transform_run = True
    self.output_df = input_data.query(query)

PandasToData

Bases: Transform

PandasToData: Class to publish a Pandas DataFrame as a DataSource

Common Usage
df_to_data = PandasToData(output_uuid)
df_to_data.set_output_tags(["test", "small"])
df_to_data.set_input(test_df)
df_to_data.transform()
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
class PandasToData(Transform):
    """PandasToData: Class to publish a Pandas DataFrame as a DataSource

    Common Usage:
        ```python
        df_to_data = PandasToData(output_uuid)
        df_to_data.set_output_tags(["test", "small"])
        df_to_data.set_input(test_df)
        df_to_data.transform()
        ```
    """

    def __init__(self, output_uuid: str, output_format: str = "parquet", catalog_db: str = "workbench"):
        """PandasToData Initialization
        Args:
            output_uuid (str): The UUID of the DataSource to create
            output_format (str): The file format to store the S3 object data in (default: "parquet")
            catalog_db (str): The AWS Data Catalog Database to use (default: "workbench")
        """

        # Make sure the output_uuid is a valid name/id
        Artifact.is_name_valid(output_uuid)

        # Call superclass init
        super().__init__("DataFrame", output_uuid, catalog_db)

        # Set up all my instance attributes
        self.input_type = TransformInput.PANDAS_DF
        self.output_type = TransformOutput.DATA_SOURCE
        self.output_df = None

        # Give a message that Parquet is best in most cases
        if output_format != "parquet":
            self.log.warning("Parquet format works the best in most cases please consider using it")
        self.output_format = output_format

    def set_input(self, input_df: pd.DataFrame):
        """Set the DataFrame Input for this Transform"""
        self.output_df = input_df.copy()

    def delete_existing(self):
        # Delete the existing FeatureSet if it exists
        self.log.info(f"Deleting the {self.output_uuid} DataSource...")
        AthenaSource.managed_delete(self.output_uuid)
        time.sleep(1)

    def convert_object_to_string(self, df: pd.DataFrame) -> pd.DataFrame:
        """Try to automatically convert object columns to string columns"""
        for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
            try:
                df[c] = df[c].astype("string")
                df[c] = df[c].str.replace("'", '"')  # This is for nested JSON
            except (ParserError, ValueError, TypeError):
                self.log.info(f"Column {c} could not be converted to string...")
        return df

    def convert_object_to_datetime(self, df: pd.DataFrame) -> pd.DataFrame:
        """Try to automatically convert object columns to datetime or string columns"""
        for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
            try:
                df[c] = pd.to_datetime(df[c])
            except (ParserError, ValueError, TypeError):
                self.log.debug(f"Column {c} could not be converted to datetime...")
        return df

    @staticmethod
    def convert_datetime_columns(df: pd.DataFrame) -> pd.DataFrame:
        """Convert datetime columns to ISO-8601 string"""
        datetime_type = ["datetime", "datetime64", "datetime64[ns]", "datetimetz"]
        for c in df.select_dtypes(include=datetime_type).columns:
            df[c] = df[c].map(datetime_to_iso8601)
            df[c] = df[c].astype(pd.StringDtype())
        return df

    def pre_transform(self, **kwargs):
        """Pre-Transform: Delete the existing DataSource if it exists"""
        self.delete_existing()

    def transform_impl(self, overwrite: bool = True):
        """Convert the Pandas DataFrame into Parquet Format in the Workbench S3 Bucket, and
        store the information about the data to the AWS Data Catalog workbench database

        Args:
            overwrite (bool): Overwrite the existing data in the Workbench S3 Bucket (default: True)
        """
        self.log.info(f"DataFrame to Workbench DataSource: {self.output_uuid}...")

        # Set up our metadata storage
        workbench_meta = {"workbench_tags": self.output_tags}
        workbench_meta.update(self.output_meta)

        # Create the Output Parquet file S3 Storage Path
        s3_storage_path = f"{self.data_sources_s3_path}/{self.output_uuid}"

        # Convert columns names to lowercase, Athena will not work with uppercase column names
        if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
            for c in self.output_df.columns:
                if c != c.lower():
                    self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
            self.output_df.columns = self.output_df.columns.str.lower()

        # Convert Object Columns to String
        self.output_df = self.convert_object_to_string(self.output_df)

        # Note: Both of these conversions may not be necessary, so we're leaving them commented out
        """
        # Convert Object Columns to Datetime
        self.output_df = self.convert_object_to_datetime(self.output_df)

        # Now convert datetime columns to ISO-8601 string
        # self.output_df = self.convert_datetime_columns(self.output_df)
        """

        # Write out the DataFrame to AWS Data Catalog in either Parquet or JSONL format
        description = f"Workbench data source: {self.output_uuid}"
        glue_table_settings = {"description": description, "parameters": workbench_meta}
        if self.output_format == "parquet":
            wr.s3.to_parquet(
                self.output_df,
                path=s3_storage_path,
                dataset=True,
                mode="overwrite",
                database=self.data_catalog_db,
                table=self.output_uuid,
                filename_prefix=f"{self.output_uuid}_",
                boto3_session=self.boto3_session,
                partition_cols=None,
                glue_table_settings=glue_table_settings,
                sanitize_columns=False,
            )  # FIXME: Have some logic around partition columns

        # Note: In general Parquet works will for most uses cases. We recommend using Parquet
        #       You can use JSON_EXTRACT on Parquet string field, and it works great.
        elif self.output_format == "jsonl":
            self.log.warning("We recommend using Parquet format for most use cases")
            self.log.warning("If you have a use case that requires JSONL please contact Workbench support")
            self.log.warning("We'd like to understand what functionality JSONL is providing that isn't already")
            self.log.warning("provided with Parquet and JSON_EXTRACT() for your Athena Queries")
            wr.s3.to_json(
                self.output_df,
                path=s3_storage_path,
                orient="records",
                lines=True,
                date_format="iso",
                dataset=True,
                mode="overwrite",
                database=self.data_catalog_db,
                table=self.output_uuid,
                filename_prefix=f"{self.output_uuid}_",
                boto3_session=self.boto3_session,
                partition_cols=None,
                glue_table_settings=glue_table_settings,
            )
        else:
            raise ValueError(f"Unsupported file format: {self.output_format}")

    def post_transform(self, **kwargs):
        """Post-Transform: Calling onboard() fnr the DataSource"""
        self.log.info("Post-Transform: Calling onboard() for the DataSource...")

        # Onboard the DataSource
        output_data_source = DataSourceFactory(self.output_uuid)
        output_data_source.onboard()

__init__(output_uuid, output_format='parquet', catalog_db='workbench')

PandasToData Initialization Args: output_uuid (str): The UUID of the DataSource to create output_format (str): The file format to store the S3 object data in (default: "parquet") catalog_db (str): The AWS Data Catalog Database to use (default: "workbench")

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def __init__(self, output_uuid: str, output_format: str = "parquet", catalog_db: str = "workbench"):
    """PandasToData Initialization
    Args:
        output_uuid (str): The UUID of the DataSource to create
        output_format (str): The file format to store the S3 object data in (default: "parquet")
        catalog_db (str): The AWS Data Catalog Database to use (default: "workbench")
    """

    # Make sure the output_uuid is a valid name/id
    Artifact.is_name_valid(output_uuid)

    # Call superclass init
    super().__init__("DataFrame", output_uuid, catalog_db)

    # Set up all my instance attributes
    self.input_type = TransformInput.PANDAS_DF
    self.output_type = TransformOutput.DATA_SOURCE
    self.output_df = None

    # Give a message that Parquet is best in most cases
    if output_format != "parquet":
        self.log.warning("Parquet format works the best in most cases please consider using it")
    self.output_format = output_format

convert_datetime_columns(df) staticmethod

Convert datetime columns to ISO-8601 string

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
@staticmethod
def convert_datetime_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Convert datetime columns to ISO-8601 string"""
    datetime_type = ["datetime", "datetime64", "datetime64[ns]", "datetimetz"]
    for c in df.select_dtypes(include=datetime_type).columns:
        df[c] = df[c].map(datetime_to_iso8601)
        df[c] = df[c].astype(pd.StringDtype())
    return df

convert_object_to_datetime(df)

Try to automatically convert object columns to datetime or string columns

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def convert_object_to_datetime(self, df: pd.DataFrame) -> pd.DataFrame:
    """Try to automatically convert object columns to datetime or string columns"""
    for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
        try:
            df[c] = pd.to_datetime(df[c])
        except (ParserError, ValueError, TypeError):
            self.log.debug(f"Column {c} could not be converted to datetime...")
    return df

convert_object_to_string(df)

Try to automatically convert object columns to string columns

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def convert_object_to_string(self, df: pd.DataFrame) -> pd.DataFrame:
    """Try to automatically convert object columns to string columns"""
    for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
        try:
            df[c] = df[c].astype("string")
            df[c] = df[c].str.replace("'", '"')  # This is for nested JSON
        except (ParserError, ValueError, TypeError):
            self.log.info(f"Column {c} could not be converted to string...")
    return df

post_transform(**kwargs)

Post-Transform: Calling onboard() fnr the DataSource

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def post_transform(self, **kwargs):
    """Post-Transform: Calling onboard() fnr the DataSource"""
    self.log.info("Post-Transform: Calling onboard() for the DataSource...")

    # Onboard the DataSource
    output_data_source = DataSourceFactory(self.output_uuid)
    output_data_source.onboard()

pre_transform(**kwargs)

Pre-Transform: Delete the existing DataSource if it exists

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def pre_transform(self, **kwargs):
    """Pre-Transform: Delete the existing DataSource if it exists"""
    self.delete_existing()

set_input(input_df)

Set the DataFrame Input for this Transform

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def set_input(self, input_df: pd.DataFrame):
    """Set the DataFrame Input for this Transform"""
    self.output_df = input_df.copy()

transform_impl(overwrite=True)

Convert the Pandas DataFrame into Parquet Format in the Workbench S3 Bucket, and store the information about the data to the AWS Data Catalog workbench database

Parameters:

Name Type Description Default
overwrite bool

Overwrite the existing data in the Workbench S3 Bucket (default: True)

True
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def transform_impl(self, overwrite: bool = True):
    """Convert the Pandas DataFrame into Parquet Format in the Workbench S3 Bucket, and
    store the information about the data to the AWS Data Catalog workbench database

    Args:
        overwrite (bool): Overwrite the existing data in the Workbench S3 Bucket (default: True)
    """
    self.log.info(f"DataFrame to Workbench DataSource: {self.output_uuid}...")

    # Set up our metadata storage
    workbench_meta = {"workbench_tags": self.output_tags}
    workbench_meta.update(self.output_meta)

    # Create the Output Parquet file S3 Storage Path
    s3_storage_path = f"{self.data_sources_s3_path}/{self.output_uuid}"

    # Convert columns names to lowercase, Athena will not work with uppercase column names
    if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
        for c in self.output_df.columns:
            if c != c.lower():
                self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
        self.output_df.columns = self.output_df.columns.str.lower()

    # Convert Object Columns to String
    self.output_df = self.convert_object_to_string(self.output_df)

    # Note: Both of these conversions may not be necessary, so we're leaving them commented out
    """
    # Convert Object Columns to Datetime
    self.output_df = self.convert_object_to_datetime(self.output_df)

    # Now convert datetime columns to ISO-8601 string
    # self.output_df = self.convert_datetime_columns(self.output_df)
    """

    # Write out the DataFrame to AWS Data Catalog in either Parquet or JSONL format
    description = f"Workbench data source: {self.output_uuid}"
    glue_table_settings = {"description": description, "parameters": workbench_meta}
    if self.output_format == "parquet":
        wr.s3.to_parquet(
            self.output_df,
            path=s3_storage_path,
            dataset=True,
            mode="overwrite",
            database=self.data_catalog_db,
            table=self.output_uuid,
            filename_prefix=f"{self.output_uuid}_",
            boto3_session=self.boto3_session,
            partition_cols=None,
            glue_table_settings=glue_table_settings,
            sanitize_columns=False,
        )  # FIXME: Have some logic around partition columns

    # Note: In general Parquet works will for most uses cases. We recommend using Parquet
    #       You can use JSON_EXTRACT on Parquet string field, and it works great.
    elif self.output_format == "jsonl":
        self.log.warning("We recommend using Parquet format for most use cases")
        self.log.warning("If you have a use case that requires JSONL please contact Workbench support")
        self.log.warning("We'd like to understand what functionality JSONL is providing that isn't already")
        self.log.warning("provided with Parquet and JSON_EXTRACT() for your Athena Queries")
        wr.s3.to_json(
            self.output_df,
            path=s3_storage_path,
            orient="records",
            lines=True,
            date_format="iso",
            dataset=True,
            mode="overwrite",
            database=self.data_catalog_db,
            table=self.output_uuid,
            filename_prefix=f"{self.output_uuid}_",
            boto3_session=self.boto3_session,
            partition_cols=None,
            glue_table_settings=glue_table_settings,
        )
    else:
        raise ValueError(f"Unsupported file format: {self.output_format}")

PandasToFeatures

Bases: Transform

PandasToFeatures: Class to publish a Pandas DataFrame into a FeatureSet

Common Usage
to_features = PandasToFeatures(output_uuid)
to_features.set_output_tags(["my", "awesome", "data"])
to_features.set_input(df, id_column="my_id")
to_features.transform()
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
class PandasToFeatures(Transform):
    """PandasToFeatures: Class to publish a Pandas DataFrame into a FeatureSet

    Common Usage:
        ```python
        to_features = PandasToFeatures(output_uuid)
        to_features.set_output_tags(["my", "awesome", "data"])
        to_features.set_input(df, id_column="my_id")
        to_features.transform()
        ```
    """

    def __init__(self, output_uuid: str):
        """PandasToFeatures Initialization

        Args:
            output_uuid (str): The UUID of the FeatureSet to create
        """

        # Make sure the output_uuid is a valid name
        Artifact.is_name_valid(output_uuid)

        # Call superclass init
        super().__init__("DataFrame", output_uuid)

        # Set up all my instance attributes
        self.input_type = TransformInput.PANDAS_DF
        self.output_type = TransformOutput.FEATURE_SET
        self.id_column = None
        self.event_time_column = None
        self.one_hot_columns = []
        self.categorical_dtypes = {}  # Used for streaming/chunking
        self.output_df = None
        self.table_format = TableFormatEnum.ICEBERG
        self.incoming_hold_out_ids = None

        # These will be set in the transform method
        self.output_feature_group = None
        self.output_feature_set = None
        self.expected_rows = 0

    def set_input(self, input_df: pd.DataFrame, id_column=None, event_time_column=None, one_hot_columns=None):
        """Set the Input DataFrame for this Transform

        Args:
            input_df (pd.DataFrame): The input DataFrame.
            id_column (str, optional): The ID column (if not specified, an 'auto_id' will be generated).
            event_time_column (str, optional): The name of the event time column (default: None).
            one_hot_columns (list, optional): The list of columns to one-hot encode (default: None).
        """
        self.id_column = id_column
        self.event_time_column = event_time_column
        self.output_df = input_df.copy()
        self.one_hot_columns = one_hot_columns or []

        # Now Prepare the DataFrame for its journey into an AWS FeatureGroup
        self.prep_dataframe()

    def delete_existing(self):
        # Delete the existing FeatureSet if it exists
        self.log.info(f"Deleting the {self.output_uuid} FeatureSet...")
        FeatureSetCore.managed_delete(self.output_uuid)
        time.sleep(1)

    def _ensure_id_column(self):
        """Internal: AWS Feature Store requires an Id field"""
        if self.id_column is None:
            self.log.warning("Generating an 'auto_id' column, we highly recommended setting the 'id_column'")
            self.output_df["auto_id"] = self.output_df.index
            self.id_column = "auto_id"
            return
        if self.id_column not in self.output_df.columns:
            error_msg = f"Id column {self.id_column} not found in the DataFrame"
            self.log.critical(error_msg)
            raise ValueError(error_msg)

    def _ensure_event_time(self):
        """Internal: AWS Feature Store requires an event_time field for all data stored"""
        if self.event_time_column is None or self.event_time_column not in self.output_df.columns:
            self.log.info("Generating an event_time column before FeatureSet Creation...")
            self.event_time_column = "event_time"
            self.output_df[self.event_time_column] = pd.Timestamp("now", tz="UTC")

        # The event_time_column is defined, so we need to make sure it's in ISO-8601 string format
        # Note: AWS Feature Store only a particular ISO-8601 format not ALL ISO-8601 formats
        time_column = self.output_df[self.event_time_column]

        # Check if the event_time_column is of type object or string convert it to DateTime
        if time_column.dtypes == "object" or time_column.dtypes.name == "string":
            self.log.info(f"Converting {self.event_time_column} to DateTime...")
            time_column = pd.to_datetime(time_column)

        # Let's make sure it the right type for Feature Store
        if pd.api.types.is_datetime64_any_dtype(time_column):
            self.log.info(f"Converting {self.event_time_column} to ISOFormat Date String before FeatureSet Creation...")

            # Convert the datetime DType to ISO-8601 string
            # TableFormat=ICEBERG does not support alternate formats for event_time field, it only supports String type.
            time_column = time_column.map(datetime_to_iso8601)
            self.output_df[self.event_time_column] = time_column.astype("string")

    def _convert_objs_to_string(self):
        """Internal: AWS Feature Store doesn't know how to store object dtypes, so convert to String"""
        for col in self.output_df:
            if pd.api.types.is_object_dtype(self.output_df[col].dtype):
                self.output_df[col] = self.output_df[col].astype(pd.StringDtype())

    def process_column_name(self, column: str, shorten: bool = False) -> str:
        """Call various methods to make sure the column is ready for Feature Store
        Args:
            column (str): The column name to process
            shorten (bool): Should we shorten the column name? (default: False)
        """
        self.log.debug(f"Processing column {column}...")

        # Make sure the column name is valid
        column = self.sanitize_column_name(column)

        # Make sure the column name isn't too long
        if shorten:
            column = self.shorten_column_name(column)

        return column

    def shorten_column_name(self, name, max_length=20):
        if len(name) <= max_length:
            return name

        # Start building the new name from the end
        parts = name.split("_")[::-1]
        new_name = ""
        for part in parts:
            if len(new_name) + len(part) + 1 <= max_length:  # +1 for the underscore
                new_name = f"{part}_{new_name}" if new_name else part
            else:
                break

        # If new_name is empty, just use the last part of the original name
        if not new_name:
            new_name = parts[0]

        self.log.info(f"Shortening {name} to {new_name}")
        return new_name

    def sanitize_column_name(self, name):
        # Remove all invalid characters
        sanitized = re.sub("[^a-zA-Z0-9-_]", "_", name)
        sanitized = re.sub("_+", "_", sanitized)
        sanitized = sanitized.strip("_")

        # Log the change if the name was altered
        if sanitized != name:
            self.log.info(f"Sanitizing {name} to {sanitized}")

        return sanitized

    def one_hot_encode(self, df, one_hot_columns: list) -> pd.DataFrame:
        """One Hot Encoding for Categorical Columns with additional column name management

        Args:
            df (pd.DataFrame): The DataFrame to process
            one_hot_columns (list): The list of columns to one-hot encode

        Returns:
            pd.DataFrame: The DataFrame with one-hot encoded columns
        """

        # Grab the current list of columns
        current_columns = list(df.columns)

        # Now convert the list of columns into Categorical and then One-Hot Encode
        self.convert_columns_to_categorical(one_hot_columns)
        self.log.important(f"One-Hot encoding columns: {one_hot_columns}")
        df = pd.get_dummies(df, columns=one_hot_columns)

        # Compute the new columns generated by get_dummies
        new_columns = list(set(df.columns) - set(current_columns))
        self.log.important(f"New columns generated: {new_columns}")

        # Convert new columns to int32
        df[new_columns] = df[new_columns].astype("int32")

        # For the new columns we're going to shorten the names
        renamed_columns = {col: self.process_column_name(col) for col in new_columns}

        # Rename the columns in the DataFrame
        df.rename(columns=renamed_columns, inplace=True)

        return df

    # Helper Methods
    def convert_columns_to_categorical(self, columns: list):
        """Convert column to Categorical type"""
        for feature in columns:
            if feature not in [self.event_time_column, self.id_column]:
                unique_values = self.output_df[feature].nunique()
                if 1 < unique_values < 10:
                    self.log.important(f"Converting column {feature} to categorical (unique {unique_values})")
                    self.output_df[feature] = self.output_df[feature].astype("category")
                else:
                    self.log.warning(f"Column {feature} too many unique values {unique_values} skipping...")

    def manual_categorical_converter(self):
        """Used for Streaming: Convert object and string types to Categorical

        Note:
            This method is used for streaming/chunking. You can set the
            categorical_dtypes attribute to a dictionary of column names and
            their respective categorical types.
        """
        for column, cat_d_type in self.categorical_dtypes.items():
            self.output_df[column] = self.output_df[column].astype(cat_d_type)

    @staticmethod
    def convert_column_types(df: pd.DataFrame) -> pd.DataFrame:
        """Convert the types of the DataFrame to the correct types for the Feature Store"""
        for column in list(df.select_dtypes(include="bool").columns):
            df[column] = df[column].astype("int32")
        for column in list(df.select_dtypes(include="category").columns):
            df[column] = df[column].astype("str")

        # Select all columns that are of datetime dtype and convert them to ISO-8601 strings
        for column in [col for col in df.columns if pd.api.types.is_datetime64_any_dtype(df[col])]:
            df[column] = df[column].map(datetime_to_iso8601).astype("string")

        """FIXME Not sure we need these conversions
        for column in list(df.select_dtypes(include="object").columns):
            df[column] = df[column].astype("string")
        for column in list(df.select_dtypes(include=[pd.Int64Dtype]).columns):
            df[column] = df[column].astype("int64")
        for column in list(df.select_dtypes(include=[pd.Float64Dtype]).columns):
            df[column] = df[column].astype("float64")
        """
        return df

    def prep_dataframe(self):
        """Prep the DataFrame for Feature Store Creation"""
        self.log.info("Prep the output_df (cat_convert, convert types, and lowercase columns)...")

        # Remove any columns generated from AWS
        aws_cols = ["write_time", "api_invocation_time", "is_deleted", "event_time"]
        self.output_df = self.output_df.drop(columns=aws_cols, errors="ignore")

        # If one-hot columns are provided then one-hot encode them
        if self.one_hot_columns:
            self.output_df = self.one_hot_encode(self.output_df, self.one_hot_columns)

        # Convert columns names to lowercase, Athena will not work with uppercase column names
        if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
            for c in self.output_df.columns:
                if c != c.lower():
                    self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
            self.output_df.columns = self.output_df.columns.str.lower()

        # Make sure we have the required id and event_time columns
        self._ensure_id_column()
        self._ensure_event_time()

        # Check for a training column (Workbench uses dynamic training columns)
        if "training" in self.output_df.columns:
            self.log.important(
                """Training column detected: Since FeatureSets are read-only, Workbench creates a training view
                that can be dynamically changed. We'll use this training column to create a training view."""
            )
            self.incoming_hold_out_ids = self.output_df[~self.output_df["training"]][self.id_column].tolist()
            self.output_df = self.output_df.drop(columns=["training"])

        # We need to convert some of our column types to the correct types
        # Feature Store only supports these data types:
        # - Integral
        # - Fractional
        # - String (timestamp/datetime types need to be converted to string)
        self.output_df = self.convert_column_types(self.output_df)

    def create_feature_group(self):
        """Create a Feature Group, load our Feature Definitions, and wait for it to be ready"""

        # Create a Feature Group and load our Feature Definitions
        my_feature_group = FeatureGroup(name=self.output_uuid, sagemaker_session=self.sm_session)
        my_feature_group.load_feature_definitions(data_frame=self.output_df)

        # Create the Output S3 Storage Path for this Feature Set
        s3_storage_path = f"{self.feature_sets_s3_path}/{self.output_uuid}"

        # Get the metadata/tags to push into AWS
        aws_tags = self.get_aws_tags()

        # Create the Feature Group
        my_feature_group.create(
            s3_uri=s3_storage_path,
            record_identifier_name=self.id_column,
            event_time_feature_name=self.event_time_column,
            role_arn=self.workbench_role_arn,
            enable_online_store=True,
            table_format=self.table_format,
            tags=aws_tags,
        )

        # Ensure/wait for the feature group to be created
        self.ensure_feature_group_created(my_feature_group)
        return my_feature_group

    def pre_transform(self, **kwargs):
        """Pre-Transform: Delete any existing FeatureSet and Create the Feature Group"""
        self.delete_existing()
        self.output_feature_group = self.create_feature_group()

    def transform_impl(self):
        """Transform Implementation: Ingest the data into the Feature Group"""

        # Now we actually push the data into the Feature Group (called ingestion)
        self.log.important(f"Ingesting rows into Feature Group {self.output_uuid}...")
        ingest_manager = self.output_feature_group.ingest(self.output_df, max_workers=8, max_processes=2, wait=False)
        try:
            ingest_manager.wait()
        except IngestionError as exc:
            self.log.warning(f"Some rows had an ingesting error: {exc}")

        # Report on any rows that failed to ingest
        if ingest_manager.failed_rows:
            self.log.warning(f"Number of Failed Rows: {len(ingest_manager.failed_rows)}")

            # FIXME: This may or may not give us the correct rows
            # If any index is greater then the number of rows, then the index needs
            # to be converted to a relative index in our current output_df
            df_rows = len(self.output_df)
            relative_indexes = [idx - df_rows if idx >= df_rows else idx for idx in ingest_manager.failed_rows]
            failed_data = self.output_df.iloc[relative_indexes]
            for idx, row in failed_data.iterrows():
                self.log.warning(f"Failed Row {idx}: {row.to_dict()}")

        # Keep track of the number of rows we expect to be ingested
        self.expected_rows += len(self.output_df) - len(ingest_manager.failed_rows)
        self.log.info(f"Added rows: {len(self.output_df)}")
        self.log.info(f"Failed rows: {len(ingest_manager.failed_rows)}")
        self.log.info(f"Total rows ingested: {self.expected_rows}")

        # We often need to wait a bit for AWS to fully register the new Feature Group
        self.log.important(f"Waiting for AWS to register the new Feature Group {self.output_uuid}...")
        time.sleep(30)

    def post_transform(self, **kwargs):
        """Post-Transform: Populating Offline Storage and onboard()"""
        self.log.info("Post-Transform: Populating Offline Storage and onboard()...")

        # Feature Group Ingestion takes a while, so we need to wait for it to finish
        self.output_feature_set = FeatureSetCore(self.output_uuid)
        self.log.important("Waiting for AWS Feature Group Offline storage to be ready...")
        self.log.important("This will often take 10-20 minutes...go have coffee or lunch :)")
        self.output_feature_set.set_status("initializing")
        self.wait_for_rows(self.expected_rows)

        # Call the FeatureSet onboard method to compute a bunch of EDA stuff
        self.output_feature_set.onboard()

        # Set Hold Out Ids (if we got them during creation)
        if self.incoming_hold_out_ids:
            self.output_feature_set.set_training_holdouts(self.id_column, self.incoming_hold_out_ids)

    def ensure_feature_group_created(self, feature_group):
        status = feature_group.describe().get("FeatureGroupStatus")
        while status == "Creating":
            self.log.debug("FeatureSet being Created...")
            time.sleep(5)
            status = feature_group.describe().get("FeatureGroupStatus")
        if status == "Created":
            self.log.info(f"FeatureSet {feature_group.name} successfully created")
        else:
            self.log.critical(f"FeatureSet {feature_group.name} creation failed with status: {status}")

    def wait_for_rows(self, expected_rows: int):
        """Wait for AWS Feature Group to fully populate the Offline Storage"""
        rows = self.output_feature_set.num_rows()

        # Wait for the rows to be populated
        self.log.info(f"Waiting for AWS Feature Group {self.output_uuid} Offline Storage...")
        max_retry = 20
        num_retry = 0
        sleep_time = 30
        while rows < expected_rows and num_retry < max_retry:
            num_retry += 1
            time.sleep(sleep_time)
            rows = self.output_feature_set.num_rows()
            self.log.info(f"Offline Storage {self.output_uuid}: {rows} rows out of {expected_rows}")
        if rows == expected_rows:
            self.log.important(f"Success: Reached Expected Rows ({rows} rows)...")
        else:
            msg = f"Did not reach expected rows ({rows}/{expected_rows})...(probably AWS lag)"
            self.log.warning(msg)
            self.log.monitor(msg)

__init__(output_uuid)

PandasToFeatures Initialization

Parameters:

Name Type Description Default
output_uuid str

The UUID of the FeatureSet to create

required
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def __init__(self, output_uuid: str):
    """PandasToFeatures Initialization

    Args:
        output_uuid (str): The UUID of the FeatureSet to create
    """

    # Make sure the output_uuid is a valid name
    Artifact.is_name_valid(output_uuid)

    # Call superclass init
    super().__init__("DataFrame", output_uuid)

    # Set up all my instance attributes
    self.input_type = TransformInput.PANDAS_DF
    self.output_type = TransformOutput.FEATURE_SET
    self.id_column = None
    self.event_time_column = None
    self.one_hot_columns = []
    self.categorical_dtypes = {}  # Used for streaming/chunking
    self.output_df = None
    self.table_format = TableFormatEnum.ICEBERG
    self.incoming_hold_out_ids = None

    # These will be set in the transform method
    self.output_feature_group = None
    self.output_feature_set = None
    self.expected_rows = 0

convert_column_types(df) staticmethod

Convert the types of the DataFrame to the correct types for the Feature Store

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
@staticmethod
def convert_column_types(df: pd.DataFrame) -> pd.DataFrame:
    """Convert the types of the DataFrame to the correct types for the Feature Store"""
    for column in list(df.select_dtypes(include="bool").columns):
        df[column] = df[column].astype("int32")
    for column in list(df.select_dtypes(include="category").columns):
        df[column] = df[column].astype("str")

    # Select all columns that are of datetime dtype and convert them to ISO-8601 strings
    for column in [col for col in df.columns if pd.api.types.is_datetime64_any_dtype(df[col])]:
        df[column] = df[column].map(datetime_to_iso8601).astype("string")

    """FIXME Not sure we need these conversions
    for column in list(df.select_dtypes(include="object").columns):
        df[column] = df[column].astype("string")
    for column in list(df.select_dtypes(include=[pd.Int64Dtype]).columns):
        df[column] = df[column].astype("int64")
    for column in list(df.select_dtypes(include=[pd.Float64Dtype]).columns):
        df[column] = df[column].astype("float64")
    """
    return df

convert_columns_to_categorical(columns)

Convert column to Categorical type

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def convert_columns_to_categorical(self, columns: list):
    """Convert column to Categorical type"""
    for feature in columns:
        if feature not in [self.event_time_column, self.id_column]:
            unique_values = self.output_df[feature].nunique()
            if 1 < unique_values < 10:
                self.log.important(f"Converting column {feature} to categorical (unique {unique_values})")
                self.output_df[feature] = self.output_df[feature].astype("category")
            else:
                self.log.warning(f"Column {feature} too many unique values {unique_values} skipping...")

create_feature_group()

Create a Feature Group, load our Feature Definitions, and wait for it to be ready

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def create_feature_group(self):
    """Create a Feature Group, load our Feature Definitions, and wait for it to be ready"""

    # Create a Feature Group and load our Feature Definitions
    my_feature_group = FeatureGroup(name=self.output_uuid, sagemaker_session=self.sm_session)
    my_feature_group.load_feature_definitions(data_frame=self.output_df)

    # Create the Output S3 Storage Path for this Feature Set
    s3_storage_path = f"{self.feature_sets_s3_path}/{self.output_uuid}"

    # Get the metadata/tags to push into AWS
    aws_tags = self.get_aws_tags()

    # Create the Feature Group
    my_feature_group.create(
        s3_uri=s3_storage_path,
        record_identifier_name=self.id_column,
        event_time_feature_name=self.event_time_column,
        role_arn=self.workbench_role_arn,
        enable_online_store=True,
        table_format=self.table_format,
        tags=aws_tags,
    )

    # Ensure/wait for the feature group to be created
    self.ensure_feature_group_created(my_feature_group)
    return my_feature_group

manual_categorical_converter()

Used for Streaming: Convert object and string types to Categorical

Note

This method is used for streaming/chunking. You can set the categorical_dtypes attribute to a dictionary of column names and their respective categorical types.

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def manual_categorical_converter(self):
    """Used for Streaming: Convert object and string types to Categorical

    Note:
        This method is used for streaming/chunking. You can set the
        categorical_dtypes attribute to a dictionary of column names and
        their respective categorical types.
    """
    for column, cat_d_type in self.categorical_dtypes.items():
        self.output_df[column] = self.output_df[column].astype(cat_d_type)

one_hot_encode(df, one_hot_columns)

One Hot Encoding for Categorical Columns with additional column name management

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to process

required
one_hot_columns list

The list of columns to one-hot encode

required

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with one-hot encoded columns

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def one_hot_encode(self, df, one_hot_columns: list) -> pd.DataFrame:
    """One Hot Encoding for Categorical Columns with additional column name management

    Args:
        df (pd.DataFrame): The DataFrame to process
        one_hot_columns (list): The list of columns to one-hot encode

    Returns:
        pd.DataFrame: The DataFrame with one-hot encoded columns
    """

    # Grab the current list of columns
    current_columns = list(df.columns)

    # Now convert the list of columns into Categorical and then One-Hot Encode
    self.convert_columns_to_categorical(one_hot_columns)
    self.log.important(f"One-Hot encoding columns: {one_hot_columns}")
    df = pd.get_dummies(df, columns=one_hot_columns)

    # Compute the new columns generated by get_dummies
    new_columns = list(set(df.columns) - set(current_columns))
    self.log.important(f"New columns generated: {new_columns}")

    # Convert new columns to int32
    df[new_columns] = df[new_columns].astype("int32")

    # For the new columns we're going to shorten the names
    renamed_columns = {col: self.process_column_name(col) for col in new_columns}

    # Rename the columns in the DataFrame
    df.rename(columns=renamed_columns, inplace=True)

    return df

post_transform(**kwargs)

Post-Transform: Populating Offline Storage and onboard()

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def post_transform(self, **kwargs):
    """Post-Transform: Populating Offline Storage and onboard()"""
    self.log.info("Post-Transform: Populating Offline Storage and onboard()...")

    # Feature Group Ingestion takes a while, so we need to wait for it to finish
    self.output_feature_set = FeatureSetCore(self.output_uuid)
    self.log.important("Waiting for AWS Feature Group Offline storage to be ready...")
    self.log.important("This will often take 10-20 minutes...go have coffee or lunch :)")
    self.output_feature_set.set_status("initializing")
    self.wait_for_rows(self.expected_rows)

    # Call the FeatureSet onboard method to compute a bunch of EDA stuff
    self.output_feature_set.onboard()

    # Set Hold Out Ids (if we got them during creation)
    if self.incoming_hold_out_ids:
        self.output_feature_set.set_training_holdouts(self.id_column, self.incoming_hold_out_ids)

pre_transform(**kwargs)

Pre-Transform: Delete any existing FeatureSet and Create the Feature Group

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def pre_transform(self, **kwargs):
    """Pre-Transform: Delete any existing FeatureSet and Create the Feature Group"""
    self.delete_existing()
    self.output_feature_group = self.create_feature_group()

prep_dataframe()

Prep the DataFrame for Feature Store Creation

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def prep_dataframe(self):
    """Prep the DataFrame for Feature Store Creation"""
    self.log.info("Prep the output_df (cat_convert, convert types, and lowercase columns)...")

    # Remove any columns generated from AWS
    aws_cols = ["write_time", "api_invocation_time", "is_deleted", "event_time"]
    self.output_df = self.output_df.drop(columns=aws_cols, errors="ignore")

    # If one-hot columns are provided then one-hot encode them
    if self.one_hot_columns:
        self.output_df = self.one_hot_encode(self.output_df, self.one_hot_columns)

    # Convert columns names to lowercase, Athena will not work with uppercase column names
    if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
        for c in self.output_df.columns:
            if c != c.lower():
                self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
        self.output_df.columns = self.output_df.columns.str.lower()

    # Make sure we have the required id and event_time columns
    self._ensure_id_column()
    self._ensure_event_time()

    # Check for a training column (Workbench uses dynamic training columns)
    if "training" in self.output_df.columns:
        self.log.important(
            """Training column detected: Since FeatureSets are read-only, Workbench creates a training view
            that can be dynamically changed. We'll use this training column to create a training view."""
        )
        self.incoming_hold_out_ids = self.output_df[~self.output_df["training"]][self.id_column].tolist()
        self.output_df = self.output_df.drop(columns=["training"])

    # We need to convert some of our column types to the correct types
    # Feature Store only supports these data types:
    # - Integral
    # - Fractional
    # - String (timestamp/datetime types need to be converted to string)
    self.output_df = self.convert_column_types(self.output_df)

process_column_name(column, shorten=False)

Call various methods to make sure the column is ready for Feature Store Args: column (str): The column name to process shorten (bool): Should we shorten the column name? (default: False)

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def process_column_name(self, column: str, shorten: bool = False) -> str:
    """Call various methods to make sure the column is ready for Feature Store
    Args:
        column (str): The column name to process
        shorten (bool): Should we shorten the column name? (default: False)
    """
    self.log.debug(f"Processing column {column}...")

    # Make sure the column name is valid
    column = self.sanitize_column_name(column)

    # Make sure the column name isn't too long
    if shorten:
        column = self.shorten_column_name(column)

    return column

set_input(input_df, id_column=None, event_time_column=None, one_hot_columns=None)

Set the Input DataFrame for this Transform

Parameters:

Name Type Description Default
input_df DataFrame

The input DataFrame.

required
id_column str

The ID column (if not specified, an 'auto_id' will be generated).

None
event_time_column str

The name of the event time column (default: None).

None
one_hot_columns list

The list of columns to one-hot encode (default: None).

None
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def set_input(self, input_df: pd.DataFrame, id_column=None, event_time_column=None, one_hot_columns=None):
    """Set the Input DataFrame for this Transform

    Args:
        input_df (pd.DataFrame): The input DataFrame.
        id_column (str, optional): The ID column (if not specified, an 'auto_id' will be generated).
        event_time_column (str, optional): The name of the event time column (default: None).
        one_hot_columns (list, optional): The list of columns to one-hot encode (default: None).
    """
    self.id_column = id_column
    self.event_time_column = event_time_column
    self.output_df = input_df.copy()
    self.one_hot_columns = one_hot_columns or []

    # Now Prepare the DataFrame for its journey into an AWS FeatureGroup
    self.prep_dataframe()

transform_impl()

Transform Implementation: Ingest the data into the Feature Group

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def transform_impl(self):
    """Transform Implementation: Ingest the data into the Feature Group"""

    # Now we actually push the data into the Feature Group (called ingestion)
    self.log.important(f"Ingesting rows into Feature Group {self.output_uuid}...")
    ingest_manager = self.output_feature_group.ingest(self.output_df, max_workers=8, max_processes=2, wait=False)
    try:
        ingest_manager.wait()
    except IngestionError as exc:
        self.log.warning(f"Some rows had an ingesting error: {exc}")

    # Report on any rows that failed to ingest
    if ingest_manager.failed_rows:
        self.log.warning(f"Number of Failed Rows: {len(ingest_manager.failed_rows)}")

        # FIXME: This may or may not give us the correct rows
        # If any index is greater then the number of rows, then the index needs
        # to be converted to a relative index in our current output_df
        df_rows = len(self.output_df)
        relative_indexes = [idx - df_rows if idx >= df_rows else idx for idx in ingest_manager.failed_rows]
        failed_data = self.output_df.iloc[relative_indexes]
        for idx, row in failed_data.iterrows():
            self.log.warning(f"Failed Row {idx}: {row.to_dict()}")

    # Keep track of the number of rows we expect to be ingested
    self.expected_rows += len(self.output_df) - len(ingest_manager.failed_rows)
    self.log.info(f"Added rows: {len(self.output_df)}")
    self.log.info(f"Failed rows: {len(ingest_manager.failed_rows)}")
    self.log.info(f"Total rows ingested: {self.expected_rows}")

    # We often need to wait a bit for AWS to fully register the new Feature Group
    self.log.important(f"Waiting for AWS to register the new Feature Group {self.output_uuid}...")
    time.sleep(30)

wait_for_rows(expected_rows)

Wait for AWS Feature Group to fully populate the Offline Storage

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def wait_for_rows(self, expected_rows: int):
    """Wait for AWS Feature Group to fully populate the Offline Storage"""
    rows = self.output_feature_set.num_rows()

    # Wait for the rows to be populated
    self.log.info(f"Waiting for AWS Feature Group {self.output_uuid} Offline Storage...")
    max_retry = 20
    num_retry = 0
    sleep_time = 30
    while rows < expected_rows and num_retry < max_retry:
        num_retry += 1
        time.sleep(sleep_time)
        rows = self.output_feature_set.num_rows()
        self.log.info(f"Offline Storage {self.output_uuid}: {rows} rows out of {expected_rows}")
    if rows == expected_rows:
        self.log.important(f"Success: Reached Expected Rows ({rows} rows)...")
    else:
        msg = f"Did not reach expected rows ({rows}/{expected_rows})...(probably AWS lag)"
        self.log.warning(msg)
        self.log.monitor(msg)

PandasToFeaturesChunked

Bases: Transform

PandasToFeaturesChunked: Class to manage a bunch of chunked Pandas DataFrames into a FeatureSet

Common Usage
to_features = PandasToFeaturesChunked(output_uuid, id_column="id"/None, event_time_column="date"/None)
to_features.set_output_tags(["abalone", "public", "whatever"])
cat_column_info = {"sex": ["M", "F", "I"]}
to_features.set_categorical_info(cat_column_info)
to_features.add_chunk(df)
to_features.add_chunk(df)
...
to_features.finalize()
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
class PandasToFeaturesChunked(Transform):
    """PandasToFeaturesChunked:  Class to manage a bunch of chunked Pandas DataFrames into a FeatureSet

    Common Usage:
        ```python
        to_features = PandasToFeaturesChunked(output_uuid, id_column="id"/None, event_time_column="date"/None)
        to_features.set_output_tags(["abalone", "public", "whatever"])
        cat_column_info = {"sex": ["M", "F", "I"]}
        to_features.set_categorical_info(cat_column_info)
        to_features.add_chunk(df)
        to_features.add_chunk(df)
        ...
        to_features.finalize()
        ```
    """

    def __init__(self, output_uuid: str, id_column=None, event_time_column=None):
        """PandasToFeaturesChunked Initialization"""

        # Make sure the output_uuid is a valid name
        Artifact.is_name_valid(output_uuid)

        # Call superclass init
        super().__init__("DataFrame", output_uuid)

        # Set up all my instance attributes
        self.id_column = id_column
        self.event_time_column = event_time_column
        self.first_chunk = None
        self.pandas_to_features = PandasToFeatures(output_uuid)

    def set_categorical_info(self, cat_column_info: dict[list[str]]):
        """Set the Categorical Columns
        Args:
            cat_column_info (dict[list[str]]): Dictionary of categorical columns and their possible values
        """

        # Create the CategoricalDtypes
        cat_d_types = {}
        for col, vals in cat_column_info.items():
            cat_d_types[col] = CategoricalDtype(categories=vals)

        # Now set the CategoricalDtypes on our underlying PandasToFeatures
        self.pandas_to_features.categorical_dtypes = cat_d_types

    def add_chunk(self, chunk_df: pd.DataFrame):
        """Add a Chunk of Data to the FeatureSet"""

        # Is this the first chunk? If so we need to run the pre_transform
        if self.first_chunk is None:
            self.log.info(f"Adding first chunk {chunk_df.shape}...")
            self.first_chunk = chunk_df
            self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
            self.pandas_to_features.pre_transform()
            self.pandas_to_features.transform_impl()
        else:
            self.log.info(f"Adding chunk {chunk_df.shape}...")
            self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
            self.pandas_to_features.transform_impl()

    def pre_transform(self, **kwargs):
        """Pre-Transform: Create the Feature Group with Chunked Data"""

        # Loading data into a Feature Group takes a while, so set status to loading
        FeatureSetCore(self.output_uuid).set_status("loading")

    def transform_impl(self):
        """Required implementation of the Transform interface"""
        self.log.warning("PandasToFeaturesChunked.transform_impl() called.  This is a no-op.")

    def post_transform(self, **kwargs):
        """Post-Transform: Any Post Transform Steps"""
        self.pandas_to_features.post_transform()

__init__(output_uuid, id_column=None, event_time_column=None)

PandasToFeaturesChunked Initialization

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def __init__(self, output_uuid: str, id_column=None, event_time_column=None):
    """PandasToFeaturesChunked Initialization"""

    # Make sure the output_uuid is a valid name
    Artifact.is_name_valid(output_uuid)

    # Call superclass init
    super().__init__("DataFrame", output_uuid)

    # Set up all my instance attributes
    self.id_column = id_column
    self.event_time_column = event_time_column
    self.first_chunk = None
    self.pandas_to_features = PandasToFeatures(output_uuid)

add_chunk(chunk_df)

Add a Chunk of Data to the FeatureSet

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def add_chunk(self, chunk_df: pd.DataFrame):
    """Add a Chunk of Data to the FeatureSet"""

    # Is this the first chunk? If so we need to run the pre_transform
    if self.first_chunk is None:
        self.log.info(f"Adding first chunk {chunk_df.shape}...")
        self.first_chunk = chunk_df
        self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
        self.pandas_to_features.pre_transform()
        self.pandas_to_features.transform_impl()
    else:
        self.log.info(f"Adding chunk {chunk_df.shape}...")
        self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
        self.pandas_to_features.transform_impl()

post_transform(**kwargs)

Post-Transform: Any Post Transform Steps

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def post_transform(self, **kwargs):
    """Post-Transform: Any Post Transform Steps"""
    self.pandas_to_features.post_transform()

pre_transform(**kwargs)

Pre-Transform: Create the Feature Group with Chunked Data

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def pre_transform(self, **kwargs):
    """Pre-Transform: Create the Feature Group with Chunked Data"""

    # Loading data into a Feature Group takes a while, so set status to loading
    FeatureSetCore(self.output_uuid).set_status("loading")

set_categorical_info(cat_column_info)

Set the Categorical Columns Args: cat_column_info (dict[list[str]]): Dictionary of categorical columns and their possible values

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def set_categorical_info(self, cat_column_info: dict[list[str]]):
    """Set the Categorical Columns
    Args:
        cat_column_info (dict[list[str]]): Dictionary of categorical columns and their possible values
    """

    # Create the CategoricalDtypes
    cat_d_types = {}
    for col, vals in cat_column_info.items():
        cat_d_types[col] = CategoricalDtype(categories=vals)

    # Now set the CategoricalDtypes on our underlying PandasToFeatures
    self.pandas_to_features.categorical_dtypes = cat_d_types

transform_impl()

Required implementation of the Transform interface

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def transform_impl(self):
    """Required implementation of the Transform interface"""
    self.log.warning("PandasToFeaturesChunked.transform_impl() called.  This is a no-op.")