Source code for simple_aws_redshift.redshift_data_api.client

# -*- coding: utf-8 -*-

"""
Improve the original redshift data api boto3 API.

Ref:

- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data.html
"""

import typing as T
import warnings
import dataclasses

import botocore.exceptions
from func_args.api import REQ, OPT, remove_optional, BaseModel, BaseFrozenModel
from ..vendor.waiter import Waiter

from .model import (
    DescribeStatementResponse,
    GetStatementResultResponse,
    GetStatementResultResponseIterProxy,
    VirtualDataFrame,
    ConsolidatedStatementResult,
)

if T.TYPE_CHECKING:  # pragma: no cover
    from mypy_boto3_redshift_data.client import RedshiftDataAPIServiceClient
    from mypy_boto3_redshift_data.literals import ResultFormatStringType
    from mypy_boto3_redshift_data.type_defs import (
        ExecuteStatementOutputTypeDef,
        GetStatementResultResponseTypeDef,
    )


[docs] def get_statement_result( redshift_data_api_client: "RedshiftDataAPIServiceClient", id: str, max_items: int = 1000, ) -> GetStatementResultResponseIterProxy: """ Retrieves the result of a SQL statement execution using the Redshift Data API. This function automatically paginates through all result pages and returns an iterator proxy that yields GetStatementResultResponse objects for each page. :param redshift_data_api_client: boto3.client("redshift-data") object :param id: The identifier of the SQL statement to retrieve results for :param max_items: Maximum number of items to return across all pages :return: :class:`~simple_aws_redshift.redshift_data_api.model.GetStatementResultResponseIterProxy` Reference: - get_statement_result: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data/client/get_statement_result.html - GetStatementResult: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data/paginator/GetStatementResult.html .. note:: get_statement_result can only return results that are in JSON format! for CSV, please use get_statement_result_v2. """ def func(): paginator = redshift_data_api_client.get_paginator("get_statement_result") response_iterator = paginator.paginate( Id=id, PaginationConfig=dict( MaxItems=max_items, ), ) get_statement_result_response: GetStatementResultResponseTypeDef for get_statement_result_response in response_iterator: statement_result = GetStatementResultResponse( raw_data=get_statement_result_response ) yield statement_result return GetStatementResultResponseIterProxy(func())
# TODO: depracate this
[docs] @dataclasses.dataclass class RunSqlResult(BaseModel): """ Result of running a SQL statement using the Redshift Data API. :param execute_statement_response: Response from the `execute_statement` API call. :param describe_statement_response: Response from the `describe_statement` API call. """ # fmt: off execute_statement_response: "ExecuteStatementOutputTypeDef" = dataclasses.field(default=REQ) describe_statement_response: "DescribeStatementResponse" = dataclasses.field(default=REQ) # fmt: on @property def execution_id(self) -> str: """ Get the execution ID of the SQL statement. This ID can be used to retrieve the results of the SQL execution using the `get_statement_result` API. """ warnings.warn( "RunSqlResult is deprecated, use `SqlCommand` instead.", category=DeprecationWarning, ) return self.execute_statement_response["Id"]
# TODO: depracate this
[docs] def run_sql( redshift_data_api_client: "RedshiftDataAPIServiceClient", sql: str, client_token: str = OPT, cluster_identifier: str = OPT, database: str = OPT, db_user: str = OPT, parameters: dict[str, T.Any] = OPT, result_format: "ResultFormatStringType" = OPT, secret_arn: str = OPT, session_id: str = OPT, session_keep_alive_seconds: int = OPT, statement_name: str = OPT, with_event: bool = OPT, workgroup_name: str = OPT, delay: int = 1, timeout: int = 10, verbose: bool = False, raises_on_error: bool = True, ): """ Run redshift SQL statement using Data API and get the results. It will run ``execute_statement`` API to run the SQL asynchronously, then do a long polling to check the status of the SQL execution using``describe_statement`` API. Once the SQL execution is finished, it will run ``get_statement_result`` API to get the result. In other words, this function is a human-friendly wrapper of the Data API. :param redshift_data_api_client: boto3.client("redshift-data") object :param sql: SQL statement you want to execute. :param client_token: Unique identifier for the request to ensure idempotency. :param cluster_identifier: cluster identifier. this is for Redshift provisioned cluster only. :param database: database name. :param db_user: database user name. :param parameters: Parameters for the SQL statement. :param result_format: Format of the result set (JSON or CSV). :param secret_arn: ARN of the secret containing database credentials. :param session_id: Database session identifier. :param session_keep_alive_seconds: Number of seconds to keep the session alive. :param statement_name: statement name. a human-friendly name you want to give to this SQL statement. :param with_event: Whether to send an event to Amazon EventBridge. :param workgroup_name: workgroup name. this is for Redshift serverless only. :param delay: how many seconds to wait between each long polling. :param timeout: how many seconds to wait before timeout. :param verbose: whether to print verbose output during polling. :param raises_on_error: whether to raise an exception when the SQL execution fails. Reference: - execute_statement: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data/client/execute_statement.html - describe_statement: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data/client/describe_statement.html - get_statement_result: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data/client/get_statement_result.html :return: :class:`RunSqlResult` object. """ warnings.warn( "run_sql is deprecated, use `SqlCommand` instead.", category=DeprecationWarning, ) # --- execute_statement # process arguments kwargs = dict( Sql=sql, ClientToken=client_token, ClusterIdentifier=cluster_identifier, Database=database, DbUser=db_user, Parameters=parameters, ResultFormat=result_format, SecretArn=secret_arn, SessionId=session_id, SessionKeepAliveSeconds=session_keep_alive_seconds, StatementName=statement_name, WithEvent=with_event, WorkgroupName=workgroup_name, ) execute_statement_response = redshift_data_api_client.execute_statement( **remove_optional(**kwargs) ) id = execute_statement_response["Id"] # --- describe_statement # wait for the status to reach FINISHED describe_statement_response = None for _ in Waiter(delays=delay, timeout=timeout, verbose=verbose): try: response = redshift_data_api_client.describe_statement(Id=id) describe_statement_response = DescribeStatementResponse(raw_data=response) # 'SUBMITTED'|'PICKED'|'STARTED'|'FINISHED'|'ABORTED'|'FAILED'|'ALL' status = describe_statement_response.status if status == "FINISHED": break # still pending elif status in ["SUBMITTED", "PICKED", "STARTED"]: continue # raise exception when failed elif status == "FAILED": if raises_on_error: # pragma: no cover raise RuntimeError( "FAILED! error: {}".format(describe_statement_response.error) ) else: break elif status == "ABORTED": if raises_on_error: # pragma: no cover raise RuntimeError("ABORTED!") else: break else: # pragma: no cover raise NotImplementedError except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] == "ResourceNotFoundException": continue else: # pragma: no cover raise e run_sql_result = RunSqlResult( execute_statement_response=execute_statement_response, describe_statement_response=describe_statement_response, ) return run_sql_result
[docs] class SqlCommandKeyEnum: """ Keys for internal data storage in SqlCommand. """ execute_statement_response = "execute_statement_response" describe_statement_response = "describe_statement_response" get_statement_result_iterproxy = "get_statement_result_iterproxy" consolidated_statement_result = "consolidated_statement_result"
[docs] @dataclasses.dataclass(frozen=True) class SqlCommand(BaseFrozenModel): """ Command pattern class that encapsulates everything needed to execute a Redshift SQL statement using the Data API and retrieve its results. Usage: - Use run() for automatic execution of the complete workflow - Or call methods in sequence for more control: execute_statement() → wait_until_finished() → get_statement_result() → get_consolidated_result() Note: The iterproxy returned by get_statement_result() can only be consumed once. If you need to iterate multiple times, call get_consolidated_result() instead. :param redshift_data_api_client: boto3.client("redshift-data") object :param sql: SQL statement you want to execute. :param client_token: Unique identifier for the request to ensure idempotency. :param cluster_identifier: cluster identifier. this is for Redshift provisioned cluster only. :param database: database name. :param db_user: database user name. :param parameters: Parameters for the SQL statement. :param result_format: Format of the result set (JSON or CSV). :param secret_arn: ARN of the secret containing database credentials. :param session_id: Database session identifier. :param session_keep_alive_seconds: Number of seconds to keep the session alive. :param statement_name: statement name. a human-friendly name you want to give to this SQL statement. :param with_event: Whether to send an event to Amazon EventBridge. :param workgroup_name: workgroup name. this is for Redshift serverless only. :param delay: how many seconds to wait between each long polling. :param timeout: how many seconds to wait before timeout. :param verbose: whether to print verbose output during polling. :param raises_on_error: whether to raise an exception when the SQL execution fails. :param _data: internal data storage. Reference: - execute_statement: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data/client/execute_statement.html - describe_statement: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data/client/describe_statement.html - get_statement_result: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data/client/get_statement_result.html """ redshift_data_api_client: "RedshiftDataAPIServiceClient" = dataclasses.field( default=REQ ) sql: str = dataclasses.field(default=REQ) client_token: str = dataclasses.field(default=OPT) cluster_identifier: str = dataclasses.field(default=OPT) database: str = dataclasses.field(default=OPT) db_user: str = dataclasses.field(default=OPT) parameters: dict[str, T.Any] = dataclasses.field(default=OPT) result_format: "ResultFormatStringType" = dataclasses.field(default="JSON") secret_arn: str = dataclasses.field(default=OPT) session_id: str = dataclasses.field(default=OPT) session_keep_alive_seconds: int = dataclasses.field(default=OPT) statement_name: str = dataclasses.field(default=OPT) with_event: bool = dataclasses.field(default=OPT) workgroup_name: str = dataclasses.field(default=OPT) delay: int = dataclasses.field(default=1) timeout: int = dataclasses.field(default=10) verbose: bool = dataclasses.field(default=False) raises_on_error: bool = dataclasses.field(default=True) max_rows: int = dataclasses.field(default=1000) _data: dict[str, T.Any] = dataclasses.field(default_factory=dict) def _get_attr( self, name: str, methods: list[T.Callable] ): try: return self._data[name] except KeyError: classname = self.__class__.__name__ parts = [ f"{classname}.{method.__name__}()" for method in methods ] msg = " or ".join(parts) error_msg = f"{classname}.{name} doesn't exists, call {msg} first!" raise AttributeError(error_msg) @property def execute_statement_response(self) -> "ExecuteStatementOutputTypeDef": """ Interface to get the response from the `execute_statement` API call. """ return self._get_attr( name=SqlCommandKeyEnum.execute_statement_response, methods=[ self.execute_statement, self.run, ], ) @property def describe_statement_response(self) -> "DescribeStatementResponse": """ Interface to get the statement execution status and metadata. """ return self._get_attr( name=SqlCommandKeyEnum.execute_statement_response, methods=[ self.wait_until_finished, self.run, ], ) @property def get_statement_result_iterproxy(self) -> "GetStatementResultResponseIterProxy": """ Interface to get the statement execution results as an iterator proxy (consumable only once). """ return self._get_attr( name=SqlCommandKeyEnum.get_statement_result_iterproxy, methods=[ self.get_statement_result, self.run, ], ) @property def result(self) -> "ConsolidatedStatementResult": """ Interface to get consolidated results after running the full workflow. """ return self._get_attr( name=SqlCommandKeyEnum.consolidated_statement_result, methods=[ self.get_consolidated_result, self.run, ], ) @property def statement_id(self) -> str: """ Get the statement ID of the SQL statement. This ID can be used to retrieve the results of the SQL execution using the `get_statement_result` API. """ return self.execute_statement_response["Id"]
[docs] def execute_statement(self) -> "ExecuteStatementOutputTypeDef": """ Run ``execute_statement`` API to run the SQL asynchronously. :return: Response from the `execute_statement` API call. """ # process arguments kwargs = dict( Sql=self.sql, ClientToken=self.client_token, ClusterIdentifier=self.cluster_identifier, Database=self.database, DbUser=self.db_user, Parameters=self.parameters, ResultFormat=self.result_format, SecretArn=self.secret_arn, SessionId=self.session_id, SessionKeepAliveSeconds=self.session_keep_alive_seconds, StatementName=self.statement_name, WithEvent=self.with_event, WorkgroupName=self.workgroup_name, ) execute_statement_response = self.redshift_data_api_client.execute_statement( **remove_optional(**kwargs) ) self._data[SqlCommandKeyEnum.execute_statement_response] = ( execute_statement_response ) return execute_statement_response
[docs] def wait_until_finished(self) -> "DescribeStatementResponse": """ Poll the statement status until it reaches a final state (FINISHED, FAILED, or ABORTED). """ describe_statement_response = None for _ in Waiter( delays=self.delay, timeout=self.timeout, verbose=self.verbose, ): try: response = self.redshift_data_api_client.describe_statement( Id=self.statement_id ) describe_statement_response = DescribeStatementResponse( raw_data=response, ) # 'SUBMITTED'|'PICKED'|'STARTED'|'FINISHED'|'ABORTED'|'FAILED'|'ALL' status = describe_statement_response.status if status == "FINISHED": break # still pending elif status in ["SUBMITTED", "PICKED", "STARTED"]: continue # raise exception when failed elif status == "FAILED": if self.raises_on_error: # pragma: no cover raise RuntimeError( "FAILED! error: {}".format( describe_statement_response.error ) ) else: break elif status == "ABORTED": if self.raises_on_error: # pragma: no cover raise RuntimeError("ABORTED!") else: break else: # pragma: no cover raise NotImplementedError except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] == "ResourceNotFoundException": continue else: # pragma: no cover raise e self._data[SqlCommandKeyEnum.describe_statement_response] = ( describe_statement_response ) return describe_statement_response
[docs] def get_statement_result(self) -> "GetStatementResultResponseIterProxy": """ Retrieve the statement execution results as an iterator proxy (consumable only once). """ iterproxy = get_statement_result( redshift_data_api_client=self.redshift_data_api_client, id=self.statement_id, max_items=self.max_rows, ) self._data[SqlCommandKeyEnum.get_statement_result_iterproxy] = iterproxy return iterproxy
[docs] def get_consolidated_result_v1(self) -> "ConsolidatedStatementResult": """Consolidate JSON format results into a single result object.""" result = ConsolidatedStatementResult( response_list=self.get_statement_result_iterproxy.all(), ) self._data[SqlCommandKeyEnum.consolidated_statement_result] = result return result
[docs] def get_consolidated_result_v2( self, ) -> "ConsolidatedStatementResult": # pragma: no cover """Consolidate CSV format results into a single result object.""" raise NotImplementedError
[docs] def get_consolidated_result(self) -> "ConsolidatedStatementResult": """ Get consolidated results in the appropriate format based on result_format setting. """ if self.result_format == "JSON": return self.get_consolidated_result_v1() else: # pragma: no cover return self.get_consolidated_result_v2()
[docs] def run(self): """ Execute the complete SQL workflow: execute → wait → get results → consolidate. """ self.execute_statement() self.wait_until_finished() self.get_statement_result() self.get_consolidated_result() return self
[docs] def reset(self): """ Reset the internal state to allow re-execution of the SQL command. """ self._data.clear()