Skip to content

SageWorks DataFrame Storage

Examples

Examples of using the Parameter Storage class are listed at the bottom of this page Examples.

Why DataFrame Storage?

Great question, there's a couple of reasons. The first is that the Parameter Store in AWS has a 4KB limit, so that won't support any kind of 'real data'. The second reason is that DataFrames are commonly used as part of the data engineering, science, and ML pipeline construction process. Providing storage of named DataFrames in an accessible location that can be inspected and used by your ML Team comes in super handy.

Efficient Storage

All DataFrames are stored in the Parquet format using 'snappy' storage. Parquet is a columnar storage format that efficiently handles large datasets, and using Snappy compression reduces file size while maintaining fast read/write speeds.

DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy

DFStore

DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy

Common Usage
df_store = DFStore()

# List Data
df_store.list()

# Add DataFrame
df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
df_store.add("my_data", df)

# Retrieve DataFrame
df = df_store.get("my_data")
print(df)

# Delete Data
df_store.delete("my_data")
Source code in src/sageworks/api/df_store.py
class DFStore:
    """DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy

    Common Usage:
        ```python
        df_store = DFStore()

        # List Data
        df_store.list()

        # Add DataFrame
        df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
        df_store.add("my_data", df)

        # Retrieve DataFrame
        df = df_store.get("my_data")
        print(df)

        # Delete Data
        df_store.delete("my_data")
        ```
    """

    def __init__(self):
        """DFStore Init Method"""
        self.log = logging.getLogger("sageworks")
        self.prefix = "df_store/"

        # Initialize a SageWorks Session and retrieve the S3 bucket from ConfigManager
        config = ConfigManager()
        self.sageworks_bucket = config.get_config("SAGEWORKS_BUCKET")

        # Grab a SageWorks Session (this allows us to assume the SageWorks ExecutionRole)
        self.boto3_session = AWSAccountClamp().boto3_session

        # Read all the Pipelines from this S3 path
        self.s3_client = self.boto3_session.client("s3")

    def summary(self) -> pd.DataFrame:
        """Return a nicely formatted summary of object names, sizes (in MB), and modified dates."""
        df = self.details()

        # Create a formatted DataFrame
        formatted_df = pd.DataFrame(
            {
                "name": df["name"],
                "size (MB)": (df["size"] / (1024 * 1024)).round(2),  # Convert size to MB
                "modified": pd.to_datetime(df["modified"]).dt.strftime("%Y-%m-%d %H:%M:%S"),  # Format date
            }
        )
        return formatted_df

    def details(self) -> pd.DataFrame:
        """Return a DataFrame with detailed metadata for all objects in the data_store prefix."""
        try:
            response = self.s3_client.list_objects_v2(Bucket=self.sageworks_bucket, Prefix=self.prefix)
            if "Contents" not in response:
                return pd.DataFrame(columns=["name", "s3_file", "size", "modified"])

            # Collect details for each object
            data = []
            for obj in response["Contents"]:
                full_key = obj["Key"]

                # Reverse logic: Strip the bucket/prefix in the front and .parquet in the end
                name = full_key.replace(f"{self.prefix}", "/").split(".parquet")[0]
                s3_file = f"s3://{self.sageworks_bucket}/{full_key}"
                size = obj["Size"]
                modified = obj["LastModified"]
                data.append([name, s3_file, size, modified])

            # Create and return DataFrame
            df = pd.DataFrame(data, columns=["name", "s3_file", "size", "modified"])
            return df

        except Exception as e:
            self.log.error(f"Failed to get object details: {e}")
            return pd.DataFrame(columns=["name", "s3_file", "size", "created", "modified"])

    def get(self, name: str) -> pd.DataFrame:
        """Retrieve a DataFrame from the AWS S3.

        Args:
            name (str): The name of the data to retrieve.

        Returns:
            pd.DataFrame: The retrieved DataFrame.
        """
        s3_uri = self._generate_s3_uri(name)
        try:
            df = wr.s3.read_parquet(s3_uri)
            return df
        except ClientError:
            self.log.warning(f"Data '{name}' not found in S3.")
            return pd.DataFrame()  # Return an empty DataFrame if not found

    def add(self, name: str, data: Union[pd.DataFrame, pd.Series]):
        """Add or update a DataFrame or Series in the AWS S3.

        Args:
            name (str): The name of the data.
            data (Union[pd.DataFrame, pd.Series]): The data to be stored.
        """
        # Check if the data is a Pandas Series, convert it to a DataFrame
        if isinstance(data, pd.Series):
            data = data.to_frame()

        # Ensure data is a DataFrame
        if not isinstance(data, pd.DataFrame):
            raise ValueError("Only Pandas DataFrame or Series objects are supported.")

        s3_uri = self._generate_s3_uri(name)
        try:
            wr.s3.to_parquet(df=data, path=s3_uri, dataset=True, mode="overwrite")
            self.log.info(f"Data '{name}' added/updated successfully in S3.")
        except Exception as e:
            self.log.critical(f"Failed to add/update data '{name}': {e}")
            raise

    def delete(self, name: str):
        """Delete a DataFrame from the AWS S3.

        Args:
            name (str): The name of the data to delete.
        """
        s3_uri = self._generate_s3_uri(name)

        # Check if the folder (prefix) exists in S3
        if not wr.s3.list_objects(s3_uri):
            self.log.warning(f"Data '{name}' does not exist in S3. Cannot delete.")
            return

        # Delete the data from S3
        try:
            wr.s3.delete_objects(s3_uri)
            self.log.info(f"Data '{name}' deleted successfully from S3.")
        except Exception as e:
            self.log.error(f"Failed to delete data '{name}': {e}")

    def _generate_s3_uri(self, name: str) -> str:
        """Generate the S3 URI for the given name."""
        s3_path = f"{self.sageworks_bucket}/{self.prefix}{name}.parquet"
        s3_path = s3_path.replace("//", "/")
        s3_uri = f"s3://{s3_path}"
        return s3_uri

    def __repr__(self):
        """Return a string representation of the DFStore object."""
        # Use the summary() method and format it to align columns for printing
        summary_df = self.summary()

        # Dynamically compute the max length of the 'name' column and add 5 spaces for padding
        max_name_len = summary_df["name"].str.len().max() + 2
        summary_df["name"] = summary_df["name"].str.ljust(max_name_len)

        # Format the size column to include (MB) and ensure 3 spaces between size and date
        summary_df["size (MB)"] = summary_df["size (MB)"].apply(lambda x: f"{x:.2f} MB")

        # Enclose the modified date in parentheses and ensure 3 spaces between size and date
        summary_df["modified"] = summary_df["modified"].apply(lambda x: f" ({x})")

        # Convert the DataFrame to a string, remove headers, and return
        return summary_df.to_string(index=False, header=False)

__init__()

DFStore Init Method

Source code in src/sageworks/api/df_store.py
def __init__(self):
    """DFStore Init Method"""
    self.log = logging.getLogger("sageworks")
    self.prefix = "df_store/"

    # Initialize a SageWorks Session and retrieve the S3 bucket from ConfigManager
    config = ConfigManager()
    self.sageworks_bucket = config.get_config("SAGEWORKS_BUCKET")

    # Grab a SageWorks Session (this allows us to assume the SageWorks ExecutionRole)
    self.boto3_session = AWSAccountClamp().boto3_session

    # Read all the Pipelines from this S3 path
    self.s3_client = self.boto3_session.client("s3")

__repr__()

Return a string representation of the DFStore object.

Source code in src/sageworks/api/df_store.py
def __repr__(self):
    """Return a string representation of the DFStore object."""
    # Use the summary() method and format it to align columns for printing
    summary_df = self.summary()

    # Dynamically compute the max length of the 'name' column and add 5 spaces for padding
    max_name_len = summary_df["name"].str.len().max() + 2
    summary_df["name"] = summary_df["name"].str.ljust(max_name_len)

    # Format the size column to include (MB) and ensure 3 spaces between size and date
    summary_df["size (MB)"] = summary_df["size (MB)"].apply(lambda x: f"{x:.2f} MB")

    # Enclose the modified date in parentheses and ensure 3 spaces between size and date
    summary_df["modified"] = summary_df["modified"].apply(lambda x: f" ({x})")

    # Convert the DataFrame to a string, remove headers, and return
    return summary_df.to_string(index=False, header=False)

add(name, data)

Add or update a DataFrame or Series in the AWS S3.

Parameters:

Name Type Description Default
name str

The name of the data.

required
data Union[DataFrame, Series]

The data to be stored.

required
Source code in src/sageworks/api/df_store.py
def add(self, name: str, data: Union[pd.DataFrame, pd.Series]):
    """Add or update a DataFrame or Series in the AWS S3.

    Args:
        name (str): The name of the data.
        data (Union[pd.DataFrame, pd.Series]): The data to be stored.
    """
    # Check if the data is a Pandas Series, convert it to a DataFrame
    if isinstance(data, pd.Series):
        data = data.to_frame()

    # Ensure data is a DataFrame
    if not isinstance(data, pd.DataFrame):
        raise ValueError("Only Pandas DataFrame or Series objects are supported.")

    s3_uri = self._generate_s3_uri(name)
    try:
        wr.s3.to_parquet(df=data, path=s3_uri, dataset=True, mode="overwrite")
        self.log.info(f"Data '{name}' added/updated successfully in S3.")
    except Exception as e:
        self.log.critical(f"Failed to add/update data '{name}': {e}")
        raise

delete(name)

Delete a DataFrame from the AWS S3.

Parameters:

Name Type Description Default
name str

The name of the data to delete.

required
Source code in src/sageworks/api/df_store.py
def delete(self, name: str):
    """Delete a DataFrame from the AWS S3.

    Args:
        name (str): The name of the data to delete.
    """
    s3_uri = self._generate_s3_uri(name)

    # Check if the folder (prefix) exists in S3
    if not wr.s3.list_objects(s3_uri):
        self.log.warning(f"Data '{name}' does not exist in S3. Cannot delete.")
        return

    # Delete the data from S3
    try:
        wr.s3.delete_objects(s3_uri)
        self.log.info(f"Data '{name}' deleted successfully from S3.")
    except Exception as e:
        self.log.error(f"Failed to delete data '{name}': {e}")

details()

Return a DataFrame with detailed metadata for all objects in the data_store prefix.

Source code in src/sageworks/api/df_store.py
def details(self) -> pd.DataFrame:
    """Return a DataFrame with detailed metadata for all objects in the data_store prefix."""
    try:
        response = self.s3_client.list_objects_v2(Bucket=self.sageworks_bucket, Prefix=self.prefix)
        if "Contents" not in response:
            return pd.DataFrame(columns=["name", "s3_file", "size", "modified"])

        # Collect details for each object
        data = []
        for obj in response["Contents"]:
            full_key = obj["Key"]

            # Reverse logic: Strip the bucket/prefix in the front and .parquet in the end
            name = full_key.replace(f"{self.prefix}", "/").split(".parquet")[0]
            s3_file = f"s3://{self.sageworks_bucket}/{full_key}"
            size = obj["Size"]
            modified = obj["LastModified"]
            data.append([name, s3_file, size, modified])

        # Create and return DataFrame
        df = pd.DataFrame(data, columns=["name", "s3_file", "size", "modified"])
        return df

    except Exception as e:
        self.log.error(f"Failed to get object details: {e}")
        return pd.DataFrame(columns=["name", "s3_file", "size", "created", "modified"])

get(name)

Retrieve a DataFrame from the AWS S3.

Parameters:

Name Type Description Default
name str

The name of the data to retrieve.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The retrieved DataFrame.

Source code in src/sageworks/api/df_store.py
def get(self, name: str) -> pd.DataFrame:
    """Retrieve a DataFrame from the AWS S3.

    Args:
        name (str): The name of the data to retrieve.

    Returns:
        pd.DataFrame: The retrieved DataFrame.
    """
    s3_uri = self._generate_s3_uri(name)
    try:
        df = wr.s3.read_parquet(s3_uri)
        return df
    except ClientError:
        self.log.warning(f"Data '{name}' not found in S3.")
        return pd.DataFrame()  # Return an empty DataFrame if not found

summary()

Return a nicely formatted summary of object names, sizes (in MB), and modified dates.

Source code in src/sageworks/api/df_store.py
def summary(self) -> pd.DataFrame:
    """Return a nicely formatted summary of object names, sizes (in MB), and modified dates."""
    df = self.details()

    # Create a formatted DataFrame
    formatted_df = pd.DataFrame(
        {
            "name": df["name"],
            "size (MB)": (df["size"] / (1024 * 1024)).round(2),  # Convert size to MB
            "modified": pd.to_datetime(df["modified"]).dt.strftime("%Y-%m-%d %H:%M:%S"),  # Format date
        }
    )
    return formatted_df

Examples

These example show how to use the DFStore() class to list, add, and get dataframes from AWS Storage.

SageWorks REPL

If you'd like to experiment with listing, adding, and getting dataframe with the DFStore() class, you can spin up the SageWorks REPL, use the class and test out all the methods. Try it out! SageWorks REPL

Using DataFrame Store
from sageworks.api.df_store import DFStore
df_store = DFStore()

# List DataFrames
df_store().list()

Out[1]:
ml/confustion_matrix  (0.002MB/2024-09-23 16:44:48)
ml/hold_out_ids  (0.094MB/2024-09-23 16:57:01)
ml/my_awesome_df  (0.002MB/2024-09-23 16:43:30)
ml/shap_values  (0.019MB/2024-09-23 16:57:21)

# Add a DataFrame
df = pd.DataFrame({"A": [1]*1000, "B": [3]*1000})
df_store.add("test/test_df", df)

# List DataFrames (we can just use the REPR)
df_store

Out[2]:
ml/confustion_matrix  (0.002MB/2024-09-23 16:44:48)
ml/hold_out_ids  (0.094MB/2024-09-23 16:57:01)
ml/my_awesome_df  (0.002MB/2024-09-23 16:43:30)
ml/shap_values  (0.019MB/2024-09-23 16:57:21)
test/test_df  (0.002MB/2024-09-23 16:59:27)

# Retrieve dataframes
return_df = df_store.get("test/test_df")
return_df.head()

Out[3]:
   A  B
0  1  3
1  1  3
2  1  3
3  1  3
4  1  3

# Delete dataframes
df_store.delete("test/test_df")

Compressed Storage is Automatic

All DataFrames are stored in the Parquet format using 'snappy' storage. Parquet is a columnar storage format that efficiently handles large datasets, and using Snappy compression reduces file size while maintaining fast read/write speeds.