Skip to content

CachedMeta

CachedMeta Examples

Examples of using the CachedMeta class are listed at the bottom of this page Examples.

CachedMeta: A class that provides caching for the Meta() class

CachedMeta

Bases: CloudMeta

CachedMeta: Singleton class for caching metadata functionality.

Common Usage
from workbench.cached.cached_meta import CachedMeta
meta = CachedMeta()

# Get the AWS Account Info
meta.account()
meta.config()

# These are 'list' methods
meta.etl_jobs()
meta.data_sources()
meta.feature_sets(details=True/False)
meta.models(details=True/False)
meta.endpoints()
meta.views()

# These are 'describe' methods
meta.data_source("abalone_data")
meta.feature_set("abalone_features")
meta.model("abalone-regression")
meta.endpoint("abalone-endpoint")
Source code in src/workbench/cached/cached_meta.py
class CachedMeta(CloudMeta):
    """CachedMeta: Singleton class for caching metadata functionality.

    Common Usage:
       ```python
       from workbench.cached.cached_meta import CachedMeta
       meta = CachedMeta()

       # Get the AWS Account Info
       meta.account()
       meta.config()

       # These are 'list' methods
       meta.etl_jobs()
       meta.data_sources()
       meta.feature_sets(details=True/False)
       meta.models(details=True/False)
       meta.endpoints()
       meta.views()

       # These are 'describe' methods
       meta.data_source("abalone_data")
       meta.feature_set("abalone_features")
       meta.model("abalone-regression")
       meta.endpoint("abalone-endpoint")
       ```
    """

    _instance = None  # Class attribute to hold the singleton instance

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super(CachedMeta, cls).__new__(cls)
        return cls._instance

    def __init__(self):
        """CachedMeta Initialization"""
        if hasattr(self, "_initialized") and self._initialized:
            return  # Prevent reinitialization

        self.log = logging.getLogger("workbench")
        self.log.important("Initializing CachedMeta...")
        super().__init__()

        # Create both our Meta Cache and Fresh Cache (tracks if data is stale)
        self.meta_cache = WorkbenchCache(prefix="meta")
        self.fresh_cache = WorkbenchCache(prefix="meta_fresh", expire=90)  # 90-second expiration

        # Create a ThreadPoolExecutor for refreshing stale data
        self.thread_pool = ThreadPoolExecutor(max_workers=5)

        # Mark the instance as initialized
        self._initialized = True

    def check(self):
        """Check if our underlying caches are working"""
        return self.meta_cache.check()

    def list_meta_cache(self):
        """List the current Meta Cache"""
        return self.meta_cache.list_keys()

    def clear_meta_cache(self):
        """Clear the current Meta Cache"""
        self.meta_cache.clear()

    @cache_result
    def account(self) -> dict:
        """Cloud Platform Account Info

        Returns:
            dict: Cloud Platform Account Info
        """
        return super().account()

    @cache_result
    def config(self) -> dict:
        """Return the current Workbench Configuration

        Returns:
            dict: The current Workbench Configuration
        """
        return super().config()

    @cache_result
    def incoming_data(self) -> pd.DataFrame:
        """Get summary data about data in the incoming raw data

        Returns:
            pd.DataFrame: A summary of the incoming raw data
        """
        return super().incoming_data()

    @cache_result
    def etl_jobs(self) -> pd.DataFrame:
        """Get summary data about Extract, Transform, Load (ETL) Jobs

        Returns:
            pd.DataFrame: A summary of the ETL Jobs deployed in the Cloud Platform
        """
        return super().etl_jobs()

    @cache_result
    def data_sources(self) -> pd.DataFrame:
        """Get a summary of the Data Sources deployed in the Cloud Platform

        Returns:
            pd.DataFrame: A summary of the Data Sources deployed in the Cloud Platform
        """
        return super().data_sources()

    @cache_result
    def views(self, database: str = "workbench") -> pd.DataFrame:
        """Get a summary of the all the Views, for the given database, in AWS

        Args:
            database (str, optional): Glue database. Defaults to 'workbench'.

        Returns:
            pd.DataFrame: A summary of all the Views, for the given database, in AWS
        """
        return super().views(database=database)

    @cache_result
    def feature_sets(self, details: bool = False) -> pd.DataFrame:
        """Get a summary of the Feature Sets deployed in the Cloud Platform

        Args:
            details (bool, optional): Include detailed information. Defaults to False.

        Returns:
            pd.DataFrame: A summary of the Feature Sets deployed in the Cloud Platform
        """
        return super().feature_sets(details=details)

    @cache_result
    def models(self, details: bool = False) -> pd.DataFrame:
        """Get a summary of the Models deployed in the Cloud Platform

        Args:
            details (bool, optional): Include detailed information. Defaults to False.

        Returns:
            pd.DataFrame: A summary of the Models deployed in the Cloud Platform
        """
        return super().models(details=details)

    @cache_result
    def endpoints(self) -> pd.DataFrame:
        """Get a summary of the Endpoints deployed in the Cloud Platform

        Returns:
            pd.DataFrame: A summary of the Endpoints in the Cloud Platform
        """
        return super().endpoints()

    @cache_result
    def glue_job(self, job_name: str) -> Union[dict, None]:
        """Get the details of a specific Glue Job

        Args:
            job_name (str): The name of the Glue Job

        Returns:
            dict: The details of the Glue Job (None if not found)
        """
        return super().glue_job(job_name=job_name)

    @cache_result
    def data_source(self, data_source_name: str, database: str = "workbench") -> Union[dict, None]:
        """Get the details of a specific Data Source

        Args:
            data_source_name (str): The name of the Data Source
            database (str, optional): The Glue database. Defaults to 'workbench'.

        Returns:
            dict: The details of the Data Source (None if not found)
        """
        return super().data_source(data_source_name=data_source_name, database=database)

    @cache_result
    def feature_set(self, feature_set_name: str) -> Union[dict, None]:
        """Get the details of a specific Feature Set

        Args:
            feature_set_name (str): The name of the Feature Set

        Returns:
            dict: The details of the Feature Set (None if not found)
        """
        return super().feature_set(feature_set_name=feature_set_name)

    @cache_result
    def model(self, model_name: str) -> Union[dict, None]:
        """Get the details of a specific Model

        Args:
            model_name (str): The name of the Model

        Returns:
            dict: The details of the Model (None if not found)
        """
        return super().model(model_name=model_name)

    @cache_result
    def endpoint(self, endpoint_name: str) -> Union[dict, None]:
        """Get the details of a specific Endpoint

        Args:
            endpoint_name (str): The name of the Endpoint

        Returns:
            dict: The details of the Endpoint (None if not found)
        """
        return super().endpoint(endpoint_name=endpoint_name)

    def _refresh_data_in_background(self, cache_key, method, *args, **kwargs):
        """Background task to refresh AWS metadata."""
        result = method(self, *args, **kwargs)
        self.meta_cache.set(cache_key, result)
        self.log.debug(f"Updated Metadata for {cache_key}")

    @staticmethod
    def _flatten_redis_key(method, *args, **kwargs):
        """Flatten the args and kwargs into a single string"""
        arg_str = "_".join(str(arg) for arg in args)
        kwarg_str = "_".join(f"{k}_{v}" for k, v in sorted(kwargs.items()))
        return f"{method.__name__}_{arg_str}_{kwarg_str}".replace(" ", "").replace("'", "")

    def __del__(self):
        """Destructor to shut down the thread pool gracefully."""
        self.close()

    def close(self):
        """Explicitly close the thread pool, if needed."""
        if self.thread_pool:
            self.log.important("Shutting down the ThreadPoolExecutor...")
            try:
                self.thread_pool.shutdown(wait=True)  # Gracefully shutdown
            except RuntimeError as e:
                self.log.error(f"Error during thread pool shutdown: {e}")
            finally:
                self.thread_pool = None

    def __repr__(self):
        return f"CachedMeta()\n\t{repr(self.meta_cache)}\n\t{super().__repr__()}"

__del__()

Destructor to shut down the thread pool gracefully.

Source code in src/workbench/cached/cached_meta.py
def __del__(self):
    """Destructor to shut down the thread pool gracefully."""
    self.close()

__init__()

CachedMeta Initialization

Source code in src/workbench/cached/cached_meta.py
def __init__(self):
    """CachedMeta Initialization"""
    if hasattr(self, "_initialized") and self._initialized:
        return  # Prevent reinitialization

    self.log = logging.getLogger("workbench")
    self.log.important("Initializing CachedMeta...")
    super().__init__()

    # Create both our Meta Cache and Fresh Cache (tracks if data is stale)
    self.meta_cache = WorkbenchCache(prefix="meta")
    self.fresh_cache = WorkbenchCache(prefix="meta_fresh", expire=90)  # 90-second expiration

    # Create a ThreadPoolExecutor for refreshing stale data
    self.thread_pool = ThreadPoolExecutor(max_workers=5)

    # Mark the instance as initialized
    self._initialized = True

account()

Cloud Platform Account Info

Returns:

Name Type Description
dict dict

Cloud Platform Account Info

Source code in src/workbench/cached/cached_meta.py
@cache_result
def account(self) -> dict:
    """Cloud Platform Account Info

    Returns:
        dict: Cloud Platform Account Info
    """
    return super().account()

check()

Check if our underlying caches are working

Source code in src/workbench/cached/cached_meta.py
def check(self):
    """Check if our underlying caches are working"""
    return self.meta_cache.check()

clear_meta_cache()

Clear the current Meta Cache

Source code in src/workbench/cached/cached_meta.py
def clear_meta_cache(self):
    """Clear the current Meta Cache"""
    self.meta_cache.clear()

close()

Explicitly close the thread pool, if needed.

Source code in src/workbench/cached/cached_meta.py
def close(self):
    """Explicitly close the thread pool, if needed."""
    if self.thread_pool:
        self.log.important("Shutting down the ThreadPoolExecutor...")
        try:
            self.thread_pool.shutdown(wait=True)  # Gracefully shutdown
        except RuntimeError as e:
            self.log.error(f"Error during thread pool shutdown: {e}")
        finally:
            self.thread_pool = None

config()

Return the current Workbench Configuration

Returns:

Name Type Description
dict dict

The current Workbench Configuration

Source code in src/workbench/cached/cached_meta.py
@cache_result
def config(self) -> dict:
    """Return the current Workbench Configuration

    Returns:
        dict: The current Workbench Configuration
    """
    return super().config()

data_source(data_source_name, database='workbench')

Get the details of a specific Data Source

Parameters:

Name Type Description Default
data_source_name str

The name of the Data Source

required
database str

The Glue database. Defaults to 'workbench'.

'workbench'

Returns:

Name Type Description
dict Union[dict, None]

The details of the Data Source (None if not found)

Source code in src/workbench/cached/cached_meta.py
@cache_result
def data_source(self, data_source_name: str, database: str = "workbench") -> Union[dict, None]:
    """Get the details of a specific Data Source

    Args:
        data_source_name (str): The name of the Data Source
        database (str, optional): The Glue database. Defaults to 'workbench'.

    Returns:
        dict: The details of the Data Source (None if not found)
    """
    return super().data_source(data_source_name=data_source_name, database=database)

data_sources()

Get a summary of the Data Sources deployed in the Cloud Platform

Returns:

Type Description
DataFrame

pd.DataFrame: A summary of the Data Sources deployed in the Cloud Platform

Source code in src/workbench/cached/cached_meta.py
@cache_result
def data_sources(self) -> pd.DataFrame:
    """Get a summary of the Data Sources deployed in the Cloud Platform

    Returns:
        pd.DataFrame: A summary of the Data Sources deployed in the Cloud Platform
    """
    return super().data_sources()

endpoint(endpoint_name)

Get the details of a specific Endpoint

Parameters:

Name Type Description Default
endpoint_name str

The name of the Endpoint

required

Returns:

Name Type Description
dict Union[dict, None]

The details of the Endpoint (None if not found)

Source code in src/workbench/cached/cached_meta.py
@cache_result
def endpoint(self, endpoint_name: str) -> Union[dict, None]:
    """Get the details of a specific Endpoint

    Args:
        endpoint_name (str): The name of the Endpoint

    Returns:
        dict: The details of the Endpoint (None if not found)
    """
    return super().endpoint(endpoint_name=endpoint_name)

endpoints()

Get a summary of the Endpoints deployed in the Cloud Platform

Returns:

Type Description
DataFrame

pd.DataFrame: A summary of the Endpoints in the Cloud Platform

Source code in src/workbench/cached/cached_meta.py
@cache_result
def endpoints(self) -> pd.DataFrame:
    """Get a summary of the Endpoints deployed in the Cloud Platform

    Returns:
        pd.DataFrame: A summary of the Endpoints in the Cloud Platform
    """
    return super().endpoints()

etl_jobs()

Get summary data about Extract, Transform, Load (ETL) Jobs

Returns:

Type Description
DataFrame

pd.DataFrame: A summary of the ETL Jobs deployed in the Cloud Platform

Source code in src/workbench/cached/cached_meta.py
@cache_result
def etl_jobs(self) -> pd.DataFrame:
    """Get summary data about Extract, Transform, Load (ETL) Jobs

    Returns:
        pd.DataFrame: A summary of the ETL Jobs deployed in the Cloud Platform
    """
    return super().etl_jobs()

feature_set(feature_set_name)

Get the details of a specific Feature Set

Parameters:

Name Type Description Default
feature_set_name str

The name of the Feature Set

required

Returns:

Name Type Description
dict Union[dict, None]

The details of the Feature Set (None if not found)

Source code in src/workbench/cached/cached_meta.py
@cache_result
def feature_set(self, feature_set_name: str) -> Union[dict, None]:
    """Get the details of a specific Feature Set

    Args:
        feature_set_name (str): The name of the Feature Set

    Returns:
        dict: The details of the Feature Set (None if not found)
    """
    return super().feature_set(feature_set_name=feature_set_name)

feature_sets(details=False)

Get a summary of the Feature Sets deployed in the Cloud Platform

Parameters:

Name Type Description Default
details bool

Include detailed information. Defaults to False.

False

Returns:

Type Description
DataFrame

pd.DataFrame: A summary of the Feature Sets deployed in the Cloud Platform

Source code in src/workbench/cached/cached_meta.py
@cache_result
def feature_sets(self, details: bool = False) -> pd.DataFrame:
    """Get a summary of the Feature Sets deployed in the Cloud Platform

    Args:
        details (bool, optional): Include detailed information. Defaults to False.

    Returns:
        pd.DataFrame: A summary of the Feature Sets deployed in the Cloud Platform
    """
    return super().feature_sets(details=details)

glue_job(job_name)

Get the details of a specific Glue Job

Parameters:

Name Type Description Default
job_name str

The name of the Glue Job

required

Returns:

Name Type Description
dict Union[dict, None]

The details of the Glue Job (None if not found)

Source code in src/workbench/cached/cached_meta.py
@cache_result
def glue_job(self, job_name: str) -> Union[dict, None]:
    """Get the details of a specific Glue Job

    Args:
        job_name (str): The name of the Glue Job

    Returns:
        dict: The details of the Glue Job (None if not found)
    """
    return super().glue_job(job_name=job_name)

incoming_data()

Get summary data about data in the incoming raw data

Returns:

Type Description
DataFrame

pd.DataFrame: A summary of the incoming raw data

Source code in src/workbench/cached/cached_meta.py
@cache_result
def incoming_data(self) -> pd.DataFrame:
    """Get summary data about data in the incoming raw data

    Returns:
        pd.DataFrame: A summary of the incoming raw data
    """
    return super().incoming_data()

list_meta_cache()

List the current Meta Cache

Source code in src/workbench/cached/cached_meta.py
def list_meta_cache(self):
    """List the current Meta Cache"""
    return self.meta_cache.list_keys()

model(model_name)

Get the details of a specific Model

Parameters:

Name Type Description Default
model_name str

The name of the Model

required

Returns:

Name Type Description
dict Union[dict, None]

The details of the Model (None if not found)

Source code in src/workbench/cached/cached_meta.py
@cache_result
def model(self, model_name: str) -> Union[dict, None]:
    """Get the details of a specific Model

    Args:
        model_name (str): The name of the Model

    Returns:
        dict: The details of the Model (None if not found)
    """
    return super().model(model_name=model_name)

models(details=False)

Get a summary of the Models deployed in the Cloud Platform

Parameters:

Name Type Description Default
details bool

Include detailed information. Defaults to False.

False

Returns:

Type Description
DataFrame

pd.DataFrame: A summary of the Models deployed in the Cloud Platform

Source code in src/workbench/cached/cached_meta.py
@cache_result
def models(self, details: bool = False) -> pd.DataFrame:
    """Get a summary of the Models deployed in the Cloud Platform

    Args:
        details (bool, optional): Include detailed information. Defaults to False.

    Returns:
        pd.DataFrame: A summary of the Models deployed in the Cloud Platform
    """
    return super().models(details=details)

views(database='workbench')

Get a summary of the all the Views, for the given database, in AWS

Parameters:

Name Type Description Default
database str

Glue database. Defaults to 'workbench'.

'workbench'

Returns:

Type Description
DataFrame

pd.DataFrame: A summary of all the Views, for the given database, in AWS

Source code in src/workbench/cached/cached_meta.py
@cache_result
def views(self, database: str = "workbench") -> pd.DataFrame:
    """Get a summary of the all the Views, for the given database, in AWS

    Args:
        database (str, optional): Glue database. Defaults to 'workbench'.

    Returns:
        pd.DataFrame: A summary of all the Views, for the given database, in AWS
    """
    return super().views(database=database)

cache_result(method)

Decorator to cache method results in meta_cache

Source code in src/workbench/cached/cached_meta.py
def cache_result(method):
    """Decorator to cache method results in meta_cache"""

    @wraps(method)
    def wrapper(self, *args, **kwargs):
        # Create a unique cache key based on the method name and arguments
        cache_key = CachedMeta._flatten_redis_key(method, *args, **kwargs)

        # Check for fresh data, spawn thread to refresh if stale
        if WorkbenchCache.refresh_enabled and self.fresh_cache.get(cache_key) is None:
            self.log.debug(f"Async: Metadata for {cache_key} refresh thread started...")
            self.fresh_cache.set(cache_key, True)  # Mark as refreshed

            # Spawn a thread to refresh data without blocking
            self.thread_pool.submit(self._refresh_data_in_background, cache_key, method, *args, **kwargs)

        # Return data (fresh or stale) if available
        cached_value = self.meta_cache.get(cache_key)
        if cached_value is not None:
            return cached_value

        # Fall back to calling the method if no cached data found
        self.log.important(f"Blocking: Getting Metadata for {cache_key}")
        result = method(self, *args, **kwargs)
        self.meta_cache.set(cache_key, result)
        return result

    return wrapper

Examples

These example show how to use the CachedMeta() class to pull lists of artifacts from AWS. DataSources, FeatureSets, Models, Endpoints and more. If you're building a web interface plugin, the CachedMeta class is a great place to start.

Workbench REPL

If you'd like to see exactly what data/details you get back from the CachedMeta() class, you can spin up the Workbench REPL, use the class and test out all the methods. Try it out! Workbench REPL

Using Workbench REPL
CachedMeta = CachedMeta()
model_df = CachedMeta.models()
model_df
               Model Group   Health Owner  ...             Input     Status                Description
0      wine-classification  healthy     -  ...     wine_features  Completed  Wine Classification Model
1  abalone-regression-full  healthy     -  ...  abalone_features  Completed   Abalone Regression Model
2       abalone-regression  healthy     -  ...  abalone_features  Completed   Abalone Regression Model

[3 rows x 10 columns]

List the Models in AWS

from workbench.cached.cached_meta import CachedMeta

# Create our CachedMeta Class and get a list of our Models
CachedMeta = CachedMeta()
model_df = CachedMeta.models()

print(f"Number of Models: {len(model_df)}")
print(model_df)

# Get more details data on the Models
model_names = model_df["Model Group"].tolist()
for name in model_names:
    pprint(CachedMeta.model(name))

Output

Number of Models: 3
               Model Group   Health Owner  ...             Input     Status                Description
0      wine-classification  healthy     -  ...     wine_features  Completed  Wine Classification Model
1  abalone-regression-full  healthy     -  ...  abalone_features  Completed   Abalone Regression Model
2       abalone-regression  healthy     -  ...  abalone_features  Completed   Abalone Regression Model

[3 rows x 10 columns]
wine-classification
abalone-regression-full
abalone-regression

Getting Model Performance Metrics

from workbench.cached.cached_meta import CachedMeta

# Create our CachedMeta Class and get a list of our Models
CachedMeta = CachedMeta()
model_df = CachedMeta.models()

print(f"Number of Models: {len(model_df)}")
print(model_df)

# Get more details data on the Models
model_names = model_df["Model Group"].tolist()
for name in model_names[:5]:
    model_details = CachedMeta.model(name)
    print(f"\n\nModel: {name}")
    performance_metrics = model_details["workbench_CachedMeta"]["workbench_inference_metrics"]
    print(f"\tPerformance Metrics: {performance_metrics}")

Output

wine-classification
    ARN: arn:aws:sagemaker:us-west-2:507740646243:model-package-group/wine-classification
    Description: Wine Classification Model
    Tags: wine::classification
    Performance Metrics:
        [{'wine_class': 'TypeA', 'precision': 1.0, 'recall': 1.0, 'fscore': 1.0, 'roc_auc': 1.0, 'support': 12}, {'wine_class': 'TypeB', 'precision': 1.0, 'recall': 1.0, 'fscore': 1.0, 'roc_auc': 1.0, 'support': 14}, {'wine_class': 'TypeC', 'precision': 1.0, 'recall': 1.0, 'fscore': 1.0, 'roc_auc': 1.0, 'support': 9}]

abalone-regression
    ARN: arn:aws:sagemaker:us-west-2:507740646243:model-package-group/abalone-regression
    Description: Abalone Regression Model
    Tags: abalone::regression
    Performance Metrics:
        [{'MAE': 1.64, 'RMSE': 2.246, 'R2': 0.502, 'MAPE': 16.393, 'MedAE': 1.209, 'NumRows': 834}]

List the Endpoints in AWS

from pprint import pprint
from workbench.cached.cached_meta import CachedMeta

# Create our CachedMeta Class and get a list of our Endpoints
CachedMeta = CachedMeta()
endpoint_df = CachedMeta.endpoints()
print(f"Number of Endpoints: {len(endpoint_df)}")
print(endpoint_df)

# Get more details data on the Endpoints
endpoint_names = endpoint_df["Name"].tolist()
for name in endpoint_names:
    pprint(CachedMeta.endpoint(name))

Output

Number of Endpoints: 2
                      Name   Health            Instance           Created  ...     Status     Variant Capture Samp(%)
0  wine-classification-end  healthy  Serverless (2GB/5)  2024-03-23 23:09  ...  InService  AllTraffic   False       -
1   abalone-regression-end  healthy  Serverless (2GB/5)  2024-03-23 21:11  ...  InService  AllTraffic   False       -

[2 rows x 10 columns]
wine-classification-end
<lots of details about endpoints>

Not Finding some particular AWS Data?

The Workbench CachedMeta API Class also has (details=True) arguments, so make sure to check those out.