Skip to content

DataLoaders Light

API Classes

For most users the API Classes will provide all the general functionality to create a full AWS ML Pipeline

These DataLoader Classes are intended to load smaller dataset into AWS. If you have large data please see DataLoaders Heavy

Welcome to the Workbench DataLoaders Light Classes

These classes provide low-level APIs for loading smaller data into AWS services

  • CSVToDataSource: Loads local CSV data into a DataSource
  • JSONToDataSource: Loads local JSON data into a DataSource
  • S3ToDataSourceLight: Loads S3 data into a DataSource

CSVToDataSource

Bases: Transform

CSVToDataSource: Class to move local CSV Files into a Workbench DataSource

Common Usage
csv_to_data = CSVToDataSource(csv_file_path, data_uuid)
csv_to_data.set_output_tags(["abalone", "csv", "whatever"])
csv_to_data.transform()
Source code in src/workbench/core/transforms/data_loaders/light/csv_to_data_source.py
class CSVToDataSource(Transform):
    """CSVToDataSource: Class to move local CSV Files into a Workbench DataSource

    Common Usage:
        ```python
        csv_to_data = CSVToDataSource(csv_file_path, data_uuid)
        csv_to_data.set_output_tags(["abalone", "csv", "whatever"])
        csv_to_data.transform()
        ```
    """

    def __init__(self, csv_file_path: str, data_uuid: str):
        """CSVToDataSource: Class to move local CSV Files into a Workbench DataSource

        Args:
            csv_file_path (str): The path to the CSV file to be transformed
            data_uuid (str): The UUID of the Workbench DataSource to be created
        """

        # Call superclass init
        super().__init__(csv_file_path, data_uuid)

        # Set up all my instance attributes
        self.input_type = TransformInput.LOCAL_FILE
        self.output_type = TransformOutput.DATA_SOURCE

    def transform_impl(self, overwrite: bool = True):
        """Convert the local CSV file into Parquet Format in the Workbench Data Sources Bucket, and
        store the information about the data to the AWS Data Catalog workbench database
        """

        # Report the transformation initiation
        csv_file = os.path.basename(self.input_uuid)
        self.log.info(f"Starting {csv_file} -->  DataSource: {self.output_uuid}...")

        # Read in the Local CSV as a Pandas DataFrame
        df = pd.read_csv(self.input_uuid, low_memory=False)
        df = convert_object_columns(df)

        # Use the Workbench Pandas to Data Source class
        pandas_to_data = PandasToData(self.output_uuid)
        pandas_to_data.set_input(df)
        pandas_to_data.set_output_tags(self.output_tags)
        pandas_to_data.add_output_meta(self.output_meta)
        pandas_to_data.transform()

        # Report the transformation results
        self.log.info(f"{csv_file} -->  DataSource: {self.output_uuid} Complete!")

    def post_transform(self, **kwargs):
        """Post-Transform"""
        self.log.info("Post-Transform: S3 to DataSource...")

__init__(csv_file_path, data_uuid)

CSVToDataSource: Class to move local CSV Files into a Workbench DataSource

Parameters:

Name Type Description Default
csv_file_path str

The path to the CSV file to be transformed

required
data_uuid str

The UUID of the Workbench DataSource to be created

required
Source code in src/workbench/core/transforms/data_loaders/light/csv_to_data_source.py
def __init__(self, csv_file_path: str, data_uuid: str):
    """CSVToDataSource: Class to move local CSV Files into a Workbench DataSource

    Args:
        csv_file_path (str): The path to the CSV file to be transformed
        data_uuid (str): The UUID of the Workbench DataSource to be created
    """

    # Call superclass init
    super().__init__(csv_file_path, data_uuid)

    # Set up all my instance attributes
    self.input_type = TransformInput.LOCAL_FILE
    self.output_type = TransformOutput.DATA_SOURCE

post_transform(**kwargs)

Post-Transform

Source code in src/workbench/core/transforms/data_loaders/light/csv_to_data_source.py
def post_transform(self, **kwargs):
    """Post-Transform"""
    self.log.info("Post-Transform: S3 to DataSource...")

transform_impl(overwrite=True)

Convert the local CSV file into Parquet Format in the Workbench Data Sources Bucket, and store the information about the data to the AWS Data Catalog workbench database

Source code in src/workbench/core/transforms/data_loaders/light/csv_to_data_source.py
def transform_impl(self, overwrite: bool = True):
    """Convert the local CSV file into Parquet Format in the Workbench Data Sources Bucket, and
    store the information about the data to the AWS Data Catalog workbench database
    """

    # Report the transformation initiation
    csv_file = os.path.basename(self.input_uuid)
    self.log.info(f"Starting {csv_file} -->  DataSource: {self.output_uuid}...")

    # Read in the Local CSV as a Pandas DataFrame
    df = pd.read_csv(self.input_uuid, low_memory=False)
    df = convert_object_columns(df)

    # Use the Workbench Pandas to Data Source class
    pandas_to_data = PandasToData(self.output_uuid)
    pandas_to_data.set_input(df)
    pandas_to_data.set_output_tags(self.output_tags)
    pandas_to_data.add_output_meta(self.output_meta)
    pandas_to_data.transform()

    # Report the transformation results
    self.log.info(f"{csv_file} -->  DataSource: {self.output_uuid} Complete!")

JSONToDataSource

Bases: Transform

JSONToDataSource: Class to move local JSON Files into a Workbench DataSource

Common Usage
json_to_data = JSONToDataSource(json_file_path, data_uuid)
json_to_data.set_output_tags(["abalone", "json", "whatever"])
json_to_data.transform()
Source code in src/workbench/core/transforms/data_loaders/light/json_to_data_source.py
class JSONToDataSource(Transform):
    """JSONToDataSource: Class to move local JSON Files into a Workbench DataSource

    Common Usage:
        ```python
        json_to_data = JSONToDataSource(json_file_path, data_uuid)
        json_to_data.set_output_tags(["abalone", "json", "whatever"])
        json_to_data.transform()
        ```
    """

    def __init__(self, json_file_path: str, data_uuid: str):
        """JSONToDataSource: Class to move local JSON Files into a Workbench DataSource

        Args:
            json_file_path (str): The path to the JSON file to be transformed
            data_uuid (str): The UUID of the Workbench DataSource to be created
        """

        # Call superclass init
        super().__init__(json_file_path, data_uuid)

        # Set up all my instance attributes
        self.input_type = TransformInput.LOCAL_FILE
        self.output_type = TransformOutput.DATA_SOURCE

    def transform_impl(self, overwrite: bool = True):
        """Convert the local JSON file into Parquet Format in the Workbench Data Sources Bucket, and
        store the information about the data to the AWS Data Catalog workbench database
        """

        # Report the transformation initiation
        json_file = os.path.basename(self.input_uuid)
        self.log.info(f"Starting {json_file} -->  DataSource: {self.output_uuid}...")

        # Read in the Local JSON as a Pandas DataFrame
        df = pd.read_json(self.input_uuid, lines=True)

        # Use the Workbench Pandas to Data Source class
        pandas_to_data = PandasToData(self.output_uuid)
        pandas_to_data.set_input(df)
        pandas_to_data.set_output_tags(self.output_tags)
        pandas_to_data.add_output_meta(self.output_meta)
        pandas_to_data.transform()

        # Report the transformation results
        self.log.info(f"{json_file} -->  DataSource: {self.output_uuid} Complete!")

    def post_transform(self, **kwargs):
        """Post-Transform"""
        self.log.info("Post-Transform: S3 to DataSource...")

__init__(json_file_path, data_uuid)

JSONToDataSource: Class to move local JSON Files into a Workbench DataSource

Parameters:

Name Type Description Default
json_file_path str

The path to the JSON file to be transformed

required
data_uuid str

The UUID of the Workbench DataSource to be created

required
Source code in src/workbench/core/transforms/data_loaders/light/json_to_data_source.py
def __init__(self, json_file_path: str, data_uuid: str):
    """JSONToDataSource: Class to move local JSON Files into a Workbench DataSource

    Args:
        json_file_path (str): The path to the JSON file to be transformed
        data_uuid (str): The UUID of the Workbench DataSource to be created
    """

    # Call superclass init
    super().__init__(json_file_path, data_uuid)

    # Set up all my instance attributes
    self.input_type = TransformInput.LOCAL_FILE
    self.output_type = TransformOutput.DATA_SOURCE

post_transform(**kwargs)

Post-Transform

Source code in src/workbench/core/transforms/data_loaders/light/json_to_data_source.py
def post_transform(self, **kwargs):
    """Post-Transform"""
    self.log.info("Post-Transform: S3 to DataSource...")

transform_impl(overwrite=True)

Convert the local JSON file into Parquet Format in the Workbench Data Sources Bucket, and store the information about the data to the AWS Data Catalog workbench database

Source code in src/workbench/core/transforms/data_loaders/light/json_to_data_source.py
def transform_impl(self, overwrite: bool = True):
    """Convert the local JSON file into Parquet Format in the Workbench Data Sources Bucket, and
    store the information about the data to the AWS Data Catalog workbench database
    """

    # Report the transformation initiation
    json_file = os.path.basename(self.input_uuid)
    self.log.info(f"Starting {json_file} -->  DataSource: {self.output_uuid}...")

    # Read in the Local JSON as a Pandas DataFrame
    df = pd.read_json(self.input_uuid, lines=True)

    # Use the Workbench Pandas to Data Source class
    pandas_to_data = PandasToData(self.output_uuid)
    pandas_to_data.set_input(df)
    pandas_to_data.set_output_tags(self.output_tags)
    pandas_to_data.add_output_meta(self.output_meta)
    pandas_to_data.transform()

    # Report the transformation results
    self.log.info(f"{json_file} -->  DataSource: {self.output_uuid} Complete!")

S3ToDataSourceLight

Bases: Transform

S3ToDataSourceLight: Class to move LIGHT S3 Files into a Workbench DataSource

Common Usage
s3_to_data = S3ToDataSourceLight(s3_path, data_uuid, datatype="csv/json")
s3_to_data.set_output_tags(["abalone", "whatever"])
s3_to_data.transform()
Source code in src/workbench/core/transforms/data_loaders/light/s3_to_data_source_light.py
class S3ToDataSourceLight(Transform):
    """S3ToDataSourceLight: Class to move LIGHT S3 Files into a Workbench DataSource

    Common Usage:
        ```python
        s3_to_data = S3ToDataSourceLight(s3_path, data_uuid, datatype="csv/json")
        s3_to_data.set_output_tags(["abalone", "whatever"])
        s3_to_data.transform()
        ```
    """

    def __init__(self, s3_path: str, data_uuid: str, datatype: str = "csv"):
        """S3ToDataSourceLight Initialization

        Args:
            s3_path (str): The S3 Path to the file to be transformed
            data_uuid (str): The UUID of the Workbench DataSource to be created
            datatype (str): The datatype of the file to be transformed (defaults to "csv")
        """

        # Call superclass init
        super().__init__(s3_path, data_uuid)

        # Set up all my instance attributes
        self.input_type = TransformInput.S3_OBJECT
        self.output_type = TransformOutput.DATA_SOURCE
        self.datatype = datatype

    def input_size_mb(self) -> int:
        """Get the size of the input S3 object in MBytes"""
        size_in_bytes = wr.s3.size_objects(self.input_uuid, boto3_session=self.boto3_session)[self.input_uuid]
        size_in_mb = round(size_in_bytes / 1_000_000)
        return size_in_mb

    def transform_impl(self, overwrite: bool = True):
        """Convert the S3 CSV data into Parquet Format in the Workbench Data Sources Bucket, and
        store the information about the data to the AWS Data Catalog workbench database
        """

        # Sanity Check for S3 Object size
        object_megabytes = self.input_size_mb()
        if object_megabytes > 100:
            self.log.error(f"S3 Object too big ({object_megabytes} MBytes): Use the S3ToDataSourceHeavy class!")
            return

        # Read in the S3 CSV as a Pandas DataFrame
        if self.datatype == "csv":
            df = wr.s3.read_csv(self.input_uuid, low_memory=False, boto3_session=self.boto3_session)
        else:
            df = wr.s3.read_json(self.input_uuid, lines=True, boto3_session=self.boto3_session)

        # Temporary hack to limit the number of columns in the dataframe
        if len(df.columns) > 40:
            self.log.warning(f"{self.input_uuid} Too Many Columns! Talk to Workbench Support...")

        # Convert object columns before sending to Workbench Data Source
        df = convert_object_columns(df)

        # Use the Workbench Pandas to Data Source class
        pandas_to_data = PandasToData(self.output_uuid)
        pandas_to_data.set_input(df)
        pandas_to_data.set_output_tags(self.output_tags)
        pandas_to_data.add_output_meta(self.output_meta)
        pandas_to_data.transform()

        # Report the transformation results
        self.log.info(f"{self.input_uuid} -->  DataSource: {self.output_uuid} Complete!")

    def post_transform(self, **kwargs):
        """Post-Transform"""
        self.log.info("Post-Transform: S3 to DataSource...")

__init__(s3_path, data_uuid, datatype='csv')

S3ToDataSourceLight Initialization

Parameters:

Name Type Description Default
s3_path str

The S3 Path to the file to be transformed

required
data_uuid str

The UUID of the Workbench DataSource to be created

required
datatype str

The datatype of the file to be transformed (defaults to "csv")

'csv'
Source code in src/workbench/core/transforms/data_loaders/light/s3_to_data_source_light.py
def __init__(self, s3_path: str, data_uuid: str, datatype: str = "csv"):
    """S3ToDataSourceLight Initialization

    Args:
        s3_path (str): The S3 Path to the file to be transformed
        data_uuid (str): The UUID of the Workbench DataSource to be created
        datatype (str): The datatype of the file to be transformed (defaults to "csv")
    """

    # Call superclass init
    super().__init__(s3_path, data_uuid)

    # Set up all my instance attributes
    self.input_type = TransformInput.S3_OBJECT
    self.output_type = TransformOutput.DATA_SOURCE
    self.datatype = datatype

input_size_mb()

Get the size of the input S3 object in MBytes

Source code in src/workbench/core/transforms/data_loaders/light/s3_to_data_source_light.py
def input_size_mb(self) -> int:
    """Get the size of the input S3 object in MBytes"""
    size_in_bytes = wr.s3.size_objects(self.input_uuid, boto3_session=self.boto3_session)[self.input_uuid]
    size_in_mb = round(size_in_bytes / 1_000_000)
    return size_in_mb

post_transform(**kwargs)

Post-Transform

Source code in src/workbench/core/transforms/data_loaders/light/s3_to_data_source_light.py
def post_transform(self, **kwargs):
    """Post-Transform"""
    self.log.info("Post-Transform: S3 to DataSource...")

transform_impl(overwrite=True)

Convert the S3 CSV data into Parquet Format in the Workbench Data Sources Bucket, and store the information about the data to the AWS Data Catalog workbench database

Source code in src/workbench/core/transforms/data_loaders/light/s3_to_data_source_light.py
def transform_impl(self, overwrite: bool = True):
    """Convert the S3 CSV data into Parquet Format in the Workbench Data Sources Bucket, and
    store the information about the data to the AWS Data Catalog workbench database
    """

    # Sanity Check for S3 Object size
    object_megabytes = self.input_size_mb()
    if object_megabytes > 100:
        self.log.error(f"S3 Object too big ({object_megabytes} MBytes): Use the S3ToDataSourceHeavy class!")
        return

    # Read in the S3 CSV as a Pandas DataFrame
    if self.datatype == "csv":
        df = wr.s3.read_csv(self.input_uuid, low_memory=False, boto3_session=self.boto3_session)
    else:
        df = wr.s3.read_json(self.input_uuid, lines=True, boto3_session=self.boto3_session)

    # Temporary hack to limit the number of columns in the dataframe
    if len(df.columns) > 40:
        self.log.warning(f"{self.input_uuid} Too Many Columns! Talk to Workbench Support...")

    # Convert object columns before sending to Workbench Data Source
    df = convert_object_columns(df)

    # Use the Workbench Pandas to Data Source class
    pandas_to_data = PandasToData(self.output_uuid)
    pandas_to_data.set_input(df)
    pandas_to_data.set_output_tags(self.output_tags)
    pandas_to_data.add_output_meta(self.output_meta)
    pandas_to_data.transform()

    # Report the transformation results
    self.log.info(f"{self.input_uuid} -->  DataSource: {self.output_uuid} Complete!")