MySQL Sink (JDBC)
Introduction
The MySQL Sink is a Vanus Connector that aims to handle incoming CloudEvents in a way that extracts the data part of the original event and delivers these extracted data to a MySQL database using JDBC.
For example, if the incoming CloudEvent looks like this:
{
"id": "88767821-92c2-477d-9a6f-bfdfbed19c6a",
"source": "quickstart",
"specversion": "1.0",
"type": "quickstart",
"time": "2022-07-08T03:17:03.139Z",
"datacontenttype": "application/json",
"data": {
"id": 18,
"name": "xdl",
"email": "Development Manager",
"date": "2022-07-06"
}
}
The MySQL Sink will extract the data fields and write to the database table in the following way:
+----+---------+---------------------+------------+
| id | name | description | date |
+----+---------+---------------------+------------+
| 18 | xdl | Development Manager | 2022-07-06 |
+----+---------+---------------------+------------+
Quick Start
This quick start will guide you through the process of running an MySQL Sink Connector.
Prerequisites
- Have a container runtime (i.e., docker).
- Have a running MySQL server.
- Have a database and table created.
Prepare for db (Optional)
Connect MySQL and Create database and table
create database vanus_test;
CREATE TABLE IF NOT EXISTS vanus_test.user
(
`id` int NOT NULL,
`name` varchar(100) NOT NULL,
`description` varchar(100) NOT NULL,
`date` date NOT NULL,
PRIMARY KEY (`id`)
);
Create the config file
cat << EOF > config.yml
db:
host: "localhost"
port: 3306
username: "vanus_test"
password: "123456"
database: "vanus_test"
table_name: "user"
insert_mode: UPSERT
EOF
Name | Required | Default | Description |
---|---|---|---|
port | NO | 8080 | the port which MySQL Sink listens on |
db.host | YES | IP address or host name of MySQL | |
db.port | YES | integer port number of MySQL | |
db.username | YES | username of MySQL | |
db.password | YES | password of MySQL | |
db.database | YES | database name of MySQL | |
db.table_name | YES | table name of MySQL | |
insert_mode | NO | INSERT | MySQL insert data type: INSERT OR UPSERT |
commit_interval | NO | 1000 | MySQL Sink batch data commit interval, unit is millisecond |
commit_size | NO | 2000 | MySQL Sink batch data commit event size |
The MYSQL Sink tries to find the config file at /vanus-connect/config/config.yml
by default. You can specify the
position of config file by setting the environment variable CONNECTOR_CONFIG
for your connector.
Start with Docker
docker run -it --rm --network=host\
-v ${PWD}:/vanus-connect/config \
--name sink-mysql public.ecr.aws/vanus/connector/sink-mysql
Test
Open a terminal and use the following command to send a CloudEvent to the Sink. The data field must be according to your database structure.
curl --location --request POST 'localhost:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"id" : "88767821-92c2-477d-9a6f-bfdfbed19c6a",
"source" : "quickstart",
"specversion" : "1.0",
"type" : "quickstart",
"time" : "2022-07-08T03:17:03.139Z",
"datacontenttype" : "application/json",
"data" : {
"id":18,
"name":"xdl",
"description":"Development Manager",
"date": "2022-07-06"
}
}'
Connect to MySQL and use the following command to make sure MySQL has the data
select * from vanus_test.user;
Clean resource
docker stop sink-mysql
Run in Kubernetes
kubectl apply -f sink-mysql.yaml
apiVersion: v1
kind: Service
metadata:
name: sink-mysql
namespace: vanus
spec:
selector:
app: sink-mysql
type: ClusterIP
ports:
- port: 8080
name: sink-mysql
---
apiVersion: v1
kind: ConfigMap
metadata:
name: sink-mysql
namespace: vanus
data:
config.yml: |-
port: 8080
db:
host: "localhost"
port: 3306
username: "vanus_test"
password: "123456"
database: "vanus_test"
table_name: "user"
insert_mode: UPSERT
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: sink-mysql
namespace: vanus
labels:
app: sink-mysql
spec:
selector:
matchLabels:
app: sink-mysql
replicas: 1
template:
metadata:
labels:
app: sink-mysql
spec:
containers:
- name: sink-mysql
image: public.ecr.aws/vanus/connector/sink-mysql
imagePullPolicy: Always
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "128Mi"
cpu: "100m"
ports:
- name: http
containerPort: 8080
volumeMounts:
- name: config
mountPath: /vanus-connect/config
volumes:
- name: config
configMap:
name: sink-mysql
Integrate with Vanus
This section shows how a sink connector can receive CloudEvents from a running Vanus cluster.
- Run the sink-mysql.yaml
kubectl apply -f sink-mysql.yaml
- Create an eventbus
vsctl eventbus create --name quick-start
- Create a subscription (the sink should be specified as the sink service address or the host name with its port)
vsctl subscription create \
--name quick-start \
--eventbus quick-start \
--sink 'http://sink-mysql:8080'