Skip to main content

MongoDB Sink

Introduction

The Sink MongoDB is a Vanus Connector which aims to handle incoming CloudEvents in a way that extracts the data part of the original event and insert/update/delete this data to mongodb.

For examples, If incoming event looks like:

{
"id": "53d1c340-551a-11ed-96c7-8b504d95037c",
"source": "quick-start",
"specversion": "1.0",
"type": "sink-mongodb",
"datacontenttype": "application/json",
"time": "2022-10-26T10:38:29.345Z",
"xvdatabasedb": "test",
"xvdatabasecoll": "demo",
"data": {
"inserts": [
{
"scenario": "quick-start"
}
]
}
}

which equals to

use test;
db.demo.insertMany([{"scenario":"quick-start"}])

Quickstart

create config file

use your mongodb's hosts, username and password.

cat << EOF > config.yml
connection_uri: "mongodb+srv://<hosts>/?retryWrites=true&w=majority"
credential:
username: "<username>"
password: "<password>"
EOF
NameRequiredDefaultDescription
portNo8080the port which the MongoDB Sink listens on
connection_uriYES-the URI to connect MongoDB, view Connection String URI Format for more details
credential.usernameNO-https://www.mongodb.com/docs/drivers/go/current/fundamentals/auth/
credential.passwordNO-https://www.mongodb.com/docs/drivers/go/current/fundamentals/auth/
credential.auth_sourceNO-https://www.mongodb.com/docs/drivers/go/current/fundamentals/auth/
credential.auth_mechanismNO-https://www.mongodb.com/docs/drivers/go/current/fundamentals/auth/
credential.auth_mechanism_propertiesNO-https://www.mongodb.com/docs/drivers/go/current/fundamentals/auth/

The MongoDB Sink tries to find the config file at /vanus-connect/config/config.yml by default. You can specify the position of config file by setting the environment variable CONNECTOR_CONFIG for your connector.

Start with Docker

docker run -it --rm \
-p 31080:8080 \
-v ${PWD}:/vanus-connect/config \
--name sink-mongodb public.ecr.aws/vanus/connector/sink-mongodb

Test

Open a terminal and use following command to send a CloudEvent to the Sink.

curl --location --request POST 'localhost:31080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"id": "53d1c340-551a-11ed-96c7-8b504d95037c",
"source": "quick-start",
"specversion": "1.0",
"type": "sink-mongodb",
"datacontenttype": "application/json",
"time": "2022-10-26T10:38:29.345Z",
"xvdatabasedb": "test",
"xvdatabasecoll": "demo",
"data": {
"inserts": [
{
"scenario": "quick-start"
}
]
}
}'

find in mongodb

shard-0 [primary] test> db.demo.find()
[
{
_id: ObjectId("63a56b176dcdb253ae4924f0"),
scenario: 'quick-start'
}
]
shard-0 [primary] test>

clean resource

docker stop sink-mongodb  

Sink details

Extension Attributes

The MongoDB Sink has defined a few CloudEvents Extension Attribute to determine how to process event.

AttributeRequiredExamplesDescription
xvdatabasedbYEStestwhich database this event write to
xvdatabasecollYESdemowhich collection this event write to

Data

ItemRequiredTypeDefaultDescription
insertsNO[]Objectnullinsert data
updatesNO[]Objectnullhttps://www.mongodb.com/docs/manual/tutorial/update-documents
updates[].filterNOObjectnull
updates[].updateNOObjectnull
updates[].update_manyNObooleanfalseupdate many records when filter matches more than one
deletesNO[]Objectnulldelete data
deletes[].filterNOObjectnulldelete data
deletes[].delete_manyNOObjectfalsedelete many records when filter matches more than one
{
"inserts": [
{
"_id": "63a56aed6dcdb253ae4924ee",
"key1": "value1"
},
{
"key2": "value2"
}
],
"updates": [
{
"filter": {
"_id": "63a56aed6dcdb253ae4924ee"
},
"update": {
"$set": {
"key1": "value2_updated"
}
},
"update_many": true
}
],
"deletes": [
{
"filter": {
"key2": "value2"
},
"delete_many": true
}
]
}

Examples

insert multiple documents to mongodb

curl --location --request POST 'localhost:31080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"id": "53d1c340-551a-11ed-96c7-8b504d95037c",
"source": "quick-start",
"specversion": "1.0",
"type": "sink-mongodb",
"datacontenttype": "application/json",
"time": "2022-10-26T10:38:29.345Z",
"xvdatabasedb": "test",
"xvdatabasecoll": "demo",
"data": {
"inserts": [
{
"scenario": "quick-start-1"
},
{
"scenario": "quick-start-2"
}
]
}
}'

update multiple documents in mongodb

curl --location --request POST 'localhost:31080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"id": "53d1c340-551a-11ed-96c7-8b504d95037c",
"source": "quick-start",
"specversion": "1.0",
"type": "sink-mongodb",
"datacontenttype": "application/json",
"time": "2022-10-26T10:38:29.345Z",
"xvdatabasedb": "test",
"xvdatabasecoll": "demo",
"data": {
"updates": [
{
"filter":{
"scenario": "quick-start-1"
},
"update": {
"$set": {
"scenario": "quick-start-1-updated"
}
},
"update_many": false
}
]
}
}'

delete document

curl --location --request POST 'localhost:31080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"id": "53d1c340-551a-11ed-96c7-8b504d95037c",
"source": "quick-start",
"specversion": "1.0",
"type": "sink-mongodb",
"datacontenttype": "application/json",
"time": "2022-10-26T10:38:29.345Z",
"xvdatabasedb": "test",
"xvdatabasecoll": "demo",
"data": {
"deletes": [
{
"filter":{
"scenario": "quick-start-1-updated"
},
"delete_many": false
}
]
}
}'

Run in Kubernetes

kubectl apply -f sink-mongodb.yaml
apiVersion: v1
kind: Service
metadata:
name: sink-mongodb
namespace: vanus
spec:
selector:
app: sink-mongodb
type: ClusterIP
ports:
- port: 8080
name: sink-mongodb
---
apiVersion: v1
kind: ConfigMap
metadata:
name: sink-mongodb
namespace: vanus
data:
config.yml: |-
connection_uri: "mongodb+srv://<hosts>/?retryWrites=true&w=majority"
credential:
username: "<username>"
password: "<password>"

---
apiVersion: apps/v1
kind: Deployment
metadata:
name: sink-mongodb
namespace: vanus
labels:
app: sink-mongodb
spec:
selector:
matchLabels:
app: sink-mongodb
replicas: 1
template:
metadata:
labels:
app: sink-mongodb
spec:
containers:
- name: sink-mongodb
image: public.ecr.aws/vanus/connector/sink-mongodb
imagePullPolicy: Always
ports:
- name: http
containerPort: 8080
volumeMounts:
- name: config
mountPath: /vanus-connect/config
volumes:
- name: config
configMap:
name: sink-mongodb

Integrate with Vanus

This section shows how a sink connector can receive CloudEvents from a running Vanus cluster.

  1. Run the sink-mongodb.yaml
kubectl apply -f sink-mongodb.yaml
  1. Create an eventbus
vsctl eventbus create --name quick-start
  1. Create a subscription (the sink should be specified as the sink service address or the host name with its port)
vsctl subscription create \
--name quick-start \
--eventbus quick-start \
--sink 'http://sink-mongdob:8080'