Source code for simple_aws_redshift.redshift_serverless.model
# -*- coding: utf-8 -*-
"""
Data models for AWS Redshift Serverless resources.
Ref:
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-serverless.html
"""
import typing as T
import datetime
import dataclasses
from func_args.api import T_KWARGS, REQ
from iterproxy import IterProxy
from ..model import Base
if T.TYPE_CHECKING: # pragma: no cover
from mypy_boto3_redshift_serverless.literals import (
NamespaceStatusType,
WorkgroupStatusType,
)
from mypy_boto3_redshift_serverless.type_defs import (
NamespaceTypeDef,
WorkgroupTypeDef,
)
[docs]
@dataclasses.dataclass
class RedshiftServerlessNamespace(Base):
"""
Redshift Serverless Namespace object.
Ref:
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-serverless/client/get_namespace.html
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-serverless/client/list_namespaces.html
"""
raw_data: "NamespaceTypeDef" = dataclasses.field(default=REQ)
@property
def admin_password_secret_arn(self) -> T.Union[str]:
return self.raw_data.get("adminPasswordSecretArn")
@property
def admin_password_secret_kms_key_id(self) -> T.Optional[str]:
return self.raw_data.get("adminPasswordSecretKmsKeyId")
@property
def admin_username(self) -> T.Optional[str]:
return self.raw_data.get("adminUsername")
@property
def creation_date(self) -> T.Optional[datetime.datetime]:
return self.raw_data.get("creationDate")
@property
def db_name(self) -> T.Optional[str]:
return self.raw_data.get("dbName")
@property
def default_iam_role_arn(self) -> T.Optional[str]:
return self.raw_data.get("defaultIamRoleArn")
@property
def iam_roles(self) -> T.Optional[T.List[str]]:
return self.raw_data.get("iamRoles")
@property
def kms_key_id(self) -> T.Optional[str]:
return self.raw_data.get("kmsKeyId")
@property
def log_exports(self) -> T.Optional[T.List[str]]:
return self.raw_data.get("logExports")
@property
def namespace_arn(self) -> T.Optional[str]:
return self.raw_data.get("namespaceArn")
@property
def namespace_id(self) -> T.Optional[str]:
return self.raw_data.get("namespaceId")
@property
def namespace_name(self) -> T.Optional[str]:
return self.raw_data.get("namespaceName")
@property
def status(self) -> T.Optional["NamespaceStatusType"]:
return self.raw_data.get("status")
@property
def core_data(self) -> T_KWARGS:
return {
"namespace_name": self.namespace_name,
"namespace_id": self.namespace_id,
"namespace_arn": self.namespace_arn,
"status": self.status,
"creation_date": self.creation_date,
}
@property
def is_available(self) -> bool:
return self.status == "AVAILABLE"
@property
def is_modifying(self) -> bool:
return self.status == "MODIFYING"
@property
def is_deleting(self) -> bool:
return self.status == "DELETING"
[docs]
@dataclasses.dataclass
class RedshiftServerlessWorkgroup(Base):
"""
Redshift Serverless Workgroup object.
Ref:
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-serverless/client/get_workgroup.html
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-serverless/client/list_workgroups.html
"""
raw_data: "WorkgroupTypeDef" = dataclasses.field(default=REQ)
@property
def base_capacity(self) -> T.Optional[int]:
return self.raw_data.get("baseCapacity")
@property
def config_parameters(self) -> T.Optional[T.List[T.Dict[str, str]]]:
return self.raw_data.get("configParameters")
@property
def creation_date(self) -> T.Optional[datetime.datetime]:
return self.raw_data.get("creationDate")
@property
def cross_account_vpcs(self) -> T.Optional[T.List[str]]:
return self.raw_data.get("crossAccountVpcs")
@property
def custom_domain_certificate_arn(self) -> T.Optional[str]:
return self.raw_data.get("customDomainCertificateArn")
@property
def custom_domain_certificate_expiry_time(self) -> T.Optional[datetime.datetime]:
return self.raw_data.get("customDomainCertificateExpiryTime")
@property
def custom_domain_name(self) -> T.Optional[str]:
return self.raw_data.get("customDomainName")
@property
def endpoint(self) -> T.Optional[T.Dict[str, T.Any]]:
return self.raw_data.get("endpoint")
@property
def enhanced_vpc_routing(self) -> T.Optional[bool]:
return self.raw_data.get("enhancedVpcRouting")
@property
def ip_address_type(self) -> T.Optional[str]:
return self.raw_data.get("ipAddressType")
@property
def max_capacity(self) -> T.Optional[int]:
return self.raw_data.get("maxCapacity")
@property
def namespace_name(self) -> T.Optional[str]:
return self.raw_data.get("namespaceName")
@property
def port(self) -> T.Optional[int]:
return self.raw_data.get("port")
@property
def publicly_accessible(self) -> T.Optional[bool]:
return self.raw_data.get("publiclyAccessible")
@property
def security_group_ids(self) -> T.Optional[T.List[str]]:
return self.raw_data.get("securityGroupIds")
@property
def status(self) -> T.Optional["WorkgroupStatusType"]:
return self.raw_data.get("status")
@property
def subnet_ids(self) -> T.Optional[T.List[str]]:
return self.raw_data.get("subnetIds")
@property
def workgroup_arn(self) -> T.Optional[str]:
return self.raw_data.get("workgroupArn")
@property
def workgroup_id(self) -> T.Optional[str]:
return self.raw_data.get("workgroupId")
@property
def workgroup_name(self) -> T.Optional[str]:
return self.raw_data.get("workgroupName")
@property
def core_data(self) -> T_KWARGS:
return {
"workgroup_name": self.workgroup_name,
"workgroup_id": self.workgroup_id,
"workgroup_arn": self.workgroup_arn,
"status": self.status,
"namespace_name": self.namespace_name,
"creation_date": self.creation_date,
}
@property
def is_available(self) -> bool:
return self.status == "AVAILABLE"
@property
def is_creating(self) -> bool:
return self.status == "CREATING"
@property
def is_modifying(self) -> bool:
return self.status == "MODIFYING"
@property
def is_deleting(self) -> bool:
return self.status == "DELETING"
@property
def endpoint_address(self) -> str:
return self.endpoint["address"]
@property
def endpoint_port(self) -> int:
return self.endpoint["port"]
[docs]
class RedshiftServerlessNamespaceIterProxy(IterProxy[RedshiftServerlessNamespace]):
"""
Iterator proxy for :class:`RedshiftServerlessNamespace`.
"""
[docs]
class RedshiftServerlessWorkgroupIterProxy(IterProxy[RedshiftServerlessWorkgroup]):
"""
Iterator proxy for :class:`RedshiftServerlessWorkgroup`.
"""