1. Set up AWS Connections in Composer
You'll need to configure AWS credentials in Google Cloud Composer so that Airflow can authenticate and access both S3 and Redshift.
A. Create IAM User in AWS
Create an IAM user with programmatic access and sufficient permissions for S3 and Redshift.
Attach the following policies (or create custom policies):
AmazonS3FullAccess for S3 access.
AmazonRedshiftDataFullAccess for Redshift access (for querying).
You can also create custom policies depending on the level of access needed.
B. Set up AWS Connection in Composer
In your Google Cloud Composer environment:
Go to the Airflow UI.
Navigate to Admin > Connections.
Create a new connection with the following settings:
Connection Type: Amazon Web Services
Connection ID: aws_default (or any name you prefer)
Access Key: AWS access key ID (from IAM user).
Secret Key: AWS secret access key (from IAM user).
Region: Choose the AWS region of your resources (e.g., us-east-1).
Extra: You can leave this empty, but if you're using temporary credentials or assuming a role, you’ll need to add appropriate configurations here (e.g., {"role_arn": "arn:aws:iam::account_id:role/role_name"}).
2. Interface with AWS S3 in Composer
Google Cloud Composer uses Airflow operators to interact with AWS S3. The most common operator is S3FileTransferOperator or S3Hook to upload and download files to/from S3.
Example: Upload a file to S3
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
upload_to_s3 = LocalFilesystemToS3Operator(
task_id='upload_file_to_s3',
filename='/path/to/local/file.txt', # local file to upload
bucket_name='your-s3-bucket-name',
key='path/in/bucket/file.txt', # file path in the S3 bucket
aws_conn_id='aws_default', # connection ID (defined in Airflow UI)
verify=False, # Set to False if SSL verification is not needed
)
Example: Download a file from S3
from airflow.providers.amazon.aws.transfers.s3_to_local import S3ToLocalFilesystemOperator
download_from_s3 = S3ToLocalFilesystemOperator(
task_id='download_file_from_s3',
bucket_name='your-s3-bucket-name',
key='path/in/bucket/file.txt', # file path in the S3 bucket
local_filepath='/path/to/local/file.txt',
aws_conn_id='aws_default',
)
3. Interface with Amazon Redshift in Composer
To query Amazon Redshift using Airflow, you'll use the RedshiftSQLOperator or PostgresOperator (since Redshift is based on PostgreSQL).
A. Create a Redshift Connection in Composer
Go to Airflow UI.
Navigate to Admin > Connections.
Create a new connection with the following settings:
Connection Type: Postgres
Connection ID: redshift_default (or any name you prefer)
Host: Redshift cluster endpoint (e.g., redshift-cluster-1.abc123xyz.us-west-2.redshift.amazonaws.com).
Schema: The database name you want to access.
Login: Redshift username.
Password: Redshift password.
Port: Default is 5439 (unless your Redshift cluster is configured differently).
B. Example: Query Redshift using the PostgresOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
redshift_query = PostgresOperator(
task_id='redshift_query',
postgres_conn_id='redshift_default', # connection ID to Redshift
sql="""
SELECT * FROM your_table LIMIT 10;
""",
)
C. Example: Loading Data from S3 into Redshift
You can also use AWS Redshift’s COPY command to load data from S3 into a Redshift table. You can execute this through an SQL statement.
from airflow.providers.postgres.operators.postgres import PostgresOperator
copy_s3_to_redshift = PostgresOperator(
task_id='copy_data_from_s3_to_redshift',
postgres_conn_id='redshift_default',
sql="""
COPY your_table
FROM 's3://your-bucket-name/path/in/bucket/file.csv'
IAM_ROLE 'arn:aws:iam::your-account-id:role/your-redshift-role'
CSV DELIMITER ',' IGNOREHEADER 1;
""",
)
Make sure that the IAM role you associate with the Redshift cluster has the necessary permissions to read from the S3 bucket.
4. Managing Workflow with Airflow DAGs
Once you have the operators set up, you can create an Airflow DAG to orchestrate the tasks. Here’s an example:
from airflow import DAG
from datetime import datetime
dag = DAG(
's3_redshift_integration',
description='S3 to Redshift and vice versa',
schedule_interval='@daily',
start_date=datetime(2023, 11, 1),
catchup=False,
)
upload_task = LocalFilesystemToS3Operator(
task_id='upload_file_to_s3',
filename='/path/to/local/file.csv',
bucket_name='your-bucket-name',
key='path/in/bucket/file.csv',
aws_conn_id='aws_default',
dag=dag
)
redshift_query_task = PostgresOperator(
task_id='redshift_query',
postgres_conn_id='redshift_default',
sql='SELECT * FROM your_table LIMIT 10;',
dag=dag
)
upload_task >> redshift_query_task # Set task dependencies
5. Monitoring and Troubleshooting
After setting up the DAGs and tasks, make sure to:
Monitor logs in the Airflow UI for each task.
Use XCom to pass data between tasks if needed.
Make sure your IAM roles and policies are correctly set up to avoid access issues.
Set proper error handling and retries for any failed tasks.
Summary
In summary:
Set up AWS credentials in Google Cloud Composer (Airflow).
Use the appropriate Airflow operators for S3 and Redshift (S3FileTransferOperator, PostgresOperator).
Manage your workflow with Airflow DAGs, combining the tasks of uploading/downloading from S3 and querying/loading data from/to Redshift.
Ensure all IAM roles and permissions are set correctly for cross-service access.
Learn GCP Training in Hyderabad
Read More
Using Cloud Composer to Schedule ML Pipeline Retraining
Automating Cloud Function Deployments with Composer
Managing Secrets in Cloud Composer Workflows
Triggering Cloud Run Jobs from Composer DAGs
Visit Our Quality Thought Training Institute in Hyderabad
Subscribe by Email
Follow Updates Articles from This Blog via Email
No Comments