SageWorks DataFrame Storage
Examples
Examples of using the Parameter Storage class are listed at the bottom of this page Examples.
Why DataFrame Storage?
Great question, there's a couple of reasons. The first is that the Parameter Store in AWS has a 4KB limit, so that won't support any kind of 'real data'. The second reason is that DataFrames are commonly used as part of the data engineering, science, and ML pipeline construction process. Providing storage of named DataFrames in an accessible location that can be inspected and used by your ML Team comes in super handy.
Efficient Storage
All DataFrames are stored in the Parquet format using 'snappy' storage. Parquet is a columnar storage format that efficiently handles large datasets, and using Snappy compression reduces file size while maintaining fast read/write speeds.
DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy
DFStore
DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy
Common Usage
df_store = DFStore()
# List Data
df_store.list()
# Add DataFrame
df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
df_store.add("my_data", df)
# Retrieve DataFrame
df = df_store.get("my_data")
print(df)
# Delete Data
df_store.delete("my_data")
Source code in src/sageworks/api/df_store.py
| class DFStore:
"""DFStore: Fast/efficient storage of DataFrames using AWS S3/Parquet/Snappy
Common Usage:
```python
df_store = DFStore()
# List Data
df_store.list()
# Add DataFrame
df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
df_store.add("my_data", df)
# Retrieve DataFrame
df = df_store.get("my_data")
print(df)
# Delete Data
df_store.delete("my_data")
```
"""
def __init__(self):
"""DFStore Init Method"""
self.log = logging.getLogger("sageworks")
self.prefix = "df_store/"
# Initialize a SageWorks Session and retrieve the S3 bucket from ConfigManager
config = ConfigManager()
self.sageworks_bucket = config.get_config("SAGEWORKS_BUCKET")
# Grab a SageWorks Session (this allows us to assume the SageWorks ExecutionRole)
self.boto3_session = AWSAccountClamp().boto3_session
# Read all the Pipelines from this S3 path
self.s3_client = self.boto3_session.client("s3")
def summary(self) -> pd.DataFrame:
"""Return a nicely formatted summary of object names, sizes (in MB), and modified dates."""
df = self.details()
# Create a formatted DataFrame
formatted_df = pd.DataFrame(
{
"name": df["name"],
"size (MB)": (df["size"] / (1024 * 1024)).round(2), # Convert size to MB
"modified": pd.to_datetime(df["modified"]).dt.strftime("%Y-%m-%d %H:%M:%S"), # Format date
}
)
return formatted_df
def details(self) -> pd.DataFrame:
"""Return a DataFrame with detailed metadata for all objects in the data_store prefix."""
try:
response = self.s3_client.list_objects_v2(Bucket=self.sageworks_bucket, Prefix=self.prefix)
if "Contents" not in response:
return pd.DataFrame(columns=["name", "s3_file", "size", "modified"])
# Collect details for each object
data = []
for obj in response["Contents"]:
full_key = obj["Key"]
# Reverse logic: Strip the bucket/prefix in the front and .parquet in the end
name = full_key.replace(f"{self.prefix}", "/").split(".parquet")[0]
s3_file = f"s3://{self.sageworks_bucket}/{full_key}"
size = obj["Size"]
modified = obj["LastModified"]
data.append([name, s3_file, size, modified])
# Create and return DataFrame
df = pd.DataFrame(data, columns=["name", "s3_file", "size", "modified"])
return df
except Exception as e:
self.log.error(f"Failed to get object details: {e}")
return pd.DataFrame(columns=["name", "s3_file", "size", "created", "modified"])
def get(self, name: str) -> pd.DataFrame:
"""Retrieve a DataFrame from the AWS S3.
Args:
name (str): The name of the data to retrieve.
Returns:
pd.DataFrame: The retrieved DataFrame.
"""
s3_uri = self._generate_s3_uri(name)
try:
df = wr.s3.read_parquet(s3_uri)
return df
except ClientError:
self.log.warning(f"Data '{name}' not found in S3.")
return pd.DataFrame() # Return an empty DataFrame if not found
def add(self, name: str, data: Union[pd.DataFrame, pd.Series]):
"""Add or update a DataFrame or Series in the AWS S3.
Args:
name (str): The name of the data.
data (Union[pd.DataFrame, pd.Series]): The data to be stored.
"""
# Check if the data is a Pandas Series, convert it to a DataFrame
if isinstance(data, pd.Series):
data = data.to_frame()
# Ensure data is a DataFrame
if not isinstance(data, pd.DataFrame):
raise ValueError("Only Pandas DataFrame or Series objects are supported.")
s3_uri = self._generate_s3_uri(name)
try:
wr.s3.to_parquet(df=data, path=s3_uri, dataset=True, mode="overwrite")
self.log.info(f"Data '{name}' added/updated successfully in S3.")
except Exception as e:
self.log.critical(f"Failed to add/update data '{name}': {e}")
raise
def delete(self, name: str):
"""Delete a DataFrame from the AWS S3.
Args:
name (str): The name of the data to delete.
"""
s3_uri = self._generate_s3_uri(name)
# Check if the folder (prefix) exists in S3
if not wr.s3.list_objects(s3_uri):
self.log.warning(f"Data '{name}' does not exist in S3. Cannot delete.")
return
# Delete the data from S3
try:
wr.s3.delete_objects(s3_uri)
self.log.info(f"Data '{name}' deleted successfully from S3.")
except Exception as e:
self.log.error(f"Failed to delete data '{name}': {e}")
def _generate_s3_uri(self, name: str) -> str:
"""Generate the S3 URI for the given name."""
s3_path = f"{self.sageworks_bucket}/{self.prefix}{name}.parquet"
s3_path = s3_path.replace("//", "/")
s3_uri = f"s3://{s3_path}"
return s3_uri
def __repr__(self):
"""Return a string representation of the DFStore object."""
# Use the summary() method and format it to align columns for printing
summary_df = self.summary()
# Dynamically compute the max length of the 'name' column and add 5 spaces for padding
max_name_len = summary_df["name"].str.len().max() + 2
summary_df["name"] = summary_df["name"].str.ljust(max_name_len)
# Format the size column to include (MB) and ensure 3 spaces between size and date
summary_df["size (MB)"] = summary_df["size (MB)"].apply(lambda x: f"{x:.2f} MB")
# Enclose the modified date in parentheses and ensure 3 spaces between size and date
summary_df["modified"] = summary_df["modified"].apply(lambda x: f" ({x})")
# Convert the DataFrame to a string, remove headers, and return
return summary_df.to_string(index=False, header=False)
|
__init__()
DFStore Init Method
Source code in src/sageworks/api/df_store.py
| def __init__(self):
"""DFStore Init Method"""
self.log = logging.getLogger("sageworks")
self.prefix = "df_store/"
# Initialize a SageWorks Session and retrieve the S3 bucket from ConfigManager
config = ConfigManager()
self.sageworks_bucket = config.get_config("SAGEWORKS_BUCKET")
# Grab a SageWorks Session (this allows us to assume the SageWorks ExecutionRole)
self.boto3_session = AWSAccountClamp().boto3_session
# Read all the Pipelines from this S3 path
self.s3_client = self.boto3_session.client("s3")
|
__repr__()
Return a string representation of the DFStore object.
Source code in src/sageworks/api/df_store.py
| def __repr__(self):
"""Return a string representation of the DFStore object."""
# Use the summary() method and format it to align columns for printing
summary_df = self.summary()
# Dynamically compute the max length of the 'name' column and add 5 spaces for padding
max_name_len = summary_df["name"].str.len().max() + 2
summary_df["name"] = summary_df["name"].str.ljust(max_name_len)
# Format the size column to include (MB) and ensure 3 spaces between size and date
summary_df["size (MB)"] = summary_df["size (MB)"].apply(lambda x: f"{x:.2f} MB")
# Enclose the modified date in parentheses and ensure 3 spaces between size and date
summary_df["modified"] = summary_df["modified"].apply(lambda x: f" ({x})")
# Convert the DataFrame to a string, remove headers, and return
return summary_df.to_string(index=False, header=False)
|
add(name, data)
Add or update a DataFrame or Series in the AWS S3.
Parameters:
Name |
Type |
Description |
Default |
name
|
str
|
|
required
|
data
|
Union[DataFrame, Series]
|
|
required
|
Source code in src/sageworks/api/df_store.py
| def add(self, name: str, data: Union[pd.DataFrame, pd.Series]):
"""Add or update a DataFrame or Series in the AWS S3.
Args:
name (str): The name of the data.
data (Union[pd.DataFrame, pd.Series]): The data to be stored.
"""
# Check if the data is a Pandas Series, convert it to a DataFrame
if isinstance(data, pd.Series):
data = data.to_frame()
# Ensure data is a DataFrame
if not isinstance(data, pd.DataFrame):
raise ValueError("Only Pandas DataFrame or Series objects are supported.")
s3_uri = self._generate_s3_uri(name)
try:
wr.s3.to_parquet(df=data, path=s3_uri, dataset=True, mode="overwrite")
self.log.info(f"Data '{name}' added/updated successfully in S3.")
except Exception as e:
self.log.critical(f"Failed to add/update data '{name}': {e}")
raise
|
delete(name)
Delete a DataFrame from the AWS S3.
Parameters:
Name |
Type |
Description |
Default |
name
|
str
|
The name of the data to delete.
|
required
|
Source code in src/sageworks/api/df_store.py
| def delete(self, name: str):
"""Delete a DataFrame from the AWS S3.
Args:
name (str): The name of the data to delete.
"""
s3_uri = self._generate_s3_uri(name)
# Check if the folder (prefix) exists in S3
if not wr.s3.list_objects(s3_uri):
self.log.warning(f"Data '{name}' does not exist in S3. Cannot delete.")
return
# Delete the data from S3
try:
wr.s3.delete_objects(s3_uri)
self.log.info(f"Data '{name}' deleted successfully from S3.")
except Exception as e:
self.log.error(f"Failed to delete data '{name}': {e}")
|
details()
Return a DataFrame with detailed metadata for all objects in the data_store prefix.
Source code in src/sageworks/api/df_store.py
| def details(self) -> pd.DataFrame:
"""Return a DataFrame with detailed metadata for all objects in the data_store prefix."""
try:
response = self.s3_client.list_objects_v2(Bucket=self.sageworks_bucket, Prefix=self.prefix)
if "Contents" not in response:
return pd.DataFrame(columns=["name", "s3_file", "size", "modified"])
# Collect details for each object
data = []
for obj in response["Contents"]:
full_key = obj["Key"]
# Reverse logic: Strip the bucket/prefix in the front and .parquet in the end
name = full_key.replace(f"{self.prefix}", "/").split(".parquet")[0]
s3_file = f"s3://{self.sageworks_bucket}/{full_key}"
size = obj["Size"]
modified = obj["LastModified"]
data.append([name, s3_file, size, modified])
# Create and return DataFrame
df = pd.DataFrame(data, columns=["name", "s3_file", "size", "modified"])
return df
except Exception as e:
self.log.error(f"Failed to get object details: {e}")
return pd.DataFrame(columns=["name", "s3_file", "size", "created", "modified"])
|
get(name)
Retrieve a DataFrame from the AWS S3.
Parameters:
Name |
Type |
Description |
Default |
name
|
str
|
The name of the data to retrieve.
|
required
|
Returns:
Type |
Description |
DataFrame
|
pd.DataFrame: The retrieved DataFrame.
|
Source code in src/sageworks/api/df_store.py
| def get(self, name: str) -> pd.DataFrame:
"""Retrieve a DataFrame from the AWS S3.
Args:
name (str): The name of the data to retrieve.
Returns:
pd.DataFrame: The retrieved DataFrame.
"""
s3_uri = self._generate_s3_uri(name)
try:
df = wr.s3.read_parquet(s3_uri)
return df
except ClientError:
self.log.warning(f"Data '{name}' not found in S3.")
return pd.DataFrame() # Return an empty DataFrame if not found
|
summary()
Return a nicely formatted summary of object names, sizes (in MB), and modified dates.
Source code in src/sageworks/api/df_store.py
| def summary(self) -> pd.DataFrame:
"""Return a nicely formatted summary of object names, sizes (in MB), and modified dates."""
df = self.details()
# Create a formatted DataFrame
formatted_df = pd.DataFrame(
{
"name": df["name"],
"size (MB)": (df["size"] / (1024 * 1024)).round(2), # Convert size to MB
"modified": pd.to_datetime(df["modified"]).dt.strftime("%Y-%m-%d %H:%M:%S"), # Format date
}
)
return formatted_df
|
Examples
These example show how to use the DFStore()
class to list, add, and get dataframes from AWS Storage.
SageWorks REPL
If you'd like to experiment with listing, adding, and getting dataframe with the DFStore()
class, you can spin up the SageWorks REPL, use the class and test out all the methods. Try it out! SageWorks REPL
Using DataFrame Storefrom sageworks.api.df_store import DFStore
df_store = DFStore()
# List DataFrames
df_store().list()
Out[1]:
ml/confustion_matrix (0.002MB/2024-09-23 16:44:48)
ml/hold_out_ids (0.094MB/2024-09-23 16:57:01)
ml/my_awesome_df (0.002MB/2024-09-23 16:43:30)
ml/shap_values (0.019MB/2024-09-23 16:57:21)
# Add a DataFrame
df = pd.DataFrame({"A": [1]*1000, "B": [3]*1000})
df_store.add("test/test_df", df)
# List DataFrames (we can just use the REPR)
df_store
Out[2]:
ml/confustion_matrix (0.002MB/2024-09-23 16:44:48)
ml/hold_out_ids (0.094MB/2024-09-23 16:57:01)
ml/my_awesome_df (0.002MB/2024-09-23 16:43:30)
ml/shap_values (0.019MB/2024-09-23 16:57:21)
test/test_df (0.002MB/2024-09-23 16:59:27)
# Retrieve dataframes
return_df = df_store.get("test/test_df")
return_df.head()
Out[3]:
A B
0 1 3
1 1 3
2 1 3
3 1 3
4 1 3
# Delete dataframes
df_store.delete("test/test_df")
Compressed Storage is Automatic
All DataFrames are stored in the Parquet format using 'snappy' storage. Parquet is a columnar storage format that efficiently handles large datasets, and using Snappy compression reduces file size while maintaining fast read/write speeds.