Automated Data Pipeline Triggers via AWS Transfer Events
PythonTo automate data pipeline triggers via AWS Transfer events, you'll utilize AWS Transfer Family, a fully managed service that enables secure file exchanges with third parties, and connect it to AWS services for data processing.
AWS Transfer Family allows you to set up a server for the Secure File Transfer Protocol (SFTP), File Transfer Protocol over SSL (FTPS), or even plain File Transfer Protocol (FTP). It can be configured to send event notifications over Amazon SNS or Amazon CloudWatch events when files are uploaded, which can in turn trigger processing pipelines in services like AWS Lambda or AWS Step Functions.
Here's how you'd set up an automated data pipeline with AWS Transfer Family that triggers an AWS Lambda function upon file upload events. This program creates the following resources:
- AWS Transfer Family Server: For SFTP, FTPS, or FTP to securely exchange files.
- Amazon S3 Bucket: The storage location for the transferred files.
- AWS IAM Role: To grant the server permissions to access the S3 bucket.
- AWS S3 Access Policy: Grants the necessary permissions for file access and event notification.
- AWS Lambda Function: Processes the file after it is uploaded.
- Amazon CloudWatch Event Rule: Triggers the Lambda function upon file upload events.
- Amazon CloudWatch Event Target: Connects the event rule to the Lambda function.
Below is the Pulumi program written in Python, which you can use to create this setup:
import pulumi import pulumi_aws as aws # Create an S3 bucket to store uploaded files s3_bucket = aws.s3.Bucket("s3Bucket") # IAM role that allows the AWS Transfer Family server to access the S3 bucket transfer_role = aws.iam.Role("transferRole", assume_role_policy="""{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "transfer.amazonaws.com"}, "Action": "sts:AssumeRole" } ] }""" ) # Attach a policy to the role that grants access to the S3 bucket s3_access_policy = aws.iam.RolePolicy("s3AccessPolicy", role=transfer_role.id, policy=s3_bucket.arn.apply(lambda arn: f"""{{ "Version": "2012-10-17", "Statement": [ {{ "Effect": "Allow", "Action": ["s3:ListBucket", "s3:GetBucketLocation"], "Resource": "{arn}" }}, {{ "Effect": "Allow", "Action": ["s3:PutObject", "s3:GetObject", "s3:GetObjectVersion"], "Resource": "{arn}/*" }} ] }}""") ) # Create an AWS Transfer Family server transfer_server = aws.transfer.Server("transferServer", protocols=["SFTP"], # Can also be ["FTP", "FTPS"] depending on your requirements identity_provider_type="SERVICE_MANAGED", logging_role=transfer_role.arn ) # Set up the AWS Lambda function to process the uploaded file lambda_role = aws.iam.Role("lambdaRole", assume_role_policy="""{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole" } ] }""" ) # Attach the necessary policy to the Lambda role lambda_policy = aws.iam.RolePolicy("lambdaPolicy", role=lambda_role.id, policy="""{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "logs:*", "Resource": "arn:aws:logs:*:*:*" } ] }""" ) # Define the Lambda function lambda_function = aws.lambda_.Function("lambdaFunction", code=pulumi.AssetArchive({ ".": pulumi.FileArchive("./lambda") # The path to your Lambda function code. }), role=lambda_role.arn, handler="index.handler", # The entry point into your Lambda function code. runtime="python3.8" # This should match the runtime your code is written for. ) # Create a CloudWatch Event Rule that triggers on file upload to the S3 bucket s3_event_rule = aws.cloudwatch.EventRule("s3EventRule", event_pattern={ "source": ["aws.transfer"], "detail-type": ["Transfer State Change"], "detail": { "event": ["Upload"], "severity": ["SUCCESS"] } } ) # Connect the CloudWatch Event Rule to the Lambda function cloudwatch_event_target = aws.cloudwatch.EventTarget("cloudwatchEventTarget", rule=s3_event_rule.name, arn=lambda_function.arn ) # Allow the CloudWatch Event to invoke the Lambda function lambda_permission = aws.lambda_.Permission("lambdaPermission", action="lambda:InvokeFunction", function=lambda_function.name, principal="events.amazonaws.com", source_arn=s3_event_rule.arn ) # Export the S3 bucket name and Transfer server endpoint pulumi.export("s3_bucket_name", s3_bucket.bucket) pulumi.export("transfer_server_endpoint", transfer_server.endpoint)
In the above program:
- Replace
./lambda
with the path to your Lambda function code. - Ensure that the runtime specified for
lambda_function
matches the one used by your Lambda code. - The
event_pattern
specified ins3_event_rule
defines the criteria for triggering the Lambda function. It's set up to respond only to successful file uploads to the AWS Transfer Family Server.
After deploying this Pulumi program, any files uploaded to the Transfer Family Server will be stored in the S3 bucket and trigger your Lambda function for further processing. This setup forms the basis of an automated data pipeline that reacts to file-transfer events.