Skip to content

Pandas Transforms

API Classes

The API Classes will often provide helpful methods that give you a DataFrame (data_source.query() for instance), so always check out the API Classes first.

These Transforms will give you the ultimate in customization and flexibility when creating AWS Machine Learning Pipelines. Grab a Pandas DataFrame from a DataSource or FeatureSet process in whatever way for your use case and simply create another Workbench DataSource or FeatureSet from the resulting DataFrame.

Lots of Options:

Not for Large Data

Pandas Transforms can't handle large datasets (> 4 GigaBytes). For doing transforma on large data see our Heavy Transforms

  • S3 --> DF --> DataSource
  • DataSource --> DF --> DataSource
  • DataSoruce --> DF --> FeatureSet
  • Get Creative!

Welcome to the Workbench Pandas Transform Classes

These classes provide low-level APIs for using Pandas DataFrames

  • DataToPandas: Pull a dataframe from a Workbench DataSource
  • FeaturesToPandas: Pull a dataframe from a Workbench FeatureSet
  • PandasToData: Create a Workbench DataSource using a Pandas DataFrame as the source
  • PandasToFeatures: Create a Workbench FeatureSet using a Pandas DataFrame as the source
  • PandasToFeaturesChunked: Create a Workbench FeatureSet using a Chunked/Streaming Pandas DataFrame as the source

DataToPandas

Bases: Transform

DataToPandas: Class to transform a Data Source into a Pandas DataFrame

Common Usage
data_to_df = DataToPandas(data_source_name)
data_to_df.transform(query=<optional SQL query to filter/process data>)
data_to_df.transform(max_rows=<optional max rows to sample>)
my_df = data_to_df.get_output()

Note: query is the best way to use this class, so use it :)
Source code in src/workbench/core/transforms/pandas_transforms/data_to_pandas.py
class DataToPandas(Transform):
    """DataToPandas: Class to transform a Data Source into a Pandas DataFrame

    Common Usage:
        ```python
        data_to_df = DataToPandas(data_source_name)
        data_to_df.transform(query=<optional SQL query to filter/process data>)
        data_to_df.transform(max_rows=<optional max rows to sample>)
        my_df = data_to_df.get_output()

        Note: query is the best way to use this class, so use it :)
        ```
    """

    def __init__(self, input_name: str):
        """DataToPandas Initialization"""

        # Call superclass init
        super().__init__(input_name, "DataFrame")

        # Set up all my instance attributes
        self.input_type = TransformInput.DATA_SOURCE
        self.output_type = TransformOutput.PANDAS_DF
        self.output_df = None

    def transform_impl(self, query: str = None, max_rows=100000):
        """Convert the DataSource into a Pandas DataFrame
        Args:
            query(str): The query to run against the DataSource (default: None)
            max_rows(int): The maximum number of rows to return (default: 100000)
        """

        # Grab the Input (Data Source)
        input_data = DataSourceFactory(self.input_name)
        if not input_data.exists():
            self.log.critical(f"Data Check on {self.input_name} failed!")
            return

        # If a query is provided, that overrides the queries below
        if query:
            self.log.info(f"Querying {self.input_name} with {query}...")
            self.output_df = input_data.query(query)
            return

        # If the data source has more rows than max_rows, do a sample query
        num_rows = input_data.num_rows()
        if num_rows > max_rows:
            percentage = round(max_rows * 100.0 / num_rows)
            self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
            query = f"SELECT * FROM {self.input_name} TABLESAMPLE BERNOULLI({percentage})"
        else:
            query = f"SELECT * FROM {self.input_name}"

        # Mark the transform as complete and set the output DataFrame
        self.output_df = input_data.query(query)

    def post_transform(self, **kwargs):
        """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
        self.log.info("Post-Transform: Checking Pandas DataFrame...")
        self.log.info(f"DataFrame Shape: {self.output_df.shape}")

    def get_output(self) -> pd.DataFrame:
        """Get the DataFrame Output from this Transform"""
        return self.output_df

__init__(input_name)

DataToPandas Initialization

Source code in src/workbench/core/transforms/pandas_transforms/data_to_pandas.py
def __init__(self, input_name: str):
    """DataToPandas Initialization"""

    # Call superclass init
    super().__init__(input_name, "DataFrame")

    # Set up all my instance attributes
    self.input_type = TransformInput.DATA_SOURCE
    self.output_type = TransformOutput.PANDAS_DF
    self.output_df = None

get_output()

Get the DataFrame Output from this Transform

Source code in src/workbench/core/transforms/pandas_transforms/data_to_pandas.py
def get_output(self) -> pd.DataFrame:
    """Get the DataFrame Output from this Transform"""
    return self.output_df

post_transform(**kwargs)

Post-Transform: Any checks on the Pandas DataFrame that need to be done

Source code in src/workbench/core/transforms/pandas_transforms/data_to_pandas.py
def post_transform(self, **kwargs):
    """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
    self.log.info("Post-Transform: Checking Pandas DataFrame...")
    self.log.info(f"DataFrame Shape: {self.output_df.shape}")

transform_impl(query=None, max_rows=100000)

Convert the DataSource into a Pandas DataFrame Args: query(str): The query to run against the DataSource (default: None) max_rows(int): The maximum number of rows to return (default: 100000)

Source code in src/workbench/core/transforms/pandas_transforms/data_to_pandas.py
def transform_impl(self, query: str = None, max_rows=100000):
    """Convert the DataSource into a Pandas DataFrame
    Args:
        query(str): The query to run against the DataSource (default: None)
        max_rows(int): The maximum number of rows to return (default: 100000)
    """

    # Grab the Input (Data Source)
    input_data = DataSourceFactory(self.input_name)
    if not input_data.exists():
        self.log.critical(f"Data Check on {self.input_name} failed!")
        return

    # If a query is provided, that overrides the queries below
    if query:
        self.log.info(f"Querying {self.input_name} with {query}...")
        self.output_df = input_data.query(query)
        return

    # If the data source has more rows than max_rows, do a sample query
    num_rows = input_data.num_rows()
    if num_rows > max_rows:
        percentage = round(max_rows * 100.0 / num_rows)
        self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
        query = f"SELECT * FROM {self.input_name} TABLESAMPLE BERNOULLI({percentage})"
    else:
        query = f"SELECT * FROM {self.input_name}"

    # Mark the transform as complete and set the output DataFrame
    self.output_df = input_data.query(query)

FeaturesToPandas

Bases: Transform

FeaturesToPandas: Class to transform a FeatureSet into a Pandas DataFrame

Common Usage
feature_to_df = FeaturesToPandas(feature_set_name)
feature_to_df.transform(max_rows=<optional max rows to sample>)
my_df = feature_to_df.get_output()
Source code in src/workbench/core/transforms/pandas_transforms/features_to_pandas.py
class FeaturesToPandas(Transform):
    """FeaturesToPandas: Class to transform a FeatureSet into a Pandas DataFrame

    Common Usage:
        ```python
        feature_to_df = FeaturesToPandas(feature_set_name)
        feature_to_df.transform(max_rows=<optional max rows to sample>)
        my_df = feature_to_df.get_output()
        ```
    """

    def __init__(self, feature_set_name: str):
        """FeaturesToPandas Initialization"""

        # Call superclass init
        super().__init__(input_name=feature_set_name, output_name="DataFrame")

        # Set up all my instance attributes
        self.input_type = TransformInput.FEATURE_SET
        self.output_type = TransformOutput.PANDAS_DF
        self.output_df = None
        self.transform_run = False

    def transform_impl(self, max_rows=100000):
        """Convert the FeatureSet into a Pandas DataFrame"""

        # Grab the Input (Feature Set)
        input_data = FeatureSetCore(self.input_name)
        if not input_data.exists():
            self.log.critical(f"Feature Set Check on {self.input_name} failed!")
            return

        # Grab the table for this Feature Set
        table = input_data.athena_table

        # Get the list of columns (and subtract metadata columns that might get added)
        columns = input_data.columns
        filter_columns = ["write_time", "api_invocation_time", "is_deleted"]
        columns = ", ".join([x for x in columns if x not in filter_columns])

        # Get the number of rows in the Feature Set
        num_rows = input_data.num_rows()

        # If the data source has more rows than max_rows, do a sample query
        if num_rows > max_rows:
            percentage = round(max_rows * 100.0 / num_rows)
            self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
            query = f'SELECT {columns} FROM "{table}" TABLESAMPLE BERNOULLI({percentage})'
        else:
            query = f'SELECT {columns} FROM "{table}"'

        # Mark the transform as complete and set the output DataFrame
        self.transform_run = True
        self.output_df = input_data.query(query)

    def post_transform(self, **kwargs):
        """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
        self.log.info("Post-Transform: Checking Pandas DataFrame...")
        self.log.info(f"DataFrame Shape: {self.output_df.shape}")

    def get_output(self) -> pd.DataFrame:
        """Get the DataFrame Output from this Transform"""
        if not self.transform_run:
            self.transform()
        return self.output_df

__init__(feature_set_name)

FeaturesToPandas Initialization

Source code in src/workbench/core/transforms/pandas_transforms/features_to_pandas.py
def __init__(self, feature_set_name: str):
    """FeaturesToPandas Initialization"""

    # Call superclass init
    super().__init__(input_name=feature_set_name, output_name="DataFrame")

    # Set up all my instance attributes
    self.input_type = TransformInput.FEATURE_SET
    self.output_type = TransformOutput.PANDAS_DF
    self.output_df = None
    self.transform_run = False

get_output()

Get the DataFrame Output from this Transform

Source code in src/workbench/core/transforms/pandas_transforms/features_to_pandas.py
def get_output(self) -> pd.DataFrame:
    """Get the DataFrame Output from this Transform"""
    if not self.transform_run:
        self.transform()
    return self.output_df

post_transform(**kwargs)

Post-Transform: Any checks on the Pandas DataFrame that need to be done

Source code in src/workbench/core/transforms/pandas_transforms/features_to_pandas.py
def post_transform(self, **kwargs):
    """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
    self.log.info("Post-Transform: Checking Pandas DataFrame...")
    self.log.info(f"DataFrame Shape: {self.output_df.shape}")

transform_impl(max_rows=100000)

Convert the FeatureSet into a Pandas DataFrame

Source code in src/workbench/core/transforms/pandas_transforms/features_to_pandas.py
def transform_impl(self, max_rows=100000):
    """Convert the FeatureSet into a Pandas DataFrame"""

    # Grab the Input (Feature Set)
    input_data = FeatureSetCore(self.input_name)
    if not input_data.exists():
        self.log.critical(f"Feature Set Check on {self.input_name} failed!")
        return

    # Grab the table for this Feature Set
    table = input_data.athena_table

    # Get the list of columns (and subtract metadata columns that might get added)
    columns = input_data.columns
    filter_columns = ["write_time", "api_invocation_time", "is_deleted"]
    columns = ", ".join([x for x in columns if x not in filter_columns])

    # Get the number of rows in the Feature Set
    num_rows = input_data.num_rows()

    # If the data source has more rows than max_rows, do a sample query
    if num_rows > max_rows:
        percentage = round(max_rows * 100.0 / num_rows)
        self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
        query = f'SELECT {columns} FROM "{table}" TABLESAMPLE BERNOULLI({percentage})'
    else:
        query = f'SELECT {columns} FROM "{table}"'

    # Mark the transform as complete and set the output DataFrame
    self.transform_run = True
    self.output_df = input_data.query(query)

PandasToData

Bases: Transform

PandasToData: Class to publish a Pandas DataFrame as a DataSource

Common Usage
df_to_data = PandasToData(output_name)
df_to_data.set_output_tags(["test", "small"])
df_to_data.set_input(test_df)
df_to_data.transform()
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
class PandasToData(Transform):
    """PandasToData: Class to publish a Pandas DataFrame as a DataSource

    Common Usage:
        ```python
        df_to_data = PandasToData(output_name)
        df_to_data.set_output_tags(["test", "small"])
        df_to_data.set_input(test_df)
        df_to_data.transform()
        ```
    """

    def __init__(self, output_name: str, output_format: str = "parquet", catalog_db: str = "workbench"):
        """PandasToData Initialization
        Args:
            output_name (str): The Name of the DataSource to create
            output_format (str): The file format to store the S3 object data in (default: "parquet")
            catalog_db (str): The AWS Data Catalog Database to use (default: "workbench")
        """

        # Make sure the output_name is a valid name/id
        Artifact.is_name_valid(output_name)

        # Call superclass init
        super().__init__("DataFrame", output_name, catalog_db)

        # Set up all my instance attributes
        self.input_type = TransformInput.PANDAS_DF
        self.output_type = TransformOutput.DATA_SOURCE
        self.output_df = None

        # Give a message that Parquet is best in most cases
        if output_format != "parquet":
            self.log.warning("Parquet format works the best in most cases please consider using it")
        self.output_format = output_format

    def set_input(self, input_df: pd.DataFrame):
        """Set the DataFrame Input for this Transform"""
        self.output_df = input_df.copy()

    def delete_existing(self):
        # Delete the existing FeatureSet if it exists
        self.log.info(f"Deleting the {self.output_name} DataSource...")
        AthenaSource.managed_delete(self.output_name)
        time.sleep(1)

    def convert_object_to_string(self, df: pd.DataFrame) -> pd.DataFrame:
        """Try to automatically convert object columns to string columns"""
        for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
            try:
                df[c] = df[c].astype("string")
                df[c] = df[c].str.replace("'", '"')  # This is for nested JSON
            except (ParserError, ValueError, TypeError):
                self.log.info(f"Column {c} could not be converted to string...")
        return df

    def convert_object_to_datetime(self, df: pd.DataFrame) -> pd.DataFrame:
        """Try to automatically convert object columns to datetime or string columns"""
        for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
            try:
                df[c] = pd.to_datetime(df[c])
            except (ParserError, ValueError, TypeError):
                self.log.debug(f"Column {c} could not be converted to datetime...")
        return df

    @staticmethod
    def convert_datetime_columns(df: pd.DataFrame) -> pd.DataFrame:
        """Convert datetime columns to ISO-8601 string"""
        datetime_type = ["datetime", "datetime64", "datetime64[ns]", "datetimetz"]
        for c in df.select_dtypes(include=datetime_type).columns:
            df[c] = df[c].map(datetime_to_iso8601)
            df[c] = df[c].astype(pd.StringDtype())
        return df

    def pre_transform(self, **kwargs):
        """Pre-Transform: Delete the existing DataSource if it exists"""
        self.delete_existing()

    def transform_impl(self, overwrite: bool = True):
        """Convert the Pandas DataFrame into Parquet Format in the Workbench S3 Bucket, and
        store the information about the data to the AWS Data Catalog workbench database

        Args:
            overwrite (bool): Overwrite the existing data in the Workbench S3 Bucket (default: True)
        """
        self.log.info(f"DataFrame to Workbench DataSource: {self.output_name}...")

        # Set up our metadata storage
        workbench_meta = {"workbench_tags": self.output_tags}
        workbench_meta.update(self.output_meta)

        # Create the Output Parquet file S3 Storage Path
        s3_storage_path = f"{self.data_sources_s3_path}/{self.output_name}"

        # Convert columns names to lowercase, Athena will not work with uppercase column names
        if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
            for c in self.output_df.columns:
                if c != c.lower():
                    self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
            self.output_df.columns = self.output_df.columns.str.lower()

        # Convert Object Columns to String
        self.output_df = self.convert_object_to_string(self.output_df)

        # Note: Both of these conversions may not be necessary, so we're leaving them commented out
        """
        # Convert Object Columns to Datetime
        self.output_df = self.convert_object_to_datetime(self.output_df)

        # Now convert datetime columns to ISO-8601 string
        # self.output_df = self.convert_datetime_columns(self.output_df)
        """

        # Write out the DataFrame to AWS Data Catalog in either Parquet or JSONL format
        description = f"Workbench data source: {self.output_name}"
        glue_table_settings = {"description": description, "parameters": workbench_meta}
        if self.output_format == "parquet":
            wr.s3.to_parquet(
                self.output_df,
                path=s3_storage_path,
                dataset=True,
                mode="overwrite",
                database=self.data_catalog_db,
                table=self.output_name,
                filename_prefix=f"{self.output_name}_",
                boto3_session=self.boto3_session,
                partition_cols=None,
                glue_table_settings=glue_table_settings,
                sanitize_columns=False,
            )  # FIXME: Have some logic around partition columns

        # Note: In general Parquet works will for most uses cases. We recommend using Parquet
        #       You can use JSON_EXTRACT on Parquet string field, and it works great.
        elif self.output_format == "jsonl":
            self.log.warning("We recommend using Parquet format for most use cases")
            self.log.warning("If you have a use case that requires JSONL please contact Workbench support")
            self.log.warning("We'd like to understand what functionality JSONL is providing that isn't already")
            self.log.warning("provided with Parquet and JSON_EXTRACT() for your Athena Queries")
            wr.s3.to_json(
                self.output_df,
                path=s3_storage_path,
                orient="records",
                lines=True,
                date_format="iso",
                dataset=True,
                mode="overwrite",
                database=self.data_catalog_db,
                table=self.output_name,
                filename_prefix=f"{self.output_name}_",
                boto3_session=self.boto3_session,
                partition_cols=None,
                glue_table_settings=glue_table_settings,
            )
        else:
            raise ValueError(f"Unsupported file format: {self.output_format}")

    def post_transform(self, **kwargs):
        """Post-Transform: Calling onboard() fnr the DataSource"""
        self.log.info("Post-Transform: Calling onboard() for the DataSource...")

        # Onboard the DataSource
        output_data_source = DataSourceFactory(self.output_name)
        output_data_source.onboard()

__init__(output_name, output_format='parquet', catalog_db='workbench')

PandasToData Initialization Args: output_name (str): The Name of the DataSource to create output_format (str): The file format to store the S3 object data in (default: "parquet") catalog_db (str): The AWS Data Catalog Database to use (default: "workbench")

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def __init__(self, output_name: str, output_format: str = "parquet", catalog_db: str = "workbench"):
    """PandasToData Initialization
    Args:
        output_name (str): The Name of the DataSource to create
        output_format (str): The file format to store the S3 object data in (default: "parquet")
        catalog_db (str): The AWS Data Catalog Database to use (default: "workbench")
    """

    # Make sure the output_name is a valid name/id
    Artifact.is_name_valid(output_name)

    # Call superclass init
    super().__init__("DataFrame", output_name, catalog_db)

    # Set up all my instance attributes
    self.input_type = TransformInput.PANDAS_DF
    self.output_type = TransformOutput.DATA_SOURCE
    self.output_df = None

    # Give a message that Parquet is best in most cases
    if output_format != "parquet":
        self.log.warning("Parquet format works the best in most cases please consider using it")
    self.output_format = output_format

convert_datetime_columns(df) staticmethod

Convert datetime columns to ISO-8601 string

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
@staticmethod
def convert_datetime_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Convert datetime columns to ISO-8601 string"""
    datetime_type = ["datetime", "datetime64", "datetime64[ns]", "datetimetz"]
    for c in df.select_dtypes(include=datetime_type).columns:
        df[c] = df[c].map(datetime_to_iso8601)
        df[c] = df[c].astype(pd.StringDtype())
    return df

convert_object_to_datetime(df)

Try to automatically convert object columns to datetime or string columns

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def convert_object_to_datetime(self, df: pd.DataFrame) -> pd.DataFrame:
    """Try to automatically convert object columns to datetime or string columns"""
    for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
        try:
            df[c] = pd.to_datetime(df[c])
        except (ParserError, ValueError, TypeError):
            self.log.debug(f"Column {c} could not be converted to datetime...")
    return df

convert_object_to_string(df)

Try to automatically convert object columns to string columns

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def convert_object_to_string(self, df: pd.DataFrame) -> pd.DataFrame:
    """Try to automatically convert object columns to string columns"""
    for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
        try:
            df[c] = df[c].astype("string")
            df[c] = df[c].str.replace("'", '"')  # This is for nested JSON
        except (ParserError, ValueError, TypeError):
            self.log.info(f"Column {c} could not be converted to string...")
    return df

post_transform(**kwargs)

Post-Transform: Calling onboard() fnr the DataSource

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def post_transform(self, **kwargs):
    """Post-Transform: Calling onboard() fnr the DataSource"""
    self.log.info("Post-Transform: Calling onboard() for the DataSource...")

    # Onboard the DataSource
    output_data_source = DataSourceFactory(self.output_name)
    output_data_source.onboard()

pre_transform(**kwargs)

Pre-Transform: Delete the existing DataSource if it exists

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def pre_transform(self, **kwargs):
    """Pre-Transform: Delete the existing DataSource if it exists"""
    self.delete_existing()

set_input(input_df)

Set the DataFrame Input for this Transform

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def set_input(self, input_df: pd.DataFrame):
    """Set the DataFrame Input for this Transform"""
    self.output_df = input_df.copy()

transform_impl(overwrite=True)

Convert the Pandas DataFrame into Parquet Format in the Workbench S3 Bucket, and store the information about the data to the AWS Data Catalog workbench database

Parameters:

Name Type Description Default
overwrite bool

Overwrite the existing data in the Workbench S3 Bucket (default: True)

True
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_data.py
def transform_impl(self, overwrite: bool = True):
    """Convert the Pandas DataFrame into Parquet Format in the Workbench S3 Bucket, and
    store the information about the data to the AWS Data Catalog workbench database

    Args:
        overwrite (bool): Overwrite the existing data in the Workbench S3 Bucket (default: True)
    """
    self.log.info(f"DataFrame to Workbench DataSource: {self.output_name}...")

    # Set up our metadata storage
    workbench_meta = {"workbench_tags": self.output_tags}
    workbench_meta.update(self.output_meta)

    # Create the Output Parquet file S3 Storage Path
    s3_storage_path = f"{self.data_sources_s3_path}/{self.output_name}"

    # Convert columns names to lowercase, Athena will not work with uppercase column names
    if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
        for c in self.output_df.columns:
            if c != c.lower():
                self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
        self.output_df.columns = self.output_df.columns.str.lower()

    # Convert Object Columns to String
    self.output_df = self.convert_object_to_string(self.output_df)

    # Note: Both of these conversions may not be necessary, so we're leaving them commented out
    """
    # Convert Object Columns to Datetime
    self.output_df = self.convert_object_to_datetime(self.output_df)

    # Now convert datetime columns to ISO-8601 string
    # self.output_df = self.convert_datetime_columns(self.output_df)
    """

    # Write out the DataFrame to AWS Data Catalog in either Parquet or JSONL format
    description = f"Workbench data source: {self.output_name}"
    glue_table_settings = {"description": description, "parameters": workbench_meta}
    if self.output_format == "parquet":
        wr.s3.to_parquet(
            self.output_df,
            path=s3_storage_path,
            dataset=True,
            mode="overwrite",
            database=self.data_catalog_db,
            table=self.output_name,
            filename_prefix=f"{self.output_name}_",
            boto3_session=self.boto3_session,
            partition_cols=None,
            glue_table_settings=glue_table_settings,
            sanitize_columns=False,
        )  # FIXME: Have some logic around partition columns

    # Note: In general Parquet works will for most uses cases. We recommend using Parquet
    #       You can use JSON_EXTRACT on Parquet string field, and it works great.
    elif self.output_format == "jsonl":
        self.log.warning("We recommend using Parquet format for most use cases")
        self.log.warning("If you have a use case that requires JSONL please contact Workbench support")
        self.log.warning("We'd like to understand what functionality JSONL is providing that isn't already")
        self.log.warning("provided with Parquet and JSON_EXTRACT() for your Athena Queries")
        wr.s3.to_json(
            self.output_df,
            path=s3_storage_path,
            orient="records",
            lines=True,
            date_format="iso",
            dataset=True,
            mode="overwrite",
            database=self.data_catalog_db,
            table=self.output_name,
            filename_prefix=f"{self.output_name}_",
            boto3_session=self.boto3_session,
            partition_cols=None,
            glue_table_settings=glue_table_settings,
        )
    else:
        raise ValueError(f"Unsupported file format: {self.output_format}")

PandasToFeatures

Bases: Transform

PandasToFeatures: Class to publish a Pandas DataFrame into a FeatureSet

Common Usage
to_features = PandasToFeatures(output_name)
to_features.set_output_tags(["my", "awesome", "data"])
to_features.set_input(df, id_column="my_id")
to_features.transform()
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
class PandasToFeatures(Transform):
    """PandasToFeatures: Class to publish a Pandas DataFrame into a FeatureSet

    Common Usage:
        ```python
        to_features = PandasToFeatures(output_name)
        to_features.set_output_tags(["my", "awesome", "data"])
        to_features.set_input(df, id_column="my_id")
        to_features.transform()
        ```
    """

    def __init__(self, output_name: str):
        """PandasToFeatures Initialization

        Args:
            output_name (str): The Name of the FeatureSet to create
        """

        # Make sure the output_name is a valid name
        Artifact.is_name_valid(output_name)

        # Call superclass init
        super().__init__("DataFrame", output_name)

        # Set up all my instance attributes
        self.input_type = TransformInput.PANDAS_DF
        self.output_type = TransformOutput.FEATURE_SET
        self.id_column = None
        self.event_time_column = None
        self.one_hot_columns = []
        self.categorical_dtypes = {}  # Used for streaming/chunking
        self.output_df = None
        self.table_format = TableFormatEnum.ICEBERG
        self.incoming_hold_out_ids = None

        # These will be set in the transform method
        self.output_feature_group = None
        self.output_feature_set = None
        self.expected_rows = 0

    def set_input(self, input_df: pd.DataFrame, id_column=None, event_time_column=None, one_hot_columns=None):
        """Set the Input DataFrame for this Transform

        Args:
            input_df (pd.DataFrame): The input DataFrame.
            id_column (str, optional): The ID column (if not specified, an 'auto_id' will be generated).
            event_time_column (str, optional): The name of the event time column (default: None).
            one_hot_columns (list, optional): The list of columns to one-hot encode (default: None).
        """
        self.id_column = id_column
        self.event_time_column = event_time_column
        self.output_df = input_df.copy()
        self.one_hot_columns = one_hot_columns or []

        # Now Prepare the DataFrame for its journey into an AWS FeatureGroup
        self.prep_dataframe()

    def delete_existing(self):
        # Delete the existing FeatureSet if it exists
        self.log.info(f"Deleting the {self.output_name} FeatureSet...")
        FeatureSetCore.managed_delete(self.output_name)
        time.sleep(1)

    def _ensure_id_column(self):
        """Internal: AWS Feature Store requires an Id field"""
        if self.id_column is None:
            self.log.warning("Generating an 'auto_id' column, we highly recommended setting the 'id_column'")
            self.output_df["auto_id"] = self.output_df.index
            self.id_column = "auto_id"
            return
        if self.id_column not in self.output_df.columns:
            error_msg = f"Id column {self.id_column} not found in the DataFrame"
            self.log.critical(error_msg)
            raise ValueError(error_msg)

    def _ensure_event_time(self):
        """Internal: AWS Feature Store requires an event_time field for all data stored"""
        if self.event_time_column is None or self.event_time_column not in self.output_df.columns:
            self.log.info("Generating an event_time column before FeatureSet Creation...")
            self.event_time_column = "event_time"
            self.output_df[self.event_time_column] = pd.Timestamp("now", tz="UTC")

        # The event_time_column is defined, so we need to make sure it's in ISO-8601 string format
        # Note: AWS Feature Store only a particular ISO-8601 format not ALL ISO-8601 formats
        time_column = self.output_df[self.event_time_column]

        # Check if the event_time_column is of type object or string convert it to DateTime
        if time_column.dtypes == "object" or time_column.dtypes.name == "string":
            self.log.info(f"Converting {self.event_time_column} to DateTime...")
            time_column = pd.to_datetime(time_column)

        # Let's make sure it the right type for Feature Store
        if pd.api.types.is_datetime64_any_dtype(time_column):
            self.log.info(f"Converting {self.event_time_column} to ISOFormat Date String before FeatureSet Creation...")

            # Convert the datetime DType to ISO-8601 string
            # TableFormat=ICEBERG does not support alternate formats for event_time field, it only supports String type.
            time_column = time_column.map(datetime_to_iso8601)
            self.output_df[self.event_time_column] = time_column.astype("string")

    def _convert_objs_to_string(self):
        """Internal: AWS Feature Store doesn't know how to store object dtypes, so convert to String"""
        for col in self.output_df:
            if pd.api.types.is_object_dtype(self.output_df[col].dtype):
                self.output_df[col] = self.output_df[col].astype(pd.StringDtype())

    def process_column_name(self, column: str, shorten: bool = False) -> str:
        """Call various methods to make sure the column is ready for Feature Store
        Args:
            column (str): The column name to process
            shorten (bool): Should we shorten the column name? (default: False)
        """
        self.log.debug(f"Processing column {column}...")

        # Make sure the column name is valid
        column = self.sanitize_column_name(column)

        # Make sure the column name isn't too long
        if shorten:
            column = self.shorten_column_name(column)

        return column

    def shorten_column_name(self, name, max_length=20):
        if len(name) <= max_length:
            return name

        # Start building the new name from the end
        parts = name.split("_")[::-1]
        new_name = ""
        for part in parts:
            if len(new_name) + len(part) + 1 <= max_length:  # +1 for the underscore
                new_name = f"{part}_{new_name}" if new_name else part
            else:
                break

        # If new_name is empty, just use the last part of the original name
        if not new_name:
            new_name = parts[0]

        self.log.info(f"Shortening {name} to {new_name}")
        return new_name

    def sanitize_column_name(self, name):
        # Remove all invalid characters
        sanitized = re.sub("[^a-zA-Z0-9-_]", "_", name)
        sanitized = re.sub("_+", "_", sanitized)
        sanitized = sanitized.strip("_")

        # Log the change if the name was altered
        if sanitized != name:
            self.log.info(f"Sanitizing {name} to {sanitized}")

        return sanitized

    def one_hot_encode(self, df, one_hot_columns: list) -> pd.DataFrame:
        """One Hot Encoding for Categorical Columns with additional column name management

        Args:
            df (pd.DataFrame): The DataFrame to process
            one_hot_columns (list): The list of columns to one-hot encode

        Returns:
            pd.DataFrame: The DataFrame with one-hot encoded columns
        """

        # Grab the current list of columns
        current_columns = list(df.columns)

        # Now convert the list of columns into Categorical and then One-Hot Encode
        self.convert_columns_to_categorical(one_hot_columns)
        self.log.important(f"One-Hot encoding columns: {one_hot_columns}")
        df = pd.get_dummies(df, columns=one_hot_columns)

        # Compute the new columns generated by get_dummies
        new_columns = list(set(df.columns) - set(current_columns))
        self.log.important(f"New columns generated: {new_columns}")

        # Convert new columns to int32
        df[new_columns] = df[new_columns].astype("int32")

        # For the new columns we're going to shorten the names
        renamed_columns = {col: self.process_column_name(col) for col in new_columns}

        # Rename the columns in the DataFrame
        df.rename(columns=renamed_columns, inplace=True)

        return df

    # Helper Methods
    def convert_columns_to_categorical(self, columns: list):
        """Convert column to Categorical type"""
        for feature in columns:
            if feature not in [self.event_time_column, self.id_column]:
                unique_values = self.output_df[feature].nunique()
                if 1 < unique_values < 10:
                    self.log.important(f"Converting column {feature} to categorical (unique {unique_values})")
                    self.output_df[feature] = self.output_df[feature].astype("category")
                else:
                    self.log.warning(f"Column {feature} too many unique values {unique_values} skipping...")

    def manual_categorical_converter(self):
        """Used for Streaming: Convert object and string types to Categorical

        Note:
            This method is used for streaming/chunking. You can set the
            categorical_dtypes attribute to a dictionary of column names and
            their respective categorical types.
        """
        for column, cat_d_type in self.categorical_dtypes.items():
            self.output_df[column] = self.output_df[column].astype(cat_d_type)

    @staticmethod
    def convert_column_types(df: pd.DataFrame) -> pd.DataFrame:
        """Convert the types of the DataFrame to the correct types for the Feature Store"""
        for column in list(df.select_dtypes(include="bool").columns):
            df[column] = df[column].astype("int32")
        for column in list(df.select_dtypes(include="category").columns):
            df[column] = df[column].astype("str")

        # Select all columns that are of datetime dtype and convert them to ISO-8601 strings
        for column in [col for col in df.columns if pd.api.types.is_datetime64_any_dtype(df[col])]:
            df[column] = df[column].map(datetime_to_iso8601).astype("string")

        """FIXME Not sure we need these conversions
        for column in list(df.select_dtypes(include="object").columns):
            df[column] = df[column].astype("string")
        for column in list(df.select_dtypes(include=[pd.Int64Dtype]).columns):
            df[column] = df[column].astype("int64")
        for column in list(df.select_dtypes(include=[pd.Float64Dtype]).columns):
            df[column] = df[column].astype("float64")
        """
        return df

    def prep_dataframe(self):
        """Prep the DataFrame for Feature Store Creation"""
        self.log.info("Prep the output_df (cat_convert, convert types, and lowercase columns)...")

        # Remove any columns generated from AWS
        aws_cols = ["write_time", "api_invocation_time", "is_deleted", "event_time"]
        self.output_df = self.output_df.drop(columns=aws_cols, errors="ignore")

        # If one-hot columns are provided then one-hot encode them
        if self.one_hot_columns:
            self.output_df = self.one_hot_encode(self.output_df, self.one_hot_columns)

        # Convert columns names to lowercase, Athena will not work with uppercase column names
        if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
            for c in self.output_df.columns:
                if c != c.lower():
                    self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
            self.output_df.columns = self.output_df.columns.str.lower()

        # Check for duplicate column names in the dataframe
        if len(self.output_df.columns) != len(set(self.output_df.columns)):
            self.log.critical("Duplicate column names detected in the DataFrame")
            duplicates = self.output_df.columns[self.output_df.columns.duplicated()].tolist()
            self.log.critical(f"Duplicated columns: {duplicates}")
            raise ValueError("Duplicate column names detected in the DataFrame")

        # Make sure we have the required id and event_time columns
        self._ensure_id_column()
        self._ensure_event_time()

        # Check for a training column (Workbench uses dynamic training columns)
        if "training" in self.output_df.columns:
            self.log.important(
                """Training column detected: Since FeatureSets are read-only, Workbench creates a training view
                that can be dynamically changed. We'll use this training column to create a training view."""
            )
            self.incoming_hold_out_ids = self.output_df[~self.output_df["training"]][self.id_column].tolist()
            self.output_df = self.output_df.drop(columns=["training"])

        # We need to convert some of our column types to the correct types
        # Feature Store only supports these data types:
        # - Integral
        # - Fractional
        # - String (timestamp/datetime types need to be converted to string)
        self.output_df = self.convert_column_types(self.output_df)

    def create_feature_group(self):
        """Create a Feature Group, load our Feature Definitions, and wait for it to be ready"""

        # Create a Feature Group and load our Feature Definitions
        my_feature_group = FeatureGroup(name=self.output_name, sagemaker_session=self.sm_session)
        my_feature_group.load_feature_definitions(data_frame=self.output_df)

        # Create the Output S3 Storage Path for this Feature Set
        s3_storage_path = f"{self.feature_sets_s3_path}/{self.output_name}"

        # Get the metadata/tags to push into AWS
        aws_tags = self.get_aws_tags()

        # Create the Feature Group
        my_feature_group.create(
            s3_uri=s3_storage_path,
            record_identifier_name=self.id_column,
            event_time_feature_name=self.event_time_column,
            role_arn=self.workbench_role_arn,
            enable_online_store=True,
            table_format=self.table_format,
            tags=aws_tags,
        )

        # Ensure/wait for the feature group to be created
        self.ensure_feature_group_created(my_feature_group)
        return my_feature_group

    def pre_transform(self, **kwargs):
        """Pre-Transform: Delete any existing FeatureSet and Create the Feature Group"""
        self.delete_existing()
        self.output_feature_group = self.create_feature_group()

    def transform_impl(self):
        """Transform Implementation: Ingest the data into the Feature Group"""

        # Now we actually push the data into the Feature Group (called ingestion)
        self.log.important(f"Ingesting rows into Feature Group {self.output_name}...")
        ingest_manager = self.output_feature_group.ingest(self.output_df, max_workers=8, max_processes=4, wait=False)
        try:
            ingest_manager.wait()
        except IngestionError as exc:
            self.log.warning(f"Some rows had an ingesting error: {exc}")

        # Report on any rows that failed to ingest
        if ingest_manager.failed_rows:
            self.log.warning(f"Number of Failed Rows: {len(ingest_manager.failed_rows)}")

            # Log failed row details
            failed_data = self.output_df.iloc[ingest_manager.failed_rows]
            for idx, row in failed_data.iterrows():
                self.log.warning(f"Failed Row {idx}: {row.to_dict()}")

        # Keep track of the number of rows we expect to be ingested
        self.expected_rows += len(self.output_df) - len(ingest_manager.failed_rows)
        self.log.info(f"Added rows: {len(self.output_df)}")
        self.log.info(f"Failed rows: {len(ingest_manager.failed_rows)}")
        self.log.info(f"Total rows ingested: {self.expected_rows}")

        # We often need to wait a bit for AWS to fully register the new Feature Group
        self.log.important(f"Waiting for AWS to register the new Feature Group {self.output_name}...")
        time.sleep(30)

    def post_transform(self, **kwargs):
        """Post-Transform: Populating Offline Storage and onboard()"""
        self.log.info("Post-Transform: Populating Offline Storage and onboard()...")

        # Feature Group Ingestion takes a while, so we need to wait for it to finish
        self.output_feature_set = FeatureSetCore(self.output_name)
        self.log.important("Waiting for AWS Feature Group Offline storage to be ready...")
        self.log.important("This will often take 10-20 minutes...go have coffee or lunch :)")
        self.output_feature_set.set_status("initializing")
        self.wait_for_rows(self.expected_rows)

        # Call the FeatureSet onboard method to compute a bunch of EDA stuff
        self.output_feature_set.onboard()

        # Set Hold Out Ids (if we got them during creation)
        if self.incoming_hold_out_ids:
            self.output_feature_set.set_training_holdouts(self.id_column, self.incoming_hold_out_ids)

    def ensure_feature_group_created(self, feature_group):
        status = feature_group.describe().get("FeatureGroupStatus")
        while status == "Creating":
            self.log.debug("FeatureSet being Created…")
            time.sleep(5)
            status = feature_group.describe().get("FeatureGroupStatus")

        if status == "Created":
            self.log.info(f"FeatureSet {feature_group.name} successfully created")
        else:
            # Get the detailed failure reason
            description = feature_group.describe()
            failure_reason = description.get("FailureReason", "No failure reason provided")
            self.log.critical(f"FeatureSet {feature_group.name} creation failed with status: {status}")
            self.log.critical(f"Failure reason: {failure_reason}")

    def wait_for_rows(self, expected_rows: int):
        """Wait for AWS Feature Group to fully populate the Offline Storage"""
        rows = self.output_feature_set.num_rows()

        # Wait for the rows to be populated
        self.log.info(f"Waiting for AWS Feature Group {self.output_name} Offline Storage...")
        max_retry = 20
        num_retry = 0
        sleep_time = 30
        while rows < expected_rows and num_retry < max_retry:
            num_retry += 1
            time.sleep(sleep_time)
            rows = self.output_feature_set.num_rows()
            self.log.info(f"Checking Offline Storage {self.output_name}: {rows}/{expected_rows} rows")
        if rows == expected_rows:
            self.log.important(f"Success: Reached Expected Rows ({rows} rows)...")
        else:
            msg = f"Did not reach expected rows ({rows}/{expected_rows})...(probably AWS lag)"
            self.log.warning(msg)
            self.log.monitor(msg)

__init__(output_name)

PandasToFeatures Initialization

Parameters:

Name Type Description Default
output_name str

The Name of the FeatureSet to create

required
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def __init__(self, output_name: str):
    """PandasToFeatures Initialization

    Args:
        output_name (str): The Name of the FeatureSet to create
    """

    # Make sure the output_name is a valid name
    Artifact.is_name_valid(output_name)

    # Call superclass init
    super().__init__("DataFrame", output_name)

    # Set up all my instance attributes
    self.input_type = TransformInput.PANDAS_DF
    self.output_type = TransformOutput.FEATURE_SET
    self.id_column = None
    self.event_time_column = None
    self.one_hot_columns = []
    self.categorical_dtypes = {}  # Used for streaming/chunking
    self.output_df = None
    self.table_format = TableFormatEnum.ICEBERG
    self.incoming_hold_out_ids = None

    # These will be set in the transform method
    self.output_feature_group = None
    self.output_feature_set = None
    self.expected_rows = 0

convert_column_types(df) staticmethod

Convert the types of the DataFrame to the correct types for the Feature Store

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
@staticmethod
def convert_column_types(df: pd.DataFrame) -> pd.DataFrame:
    """Convert the types of the DataFrame to the correct types for the Feature Store"""
    for column in list(df.select_dtypes(include="bool").columns):
        df[column] = df[column].astype("int32")
    for column in list(df.select_dtypes(include="category").columns):
        df[column] = df[column].astype("str")

    # Select all columns that are of datetime dtype and convert them to ISO-8601 strings
    for column in [col for col in df.columns if pd.api.types.is_datetime64_any_dtype(df[col])]:
        df[column] = df[column].map(datetime_to_iso8601).astype("string")

    """FIXME Not sure we need these conversions
    for column in list(df.select_dtypes(include="object").columns):
        df[column] = df[column].astype("string")
    for column in list(df.select_dtypes(include=[pd.Int64Dtype]).columns):
        df[column] = df[column].astype("int64")
    for column in list(df.select_dtypes(include=[pd.Float64Dtype]).columns):
        df[column] = df[column].astype("float64")
    """
    return df

convert_columns_to_categorical(columns)

Convert column to Categorical type

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def convert_columns_to_categorical(self, columns: list):
    """Convert column to Categorical type"""
    for feature in columns:
        if feature not in [self.event_time_column, self.id_column]:
            unique_values = self.output_df[feature].nunique()
            if 1 < unique_values < 10:
                self.log.important(f"Converting column {feature} to categorical (unique {unique_values})")
                self.output_df[feature] = self.output_df[feature].astype("category")
            else:
                self.log.warning(f"Column {feature} too many unique values {unique_values} skipping...")

create_feature_group()

Create a Feature Group, load our Feature Definitions, and wait for it to be ready

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def create_feature_group(self):
    """Create a Feature Group, load our Feature Definitions, and wait for it to be ready"""

    # Create a Feature Group and load our Feature Definitions
    my_feature_group = FeatureGroup(name=self.output_name, sagemaker_session=self.sm_session)
    my_feature_group.load_feature_definitions(data_frame=self.output_df)

    # Create the Output S3 Storage Path for this Feature Set
    s3_storage_path = f"{self.feature_sets_s3_path}/{self.output_name}"

    # Get the metadata/tags to push into AWS
    aws_tags = self.get_aws_tags()

    # Create the Feature Group
    my_feature_group.create(
        s3_uri=s3_storage_path,
        record_identifier_name=self.id_column,
        event_time_feature_name=self.event_time_column,
        role_arn=self.workbench_role_arn,
        enable_online_store=True,
        table_format=self.table_format,
        tags=aws_tags,
    )

    # Ensure/wait for the feature group to be created
    self.ensure_feature_group_created(my_feature_group)
    return my_feature_group

manual_categorical_converter()

Used for Streaming: Convert object and string types to Categorical

Note

This method is used for streaming/chunking. You can set the categorical_dtypes attribute to a dictionary of column names and their respective categorical types.

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def manual_categorical_converter(self):
    """Used for Streaming: Convert object and string types to Categorical

    Note:
        This method is used for streaming/chunking. You can set the
        categorical_dtypes attribute to a dictionary of column names and
        their respective categorical types.
    """
    for column, cat_d_type in self.categorical_dtypes.items():
        self.output_df[column] = self.output_df[column].astype(cat_d_type)

one_hot_encode(df, one_hot_columns)

One Hot Encoding for Categorical Columns with additional column name management

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to process

required
one_hot_columns list

The list of columns to one-hot encode

required

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with one-hot encoded columns

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def one_hot_encode(self, df, one_hot_columns: list) -> pd.DataFrame:
    """One Hot Encoding for Categorical Columns with additional column name management

    Args:
        df (pd.DataFrame): The DataFrame to process
        one_hot_columns (list): The list of columns to one-hot encode

    Returns:
        pd.DataFrame: The DataFrame with one-hot encoded columns
    """

    # Grab the current list of columns
    current_columns = list(df.columns)

    # Now convert the list of columns into Categorical and then One-Hot Encode
    self.convert_columns_to_categorical(one_hot_columns)
    self.log.important(f"One-Hot encoding columns: {one_hot_columns}")
    df = pd.get_dummies(df, columns=one_hot_columns)

    # Compute the new columns generated by get_dummies
    new_columns = list(set(df.columns) - set(current_columns))
    self.log.important(f"New columns generated: {new_columns}")

    # Convert new columns to int32
    df[new_columns] = df[new_columns].astype("int32")

    # For the new columns we're going to shorten the names
    renamed_columns = {col: self.process_column_name(col) for col in new_columns}

    # Rename the columns in the DataFrame
    df.rename(columns=renamed_columns, inplace=True)

    return df

post_transform(**kwargs)

Post-Transform: Populating Offline Storage and onboard()

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def post_transform(self, **kwargs):
    """Post-Transform: Populating Offline Storage and onboard()"""
    self.log.info("Post-Transform: Populating Offline Storage and onboard()...")

    # Feature Group Ingestion takes a while, so we need to wait for it to finish
    self.output_feature_set = FeatureSetCore(self.output_name)
    self.log.important("Waiting for AWS Feature Group Offline storage to be ready...")
    self.log.important("This will often take 10-20 minutes...go have coffee or lunch :)")
    self.output_feature_set.set_status("initializing")
    self.wait_for_rows(self.expected_rows)

    # Call the FeatureSet onboard method to compute a bunch of EDA stuff
    self.output_feature_set.onboard()

    # Set Hold Out Ids (if we got them during creation)
    if self.incoming_hold_out_ids:
        self.output_feature_set.set_training_holdouts(self.id_column, self.incoming_hold_out_ids)

pre_transform(**kwargs)

Pre-Transform: Delete any existing FeatureSet and Create the Feature Group

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def pre_transform(self, **kwargs):
    """Pre-Transform: Delete any existing FeatureSet and Create the Feature Group"""
    self.delete_existing()
    self.output_feature_group = self.create_feature_group()

prep_dataframe()

Prep the DataFrame for Feature Store Creation

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def prep_dataframe(self):
    """Prep the DataFrame for Feature Store Creation"""
    self.log.info("Prep the output_df (cat_convert, convert types, and lowercase columns)...")

    # Remove any columns generated from AWS
    aws_cols = ["write_time", "api_invocation_time", "is_deleted", "event_time"]
    self.output_df = self.output_df.drop(columns=aws_cols, errors="ignore")

    # If one-hot columns are provided then one-hot encode them
    if self.one_hot_columns:
        self.output_df = self.one_hot_encode(self.output_df, self.one_hot_columns)

    # Convert columns names to lowercase, Athena will not work with uppercase column names
    if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
        for c in self.output_df.columns:
            if c != c.lower():
                self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
        self.output_df.columns = self.output_df.columns.str.lower()

    # Check for duplicate column names in the dataframe
    if len(self.output_df.columns) != len(set(self.output_df.columns)):
        self.log.critical("Duplicate column names detected in the DataFrame")
        duplicates = self.output_df.columns[self.output_df.columns.duplicated()].tolist()
        self.log.critical(f"Duplicated columns: {duplicates}")
        raise ValueError("Duplicate column names detected in the DataFrame")

    # Make sure we have the required id and event_time columns
    self._ensure_id_column()
    self._ensure_event_time()

    # Check for a training column (Workbench uses dynamic training columns)
    if "training" in self.output_df.columns:
        self.log.important(
            """Training column detected: Since FeatureSets are read-only, Workbench creates a training view
            that can be dynamically changed. We'll use this training column to create a training view."""
        )
        self.incoming_hold_out_ids = self.output_df[~self.output_df["training"]][self.id_column].tolist()
        self.output_df = self.output_df.drop(columns=["training"])

    # We need to convert some of our column types to the correct types
    # Feature Store only supports these data types:
    # - Integral
    # - Fractional
    # - String (timestamp/datetime types need to be converted to string)
    self.output_df = self.convert_column_types(self.output_df)

process_column_name(column, shorten=False)

Call various methods to make sure the column is ready for Feature Store Args: column (str): The column name to process shorten (bool): Should we shorten the column name? (default: False)

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def process_column_name(self, column: str, shorten: bool = False) -> str:
    """Call various methods to make sure the column is ready for Feature Store
    Args:
        column (str): The column name to process
        shorten (bool): Should we shorten the column name? (default: False)
    """
    self.log.debug(f"Processing column {column}...")

    # Make sure the column name is valid
    column = self.sanitize_column_name(column)

    # Make sure the column name isn't too long
    if shorten:
        column = self.shorten_column_name(column)

    return column

set_input(input_df, id_column=None, event_time_column=None, one_hot_columns=None)

Set the Input DataFrame for this Transform

Parameters:

Name Type Description Default
input_df DataFrame

The input DataFrame.

required
id_column str

The ID column (if not specified, an 'auto_id' will be generated).

None
event_time_column str

The name of the event time column (default: None).

None
one_hot_columns list

The list of columns to one-hot encode (default: None).

None
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def set_input(self, input_df: pd.DataFrame, id_column=None, event_time_column=None, one_hot_columns=None):
    """Set the Input DataFrame for this Transform

    Args:
        input_df (pd.DataFrame): The input DataFrame.
        id_column (str, optional): The ID column (if not specified, an 'auto_id' will be generated).
        event_time_column (str, optional): The name of the event time column (default: None).
        one_hot_columns (list, optional): The list of columns to one-hot encode (default: None).
    """
    self.id_column = id_column
    self.event_time_column = event_time_column
    self.output_df = input_df.copy()
    self.one_hot_columns = one_hot_columns or []

    # Now Prepare the DataFrame for its journey into an AWS FeatureGroup
    self.prep_dataframe()

transform_impl()

Transform Implementation: Ingest the data into the Feature Group

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def transform_impl(self):
    """Transform Implementation: Ingest the data into the Feature Group"""

    # Now we actually push the data into the Feature Group (called ingestion)
    self.log.important(f"Ingesting rows into Feature Group {self.output_name}...")
    ingest_manager = self.output_feature_group.ingest(self.output_df, max_workers=8, max_processes=4, wait=False)
    try:
        ingest_manager.wait()
    except IngestionError as exc:
        self.log.warning(f"Some rows had an ingesting error: {exc}")

    # Report on any rows that failed to ingest
    if ingest_manager.failed_rows:
        self.log.warning(f"Number of Failed Rows: {len(ingest_manager.failed_rows)}")

        # Log failed row details
        failed_data = self.output_df.iloc[ingest_manager.failed_rows]
        for idx, row in failed_data.iterrows():
            self.log.warning(f"Failed Row {idx}: {row.to_dict()}")

    # Keep track of the number of rows we expect to be ingested
    self.expected_rows += len(self.output_df) - len(ingest_manager.failed_rows)
    self.log.info(f"Added rows: {len(self.output_df)}")
    self.log.info(f"Failed rows: {len(ingest_manager.failed_rows)}")
    self.log.info(f"Total rows ingested: {self.expected_rows}")

    # We often need to wait a bit for AWS to fully register the new Feature Group
    self.log.important(f"Waiting for AWS to register the new Feature Group {self.output_name}...")
    time.sleep(30)

wait_for_rows(expected_rows)

Wait for AWS Feature Group to fully populate the Offline Storage

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features.py
def wait_for_rows(self, expected_rows: int):
    """Wait for AWS Feature Group to fully populate the Offline Storage"""
    rows = self.output_feature_set.num_rows()

    # Wait for the rows to be populated
    self.log.info(f"Waiting for AWS Feature Group {self.output_name} Offline Storage...")
    max_retry = 20
    num_retry = 0
    sleep_time = 30
    while rows < expected_rows and num_retry < max_retry:
        num_retry += 1
        time.sleep(sleep_time)
        rows = self.output_feature_set.num_rows()
        self.log.info(f"Checking Offline Storage {self.output_name}: {rows}/{expected_rows} rows")
    if rows == expected_rows:
        self.log.important(f"Success: Reached Expected Rows ({rows} rows)...")
    else:
        msg = f"Did not reach expected rows ({rows}/{expected_rows})...(probably AWS lag)"
        self.log.warning(msg)
        self.log.monitor(msg)

PandasToFeaturesChunked

Bases: Transform

PandasToFeaturesChunked: Class to manage a bunch of chunked Pandas DataFrames into a FeatureSet

Common Usage
to_features = PandasToFeaturesChunked(output_name, id_column="id"/None, event_time_column="date"/None)
to_features.set_output_tags(["abalone", "public", "whatever"])
cat_column_info = {"sex": ["M", "F", "I"]}
to_features.set_categorical_info(cat_column_info)
to_features.add_chunk(df)
to_features.add_chunk(df)
...
to_features.finalize()
Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
class PandasToFeaturesChunked(Transform):
    """PandasToFeaturesChunked:  Class to manage a bunch of chunked Pandas DataFrames into a FeatureSet

    Common Usage:
        ```python
        to_features = PandasToFeaturesChunked(output_name, id_column="id"/None, event_time_column="date"/None)
        to_features.set_output_tags(["abalone", "public", "whatever"])
        cat_column_info = {"sex": ["M", "F", "I"]}
        to_features.set_categorical_info(cat_column_info)
        to_features.add_chunk(df)
        to_features.add_chunk(df)
        ...
        to_features.finalize()
        ```
    """

    def __init__(self, output_name: str, id_column=None, event_time_column=None):
        """PandasToFeaturesChunked Initialization"""

        # Make sure the output_name is a valid name
        Artifact.is_name_valid(output_name)

        # Call superclass init
        super().__init__("DataFrame", output_name)

        # Set up all my instance attributes
        self.id_column = id_column
        self.event_time_column = event_time_column
        self.first_chunk = None
        self.pandas_to_features = PandasToFeatures(output_name)

    def set_categorical_info(self, cat_column_info: dict[list[str]]):
        """Set the Categorical Columns
        Args:
            cat_column_info (dict[list[str]]): Dictionary of categorical columns and their possible values
        """

        # Create the CategoricalDtypes
        cat_d_types = {}
        for col, vals in cat_column_info.items():
            cat_d_types[col] = CategoricalDtype(categories=vals)

        # Now set the CategoricalDtypes on our underlying PandasToFeatures
        self.pandas_to_features.categorical_dtypes = cat_d_types

    def add_chunk(self, chunk_df: pd.DataFrame):
        """Add a Chunk of Data to the FeatureSet"""

        # Is this the first chunk? If so we need to run the pre_transform
        if self.first_chunk is None:
            self.log.info(f"Adding first chunk {chunk_df.shape}...")
            self.first_chunk = chunk_df
            self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
            self.pandas_to_features.pre_transform()
            self.pandas_to_features.transform_impl()
        else:
            self.log.info(f"Adding chunk {chunk_df.shape}...")
            self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
            self.pandas_to_features.transform_impl()

    def pre_transform(self, **kwargs):
        """Pre-Transform: Create the Feature Group with Chunked Data"""

        # Loading data into a Feature Group takes a while, so set status to loading
        FeatureSetCore(self.output_name).set_status("loading")

    def transform_impl(self):
        """Required implementation of the Transform interface"""
        self.log.warning("PandasToFeaturesChunked.transform_impl() called.  This is a no-op.")

    def post_transform(self, **kwargs):
        """Post-Transform: Any Post Transform Steps"""
        self.pandas_to_features.post_transform()

__init__(output_name, id_column=None, event_time_column=None)

PandasToFeaturesChunked Initialization

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def __init__(self, output_name: str, id_column=None, event_time_column=None):
    """PandasToFeaturesChunked Initialization"""

    # Make sure the output_name is a valid name
    Artifact.is_name_valid(output_name)

    # Call superclass init
    super().__init__("DataFrame", output_name)

    # Set up all my instance attributes
    self.id_column = id_column
    self.event_time_column = event_time_column
    self.first_chunk = None
    self.pandas_to_features = PandasToFeatures(output_name)

add_chunk(chunk_df)

Add a Chunk of Data to the FeatureSet

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def add_chunk(self, chunk_df: pd.DataFrame):
    """Add a Chunk of Data to the FeatureSet"""

    # Is this the first chunk? If so we need to run the pre_transform
    if self.first_chunk is None:
        self.log.info(f"Adding first chunk {chunk_df.shape}...")
        self.first_chunk = chunk_df
        self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
        self.pandas_to_features.pre_transform()
        self.pandas_to_features.transform_impl()
    else:
        self.log.info(f"Adding chunk {chunk_df.shape}...")
        self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
        self.pandas_to_features.transform_impl()

post_transform(**kwargs)

Post-Transform: Any Post Transform Steps

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def post_transform(self, **kwargs):
    """Post-Transform: Any Post Transform Steps"""
    self.pandas_to_features.post_transform()

pre_transform(**kwargs)

Pre-Transform: Create the Feature Group with Chunked Data

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def pre_transform(self, **kwargs):
    """Pre-Transform: Create the Feature Group with Chunked Data"""

    # Loading data into a Feature Group takes a while, so set status to loading
    FeatureSetCore(self.output_name).set_status("loading")

set_categorical_info(cat_column_info)

Set the Categorical Columns Args: cat_column_info (dict[list[str]]): Dictionary of categorical columns and their possible values

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def set_categorical_info(self, cat_column_info: dict[list[str]]):
    """Set the Categorical Columns
    Args:
        cat_column_info (dict[list[str]]): Dictionary of categorical columns and their possible values
    """

    # Create the CategoricalDtypes
    cat_d_types = {}
    for col, vals in cat_column_info.items():
        cat_d_types[col] = CategoricalDtype(categories=vals)

    # Now set the CategoricalDtypes on our underlying PandasToFeatures
    self.pandas_to_features.categorical_dtypes = cat_d_types

transform_impl()

Required implementation of the Transform interface

Source code in src/workbench/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def transform_impl(self):
    """Required implementation of the Transform interface"""
    self.log.warning("PandasToFeaturesChunked.transform_impl() called.  This is a no-op.")