Skip to content

Workbench DataFrame Storage

Examples

Examples of using the Parameter Storage class are listed at the bottom of this page Examples.

Why DataFrame Storage?

Great question, there's a couple of reasons. The first is that the Parameter Store in AWS has a 4KB limit, so that won't support any kind of 'real data'. The second reason is that DataFrames are commonly used as part of the data engineering, science, and ML pipeline construction process. Providing storage of named DataFrames in an accessible location that can be inspected and used by your ML Team comes in super handy.

Efficient Storage

All DataFrames are stored in the Parquet format using 'snappy' storage. Parquet is a columnar storage format that efficiently handles large datasets, and using Snappy compression reduces file size while maintaining fast read/write speeds.

DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy

DFStore

Bases: AWSDFStore

DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy

Common Usage
df_store = DFStore()

# List Data
df_store.list()

# Add DataFrame
df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
df_store.upsert("/test/my_data", df)

# Retrieve DataFrame
df = df_store.get("/test/my_data")
print(df)

# Delete Data
df_store.delete("/test/my_data")
Source code in src/workbench/api/df_store.py
class DFStore(AWSDFStore):
    """DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy

    Common Usage:
        ```python
        df_store = DFStore()

        # List Data
        df_store.list()

        # Add DataFrame
        df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
        df_store.upsert("/test/my_data", df)

        # Retrieve DataFrame
        df = df_store.get("/test/my_data")
        print(df)

        # Delete Data
        df_store.delete("/test/my_data")
        ```
    """

    def __init__(self, path_prefix: Union[str, None] = None):
        """DFStore Init Method

        Args:
            path_prefix (Union[str, None], optional): Add a path prefix to storage locations (Defaults to None)
        """
        self.log = logging.getLogger("workbench")

        # Initialize the SuperClass
        super().__init__(path_prefix=path_prefix)

    def list(self, include_cache: bool = False) -> list:
        """List all the objects in the data_store prefix.

        Args:
            include_cache (bool, optional): Include cache objects in the list (Defaults to False).

        Returns:
            list: A list of all the objects in the data_store prefix.
        """
        return super().list(include_cache=include_cache)

    def summary(self, include_cache: bool = False) -> pd.DataFrame:
        """Return a nicely formatted summary of object locations, sizes (in MB), and modified dates.

        Args:
            include_cache (bool, optional): Include cache objects in the summary (Defaults to False).

        Returns:
            pd.DataFrame: A formatted DataFrame with the summary details.
        """
        return super().summary(include_cache=include_cache)

    def details(self, include_cache: bool = False) -> pd.DataFrame:
        """Return a DataFrame with detailed metadata for all objects in the data_store prefix.

        Args:
            include_cache (bool, optional): Include cache objects in the details (Defaults to False).

        Returns:
            pd.DataFrame: A DataFrame with detailed metadata for all objects in the data_store prefix.
        """
        return super().details(include_cache=include_cache)

    def check(self, location: str) -> bool:
        """Check if a DataFrame exists at the specified location

        Args:
            location (str): The location of the data to check.

        Returns:
            bool: True if the data exists, False otherwise.
        """
        return super().check(location)

    def get(self, location: str) -> Union[pd.DataFrame, None]:
        """Retrieve a DataFrame from AWS S3.

        Args:
            location (str): The location of the data to retrieve.

        Returns:
            pd.DataFrame: The retrieved DataFrame or None if not found.
        """
        _df = super().get(location)
        if _df is None:
            self.log.error(f"Dataframe not found at location: {location}")
        return _df

    def upsert(self, location: str, data: Union[pd.DataFrame, pd.Series]):
        """Insert or update a DataFrame or Series in the AWS S3.

        Args:
            location (str): The location of the data.
            data (Union[pd.DataFrame, pd.Series]): The data to be stored.
        """
        super().upsert(location, data)

    def last_modified(self, location: str) -> Union[datetime, None]:
        """Get the last modified date of the DataFrame at the specified location.

        Args:
            location (str): The location of the data to check.

        Returns:
            Union[datetime, None]: The last modified date of the DataFrame or None if not found.
        """
        return super().last_modified(location)

    def delete(self, location: str):
        """Delete a DataFrame from the AWS S3.

        Args:
            location (str): The location of the data to delete.
        """
        super().delete(location)

__init__(path_prefix=None)

DFStore Init Method

Parameters:

Name Type Description Default
path_prefix Union[str, None]

Add a path prefix to storage locations (Defaults to None)

None
Source code in src/workbench/api/df_store.py
def __init__(self, path_prefix: Union[str, None] = None):
    """DFStore Init Method

    Args:
        path_prefix (Union[str, None], optional): Add a path prefix to storage locations (Defaults to None)
    """
    self.log = logging.getLogger("workbench")

    # Initialize the SuperClass
    super().__init__(path_prefix=path_prefix)

check(location)

Check if a DataFrame exists at the specified location

Parameters:

Name Type Description Default
location str

The location of the data to check.

required

Returns:

Name Type Description
bool bool

True if the data exists, False otherwise.

Source code in src/workbench/api/df_store.py
def check(self, location: str) -> bool:
    """Check if a DataFrame exists at the specified location

    Args:
        location (str): The location of the data to check.

    Returns:
        bool: True if the data exists, False otherwise.
    """
    return super().check(location)

delete(location)

Delete a DataFrame from the AWS S3.

Parameters:

Name Type Description Default
location str

The location of the data to delete.

required
Source code in src/workbench/api/df_store.py
def delete(self, location: str):
    """Delete a DataFrame from the AWS S3.

    Args:
        location (str): The location of the data to delete.
    """
    super().delete(location)

details(include_cache=False)

Return a DataFrame with detailed metadata for all objects in the data_store prefix.

Parameters:

Name Type Description Default
include_cache bool

Include cache objects in the details (Defaults to False).

False

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame with detailed metadata for all objects in the data_store prefix.

Source code in src/workbench/api/df_store.py
def details(self, include_cache: bool = False) -> pd.DataFrame:
    """Return a DataFrame with detailed metadata for all objects in the data_store prefix.

    Args:
        include_cache (bool, optional): Include cache objects in the details (Defaults to False).

    Returns:
        pd.DataFrame: A DataFrame with detailed metadata for all objects in the data_store prefix.
    """
    return super().details(include_cache=include_cache)

get(location)

Retrieve a DataFrame from AWS S3.

Parameters:

Name Type Description Default
location str

The location of the data to retrieve.

required

Returns:

Type Description
Union[DataFrame, None]

pd.DataFrame: The retrieved DataFrame or None if not found.

Source code in src/workbench/api/df_store.py
def get(self, location: str) -> Union[pd.DataFrame, None]:
    """Retrieve a DataFrame from AWS S3.

    Args:
        location (str): The location of the data to retrieve.

    Returns:
        pd.DataFrame: The retrieved DataFrame or None if not found.
    """
    _df = super().get(location)
    if _df is None:
        self.log.error(f"Dataframe not found at location: {location}")
    return _df

last_modified(location)

Get the last modified date of the DataFrame at the specified location.

Parameters:

Name Type Description Default
location str

The location of the data to check.

required

Returns:

Type Description
Union[datetime, None]

Union[datetime, None]: The last modified date of the DataFrame or None if not found.

Source code in src/workbench/api/df_store.py
def last_modified(self, location: str) -> Union[datetime, None]:
    """Get the last modified date of the DataFrame at the specified location.

    Args:
        location (str): The location of the data to check.

    Returns:
        Union[datetime, None]: The last modified date of the DataFrame or None if not found.
    """
    return super().last_modified(location)

list(include_cache=False)

List all the objects in the data_store prefix.

Parameters:

Name Type Description Default
include_cache bool

Include cache objects in the list (Defaults to False).

False

Returns:

Name Type Description
list list

A list of all the objects in the data_store prefix.

Source code in src/workbench/api/df_store.py
def list(self, include_cache: bool = False) -> list:
    """List all the objects in the data_store prefix.

    Args:
        include_cache (bool, optional): Include cache objects in the list (Defaults to False).

    Returns:
        list: A list of all the objects in the data_store prefix.
    """
    return super().list(include_cache=include_cache)

summary(include_cache=False)

Return a nicely formatted summary of object locations, sizes (in MB), and modified dates.

Parameters:

Name Type Description Default
include_cache bool

Include cache objects in the summary (Defaults to False).

False

Returns:

Type Description
DataFrame

pd.DataFrame: A formatted DataFrame with the summary details.

Source code in src/workbench/api/df_store.py
def summary(self, include_cache: bool = False) -> pd.DataFrame:
    """Return a nicely formatted summary of object locations, sizes (in MB), and modified dates.

    Args:
        include_cache (bool, optional): Include cache objects in the summary (Defaults to False).

    Returns:
        pd.DataFrame: A formatted DataFrame with the summary details.
    """
    return super().summary(include_cache=include_cache)

upsert(location, data)

Insert or update a DataFrame or Series in the AWS S3.

Parameters:

Name Type Description Default
location str

The location of the data.

required
data Union[DataFrame, Series]

The data to be stored.

required
Source code in src/workbench/api/df_store.py
def upsert(self, location: str, data: Union[pd.DataFrame, pd.Series]):
    """Insert or update a DataFrame or Series in the AWS S3.

    Args:
        location (str): The location of the data.
        data (Union[pd.DataFrame, pd.Series]): The data to be stored.
    """
    super().upsert(location, data)

Examples

These example show how to use the DFStore() class to list, add, and get dataframes from AWS Storage.

Workbench REPL

If you'd like to experiment with listing, adding, and getting dataframe with the DFStore() class, you can spin up the Workbench REPL, use the class and test out all the methods. Try it out! Workbench REPL

Using DataFrame Store
from workbench.api.df_store import DFStore
df_store = DFStore()

# List DataFrames
df_store().list()

Out[1]:
ml/confustion_matrix  (0.002MB/2024-09-23 16:44:48)
ml/hold_out_ids  (0.094MB/2024-09-23 16:57:01)
ml/my_awesome_df  (0.002MB/2024-09-23 16:43:30)
ml/shap_values  (0.019MB/2024-09-23 16:57:21)

# Add a DataFrame
df = pd.DataFrame({"A": [1]*1000, "B": [3]*1000})
df_store.upsert("test/test_df", df)

# List DataFrames (we can just use the REPR)
df_store

Out[2]:
ml/confustion_matrix  (0.002MB/2024-09-23 16:44:48)
ml/hold_out_ids  (0.094MB/2024-09-23 16:57:01)
ml/my_awesome_df  (0.002MB/2024-09-23 16:43:30)
ml/shap_values  (0.019MB/2024-09-23 16:57:21)
test/test_df  (0.002MB/2024-09-23 16:59:27)

# Retrieve dataframes
return_df = df_store.get("test/test_df")
return_df.head()

Out[3]:
   A  B
0  1  3
1  1  3
2  1  3
3  1  3
4  1  3

# Delete dataframes
df_store.delete("test/test_df")

Compressed Storage is Automatic

All DataFrames are stored in the Parquet format using 'snappy' storage. Parquet is a columnar storage format that efficiently handles large datasets, and using Snappy compression reduces file size while maintaining fast read/write speeds.