Skip to main content

Amazon S3 Sink

Introduction

The Amazon S3 (Simple Storage Service) Sink is a Vanus Connector that aims to cache incoming CloudEvents in local files and upload them to AWS S3. The files uploaded to AWS S3 will be partitioned by time. S3 Sink supports HOURLY and DAILY partitioning.

Features

  • Upload scheduled interval: S3 Sink supports scheduled periodic check for closing and uploading files to S3. When the time interval reaches the threshold, the file will be directly closed and uploaded, regardless of whether the file is full. The interval defaults to 60 seconds.
  • Flush size: When file reaches flush size, it will be uploaded automatically. The flush size defaults to 1000.
  • Partition: CloudEvents uploaded by S3 Sink are stored in S3 partitioned. S3 Sink create storage path in S3 according to current time. Time-based partitioning options are daily or hourly.

Limitations

  • valid schema: Data S3 Sink received must conform CloudEvents Schema Registry.
  • Limitation of rate for receiving CloudEvents: S3 Sink Connector will check the flush size and upload scheduled interval one per second. Therefore, the number of CloudEvents sent to S3 Sink per second should be less than flush size.

Quickstart

This quick start will guide you through the process of running an Amazon S3 Sink connector.

Prerequisites

  • Have a container runtime (i.e., docker).
  • An Amazon S3 bucket.
  • AWS IAM Access Key.
  • AWS permissions for the IAM user:
    • s3:PutObject

Create the config file

cat << EOF > config.yml
port: 8080
aws:
access_key_id: AKIAIOSFODNN7EXAMPLE
secret_access_key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
region: "us-west-2"
bucket: "mybucket"
scheduled_interval: 10
EOF
NameRequiredDefaultDescription
portNO8080the port which S3 Sink listens on
aws.access_key_idYESthe AWS IAM Access Key
aws.secret_access_keyYESthe AWS IAM Secret Key
regionYESthe S3 bucket region
bucketYESthe S3 bucket name
flush_sizeNO1000the number of CloudEvents cached to the local file before S3 Sink upload the file
scheduled_intervalNO60the maximum time interval between S3 Sink closing and uploading files which unit is second.
time_intervalNOHOURLYthe partitioning interval of files have been uploaded to the S3. S3 Sink supports HOURLY and DAILY time interval. For example, when timeInterval is HOURLY, files uploaded between 3 pm and 4 pm will be partitioned to one path, while files uploaded after 4 pm will be partitioned to another path

The Amazon S3 Sink tries to find the config file at /vanus-connect/config/config.yml by default. You can specify the position of config file by setting the environment variable CONNECTOR_CONFIG for your connector.

Start with Docker

docker run -it --rm \
-p 31080:8080 \
-v ${PWD}:/vanus-connect/config \
--name sink-aws-s3 public.ecr.aws/vanus/connector/sink-aws-s3

Test

Open a terminal and use following command to send a CloudEvent to the Sink.

curl --location --request POST 'localhost:31080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"id": "53d1c340-551a-11ed-96c7-8b504d95037c",
"source": "quickstart",
"specversion": "1.0",
"type": "quickstart",
"datacontenttype": "application/json",
"time": "2022-10-26T10:38:29.345Z",
"data": {
"myData": "simulation event data"
}
}'

Open AWS S3 Console, select the bucket and verify the file has uploaded.

Clean resource

docker stop sink-aws-s3

Run in Kubernetes

kubectl apply -f sink-aws-s3.yaml
apiVersion: v1
kind: Service
metadata:
name: sink-aws-s3
namespace: vanus
spec:
selector:
app: sink-aws-s3
type: ClusterIP
ports:
- port: 8080
name: sink-aws-s3
---
apiVersion: v1
kind: ConfigMap
metadata:
name: sink-aws-s3
namespace: vanus
data:
config.yml: |-
port: 8080
aws:
access_key_id: AKIAIOSFODNN7EXAMPLE
secret_access_Key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
region: "us-west-2"
bucket: "mybucket"
scheduled_interval: 10
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: sink-aws-s3
namespace: vanus
labels:
app: sink-aws-s3
spec:
selector:
matchLabels:
app: sink-aws-s3
replicas: 1
template:
metadata:
labels:
app: sink-aws-s3
spec:
containers:
- name: sink-aws-s3
image: public.ecr.aws/vanus/connector/sink-aws-s3
imagePullPolicy: Always
ports:
- containerPort: 8080
name: http
volumeMounts:
- name: config
mountPath: /vanus-connect/config
volumes:
- name: config
configMap:
name: sink-aws-s3

Integrate with Vanus

This section shows how a sink connector can receive CloudEvents from a running Vanus cluster.

  1. Run the sink-aws-s3.yaml
kubectl apply -f sink-aws-s3.yaml
  1. Create an eventbus
vsctl eventbus create --name quick-start
  1. Create a subscription (the sink should be specified as the sink service address or the host name with its port)
vsctl subscription create \
--name quick-start \
--eventbus quick-start \
--sink 'http://sink-aws-s3:8080'