Skip to content

Workbench DataFrame Storage

Why DataFrame Storage?

Great question, there's a couple of reasons. The first is that the Parameter Store in AWS has a 4KB limit, so that won't support any kind of 'real data'. The second reason is that DataFrames are commonly used as part of the data engineering, science, and ML pipeline construction process. Providing storage of named DataFrames in an accessible location that can be inspected and used by your ML Team comes in super handy.

Efficient Storage

All DataFrames are stored in the Parquet format using 'snappy' storage. Parquet is a columnar storage format that efficiently handles large datasets, and using Snappy compression reduces file size while maintaining fast read/write speeds.

Examples

These example show how to use the DFStore() class to list, add, and get dataframes from AWS Storage.

Workbench REPL

If you'd like to experiment with listing, adding, and getting dataframe with the DFStore() class, you can spin up the Workbench REPL, use the class and test out all the methods. Try it out! Workbench REPL

Using DataFrame Store
from workbench.api import DFStore
df_store = DFStore()

# List DataFrames
df_store().list()

Out[1]:
ml/confustion_matrix  (0.002MB/2024-09-23 16:44:48)
ml/hold_out_ids  (0.094MB/2024-09-23 16:57:01)
ml/my_awesome_df  (0.002MB/2024-09-23 16:43:30)
ml/shap_values  (0.019MB/2024-09-23 16:57:21)

# Add a DataFrame
df = pd.DataFrame({"A": [1]*1000, "B": [3]*1000})
df_store.upsert("test/test_df", df)

# List DataFrames (we can just use the REPR)
df_store

Out[2]:
ml/confustion_matrix  (0.002MB/2024-09-23 16:44:48)
ml/hold_out_ids  (0.094MB/2024-09-23 16:57:01)
ml/my_awesome_df  (0.002MB/2024-09-23 16:43:30)
ml/shap_values  (0.019MB/2024-09-23 16:57:21)
test/test_df  (0.002MB/2024-09-23 16:59:27)

# Retrieve dataframes
return_df = df_store.get("test/test_df")
return_df.head()

Out[3]:
   A  B
0  1  3
1  1  3
2  1  3
3  1  3
4  1  3

# Delete dataframes
df_store.delete("test/test_df")

Compressed Storage is Automatic

All DataFrames are stored in the Parquet format using 'snappy' storage. Parquet is a columnar storage format that efficiently handles large datasets, and using Snappy compression reduces file size while maintaining fast read/write speeds.

DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy

DFStore

Bases: DFStoreCore

DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy

Orchestration-side wrapper around the endpoint-safe :class:DFStoreCore. Pulls the workbench bucket from :class:ConfigManager and a refreshable boto3 session from :class:AWSAccountClamp — what you almost always want for interactive / long-running workbench code. Endpoint-side code should use :class:workbench.endpoints.df_store.DFStore instead, which auto- discovers s3_bucket and boto3_session from the container env.

Common Usage:

python df_store = DFStore() # List Data df_store.list() # Add DataFrame df = pd.DataFrame({"A": [1, 2], "B": [3, 4]}) df_store.upsert("/test/my_data", df) # Retrieve DataFrame df = df_store.get("/test/my_data") print(df) # Delete Data df_store.delete("/test/my_data")

Source code in src/workbench/api/df_store.py
class DFStore(DFStoreCore):
    """DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy

    Orchestration-side wrapper around the endpoint-safe :class:`DFStoreCore`.
    Pulls the workbench bucket from :class:`ConfigManager` and a refreshable
    boto3 session from :class:`AWSAccountClamp` — what you almost always want
    for interactive / long-running workbench code. Endpoint-side code should
    use :class:`workbench.endpoints.df_store.DFStore` instead, which auto-
    discovers ``s3_bucket`` and ``boto3_session`` from the container env.

        Common Usage:
    ```python
            df_store = DFStore()

            # List Data
            df_store.list()

            # Add DataFrame
            df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
            df_store.upsert("/test/my_data", df)

            # Retrieve DataFrame
            df = df_store.get("/test/my_data")
            print(df)

            # Delete Data
            df_store.delete("/test/my_data")
    ```
    """

    def __init__(self, path_prefix: Union[str, None] = None):
        """DFStore Init Method

        Args:
            path_prefix (Union[str, None], optional): Add a path prefix to storage locations (Defaults to None)
        """
        bucket = ConfigManager().get_config("WORKBENCH_BUCKET")
        session = AWSAccountClamp().boto3_session
        super().__init__(path_prefix=path_prefix, s3_bucket=bucket, boto3_session=session)
        self.log = logging.getLogger("workbench")

__init__(path_prefix=None)

DFStore Init Method

Parameters:

Name Type Description Default
path_prefix Union[str, None]

Add a path prefix to storage locations (Defaults to None)

None
Source code in src/workbench/api/df_store.py
def __init__(self, path_prefix: Union[str, None] = None):
    """DFStore Init Method

    Args:
        path_prefix (Union[str, None], optional): Add a path prefix to storage locations (Defaults to None)
    """
    bucket = ConfigManager().get_config("WORKBENCH_BUCKET")
    session = AWSAccountClamp().boto3_session
    super().__init__(path_prefix=path_prefix, s3_bucket=bucket, boto3_session=session)
    self.log = logging.getLogger("workbench")