Monday, November 3, 2025

thumbnail

Interfacing Composer with AWS S3 and Redshift

 1. Set up AWS Connections in Composer


You'll need to configure AWS credentials in Google Cloud Composer so that Airflow can authenticate and access both S3 and Redshift.


A. Create IAM User in AWS


Create an IAM user with programmatic access and sufficient permissions for S3 and Redshift.


Attach the following policies (or create custom policies):


AmazonS3FullAccess for S3 access.


AmazonRedshiftDataFullAccess for Redshift access (for querying).


You can also create custom policies depending on the level of access needed.


B. Set up AWS Connection in Composer


In your Google Cloud Composer environment:


Go to the Airflow UI.


Navigate to Admin > Connections.


Create a new connection with the following settings:


Connection Type: Amazon Web Services


Connection ID: aws_default (or any name you prefer)


Access Key: AWS access key ID (from IAM user).


Secret Key: AWS secret access key (from IAM user).


Region: Choose the AWS region of your resources (e.g., us-east-1).


Extra: You can leave this empty, but if you're using temporary credentials or assuming a role, you’ll need to add appropriate configurations here (e.g., {"role_arn": "arn:aws:iam::account_id:role/role_name"}).


2. Interface with AWS S3 in Composer


Google Cloud Composer uses Airflow operators to interact with AWS S3. The most common operator is S3FileTransferOperator or S3Hook to upload and download files to/from S3.


Example: Upload a file to S3

from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator


upload_to_s3 = LocalFilesystemToS3Operator(

    task_id='upload_file_to_s3',

    filename='/path/to/local/file.txt',  # local file to upload

    bucket_name='your-s3-bucket-name',

    key='path/in/bucket/file.txt',  # file path in the S3 bucket

    aws_conn_id='aws_default',  # connection ID (defined in Airflow UI)

    verify=False,  # Set to False if SSL verification is not needed

)


Example: Download a file from S3

from airflow.providers.amazon.aws.transfers.s3_to_local import S3ToLocalFilesystemOperator


download_from_s3 = S3ToLocalFilesystemOperator(

    task_id='download_file_from_s3',

    bucket_name='your-s3-bucket-name',

    key='path/in/bucket/file.txt',  # file path in the S3 bucket

    local_filepath='/path/to/local/file.txt',

    aws_conn_id='aws_default',

)


3. Interface with Amazon Redshift in Composer


To query Amazon Redshift using Airflow, you'll use the RedshiftSQLOperator or PostgresOperator (since Redshift is based on PostgreSQL).


A. Create a Redshift Connection in Composer


Go to Airflow UI.


Navigate to Admin > Connections.


Create a new connection with the following settings:


Connection Type: Postgres


Connection ID: redshift_default (or any name you prefer)


Host: Redshift cluster endpoint (e.g., redshift-cluster-1.abc123xyz.us-west-2.redshift.amazonaws.com).


Schema: The database name you want to access.


Login: Redshift username.


Password: Redshift password.


Port: Default is 5439 (unless your Redshift cluster is configured differently).


B. Example: Query Redshift using the PostgresOperator

from airflow.providers.postgres.operators.postgres import PostgresOperator


redshift_query = PostgresOperator(

    task_id='redshift_query',

    postgres_conn_id='redshift_default',  # connection ID to Redshift

    sql="""

    SELECT * FROM your_table LIMIT 10;

    """,

)


C. Example: Loading Data from S3 into Redshift


You can also use AWS Redshift’s COPY command to load data from S3 into a Redshift table. You can execute this through an SQL statement.


from airflow.providers.postgres.operators.postgres import PostgresOperator


copy_s3_to_redshift = PostgresOperator(

    task_id='copy_data_from_s3_to_redshift',

    postgres_conn_id='redshift_default',

    sql="""

    COPY your_table

    FROM 's3://your-bucket-name/path/in/bucket/file.csv'

    IAM_ROLE 'arn:aws:iam::your-account-id:role/your-redshift-role'

    CSV DELIMITER ',' IGNOREHEADER 1;

    """,

)



Make sure that the IAM role you associate with the Redshift cluster has the necessary permissions to read from the S3 bucket.


4. Managing Workflow with Airflow DAGs


Once you have the operators set up, you can create an Airflow DAG to orchestrate the tasks. Here’s an example:


from airflow import DAG

from datetime import datetime


dag = DAG(

    's3_redshift_integration',

    description='S3 to Redshift and vice versa',

    schedule_interval='@daily',

    start_date=datetime(2023, 11, 1),

    catchup=False,

)


upload_task = LocalFilesystemToS3Operator(

    task_id='upload_file_to_s3',

    filename='/path/to/local/file.csv',

    bucket_name='your-bucket-name',

    key='path/in/bucket/file.csv',

    aws_conn_id='aws_default',

    dag=dag

)


redshift_query_task = PostgresOperator(

    task_id='redshift_query',

    postgres_conn_id='redshift_default',

    sql='SELECT * FROM your_table LIMIT 10;',

    dag=dag

)


upload_task >> redshift_query_task  # Set task dependencies


5. Monitoring and Troubleshooting


After setting up the DAGs and tasks, make sure to:


Monitor logs in the Airflow UI for each task.


Use XCom to pass data between tasks if needed.


Make sure your IAM roles and policies are correctly set up to avoid access issues.


Set proper error handling and retries for any failed tasks.


Summary


In summary:


Set up AWS credentials in Google Cloud Composer (Airflow).


Use the appropriate Airflow operators for S3 and Redshift (S3FileTransferOperator, PostgresOperator).


Manage your workflow with Airflow DAGs, combining the tasks of uploading/downloading from S3 and querying/loading data from/to Redshift.


Ensure all IAM roles and permissions are set correctly for cross-service access.

Learn GCP Training in Hyderabad

Read More

Using Cloud Composer to Schedule ML Pipeline Retraining

Automating Cloud Function Deployments with Composer

Managing Secrets in Cloud Composer Workflows

Triggering Cloud Run Jobs from Composer DAGs

Visit Our Quality Thought Training Institute in Hyderabad

Get Directions 

Subscribe by Email

Follow Updates Articles from This Blog via Email

No Comments

About

Search This Blog

Powered by Blogger.

Blog Archive