Amazon S3 Source
Introduction
The Amazon S3 Source is a Vanus Connector which is designed to retrieve S3 events from a specific bucket and transform them into CloudEvents based on CloudEvents Adapter specification.
This connector allows users to specify a SQS queue to receive S3 event notification messages. It will automatically create a SQS queue if you don't specify yours.
The original S3 events looks like:
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "us-west-2",
"eventTime": "1970-01-01T00:00:00.000Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AIDAJDPLRKLG7UEXAMPLE"
},
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"responseElements": {
"x-amz-request-id": "C3D13FE58DE4C810",
"x-amz-id-2": "FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "mybucket",
"ownerIdentity": {
"principalId": "A3NL1KOZZKExample"
},
"arn": "arn:aws:s3:::mybucket"
},
"object": {
"key": "HappyFace.jpg",
"size": 1024,
"eTag": "d41d8cd98f00b204e9800998ecf8427e",
"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko",
"sequencer": "0055AED6DCD90281E5"
}
}
}
]
}
which is converted to:
{
"id": "C3D13FE58DE4C810.FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD",
"source": "aws:s3.us-west-2.mybucket",
"specversion": "V1",
"type": "com.amazonaws.s3.ObjectCreated:Put",
"datacontenttype": "application/json",
"subject": "HappyFace.jpg",
"time": "1970-01-01T00:00:00.000Z",
"data": {
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "mybucket",
"ownerIdentity": {
"principalId": "A3NL1KOZZKExample"
},
"arn": "arn:aws:s3:::mybucket"
},
"object": {
"key": "HappyFace.jpg",
"size": 1024,
"eTag": "d41d8cd98f00b204e9800998ecf8427e",
"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko",
"sequencer": "0055AED6DCD90281E5"
}
}
}
}
Quick Start
This section will show you how Amazon S3 Source converts S3 events to a CloudEvent.
Prerequisites
- Have a container runtime (i.e., docker).
- Have an AWS S3 bucket.
- AWS IAM Access Key.
- AWS permissions for the IAM user:
- s3:PutBucketNotification
- sqs:ListQueues
- sqs:GetQueueUrl
- sqs:ReceiveMessage
- sqs:GetQueueAttributes
- sqs:CreateQueue
- sqs:SetQueueAttributes
- sqs:DeleteMessage
Create the config file
cat << EOF > config.yml
target: http://localhost:31081
aws:
access_key_id: AKIAIOSFODNN7EXAMPLE
secret_access_key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
s3_bucket_arn: "arn:aws:s3:::<buckeName>"
s3_events: ["s3:ObjectCreated:*","s3:ObjectRemoved:*"]
region: "us-west-2"
EOF
Name | Required | Default | Description |
---|---|---|---|
target | YES | the target URL to send CloudEvents | |
aws.access_key_id | YES | the AWS IAM Access Key | |
aws.secret_access_key | YES | the AWS IAM Secret Key | |
s3_bucket_arn | YES | your S3 bucket arn, example: "arn:aws:s3:::mybucket" | |
s3_events | YES | it is an array consisting of s3 events you're interested in. example: ["s3:ObjectCreated:","s3:ObjectRemoved:"] | |
region | NO | it describes where the SQS queue will be created at. This field is only required when you didn't specify your sqsArn. | |
sqs_arn | NO | it is the arn of your SQS queue. The Amazon S3 Source will create a queue located at region if this field is omitted. |
The Amazon S3 Source tries to find the config file at /vanus-connect/config/config.yml
by default. You can specify the position of config file by setting the environment variable CONNECTOR_CONFIG
for your connector.
Start with Docker
docker run -it --rm --network=host \
-v ${PWD}:/vanus-connect/config \
--name source-aws-s3 public.ecr.aws/vanus/connector/source-aws-s3
Test
Open a terminal and use the following command to run a Display sink, which receives and prints CloudEvents.
docker run -it --rm \
-p 31081:8080 \
--name sink-display public.ecr.aws/vanus/connector/sink-display
Make sure the target
value in your config file is http://localhost:31081
so that the Source can send CloudEvents to our Display Sink.
Open AWS S3 Console, select the bucket and upload a file.
Here is the sort of CloudEvent you should expect to receive in the Display Sink:
{
"id": "C3D13FE58DE4C810.FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD",
"source": "aws:s3.us-west-2.mybucket",
"specversion": "V1",
"type": "com.amazonaws.s3.ObjectCreated:Put",
"datacontenttype": "application/json",
"subject": "HappyFace.jpg",
"time": "1970-01-01T00:00:00.000Z",
"data": {
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "mybucket",
"ownerIdentity": {
"principalId": "A3NL1KOZZKExample"
},
"arn": "arn:aws:s3:::mybucket"
},
"object": {
"key": "HappyFace.jpg",
"size": 1024,
"eTag": "d41d8cd98f00b204e9800998ecf8427e",
"versionId": "096fKKXTRTtl3on89fVO.nfljtsv6qko",
"sequencer": "0055AED6DCD90281E5"
}
}
}
}
Clean
docker stop source-aws-s3 sink-display
Run in Kubernetes
kubectl apply -f source-aws-s3.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: source-aws-s3
namespace: vanus
data:
config.yml: |-
"target": "http://vanus-gateway.vanus:8080/gateway/quick_start"
aws:
access_key_id: AKIAIOSFODNN7EXAMPLE
secret_access_Key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
s3_bucket_arn: "arn:aws:s3:::mybucket"
s3_events: ["s3:ObjectCreated:*","s3:ObjectRemoved:*"]
region: "us-west-2"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: source-aws-s3
namespace: vanus
labels:
app: source-aws-s3
spec:
selector:
matchLabels:
app: source-aws-s3
replicas: 1
template:
metadata:
labels:
app: source-aws-s3
spec:
containers:
- name: source-aws-s3
image: public.ecr.aws/vanus/connector/source-aws-s3
imagePullPolicy: Always
volumeMounts:
- name: config
mountPath: /vanus-connect/config
volumes:
- name: config
configMap:
name: source-aws-s3
Integrate with Vanus
This section shows how a source connector can send CloudEvents to a running Vanus cluster.
Prerequisites
- Have a running K8s cluster
- Have a running Vanus cluster
- Vsctl Installed
- Export the VANUS_GATEWAY environment variable (the ip should be a host-accessible address of the vanus-gateway service)
export VANUS_GATEWAY=192.168.49.2:30001
- Create an eventbus
vsctl eventbus create --name quick-start
- Update the target config of the Amazon S3 Source
target: http://192.168.49.2:30001/gateway/quick-start
- Run the Amazon S3 Source
kubectl apply -f source-aws-s3.yaml