Welcome to the SageWorks DataLoaders Light Classes
These classes provide low-level APIs for loading smaller data into AWS services
- CSVToDataSource: Loads local CSV data into a DataSource
- JSONToDataSource: Loads local JSON data into a DataSource
- S3ToDataSourceLight: Loads S3 data into a DataSource
Bases: Transform
CSVToDataSource: Class to move local CSV Files into a SageWorks DataSource
Common Usage
csv_to_data = CSVToDataSource(csv_file_path, data_uuid)
csv_to_data.set_output_tags(["abalone", "csv", "whatever"])
csv_to_data.transform()
Source code in src/sageworks/core/transforms/data_loaders/light/csv_to_data_source.py
| class CSVToDataSource(Transform):
"""CSVToDataSource: Class to move local CSV Files into a SageWorks DataSource
Common Usage:
```python
csv_to_data = CSVToDataSource(csv_file_path, data_uuid)
csv_to_data.set_output_tags(["abalone", "csv", "whatever"])
csv_to_data.transform()
```
"""
def __init__(self, csv_file_path: str, data_uuid: str):
"""CSVToDataSource: Class to move local CSV Files into a SageWorks DataSource
Args:
csv_file_path (str): The path to the CSV file to be transformed
data_uuid (str): The UUID of the SageWorks DataSource to be created
"""
# Call superclass init
super().__init__(csv_file_path, data_uuid)
# Set up all my instance attributes
self.input_type = TransformInput.LOCAL_FILE
self.output_type = TransformOutput.DATA_SOURCE
def transform_impl(self, overwrite: bool = True):
"""Convert the local CSV file into Parquet Format in the SageWorks Data Sources Bucket, and
store the information about the data to the AWS Data Catalog sageworks database
"""
# Report the transformation initiation
csv_file = os.path.basename(self.input_uuid)
self.log.info(f"Starting {csv_file} --> DataSource: {self.output_uuid}...")
# Read in the Local CSV as a Pandas DataFrame
df = pd.read_csv(self.input_uuid, low_memory=False)
df = convert_object_columns(df)
# Use the SageWorks Pandas to Data Source class
pandas_to_data = PandasToData(self.output_uuid)
pandas_to_data.set_input(df)
pandas_to_data.set_output_tags(self.output_tags)
pandas_to_data.add_output_meta(self.output_meta)
pandas_to_data.transform()
# Report the transformation results
self.log.info(f"{csv_file} --> DataSource: {self.output_uuid} Complete!")
def post_transform(self, **kwargs):
"""Post-Transform"""
self.log.info("Post-Transform: S3 to DataSource...")
|
CSVToDataSource: Class to move local CSV Files into a SageWorks DataSource
Parameters:
Name |
Type |
Description |
Default |
csv_file_path
|
str
|
The path to the CSV file to be transformed
|
required
|
data_uuid
|
str
|
The UUID of the SageWorks DataSource to be created
|
required
|
Source code in src/sageworks/core/transforms/data_loaders/light/csv_to_data_source.py
| def __init__(self, csv_file_path: str, data_uuid: str):
"""CSVToDataSource: Class to move local CSV Files into a SageWorks DataSource
Args:
csv_file_path (str): The path to the CSV file to be transformed
data_uuid (str): The UUID of the SageWorks DataSource to be created
"""
# Call superclass init
super().__init__(csv_file_path, data_uuid)
# Set up all my instance attributes
self.input_type = TransformInput.LOCAL_FILE
self.output_type = TransformOutput.DATA_SOURCE
|
post_transform(**kwargs)
Post-Transform
Source code in src/sageworks/core/transforms/data_loaders/light/csv_to_data_source.py
| def post_transform(self, **kwargs):
"""Post-Transform"""
self.log.info("Post-Transform: S3 to DataSource...")
|
Convert the local CSV file into Parquet Format in the SageWorks Data Sources Bucket, and
store the information about the data to the AWS Data Catalog sageworks database
Source code in src/sageworks/core/transforms/data_loaders/light/csv_to_data_source.py
| def transform_impl(self, overwrite: bool = True):
"""Convert the local CSV file into Parquet Format in the SageWorks Data Sources Bucket, and
store the information about the data to the AWS Data Catalog sageworks database
"""
# Report the transformation initiation
csv_file = os.path.basename(self.input_uuid)
self.log.info(f"Starting {csv_file} --> DataSource: {self.output_uuid}...")
# Read in the Local CSV as a Pandas DataFrame
df = pd.read_csv(self.input_uuid, low_memory=False)
df = convert_object_columns(df)
# Use the SageWorks Pandas to Data Source class
pandas_to_data = PandasToData(self.output_uuid)
pandas_to_data.set_input(df)
pandas_to_data.set_output_tags(self.output_tags)
pandas_to_data.add_output_meta(self.output_meta)
pandas_to_data.transform()
# Report the transformation results
self.log.info(f"{csv_file} --> DataSource: {self.output_uuid} Complete!")
|
Bases: Transform
JSONToDataSource: Class to move local JSON Files into a SageWorks DataSource
Common Usage
json_to_data = JSONToDataSource(json_file_path, data_uuid)
json_to_data.set_output_tags(["abalone", "json", "whatever"])
json_to_data.transform()
Source code in src/sageworks/core/transforms/data_loaders/light/json_to_data_source.py
| class JSONToDataSource(Transform):
"""JSONToDataSource: Class to move local JSON Files into a SageWorks DataSource
Common Usage:
```python
json_to_data = JSONToDataSource(json_file_path, data_uuid)
json_to_data.set_output_tags(["abalone", "json", "whatever"])
json_to_data.transform()
```
"""
def __init__(self, json_file_path: str, data_uuid: str):
"""JSONToDataSource: Class to move local JSON Files into a SageWorks DataSource
Args:
json_file_path (str): The path to the JSON file to be transformed
data_uuid (str): The UUID of the SageWorks DataSource to be created
"""
# Call superclass init
super().__init__(json_file_path, data_uuid)
# Set up all my instance attributes
self.input_type = TransformInput.LOCAL_FILE
self.output_type = TransformOutput.DATA_SOURCE
def transform_impl(self, overwrite: bool = True):
"""Convert the local JSON file into Parquet Format in the SageWorks Data Sources Bucket, and
store the information about the data to the AWS Data Catalog sageworks database
"""
# Report the transformation initiation
json_file = os.path.basename(self.input_uuid)
self.log.info(f"Starting {json_file} --> DataSource: {self.output_uuid}...")
# Read in the Local JSON as a Pandas DataFrame
df = pd.read_json(self.input_uuid, lines=True)
# Use the SageWorks Pandas to Data Source class
pandas_to_data = PandasToData(self.output_uuid)
pandas_to_data.set_input(df)
pandas_to_data.set_output_tags(self.output_tags)
pandas_to_data.add_output_meta(self.output_meta)
pandas_to_data.transform()
# Report the transformation results
self.log.info(f"{json_file} --> DataSource: {self.output_uuid} Complete!")
def post_transform(self, **kwargs):
"""Post-Transform"""
self.log.info("Post-Transform: S3 to DataSource...")
|
JSONToDataSource: Class to move local JSON Files into a SageWorks DataSource
Parameters:
Name |
Type |
Description |
Default |
json_file_path
|
str
|
The path to the JSON file to be transformed
|
required
|
data_uuid
|
str
|
The UUID of the SageWorks DataSource to be created
|
required
|
Source code in src/sageworks/core/transforms/data_loaders/light/json_to_data_source.py
| def __init__(self, json_file_path: str, data_uuid: str):
"""JSONToDataSource: Class to move local JSON Files into a SageWorks DataSource
Args:
json_file_path (str): The path to the JSON file to be transformed
data_uuid (str): The UUID of the SageWorks DataSource to be created
"""
# Call superclass init
super().__init__(json_file_path, data_uuid)
# Set up all my instance attributes
self.input_type = TransformInput.LOCAL_FILE
self.output_type = TransformOutput.DATA_SOURCE
|
post_transform(**kwargs)
Post-Transform
Source code in src/sageworks/core/transforms/data_loaders/light/json_to_data_source.py
| def post_transform(self, **kwargs):
"""Post-Transform"""
self.log.info("Post-Transform: S3 to DataSource...")
|
Convert the local JSON file into Parquet Format in the SageWorks Data Sources Bucket, and
store the information about the data to the AWS Data Catalog sageworks database
Source code in src/sageworks/core/transforms/data_loaders/light/json_to_data_source.py
| def transform_impl(self, overwrite: bool = True):
"""Convert the local JSON file into Parquet Format in the SageWorks Data Sources Bucket, and
store the information about the data to the AWS Data Catalog sageworks database
"""
# Report the transformation initiation
json_file = os.path.basename(self.input_uuid)
self.log.info(f"Starting {json_file} --> DataSource: {self.output_uuid}...")
# Read in the Local JSON as a Pandas DataFrame
df = pd.read_json(self.input_uuid, lines=True)
# Use the SageWorks Pandas to Data Source class
pandas_to_data = PandasToData(self.output_uuid)
pandas_to_data.set_input(df)
pandas_to_data.set_output_tags(self.output_tags)
pandas_to_data.add_output_meta(self.output_meta)
pandas_to_data.transform()
# Report the transformation results
self.log.info(f"{json_file} --> DataSource: {self.output_uuid} Complete!")
|
Bases: Transform
S3ToDataSourceLight: Class to move LIGHT S3 Files into a SageWorks DataSource
Common Usage
s3_to_data = S3ToDataSourceLight(s3_path, data_uuid, datatype="csv/json")
s3_to_data.set_output_tags(["abalone", "whatever"])
s3_to_data.transform()
Source code in src/sageworks/core/transforms/data_loaders/light/s3_to_data_source_light.py
| class S3ToDataSourceLight(Transform):
"""S3ToDataSourceLight: Class to move LIGHT S3 Files into a SageWorks DataSource
Common Usage:
```python
s3_to_data = S3ToDataSourceLight(s3_path, data_uuid, datatype="csv/json")
s3_to_data.set_output_tags(["abalone", "whatever"])
s3_to_data.transform()
```
"""
def __init__(self, s3_path: str, data_uuid: str, datatype: str = "csv"):
"""S3ToDataSourceLight Initialization
Args:
s3_path (str): The S3 Path to the file to be transformed
data_uuid (str): The UUID of the SageWorks DataSource to be created
datatype (str): The datatype of the file to be transformed (defaults to "csv")
"""
# Call superclass init
super().__init__(s3_path, data_uuid)
# Set up all my instance attributes
self.input_type = TransformInput.S3_OBJECT
self.output_type = TransformOutput.DATA_SOURCE
self.datatype = datatype
def input_size_mb(self) -> int:
"""Get the size of the input S3 object in MBytes"""
size_in_bytes = wr.s3.size_objects(self.input_uuid, boto3_session=self.boto3_session)[self.input_uuid]
size_in_mb = round(size_in_bytes / 1_000_000)
return size_in_mb
def transform_impl(self, overwrite: bool = True):
"""Convert the S3 CSV data into Parquet Format in the SageWorks Data Sources Bucket, and
store the information about the data to the AWS Data Catalog sageworks database
"""
# Sanity Check for S3 Object size
object_megabytes = self.input_size_mb()
if object_megabytes > 100:
self.log.error(f"S3 Object too big ({object_megabytes} MBytes): Use the S3ToDataSourceHeavy class!")
return
# Read in the S3 CSV as a Pandas DataFrame
if self.datatype == "csv":
df = wr.s3.read_csv(self.input_uuid, low_memory=False, boto3_session=self.boto3_session)
else:
df = wr.s3.read_json(self.input_uuid, lines=True, boto3_session=self.boto3_session)
# Temporary hack to limit the number of columns in the dataframe
if len(df.columns) > 40:
self.log.warning(f"{self.input_uuid} Too Many Columns! Talk to SageWorks Support...")
# Convert object columns before sending to SageWorks Data Source
df = convert_object_columns(df)
# Use the SageWorks Pandas to Data Source class
pandas_to_data = PandasToData(self.output_uuid)
pandas_to_data.set_input(df)
pandas_to_data.set_output_tags(self.output_tags)
pandas_to_data.add_output_meta(self.output_meta)
pandas_to_data.transform()
# Report the transformation results
self.log.info(f"{self.input_uuid} --> DataSource: {self.output_uuid} Complete!")
def post_transform(self, **kwargs):
"""Post-Transform"""
self.log.info("Post-Transform: S3 to DataSource...")
|
S3ToDataSourceLight Initialization
Parameters:
Name |
Type |
Description |
Default |
s3_path
|
str
|
The S3 Path to the file to be transformed
|
required
|
data_uuid
|
str
|
The UUID of the SageWorks DataSource to be created
|
required
|
datatype
|
str
|
The datatype of the file to be transformed (defaults to "csv")
|
'csv'
|
Source code in src/sageworks/core/transforms/data_loaders/light/s3_to_data_source_light.py
| def __init__(self, s3_path: str, data_uuid: str, datatype: str = "csv"):
"""S3ToDataSourceLight Initialization
Args:
s3_path (str): The S3 Path to the file to be transformed
data_uuid (str): The UUID of the SageWorks DataSource to be created
datatype (str): The datatype of the file to be transformed (defaults to "csv")
"""
# Call superclass init
super().__init__(s3_path, data_uuid)
# Set up all my instance attributes
self.input_type = TransformInput.S3_OBJECT
self.output_type = TransformOutput.DATA_SOURCE
self.datatype = datatype
|
Get the size of the input S3 object in MBytes
Source code in src/sageworks/core/transforms/data_loaders/light/s3_to_data_source_light.py
| def input_size_mb(self) -> int:
"""Get the size of the input S3 object in MBytes"""
size_in_bytes = wr.s3.size_objects(self.input_uuid, boto3_session=self.boto3_session)[self.input_uuid]
size_in_mb = round(size_in_bytes / 1_000_000)
return size_in_mb
|
post_transform(**kwargs)
Post-Transform
Source code in src/sageworks/core/transforms/data_loaders/light/s3_to_data_source_light.py
| def post_transform(self, **kwargs):
"""Post-Transform"""
self.log.info("Post-Transform: S3 to DataSource...")
|
Convert the S3 CSV data into Parquet Format in the SageWorks Data Sources Bucket, and
store the information about the data to the AWS Data Catalog sageworks database
Source code in src/sageworks/core/transforms/data_loaders/light/s3_to_data_source_light.py
| def transform_impl(self, overwrite: bool = True):
"""Convert the S3 CSV data into Parquet Format in the SageWorks Data Sources Bucket, and
store the information about the data to the AWS Data Catalog sageworks database
"""
# Sanity Check for S3 Object size
object_megabytes = self.input_size_mb()
if object_megabytes > 100:
self.log.error(f"S3 Object too big ({object_megabytes} MBytes): Use the S3ToDataSourceHeavy class!")
return
# Read in the S3 CSV as a Pandas DataFrame
if self.datatype == "csv":
df = wr.s3.read_csv(self.input_uuid, low_memory=False, boto3_session=self.boto3_session)
else:
df = wr.s3.read_json(self.input_uuid, lines=True, boto3_session=self.boto3_session)
# Temporary hack to limit the number of columns in the dataframe
if len(df.columns) > 40:
self.log.warning(f"{self.input_uuid} Too Many Columns! Talk to SageWorks Support...")
# Convert object columns before sending to SageWorks Data Source
df = convert_object_columns(df)
# Use the SageWorks Pandas to Data Source class
pandas_to_data = PandasToData(self.output_uuid)
pandas_to_data.set_input(df)
pandas_to_data.set_output_tags(self.output_tags)
pandas_to_data.add_output_meta(self.output_meta)
pandas_to_data.transform()
# Report the transformation results
self.log.info(f"{self.input_uuid} --> DataSource: {self.output_uuid} Complete!")
|