Pythonic Redshift API¶
The simple_aws_redshift library provides a Pythonic interface for working with AWS Redshift resources. It simplifies the original boto3 API by providing intuitive data models, property-based access patterns, and comprehensive type hints.
Quick Start¶
Import the library and access all public APIs:
[1]:
import simple_aws_redshift.api as rs
[2]:
import boto3
# Create boto3 clients
redshift_client = boto3.client("redshift")
redshift_serverless_client = boto3.client("redshift-serverless")
redshift_data_client = boto3.client("redshift-data")
Working with Redshift Clusters¶
List All Clusters¶
[3]:
clusters = rs.redshift.list_redshift_clusters(redshift_client)
for cluster in clusters:
print(f"{cluster.cluster_identifier = }")
print(f"{cluster.endpoint_address = }")
print(f"{cluster.endpoint_port = }")
Get Specific Cluster¶
[4]:
cluster = rs.redshift.get_redshift_cluster(
redshift_client,
cluster_identifier="my-cluster",
)
print(f"{cluster.cluster_identifier = }")
print(f"{cluster.endpoint_address = }")
print(f"{cluster.endpoint_port = }")
print(f"{cluster.is_available = }")
print(f"{cluster.is_creating = }")
print(f"{cluster.is_deleting = }")
print(f"{cluster.is_modifying = }")
print(f"{cluster.is_paused = }")
print(f"{cluster.is_rebooting = }")
print(f"{cluster.is_resizing = }")
Working with Redshift Serverless¶
List All Namespaces¶
[ ]:
# List all Redshift Serverless namespaces
namespaces = rs.redshift_serverless.list_namespaces(redshift_serverless_client)
# Iterate through namespaces
for namespace in namespaces:
print(f"{namespace.namespace_name = }")
print(f"{namespace.status = }")
print(f"{namespace.is_available = }")
Get Specific Namespace¶
[ ]:
namespace = rs.redshift_serverless.get_namespace(
redshift_serverless_client,
namespace_name="simple-aws-redshift-dev",
)
print(f"{namespace.namespace_name = }")
print(f"{namespace.db_name = }")
print(f"{namespace.status = }")
print(f"{namespace.is_available = }")
Delete Namespace¶
[ ]:
deleted_namespace = rs.redshift_serverless.delete_namespace(
redshift_serverless_client,
namespace_name="invalid-namespace",
)
List All Workgroups¶
[ ]:
# List all Redshift Serverless workgroups
workgroups = rs.redshift_serverless.list_workgroups(redshift_serverless_client)
# Iterate through workgroups
for workgroup in workgroups:
print(f"{workgroup.workgroup_name = }")
print(f"{workgroup.status = }")
print(f"{workgroup.namespace_name = }")
print(f"{workgroup.is_available = }")
Get Specific Workgroup¶
[ ]:
workgroup = rs.redshift_serverless.get_workgroup(
redshift_serverless_client,
workgroup_name="simple-aws-redshift-dev",
)
print(f"{workgroup.workgroup_name = }")
print(f"{workgroup.status = }")
print(f"{workgroup.namespace_name = }")
print(f"{workgroup.is_available = }")
Delete Workgroup¶
[ ]:
deleted_workgroup = rs.redshift_serverless.delete_workgroup(
redshift_serverless_client,
workgroup_name="invalid-workgroup"
)
Working with Redshift Database Connection¶
Create Redshift Serverless Connection Parameters¶
[ ]:
connection_params = rs.RedshiftServerlessConnectionParams.new(
redshift_serverless_client=redshift_serverless_client,
namespace_name="simple-aws-redshift-dev",
workgroup_name="simple-aws-redshift-dev",
)
Use Redshift Connector¶
The Redshift connector is an AWS-maintained DB API 2.0 driver specifically designed for AWS Redshift databases. While it provides a reliable and official connection interface, it lacks advanced features like ORM capabilities, relationship mapping, and query builders. This means developers must write most database operations using raw SQL commands, which can be verbose and require more manual effort for complex database interactions.
[ ]:
redshift_connection = connection_params.get_connection(timeout=5)
cursor = redshift_connection.cursor()
sql = "SELECT 1;"
cursor.execute(sql)
rows = cursor.fetchall()
print(rows)
print("Redshift connection is working!")
redshift_connection.close()
Create Sqlalchemy Engine¶
SQLAlchemy is the de facto standard for operating relational databases in Python, providing powerful ORM capabilities and database abstraction layers. Since AWS Redshift is built on top of PostgreSQL, it maintains compatibility with most PostgreSQL APIs and can leverage existing PostgreSQL tooling. However, the official sqlalchemy-redshift dialect is no longer maintained since April 2023 after its core AWS developer left, and its older versions force the use of SQLAlchemy < 2.0.0, which conflicts with the modern SQLAlchemy 2.0.0 standard - therefore, we built our own thin dialect layer to make SQLAlchemy work seamlessly with Redshift using the latest standards.
Limitations:
Table Creation Limitation: Since we have not implemented a full Redshift dialect, it’s not possible to use SQLAlchemy to create tables. Redshift tables have special attributes such as Distribution Keys and Sort Keys, which SQLAlchemy does not understand. Therefore, you cannot use SQLAlchemy’s metadata-based create_all() functionality. You must write raw
CREATE TABLEstatements manually. However, once the tables are created, you can still use SQLAlchemy’s ORM and Table objects to write queries, which remains an elegant experience.Metadata Reflection Issue: Because SQLAlchemy assumes the backend is standard PostgreSQL, using the MetaData.reflect() function to introspect and reconstruct database DDL objects in memory doesn’t work. Redshift’s PostgreSQL-compatible schema differs from the official PostgreSQL standard, and the reflection functionality cannot accurately interpret those differences.
Special Syntax Support: For Redshift-specific syntax such as the COPY command, COPY FROM S3, and UNLOAD, you must write raw SQL — there is no ORM-level abstraction. However, you can still use SQLAlchemy’s transaction context manager to manage the transactional scope of these operations.
Data Type Limitation: Redshift-specific data types are not directly supported in the ORM. You need to either use generic data types or handle special types through raw SQL statements.
[ ]:
import sqlalchemy as sa
sqlalchemy_engine = connection_params.get_engine()
with sqlalchemy_engine.connect() as conn:
sql = "SELECT 1;"
rows = conn.execute(sa.text(sql)).fetchall()
print(rows)
print("Sqlalchemy engine is working!")
Working with Redshift Data API¶
Run SQL Queries¶
The original redshift data API execute_statement method is an async call. We provide an elegant way to run SQL on redshift and get results into Dataframe compatible format elegantly
[3]:
# Run SQL query and wait it to finish
sql_cmd = rs.redshift_data_api.SqlCommand(
redshift_data_api_client=redshift_data_client,
sql="SELECT 1 as value;",
workgroup_name="simple-aws-redshift-dev",
database="dev",
delay=1,
timeout=10,
verbose=True,
raises_on_error=True,
)
print("")
sql_cmd.run()
sql_cmd.result.vdf.show()
start waiter, polling every 1 seconds, timeout in 10 seconds.
on 1 th attempt, elapsed 1 seconds, remain 9 seconds ...(1, 1)
+---------+
| value |
|---------|
| 1 |
+---------+
[6]:
from rich import print as rprint
rprint(sql_cmd.result.vdf.columns)
rprint(sql_cmd.result.vdf.rows)
['value']
[(1,)]