Skip to content

Pandas Transforms

API Classes

The API Classes will often provide helpful methods that give you a DataFrame (data_source.query() for instance), so always check out the API Classes first.

These Transforms will give you the ultimate in customization and flexibility when creating AWS Machine Learning Pipelines. Grab a Pandas DataFrame from a DataSource or FeatureSet process in whatever way for your use case and simply create another Sageworks DataSource or FeatureSet from the resulting DataFrame.

Lots of Options:

Not for Large Data

Pandas Transforms can't handle large datasets (> 4 GigaBytes). For doing transforma on large data see our Heavy Transforms

  • S3 --> DF --> DataSource
  • DataSource --> DF --> DataSource
  • DataSoruce --> DF --> FeatureSet
  • Get Creative!

Welcome to the SageWorks Pandas Transform Classes

These classes provide low-level APIs for using Pandas DataFrames

  • DataToPandas: Pull a dataframe from a SageWorks DataSource
  • FeaturesToPandas: Pull a dataframe from a SageWorks FeatureSet
  • PandasToData: Create a SageWorks DataSource using a Pandas DataFrame as the source
  • PandasToFeatures: Create a SageWorks FeatureSet using a Pandas DataFrame as the source
  • PandasToFeaturesChunked: Create a SageWorks FeatureSet using a Chunked/Streaming Pandas DataFrame as the source

DataToPandas

Bases: Transform

DataToPandas: Class to transform a Data Source into a Pandas DataFrame

Common Usage
data_to_df = DataToPandas(data_source_uuid)
data_to_df.transform(query=<optional SQL query to filter/process data>)
data_to_df.transform(max_rows=<optional max rows to sample>)
my_df = data_to_df.get_output()

Note: query is the best way to use this class, so use it :)
Source code in src/sageworks/core/transforms/pandas_transforms/data_to_pandas.py
class DataToPandas(Transform):
    """DataToPandas: Class to transform a Data Source into a Pandas DataFrame

    Common Usage:
        ```
        data_to_df = DataToPandas(data_source_uuid)
        data_to_df.transform(query=<optional SQL query to filter/process data>)
        data_to_df.transform(max_rows=<optional max rows to sample>)
        my_df = data_to_df.get_output()

        Note: query is the best way to use this class, so use it :)
        ```
    """

    def __init__(self, input_uuid: str):
        """DataToPandas Initialization"""

        # Call superclass init
        super().__init__(input_uuid, "DataFrame")

        # Set up all my instance attributes
        self.input_type = TransformInput.DATA_SOURCE
        self.output_type = TransformOutput.PANDAS_DF
        self.output_df = None

    def transform_impl(self, query: str = None, max_rows=100000):
        """Convert the DataSource into a Pandas DataFrame
        Args:
            query(str): The query to run against the DataSource (default: None)
            max_rows(int): The maximum number of rows to return (default: 100000)
        """

        # Grab the Input (Data Source)
        input_data = DataSourceFactory(self.input_uuid)
        if not input_data.exists():
            self.log.critical(f"Data Check on {self.input_uuid} failed!")
            return

        # If a query is provided, that overrides the queries below
        if query:
            self.log.info(f"Querying {self.input_uuid} with {query}...")
            self.output_df = input_data.query(query)
            return

        # If the data source has more rows than max_rows, do a sample query
        num_rows = input_data.num_rows()
        if num_rows > max_rows:
            percentage = round(max_rows * 100.0 / num_rows)
            self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
            query = f"SELECT * FROM {self.input_uuid} TABLESAMPLE BERNOULLI({percentage})"
        else:
            query = f"SELECT * FROM {self.input_uuid}"

        # Mark the transform as complete and set the output DataFrame
        self.output_df = input_data.query(query)

    def post_transform(self, **kwargs):
        """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
        self.log.info("Post-Transform: Checking Pandas DataFrame...")
        self.log.info(f"DataFrame Shape: {self.output_df.shape}")

    def get_output(self) -> pd.DataFrame:
        """Get the DataFrame Output from this Transform"""
        return self.output_df

__init__(input_uuid)

DataToPandas Initialization

Source code in src/sageworks/core/transforms/pandas_transforms/data_to_pandas.py
def __init__(self, input_uuid: str):
    """DataToPandas Initialization"""

    # Call superclass init
    super().__init__(input_uuid, "DataFrame")

    # Set up all my instance attributes
    self.input_type = TransformInput.DATA_SOURCE
    self.output_type = TransformOutput.PANDAS_DF
    self.output_df = None

get_output()

Get the DataFrame Output from this Transform

Source code in src/sageworks/core/transforms/pandas_transforms/data_to_pandas.py
def get_output(self) -> pd.DataFrame:
    """Get the DataFrame Output from this Transform"""
    return self.output_df

post_transform(**kwargs)

Post-Transform: Any checks on the Pandas DataFrame that need to be done

Source code in src/sageworks/core/transforms/pandas_transforms/data_to_pandas.py
def post_transform(self, **kwargs):
    """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
    self.log.info("Post-Transform: Checking Pandas DataFrame...")
    self.log.info(f"DataFrame Shape: {self.output_df.shape}")

transform_impl(query=None, max_rows=100000)

Convert the DataSource into a Pandas DataFrame Args: query(str): The query to run against the DataSource (default: None) max_rows(int): The maximum number of rows to return (default: 100000)

Source code in src/sageworks/core/transforms/pandas_transforms/data_to_pandas.py
def transform_impl(self, query: str = None, max_rows=100000):
    """Convert the DataSource into a Pandas DataFrame
    Args:
        query(str): The query to run against the DataSource (default: None)
        max_rows(int): The maximum number of rows to return (default: 100000)
    """

    # Grab the Input (Data Source)
    input_data = DataSourceFactory(self.input_uuid)
    if not input_data.exists():
        self.log.critical(f"Data Check on {self.input_uuid} failed!")
        return

    # If a query is provided, that overrides the queries below
    if query:
        self.log.info(f"Querying {self.input_uuid} with {query}...")
        self.output_df = input_data.query(query)
        return

    # If the data source has more rows than max_rows, do a sample query
    num_rows = input_data.num_rows()
    if num_rows > max_rows:
        percentage = round(max_rows * 100.0 / num_rows)
        self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
        query = f"SELECT * FROM {self.input_uuid} TABLESAMPLE BERNOULLI({percentage})"
    else:
        query = f"SELECT * FROM {self.input_uuid}"

    # Mark the transform as complete and set the output DataFrame
    self.output_df = input_data.query(query)

FeaturesToPandas

Bases: Transform

FeaturesToPandas: Class to transform a FeatureSet into a Pandas DataFrame

Common Usage
feature_to_df = FeaturesToPandas(feature_set_uuid)
feature_to_df.transform(max_rows=<optional max rows to sample>)
my_df = feature_to_df.get_output()
Source code in src/sageworks/core/transforms/pandas_transforms/features_to_pandas.py
class FeaturesToPandas(Transform):
    """FeaturesToPandas: Class to transform a FeatureSet into a Pandas DataFrame

    Common Usage:
        ```
        feature_to_df = FeaturesToPandas(feature_set_uuid)
        feature_to_df.transform(max_rows=<optional max rows to sample>)
        my_df = feature_to_df.get_output()
        ```
    """

    def __init__(self, feature_set_name: str):
        """FeaturesToPandas Initialization"""

        # Call superclass init
        super().__init__(input_uuid=feature_set_name, output_uuid="DataFrame")

        # Set up all my instance attributes
        self.input_type = TransformInput.FEATURE_SET
        self.output_type = TransformOutput.PANDAS_DF
        self.output_df = None
        self.transform_run = False

    def transform_impl(self, max_rows=100000):
        """Convert the FeatureSet into a Pandas DataFrame"""

        # Grab the Input (Feature Set)
        input_data = FeatureSetCore(self.input_uuid)
        if not input_data.exists():
            self.log.critical(f"Feature Set Check on {self.input_uuid} failed!")
            return

        # Grab the table for this Feature Set
        table = input_data.athena_table

        # Get the list of columns (and subtract metadata columns that might get added)
        columns = input_data.columns
        filter_columns = ["write_time", "api_invocation_time", "is_deleted"]
        columns = ", ".join([x for x in columns if x not in filter_columns])

        # Get the number of rows in the Feature Set
        num_rows = input_data.num_rows()

        # If the data source has more rows than max_rows, do a sample query
        if num_rows > max_rows:
            percentage = round(max_rows * 100.0 / num_rows)
            self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
            query = f'SELECT {columns} FROM "{table}" TABLESAMPLE BERNOULLI({percentage})'
        else:
            query = f'SELECT {columns} FROM "{table}"'

        # Mark the transform as complete and set the output DataFrame
        self.transform_run = True
        self.output_df = input_data.query(query)

    def post_transform(self, **kwargs):
        """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
        self.log.info("Post-Transform: Checking Pandas DataFrame...")
        self.log.info(f"DataFrame Shape: {self.output_df.shape}")

    def get_output(self) -> pd.DataFrame:
        """Get the DataFrame Output from this Transform"""
        if not self.transform_run:
            self.transform()
        return self.output_df

__init__(feature_set_name)

FeaturesToPandas Initialization

Source code in src/sageworks/core/transforms/pandas_transforms/features_to_pandas.py
def __init__(self, feature_set_name: str):
    """FeaturesToPandas Initialization"""

    # Call superclass init
    super().__init__(input_uuid=feature_set_name, output_uuid="DataFrame")

    # Set up all my instance attributes
    self.input_type = TransformInput.FEATURE_SET
    self.output_type = TransformOutput.PANDAS_DF
    self.output_df = None
    self.transform_run = False

get_output()

Get the DataFrame Output from this Transform

Source code in src/sageworks/core/transforms/pandas_transforms/features_to_pandas.py
def get_output(self) -> pd.DataFrame:
    """Get the DataFrame Output from this Transform"""
    if not self.transform_run:
        self.transform()
    return self.output_df

post_transform(**kwargs)

Post-Transform: Any checks on the Pandas DataFrame that need to be done

Source code in src/sageworks/core/transforms/pandas_transforms/features_to_pandas.py
def post_transform(self, **kwargs):
    """Post-Transform: Any checks on the Pandas DataFrame that need to be done"""
    self.log.info("Post-Transform: Checking Pandas DataFrame...")
    self.log.info(f"DataFrame Shape: {self.output_df.shape}")

transform_impl(max_rows=100000)

Convert the FeatureSet into a Pandas DataFrame

Source code in src/sageworks/core/transforms/pandas_transforms/features_to_pandas.py
def transform_impl(self, max_rows=100000):
    """Convert the FeatureSet into a Pandas DataFrame"""

    # Grab the Input (Feature Set)
    input_data = FeatureSetCore(self.input_uuid)
    if not input_data.exists():
        self.log.critical(f"Feature Set Check on {self.input_uuid} failed!")
        return

    # Grab the table for this Feature Set
    table = input_data.athena_table

    # Get the list of columns (and subtract metadata columns that might get added)
    columns = input_data.columns
    filter_columns = ["write_time", "api_invocation_time", "is_deleted"]
    columns = ", ".join([x for x in columns if x not in filter_columns])

    # Get the number of rows in the Feature Set
    num_rows = input_data.num_rows()

    # If the data source has more rows than max_rows, do a sample query
    if num_rows > max_rows:
        percentage = round(max_rows * 100.0 / num_rows)
        self.log.important(f"DataSource has {num_rows} rows.. sampling down to {max_rows}...")
        query = f'SELECT {columns} FROM "{table}" TABLESAMPLE BERNOULLI({percentage})'
    else:
        query = f'SELECT {columns} FROM "{table}"'

    # Mark the transform as complete and set the output DataFrame
    self.transform_run = True
    self.output_df = input_data.query(query)

PandasToData

Bases: Transform

PandasToData: Class to publish a Pandas DataFrame as a DataSource

Common Usage
df_to_data = PandasToData(output_uuid)
df_to_data.set_output_tags(["test", "small"])
df_to_data.set_input(test_df)
df_to_data.transform()
Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_data.py
class PandasToData(Transform):
    """PandasToData: Class to publish a Pandas DataFrame as a DataSource

    Common Usage:
        ```
        df_to_data = PandasToData(output_uuid)
        df_to_data.set_output_tags(["test", "small"])
        df_to_data.set_input(test_df)
        df_to_data.transform()
        ```
    """

    def __init__(self, output_uuid: str, output_format: str = "parquet"):
        """PandasToData Initialization
        Args:
            output_uuid (str): The UUID of the DataSource to create
            output_format (str): The file format to store the S3 object data in (default: "parquet")
        """

        # Make sure the output_uuid is a valid name/id
        Artifact.ensure_valid_name(output_uuid)

        # Call superclass init
        super().__init__("DataFrame", output_uuid)

        # Set up all my instance attributes
        self.input_type = TransformInput.PANDAS_DF
        self.output_type = TransformOutput.DATA_SOURCE
        self.output_df = None

        # Give a message that Parquet is best in most cases
        if output_format != "parquet":
            self.log.warning("Parquet format works the best in most cases please consider using it")
        self.output_format = output_format

    def set_input(self, input_df: pd.DataFrame):
        """Set the DataFrame Input for this Transform"""
        self.output_df = input_df.copy()

    def convert_object_to_string(self, df: pd.DataFrame) -> pd.DataFrame:
        """Try to automatically convert object columns to string columns"""
        for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
            try:
                df[c] = df[c].astype("string")
                df[c] = df[c].str.replace("'", '"')  # This is for nested JSON
            except (ParserError, ValueError, TypeError):
                self.log.info(f"Column {c} could not be converted to string...")
        return df

    def convert_object_to_datetime(self, df: pd.DataFrame) -> pd.DataFrame:
        """Try to automatically convert object columns to datetime or string columns"""
        for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
            try:
                df[c] = pd.to_datetime(df[c])
            except (ParserError, ValueError, TypeError):
                self.log.debug(f"Column {c} could not be converted to datetime...")
        return df

    @staticmethod
    def convert_datetime_columns(df: pd.DataFrame) -> pd.DataFrame:
        """Convert datetime columns to ISO-8601 string"""
        datetime_type = ["datetime", "datetime64", "datetime64[ns]", "datetimetz"]
        for c in df.select_dtypes(include=datetime_type).columns:
            df[c] = df[c].map(datetime_to_iso8601)
            df[c] = df[c].astype(pd.StringDtype())
        return df

    def transform_impl(self, overwrite: bool = True, **kwargs):
        """Convert the Pandas DataFrame into Parquet Format in the SageWorks S3 Bucket, and
        store the information about the data to the AWS Data Catalog sageworks database

        Args:
            overwrite (bool): Overwrite the existing data in the SageWorks S3 Bucket
        """
        self.log.info(f"DataFrame to SageWorks DataSource: {self.output_uuid}...")

        # Set up our metadata storage
        sageworks_meta = {"sageworks_tags": self.output_tags}
        sageworks_meta.update(self.output_meta)

        # Create the Output Parquet file S3 Storage Path
        s3_storage_path = f"{self.data_sources_s3_path}/{self.output_uuid}"

        # Convert columns names to lowercase, Athena will not work with uppercase column names
        if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
            for c in self.output_df.columns:
                if c != c.lower():
                    self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
            self.output_df.columns = self.output_df.columns.str.lower()

        # Convert Object Columns to String
        self.output_df = self.convert_object_to_string(self.output_df)

        # Note: Both of these conversions may not be necessary, so we're leaving them commented out
        """
        # Convert Object Columns to Datetime
        self.output_df = self.convert_object_to_datetime(self.output_df)

        # Now convert datetime columns to ISO-8601 string
        # self.output_df = self.convert_datetime_columns(self.output_df)
        """

        # Write out the DataFrame to AWS Data Catalog in either Parquet or JSONL format
        description = f"SageWorks data source: {self.output_uuid}"
        glue_table_settings = {"description": description, "parameters": sageworks_meta}
        if self.output_format == "parquet":
            wr.s3.to_parquet(
                self.output_df,
                path=s3_storage_path,
                dataset=True,
                mode="overwrite",
                database=self.data_catalog_db,
                table=self.output_uuid,
                filename_prefix=f"{self.output_uuid}_",
                boto3_session=self.boto3_session,
                partition_cols=None,
                glue_table_settings=glue_table_settings,
                sanitize_columns=False,
            )  # FIXME: Have some logic around partition columns

        # Note: In general Parquet works will for most uses cases. We recommend using Parquet
        #       You can use JSON_EXTRACT on Parquet string field, and it works great.
        elif self.output_format == "jsonl":
            self.log.warning("We recommend using Parquet format for most use cases")
            self.log.warning("If you have a use case that requires JSONL please contact SageWorks support")
            self.log.warning("We'd like to understand what functionality JSONL is providing that isn't already")
            self.log.warning("provided with Parquet and JSON_EXTRACT() for your Athena Queries")
            wr.s3.to_json(
                self.output_df,
                path=s3_storage_path,
                orient="records",
                lines=True,
                date_format="iso",
                dataset=True,
                mode="overwrite",
                database=self.data_catalog_db,
                table=self.output_uuid,
                filename_prefix=f"{self.output_uuid}_",
                boto3_session=self.boto3_session,
                partition_cols=None,
                glue_table_settings=glue_table_settings,
            )
        else:
            raise ValueError(f"Unsupported file format: {self.output_format}")

    def post_transform(self, **kwargs):
        """Post-Transform: Calling onboard() fnr the DataSource"""
        self.log.info("Post-Transform: Calling onboard() for the DataSource...")

        # Onboard the DataSource
        output_data_source = DataSourceFactory(self.output_uuid, force_refresh=True)
        output_data_source.onboard()

__init__(output_uuid, output_format='parquet')

PandasToData Initialization Args: output_uuid (str): The UUID of the DataSource to create output_format (str): The file format to store the S3 object data in (default: "parquet")

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_data.py
def __init__(self, output_uuid: str, output_format: str = "parquet"):
    """PandasToData Initialization
    Args:
        output_uuid (str): The UUID of the DataSource to create
        output_format (str): The file format to store the S3 object data in (default: "parquet")
    """

    # Make sure the output_uuid is a valid name/id
    Artifact.ensure_valid_name(output_uuid)

    # Call superclass init
    super().__init__("DataFrame", output_uuid)

    # Set up all my instance attributes
    self.input_type = TransformInput.PANDAS_DF
    self.output_type = TransformOutput.DATA_SOURCE
    self.output_df = None

    # Give a message that Parquet is best in most cases
    if output_format != "parquet":
        self.log.warning("Parquet format works the best in most cases please consider using it")
    self.output_format = output_format

convert_datetime_columns(df) staticmethod

Convert datetime columns to ISO-8601 string

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_data.py
@staticmethod
def convert_datetime_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Convert datetime columns to ISO-8601 string"""
    datetime_type = ["datetime", "datetime64", "datetime64[ns]", "datetimetz"]
    for c in df.select_dtypes(include=datetime_type).columns:
        df[c] = df[c].map(datetime_to_iso8601)
        df[c] = df[c].astype(pd.StringDtype())
    return df

convert_object_to_datetime(df)

Try to automatically convert object columns to datetime or string columns

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_data.py
def convert_object_to_datetime(self, df: pd.DataFrame) -> pd.DataFrame:
    """Try to automatically convert object columns to datetime or string columns"""
    for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
        try:
            df[c] = pd.to_datetime(df[c])
        except (ParserError, ValueError, TypeError):
            self.log.debug(f"Column {c} could not be converted to datetime...")
    return df

convert_object_to_string(df)

Try to automatically convert object columns to string columns

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_data.py
def convert_object_to_string(self, df: pd.DataFrame) -> pd.DataFrame:
    """Try to automatically convert object columns to string columns"""
    for c in df.columns[df.dtypes == "object"]:  # Look at the object columns
        try:
            df[c] = df[c].astype("string")
            df[c] = df[c].str.replace("'", '"')  # This is for nested JSON
        except (ParserError, ValueError, TypeError):
            self.log.info(f"Column {c} could not be converted to string...")
    return df

post_transform(**kwargs)

Post-Transform: Calling onboard() fnr the DataSource

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_data.py
def post_transform(self, **kwargs):
    """Post-Transform: Calling onboard() fnr the DataSource"""
    self.log.info("Post-Transform: Calling onboard() for the DataSource...")

    # Onboard the DataSource
    output_data_source = DataSourceFactory(self.output_uuid, force_refresh=True)
    output_data_source.onboard()

set_input(input_df)

Set the DataFrame Input for this Transform

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_data.py
def set_input(self, input_df: pd.DataFrame):
    """Set the DataFrame Input for this Transform"""
    self.output_df = input_df.copy()

transform_impl(overwrite=True, **kwargs)

Convert the Pandas DataFrame into Parquet Format in the SageWorks S3 Bucket, and store the information about the data to the AWS Data Catalog sageworks database

Parameters:

Name Type Description Default
overwrite bool

Overwrite the existing data in the SageWorks S3 Bucket

True
Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_data.py
def transform_impl(self, overwrite: bool = True, **kwargs):
    """Convert the Pandas DataFrame into Parquet Format in the SageWorks S3 Bucket, and
    store the information about the data to the AWS Data Catalog sageworks database

    Args:
        overwrite (bool): Overwrite the existing data in the SageWorks S3 Bucket
    """
    self.log.info(f"DataFrame to SageWorks DataSource: {self.output_uuid}...")

    # Set up our metadata storage
    sageworks_meta = {"sageworks_tags": self.output_tags}
    sageworks_meta.update(self.output_meta)

    # Create the Output Parquet file S3 Storage Path
    s3_storage_path = f"{self.data_sources_s3_path}/{self.output_uuid}"

    # Convert columns names to lowercase, Athena will not work with uppercase column names
    if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
        for c in self.output_df.columns:
            if c != c.lower():
                self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
        self.output_df.columns = self.output_df.columns.str.lower()

    # Convert Object Columns to String
    self.output_df = self.convert_object_to_string(self.output_df)

    # Note: Both of these conversions may not be necessary, so we're leaving them commented out
    """
    # Convert Object Columns to Datetime
    self.output_df = self.convert_object_to_datetime(self.output_df)

    # Now convert datetime columns to ISO-8601 string
    # self.output_df = self.convert_datetime_columns(self.output_df)
    """

    # Write out the DataFrame to AWS Data Catalog in either Parquet or JSONL format
    description = f"SageWorks data source: {self.output_uuid}"
    glue_table_settings = {"description": description, "parameters": sageworks_meta}
    if self.output_format == "parquet":
        wr.s3.to_parquet(
            self.output_df,
            path=s3_storage_path,
            dataset=True,
            mode="overwrite",
            database=self.data_catalog_db,
            table=self.output_uuid,
            filename_prefix=f"{self.output_uuid}_",
            boto3_session=self.boto3_session,
            partition_cols=None,
            glue_table_settings=glue_table_settings,
            sanitize_columns=False,
        )  # FIXME: Have some logic around partition columns

    # Note: In general Parquet works will for most uses cases. We recommend using Parquet
    #       You can use JSON_EXTRACT on Parquet string field, and it works great.
    elif self.output_format == "jsonl":
        self.log.warning("We recommend using Parquet format for most use cases")
        self.log.warning("If you have a use case that requires JSONL please contact SageWorks support")
        self.log.warning("We'd like to understand what functionality JSONL is providing that isn't already")
        self.log.warning("provided with Parquet and JSON_EXTRACT() for your Athena Queries")
        wr.s3.to_json(
            self.output_df,
            path=s3_storage_path,
            orient="records",
            lines=True,
            date_format="iso",
            dataset=True,
            mode="overwrite",
            database=self.data_catalog_db,
            table=self.output_uuid,
            filename_prefix=f"{self.output_uuid}_",
            boto3_session=self.boto3_session,
            partition_cols=None,
            glue_table_settings=glue_table_settings,
        )
    else:
        raise ValueError(f"Unsupported file format: {self.output_format}")

PandasToFeatures

Bases: Transform

PandasToFeatures: Class to publish a Pandas DataFrame into a FeatureSet

Common Usage
to_features = PandasToFeatures(output_uuid)
to_features.set_output_tags(["abalone", "public", "whatever"])
to_features.set_input(df, id_column="id"/None, event_time_column="date"/None)
to_features.transform()
Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
class PandasToFeatures(Transform):
    """PandasToFeatures: Class to publish a Pandas DataFrame into a FeatureSet

    Common Usage:
        ```
        to_features = PandasToFeatures(output_uuid)
        to_features.set_output_tags(["abalone", "public", "whatever"])
        to_features.set_input(df, id_column="id"/None, event_time_column="date"/None)
        to_features.transform()
        ```
    """

    def __init__(self, output_uuid: str, auto_one_hot=False):
        """PandasToFeatures Initialization
        Args:
            output_uuid (str): The UUID of the FeatureSet to create
            auto_one_hot (bool): Should we automatically one-hot encode categorical columns?
        """

        # Make sure the output_uuid is a valid name
        Artifact.ensure_valid_name(output_uuid)

        # Call superclass init
        super().__init__("DataFrame", output_uuid)

        # Set up all my instance attributes
        self.input_type = TransformInput.PANDAS_DF
        self.output_type = TransformOutput.FEATURE_SET
        self.target_column = None
        self.id_column = None
        self.event_time_column = None
        self.auto_one_hot = auto_one_hot
        self.categorical_dtypes = {}
        self.output_df = None
        self.table_format = TableFormatEnum.ICEBERG

        # Delete the existing FeatureSet if it exists
        self.delete_existing()

        # These will be set in the transform method
        self.output_feature_group = None
        self.output_feature_set = None
        self.expected_rows = 0

    def set_input(self, input_df: pd.DataFrame, target_column=None, id_column=None, event_time_column=None):
        """Set the Input DataFrame for this Transform
        Args:
            input_df (pd.DataFrame): The input DataFrame
            target_column (str): The name of the target column (default: None)
            id_column (str): The name of the id column (default: None)
            event_time_column (str): The name of the event_time column (default: None)
        """
        self.target_column = target_column
        self.id_column = id_column
        self.event_time_column = event_time_column
        self.output_df = input_df.copy()

        # Now Prepare the DataFrame for its journey into an AWS FeatureGroup
        self.prep_dataframe()

    def delete_existing(self):
        # Delete the existing FeatureSet if it exists
        try:
            delete_fs = FeatureSetCore(self.output_uuid)
            if delete_fs.exists():
                self.log.info(f"Deleting the {self.output_uuid} FeatureSet...")
                delete_fs.delete()
                time.sleep(1)
        except ClientError as exc:
            self.log.info(f"FeatureSet {self.output_uuid} doesn't exist...")
            self.log.info(exc)

    def _ensure_id_column(self):
        """Internal: AWS Feature Store requires an Id field for all data store"""
        if self.id_column is None or self.id_column not in self.output_df.columns:
            if "id" not in self.output_df.columns:
                self.log.info("Generating an id column before FeatureSet Creation...")
                self.output_df["id"] = self.output_df.index
            self.id_column = "id"

    def _ensure_event_time(self):
        """Internal: AWS Feature Store requires an event_time field for all data stored"""
        if self.event_time_column is None or self.event_time_column not in self.output_df.columns:
            self.log.info("Generating an event_time column before FeatureSet Creation...")
            self.event_time_column = "event_time"
            self.output_df[self.event_time_column] = pd.Timestamp("now", tz="UTC")

        # The event_time_column is defined, so we need to make sure it's in ISO-8601 string format
        # Note: AWS Feature Store only a particular ISO-8601 format not ALL ISO-8601 formats
        time_column = self.output_df[self.event_time_column]

        # Check if the event_time_column is of type object or string convert it to DateTime
        if time_column.dtypes == "object" or time_column.dtypes.name == "string":
            self.log.info(f"Converting {self.event_time_column} to DateTime...")
            time_column = pd.to_datetime(time_column)

        # Let's make sure it the right type for Feature Store
        if pd.api.types.is_datetime64_any_dtype(time_column):
            self.log.info(f"Converting {self.event_time_column} to ISOFormat Date String before FeatureSet Creation...")

            # Convert the datetime DType to ISO-8601 string
            # TableFormat=ICEBERG does not support alternate formats for event_time field, it only supports String type.
            time_column = time_column.map(datetime_to_iso8601)
            self.output_df[self.event_time_column] = time_column.astype("string")

    def _convert_objs_to_string(self):
        """Internal: AWS Feature Store doesn't know how to store object dtypes, so convert to String"""
        for col in self.output_df:
            if pd.api.types.is_object_dtype(self.output_df[col].dtype):
                self.output_df[col] = self.output_df[col].astype(pd.StringDtype())

    def process_column_name(self, column: str, shorten: bool = False) -> str:
        """Call various methods to make sure the column is ready for Feature Store
        Args:
            column (str): The column name to process
            shorten (bool): Should we shorten the column name? (default: False)
        """
        self.log.debug(f"Processing column {column}...")

        # Make sure the column name is valid
        column = self.sanitize_column_name(column)

        # Make sure the column name isn't too long
        if shorten:
            column = self.shorten_column_name(column)

        return column

    def shorten_column_name(self, name, max_length=20):
        if len(name) <= max_length:
            return name

        # Start building the new name from the end
        parts = name.split("_")[::-1]
        new_name = ""
        for part in parts:
            if len(new_name) + len(part) + 1 <= max_length:  # +1 for the underscore
                new_name = f"{part}_{new_name}" if new_name else part
            else:
                break

        # If new_name is empty, just use the last part of the original name
        if not new_name:
            new_name = parts[0]

        self.log.info(f"Shortening {name} to {new_name}")
        return new_name

    def sanitize_column_name(self, name):
        # Remove all invalid characters
        sanitized = re.sub("[^a-zA-Z0-9-_]", "_", name)
        sanitized = re.sub("_+", "_", sanitized)
        sanitized = sanitized.strip("_")

        # Log the change if the name was altered
        if sanitized != name:
            self.log.info(f"Sanitizing {name} to {sanitized}")

        return sanitized

    def one_hot_encoding(self, df, categorical_columns: list) -> pd.DataFrame:
        """One Hot Encoding for Categorical Columns with additional column name management"""

        # Now convert Categorical Types to One Hot Encoding
        current_columns = list(df.columns)
        df = pd.get_dummies(df, columns=categorical_columns)

        # Compute the new columns generated by get_dummies
        new_columns = list(set(df.columns) - set(current_columns))

        # Convert new columns to int32
        df[new_columns] = df[new_columns].astype("int32")

        # For the new columns we're going to shorten the names
        renamed_columns = {col: self.process_column_name(col) for col in new_columns}

        # Rename the columns in the DataFrame
        df.rename(columns=renamed_columns, inplace=True)

        return df

    # Helper Methods
    def auto_convert_columns_to_categorical(self):
        """Convert object and string types to Categorical"""
        categorical_columns = []
        for feature, dtype in self.output_df.dtypes.items():
            if dtype in ["object", "string", "category"] and feature not in [
                self.event_time_column,
                self.id_column,
                self.target_column,
            ]:
                unique_values = self.output_df[feature].nunique()
                if 1 < unique_values < 6:
                    self.log.important(f"Converting column {feature} to categorical (unique {unique_values})")
                    self.output_df[feature] = self.output_df[feature].astype("category")
                    categorical_columns.append(feature)

        # Now run one hot encoding on categorical columns
        self.output_df = self.one_hot_encoding(self.output_df, categorical_columns)

    def manual_categorical_converter(self):
        """Convert object and string types to Categorical

        Note:
            This method is used for streaming/chunking. You can set the
            categorical_dtypes attribute to a dictionary of column names and
            their respective categorical types.
        """
        for column, cat_d_type in self.categorical_dtypes.items():
            self.output_df[column] = self.output_df[column].astype(cat_d_type)

        # Now convert Categorical Types to One Hot Encoding
        categorical_columns = list(self.categorical_dtypes.keys())
        self.output_df = self.one_hot_encoding(self.output_df, categorical_columns)

    @staticmethod
    def convert_column_types(df: pd.DataFrame) -> pd.DataFrame:
        """Convert the types of the DataFrame to the correct types for the Feature Store"""
        for column in list(df.select_dtypes(include="bool").columns):
            df[column] = df[column].astype("int32")
        for column in list(df.select_dtypes(include="category").columns):
            df[column] = df[column].astype("str")

        # Select all columns that are of datetime dtype and convert them to ISO-8601 strings
        for column in [col for col in df.columns if pd.api.types.is_datetime64_any_dtype(df[col])]:
            df[column] = df[column].map(datetime_to_iso8601).astype("string")

        """FIXME Not sure we need these conversions
        for column in list(df.select_dtypes(include="object").columns):
            df[column] = df[column].astype("string")
        for column in list(df.select_dtypes(include=[pd.Int64Dtype]).columns):
            df[column] = df[column].astype("int64")
        for column in list(df.select_dtypes(include=[pd.Float64Dtype]).columns):
            df[column] = df[column].astype("float64")
        """
        return df

    def prep_dataframe(self):
        """Prep the DataFrame for Feature Store Creation"""
        self.log.info("Prep the output_df (cat_convert, convert types, and lowercase columns)...")

        # Remove any columns generated from AWS
        aws_cols = ["write_time", "api_invocation_time", "is_deleted", "event_time"]
        self.output_df = self.output_df.drop(columns=aws_cols, errors="ignore")

        # Convert object and string types to Categorical
        if self.auto_one_hot:
            self.auto_convert_columns_to_categorical()
        else:
            self.manual_categorical_converter()

        # Convert columns names to lowercase, Athena will not work with uppercase column names
        if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
            for c in self.output_df.columns:
                if c != c.lower():
                    self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
            self.output_df.columns = self.output_df.columns.str.lower()

        # Make sure we have the required id and event_time columns
        self._ensure_id_column()
        self._ensure_event_time()

        # We need to convert some of our column types to the correct types
        # Feature Store only supports these data types:
        # - Integral
        # - Fractional
        # - String (timestamp/datetime types need to be converted to string)
        self.output_df = self.convert_column_types(self.output_df)

    def create_feature_group(self):
        """Create a Feature Group, load our Feature Definitions, and wait for it to be ready"""

        # Create a Feature Group and load our Feature Definitions
        my_feature_group = FeatureGroup(name=self.output_uuid, sagemaker_session=self.sm_session)
        my_feature_group.load_feature_definitions(data_frame=self.output_df)

        # Create the Output S3 Storage Path for this Feature Set
        s3_storage_path = f"{self.feature_sets_s3_path}/{self.output_uuid}"

        # Get the metadata/tags to push into AWS
        aws_tags = self.get_aws_tags()

        # Create the Feature Group
        my_feature_group.create(
            s3_uri=s3_storage_path,
            record_identifier_name=self.id_column,
            event_time_feature_name=self.event_time_column,
            role_arn=self.sageworks_role_arn,
            enable_online_store=True,
            table_format=self.table_format,
            tags=aws_tags,
        )

        # Ensure/wait for the feature group to be created
        self.ensure_feature_group_created(my_feature_group)
        return my_feature_group

    def pre_transform(self, **kwargs):
        """Pre-Transform: Create the Feature Group"""
        self.output_feature_group = self.create_feature_group()

    def transform_impl(self):
        """Transform Implementation: Ingest the data into the Feature Group"""

        # Now we actually push the data into the Feature Group (called ingestion)
        self.log.important("Ingesting rows into Feature Group...")
        ingest_manager = self.output_feature_group.ingest(self.output_df, max_processes=8, wait=False)
        try:
            ingest_manager.wait()
        except IngestionError as exc:
            self.log.warning(f"Some rows had an ingesting error: {exc}")

        # Report on any rows that failed to ingest
        if ingest_manager.failed_rows:
            self.log.warning(f"Number of Failed Rows: {len(ingest_manager.failed_rows)}")

            # FIXME: This may or may not give us the correct rows
            # If any index is greater then the number of rows, then the index needs
            # to be converted to a relative index in our current output_df
            df_rows = len(self.output_df)
            relative_indexes = [idx - df_rows if idx >= df_rows else idx for idx in ingest_manager.failed_rows]
            failed_data = self.output_df.iloc[relative_indexes]
            for idx, row in failed_data.iterrows():
                self.log.warning(f"Failed Row {idx}: {row.to_dict()}")

        # Keep track of the number of rows we expect to be ingested
        self.expected_rows += len(self.output_df) - len(ingest_manager.failed_rows)
        self.log.info(f"Added rows: {len(self.output_df)}")
        self.log.info(f"Failed rows: {len(ingest_manager.failed_rows)}")
        self.log.info(f"Total rows to be ingested: {self.expected_rows}")

    def post_transform(self, **kwargs):
        """Post-Transform: Populating Offline Storage and onboard()"""
        self.log.info("Post-Transform: Populating Offline Storage and onboard()...")

        # Feature Group Ingestion takes a while, so we need to wait for it to finish
        self.output_feature_set = FeatureSetCore(self.output_uuid, force_refresh=True)
        self.log.important("Waiting for AWS Feature Group Offline storage to be ready...")
        self.log.important("This will often take 10-20 minutes...go have coffee or lunch :)")
        self.output_feature_set.set_status("initializing")
        self.wait_for_rows(self.expected_rows)

        # Call the FeatureSet onboard method to compute a bunch of EDA stuff
        self.output_feature_set.onboard()

    def ensure_feature_group_created(self, feature_group):
        status = feature_group.describe().get("FeatureGroupStatus")
        while status == "Creating":
            self.log.debug("FeatureSet being Created...")
            time.sleep(5)
            status = feature_group.describe().get("FeatureGroupStatus")
        self.log.info(f"FeatureSet {feature_group.name} successfully created")

    def wait_for_rows(self, expected_rows: int):
        """Wait for AWS Feature Group to fully populate the Offline Storage"""
        rows = self.output_feature_set.num_rows()

        # Wait for the rows to be populated
        self.log.info(f"Waiting for AWS Feature Group {self.output_uuid} Offline Storage...")
        not_all_rows_retry = 5
        while rows < expected_rows and not_all_rows_retry > 0:
            sleep_time = 5 if rows else 60
            not_all_rows_retry -= 1 if rows else 0
            time.sleep(sleep_time)
            rows = self.output_feature_set.num_rows()
            self.log.info(f"Offline Storage {self.output_uuid}: {rows} rows out of {expected_rows}")
        if rows == expected_rows:
            self.log.important(f"Success: Reached Expected Rows ({rows} rows)...")
        else:
            msg = f"Did not reach expected rows ({rows}/{expected_rows})...(probably AWS lag)"
            self.log.warning(msg)
            self.log.monitor(msg)

__init__(output_uuid, auto_one_hot=False)

PandasToFeatures Initialization Args: output_uuid (str): The UUID of the FeatureSet to create auto_one_hot (bool): Should we automatically one-hot encode categorical columns?

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def __init__(self, output_uuid: str, auto_one_hot=False):
    """PandasToFeatures Initialization
    Args:
        output_uuid (str): The UUID of the FeatureSet to create
        auto_one_hot (bool): Should we automatically one-hot encode categorical columns?
    """

    # Make sure the output_uuid is a valid name
    Artifact.ensure_valid_name(output_uuid)

    # Call superclass init
    super().__init__("DataFrame", output_uuid)

    # Set up all my instance attributes
    self.input_type = TransformInput.PANDAS_DF
    self.output_type = TransformOutput.FEATURE_SET
    self.target_column = None
    self.id_column = None
    self.event_time_column = None
    self.auto_one_hot = auto_one_hot
    self.categorical_dtypes = {}
    self.output_df = None
    self.table_format = TableFormatEnum.ICEBERG

    # Delete the existing FeatureSet if it exists
    self.delete_existing()

    # These will be set in the transform method
    self.output_feature_group = None
    self.output_feature_set = None
    self.expected_rows = 0

auto_convert_columns_to_categorical()

Convert object and string types to Categorical

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def auto_convert_columns_to_categorical(self):
    """Convert object and string types to Categorical"""
    categorical_columns = []
    for feature, dtype in self.output_df.dtypes.items():
        if dtype in ["object", "string", "category"] and feature not in [
            self.event_time_column,
            self.id_column,
            self.target_column,
        ]:
            unique_values = self.output_df[feature].nunique()
            if 1 < unique_values < 6:
                self.log.important(f"Converting column {feature} to categorical (unique {unique_values})")
                self.output_df[feature] = self.output_df[feature].astype("category")
                categorical_columns.append(feature)

    # Now run one hot encoding on categorical columns
    self.output_df = self.one_hot_encoding(self.output_df, categorical_columns)

convert_column_types(df) staticmethod

Convert the types of the DataFrame to the correct types for the Feature Store

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
@staticmethod
def convert_column_types(df: pd.DataFrame) -> pd.DataFrame:
    """Convert the types of the DataFrame to the correct types for the Feature Store"""
    for column in list(df.select_dtypes(include="bool").columns):
        df[column] = df[column].astype("int32")
    for column in list(df.select_dtypes(include="category").columns):
        df[column] = df[column].astype("str")

    # Select all columns that are of datetime dtype and convert them to ISO-8601 strings
    for column in [col for col in df.columns if pd.api.types.is_datetime64_any_dtype(df[col])]:
        df[column] = df[column].map(datetime_to_iso8601).astype("string")

    """FIXME Not sure we need these conversions
    for column in list(df.select_dtypes(include="object").columns):
        df[column] = df[column].astype("string")
    for column in list(df.select_dtypes(include=[pd.Int64Dtype]).columns):
        df[column] = df[column].astype("int64")
    for column in list(df.select_dtypes(include=[pd.Float64Dtype]).columns):
        df[column] = df[column].astype("float64")
    """
    return df

create_feature_group()

Create a Feature Group, load our Feature Definitions, and wait for it to be ready

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def create_feature_group(self):
    """Create a Feature Group, load our Feature Definitions, and wait for it to be ready"""

    # Create a Feature Group and load our Feature Definitions
    my_feature_group = FeatureGroup(name=self.output_uuid, sagemaker_session=self.sm_session)
    my_feature_group.load_feature_definitions(data_frame=self.output_df)

    # Create the Output S3 Storage Path for this Feature Set
    s3_storage_path = f"{self.feature_sets_s3_path}/{self.output_uuid}"

    # Get the metadata/tags to push into AWS
    aws_tags = self.get_aws_tags()

    # Create the Feature Group
    my_feature_group.create(
        s3_uri=s3_storage_path,
        record_identifier_name=self.id_column,
        event_time_feature_name=self.event_time_column,
        role_arn=self.sageworks_role_arn,
        enable_online_store=True,
        table_format=self.table_format,
        tags=aws_tags,
    )

    # Ensure/wait for the feature group to be created
    self.ensure_feature_group_created(my_feature_group)
    return my_feature_group

manual_categorical_converter()

Convert object and string types to Categorical

Note

This method is used for streaming/chunking. You can set the categorical_dtypes attribute to a dictionary of column names and their respective categorical types.

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def manual_categorical_converter(self):
    """Convert object and string types to Categorical

    Note:
        This method is used for streaming/chunking. You can set the
        categorical_dtypes attribute to a dictionary of column names and
        their respective categorical types.
    """
    for column, cat_d_type in self.categorical_dtypes.items():
        self.output_df[column] = self.output_df[column].astype(cat_d_type)

    # Now convert Categorical Types to One Hot Encoding
    categorical_columns = list(self.categorical_dtypes.keys())
    self.output_df = self.one_hot_encoding(self.output_df, categorical_columns)

one_hot_encoding(df, categorical_columns)

One Hot Encoding for Categorical Columns with additional column name management

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def one_hot_encoding(self, df, categorical_columns: list) -> pd.DataFrame:
    """One Hot Encoding for Categorical Columns with additional column name management"""

    # Now convert Categorical Types to One Hot Encoding
    current_columns = list(df.columns)
    df = pd.get_dummies(df, columns=categorical_columns)

    # Compute the new columns generated by get_dummies
    new_columns = list(set(df.columns) - set(current_columns))

    # Convert new columns to int32
    df[new_columns] = df[new_columns].astype("int32")

    # For the new columns we're going to shorten the names
    renamed_columns = {col: self.process_column_name(col) for col in new_columns}

    # Rename the columns in the DataFrame
    df.rename(columns=renamed_columns, inplace=True)

    return df

post_transform(**kwargs)

Post-Transform: Populating Offline Storage and onboard()

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def post_transform(self, **kwargs):
    """Post-Transform: Populating Offline Storage and onboard()"""
    self.log.info("Post-Transform: Populating Offline Storage and onboard()...")

    # Feature Group Ingestion takes a while, so we need to wait for it to finish
    self.output_feature_set = FeatureSetCore(self.output_uuid, force_refresh=True)
    self.log.important("Waiting for AWS Feature Group Offline storage to be ready...")
    self.log.important("This will often take 10-20 minutes...go have coffee or lunch :)")
    self.output_feature_set.set_status("initializing")
    self.wait_for_rows(self.expected_rows)

    # Call the FeatureSet onboard method to compute a bunch of EDA stuff
    self.output_feature_set.onboard()

pre_transform(**kwargs)

Pre-Transform: Create the Feature Group

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def pre_transform(self, **kwargs):
    """Pre-Transform: Create the Feature Group"""
    self.output_feature_group = self.create_feature_group()

prep_dataframe()

Prep the DataFrame for Feature Store Creation

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def prep_dataframe(self):
    """Prep the DataFrame for Feature Store Creation"""
    self.log.info("Prep the output_df (cat_convert, convert types, and lowercase columns)...")

    # Remove any columns generated from AWS
    aws_cols = ["write_time", "api_invocation_time", "is_deleted", "event_time"]
    self.output_df = self.output_df.drop(columns=aws_cols, errors="ignore")

    # Convert object and string types to Categorical
    if self.auto_one_hot:
        self.auto_convert_columns_to_categorical()
    else:
        self.manual_categorical_converter()

    # Convert columns names to lowercase, Athena will not work with uppercase column names
    if str(self.output_df.columns) != str(self.output_df.columns.str.lower()):
        for c in self.output_df.columns:
            if c != c.lower():
                self.log.important(f"Column name {c} converted to lowercase: {c.lower()}")
        self.output_df.columns = self.output_df.columns.str.lower()

    # Make sure we have the required id and event_time columns
    self._ensure_id_column()
    self._ensure_event_time()

    # We need to convert some of our column types to the correct types
    # Feature Store only supports these data types:
    # - Integral
    # - Fractional
    # - String (timestamp/datetime types need to be converted to string)
    self.output_df = self.convert_column_types(self.output_df)

process_column_name(column, shorten=False)

Call various methods to make sure the column is ready for Feature Store Args: column (str): The column name to process shorten (bool): Should we shorten the column name? (default: False)

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def process_column_name(self, column: str, shorten: bool = False) -> str:
    """Call various methods to make sure the column is ready for Feature Store
    Args:
        column (str): The column name to process
        shorten (bool): Should we shorten the column name? (default: False)
    """
    self.log.debug(f"Processing column {column}...")

    # Make sure the column name is valid
    column = self.sanitize_column_name(column)

    # Make sure the column name isn't too long
    if shorten:
        column = self.shorten_column_name(column)

    return column

set_input(input_df, target_column=None, id_column=None, event_time_column=None)

Set the Input DataFrame for this Transform Args: input_df (pd.DataFrame): The input DataFrame target_column (str): The name of the target column (default: None) id_column (str): The name of the id column (default: None) event_time_column (str): The name of the event_time column (default: None)

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def set_input(self, input_df: pd.DataFrame, target_column=None, id_column=None, event_time_column=None):
    """Set the Input DataFrame for this Transform
    Args:
        input_df (pd.DataFrame): The input DataFrame
        target_column (str): The name of the target column (default: None)
        id_column (str): The name of the id column (default: None)
        event_time_column (str): The name of the event_time column (default: None)
    """
    self.target_column = target_column
    self.id_column = id_column
    self.event_time_column = event_time_column
    self.output_df = input_df.copy()

    # Now Prepare the DataFrame for its journey into an AWS FeatureGroup
    self.prep_dataframe()

transform_impl()

Transform Implementation: Ingest the data into the Feature Group

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def transform_impl(self):
    """Transform Implementation: Ingest the data into the Feature Group"""

    # Now we actually push the data into the Feature Group (called ingestion)
    self.log.important("Ingesting rows into Feature Group...")
    ingest_manager = self.output_feature_group.ingest(self.output_df, max_processes=8, wait=False)
    try:
        ingest_manager.wait()
    except IngestionError as exc:
        self.log.warning(f"Some rows had an ingesting error: {exc}")

    # Report on any rows that failed to ingest
    if ingest_manager.failed_rows:
        self.log.warning(f"Number of Failed Rows: {len(ingest_manager.failed_rows)}")

        # FIXME: This may or may not give us the correct rows
        # If any index is greater then the number of rows, then the index needs
        # to be converted to a relative index in our current output_df
        df_rows = len(self.output_df)
        relative_indexes = [idx - df_rows if idx >= df_rows else idx for idx in ingest_manager.failed_rows]
        failed_data = self.output_df.iloc[relative_indexes]
        for idx, row in failed_data.iterrows():
            self.log.warning(f"Failed Row {idx}: {row.to_dict()}")

    # Keep track of the number of rows we expect to be ingested
    self.expected_rows += len(self.output_df) - len(ingest_manager.failed_rows)
    self.log.info(f"Added rows: {len(self.output_df)}")
    self.log.info(f"Failed rows: {len(ingest_manager.failed_rows)}")
    self.log.info(f"Total rows to be ingested: {self.expected_rows}")

wait_for_rows(expected_rows)

Wait for AWS Feature Group to fully populate the Offline Storage

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py
def wait_for_rows(self, expected_rows: int):
    """Wait for AWS Feature Group to fully populate the Offline Storage"""
    rows = self.output_feature_set.num_rows()

    # Wait for the rows to be populated
    self.log.info(f"Waiting for AWS Feature Group {self.output_uuid} Offline Storage...")
    not_all_rows_retry = 5
    while rows < expected_rows and not_all_rows_retry > 0:
        sleep_time = 5 if rows else 60
        not_all_rows_retry -= 1 if rows else 0
        time.sleep(sleep_time)
        rows = self.output_feature_set.num_rows()
        self.log.info(f"Offline Storage {self.output_uuid}: {rows} rows out of {expected_rows}")
    if rows == expected_rows:
        self.log.important(f"Success: Reached Expected Rows ({rows} rows)...")
    else:
        msg = f"Did not reach expected rows ({rows}/{expected_rows})...(probably AWS lag)"
        self.log.warning(msg)
        self.log.monitor(msg)

PandasToFeaturesChunked

Bases: Transform

PandasToFeaturesChunked: Class to manage a bunch of chunked Pandas DataFrames into a FeatureSet

Common Usage
to_features = PandasToFeaturesChunked(output_uuid, id_column="id"/None, event_time_column="date"/None)
to_features.set_output_tags(["abalone", "public", "whatever"])
cat_column_info = {"sex": ["M", "F", "I"]}
to_features.set_categorical_info(cat_column_info)
to_features.add_chunk(df)
to_features.add_chunk(df)
...
to_features.finalize()
Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features_chunked.py
class PandasToFeaturesChunked(Transform):
    """PandasToFeaturesChunked:  Class to manage a bunch of chunked Pandas DataFrames into a FeatureSet

    Common Usage:
        ```
        to_features = PandasToFeaturesChunked(output_uuid, id_column="id"/None, event_time_column="date"/None)
        to_features.set_output_tags(["abalone", "public", "whatever"])
        cat_column_info = {"sex": ["M", "F", "I"]}
        to_features.set_categorical_info(cat_column_info)
        to_features.add_chunk(df)
        to_features.add_chunk(df)
        ...
        to_features.finalize()
        ```
    """

    def __init__(self, output_uuid: str, id_column=None, event_time_column=None):
        """PandasToFeaturesChunked Initialization"""

        # Make sure the output_uuid is a valid name
        Artifact.ensure_valid_name(output_uuid)

        # Call superclass init
        super().__init__("DataFrame", output_uuid)

        # Set up all my instance attributes
        self.id_column = id_column
        self.event_time_column = event_time_column
        self.first_chunk = None
        self.pandas_to_features = PandasToFeatures(output_uuid, auto_one_hot=False)

    def set_categorical_info(self, cat_column_info: dict[list[str]]):
        """Set the Categorical Columns
        Args:
            cat_column_info (dict[list[str]]): Dictionary of categorical columns and their possible values
        """

        # Create the CategoricalDtypes
        cat_d_types = {}
        for col, vals in cat_column_info.items():
            cat_d_types[col] = CategoricalDtype(categories=vals)

        # Now set the CategoricalDtypes on our underlying PandasToFeatures
        self.pandas_to_features.categorical_dtypes = cat_d_types

    def add_chunk(self, chunk_df: pd.DataFrame):
        """Add a Chunk of Data to the FeatureSet"""

        # Is this the first chunk? If so we need to run the pre_transform
        if self.first_chunk is None:
            self.log.info(f"Adding first chunk {chunk_df.shape}...")
            self.first_chunk = chunk_df
            self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
            self.pandas_to_features.pre_transform()
            self.pandas_to_features.transform_impl()
        else:
            self.log.info(f"Adding chunk {chunk_df.shape}...")
            self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
            self.pandas_to_features.transform_impl()

    def pre_transform(self, **kwargs):
        """Pre-Transform: Create the Feature Group with Chunked Data"""

        # Loading data into a Feature Group takes a while, so set status to loading
        FeatureSetCore(self.output_uuid).set_status("loading")

    def transform_impl(self):
        """Required implementation of the Transform interface"""
        self.log.warning("PandasToFeaturesChunked.transform_impl() called.  This is a no-op.")

    def post_transform(self, **kwargs):
        """Post-Transform: Any Post Transform Steps"""
        self.pandas_to_features.post_transform()

__init__(output_uuid, id_column=None, event_time_column=None)

PandasToFeaturesChunked Initialization

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def __init__(self, output_uuid: str, id_column=None, event_time_column=None):
    """PandasToFeaturesChunked Initialization"""

    # Make sure the output_uuid is a valid name
    Artifact.ensure_valid_name(output_uuid)

    # Call superclass init
    super().__init__("DataFrame", output_uuid)

    # Set up all my instance attributes
    self.id_column = id_column
    self.event_time_column = event_time_column
    self.first_chunk = None
    self.pandas_to_features = PandasToFeatures(output_uuid, auto_one_hot=False)

add_chunk(chunk_df)

Add a Chunk of Data to the FeatureSet

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def add_chunk(self, chunk_df: pd.DataFrame):
    """Add a Chunk of Data to the FeatureSet"""

    # Is this the first chunk? If so we need to run the pre_transform
    if self.first_chunk is None:
        self.log.info(f"Adding first chunk {chunk_df.shape}...")
        self.first_chunk = chunk_df
        self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
        self.pandas_to_features.pre_transform()
        self.pandas_to_features.transform_impl()
    else:
        self.log.info(f"Adding chunk {chunk_df.shape}...")
        self.pandas_to_features.set_input(chunk_df, self.id_column, self.event_time_column)
        self.pandas_to_features.transform_impl()

post_transform(**kwargs)

Post-Transform: Any Post Transform Steps

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def post_transform(self, **kwargs):
    """Post-Transform: Any Post Transform Steps"""
    self.pandas_to_features.post_transform()

pre_transform(**kwargs)

Pre-Transform: Create the Feature Group with Chunked Data

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def pre_transform(self, **kwargs):
    """Pre-Transform: Create the Feature Group with Chunked Data"""

    # Loading data into a Feature Group takes a while, so set status to loading
    FeatureSetCore(self.output_uuid).set_status("loading")

set_categorical_info(cat_column_info)

Set the Categorical Columns Args: cat_column_info (dict[list[str]]): Dictionary of categorical columns and their possible values

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def set_categorical_info(self, cat_column_info: dict[list[str]]):
    """Set the Categorical Columns
    Args:
        cat_column_info (dict[list[str]]): Dictionary of categorical columns and their possible values
    """

    # Create the CategoricalDtypes
    cat_d_types = {}
    for col, vals in cat_column_info.items():
        cat_d_types[col] = CategoricalDtype(categories=vals)

    # Now set the CategoricalDtypes on our underlying PandasToFeatures
    self.pandas_to_features.categorical_dtypes = cat_d_types

transform_impl()

Required implementation of the Transform interface

Source code in src/sageworks/core/transforms/pandas_transforms/pandas_to_features_chunked.py
def transform_impl(self):
    """Required implementation of the Transform interface"""
    self.log.warning("PandasToFeaturesChunked.transform_impl() called.  This is a no-op.")