Skip to main content

Elasticsearch Sink

Introduction

The Elasticsearch Sink is a Vanus Connector which aims to handle incoming CloudEvents in a way that extracts the data part of the original event and deliver these extracted data to Elasticsearch cluster.

For example, an incoming CloudEvent looks like:

{
"specversion": "1.0",
"id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
"source": "vanus.source.test",
"type": "vanus.type.test",
"datacontenttype": "application/json",
"time": "2022-06-14T07:05:55.777689Z",
"data": {
"id": 123,
"date": "2022-06-13",
"service": "test data"
}
}

The Elasticsearch Sink will extract the data field and write it to the Elasticsearch cluster index as a document:

{
"_index": "vanus_test",
"_type": "_doc",
"_id": "CqFnBIEBzJc0Oa5TERDD",
"_version": 1,
"_source": {
"id": 123,
"date": "2022-06-13",
"service": "test data"
}
}

Quickstart

Prerequisites

  • Have a container runtime (i.e., docker).
  • Have an Elasticsearch cluster.

Create the config file

cat << EOF > config.yml
port: 8080
insert_mode: "upsert"
es:
address: "http://localhost:9200"
index_name: "vanus_test"
username: "elastic"
password: "elastic"
EOF
namerequirementdefaultdescription
portNO8080the port which Elasticsearch Sink listens on
addressYESelasticsearch cluster address, multi split by ","
index_nameYESelasticsearch index name
usernameYESelasticsearch cluster username
passwordYESelasticsearch cluster password
timeoutNO10000elasticsearch index document timeout, unit millisecond
insert_modeNOinsertelasticsearch index document type: insert or upsert
buffer_bytesNO5 1024 1024elasticsearch each bulk api request max body size

The Elasticsearch Sink tries to find the config file at /vanus-connect/config/config.yml by default. You can specify the position of config file by setting the environment variable CONNECTOR_CONFIG for your connector.

Start with Docker

docker run -it --rm --network=host\
-v ${PWD}:/vanus-connect/config \
--name sink-elasticsearch public.ecr.aws/vanus/connector/sink-elasticsearch

Test

Open a terminal and use the following command to send a CloudEvent to the Sink.

curl --location --request POST 'localhost:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"specversion": "1.0",
"id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
"source": "vanus.source.test",
"type": "vanus.type.test",
"datacontenttype": "application/json",
"time": "2022-06-14T07:05:55.777689Z",
"data": {
"id": 123,
"date": "2022-06-13",
"service": "test data"
}
}'

Use the following command to get an es document.

curl http://localhost:9200/vanus_test/_search?pretty
{
"_index": "vanus_test",
"_type": "_doc",
"_id": "123",
"_version": 1,
"_source": {
"id": 123,
"date": "2022-06-13",
"service": "test data"
}
}

Clean resource

docker stop sink-elasticsearch

Source details

Extension Attributes

The Elasticsearch Sink have additional reactions if the incoming CloudEvent contains following Extension Attributes .

AttributeRequiredExamplesDescription
xvindexnameNO"myindex"the event want write to index name, if empty it will use config index_name
xvopNO"c"the event to document action:c, u, d; c convert to es index, u convert to es update with upsert, d convert to es delete, if config insert_mode is upsert the action c will be changed to u
xvidNO"123"the document id , it needs if action is delete, update

Examples

Write to es with index

curl --location --request POST 'localhost:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"specversion": "1.0",
"id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
"source": "vanus.source.test",
"type": "vanus.type.test",
"datacontenttype": "application/json",
"time": "2022-06-14T07:05:55.777689Z",
"xvindexname": "myindex",
"xvop": "c",
"data": {
"id": 123,
"date": "2022-06-13",
"service": "test data"
}
}'

Write to es with document id

curl --location --request POST 'localhost:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"specversion": "1.0",
"id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
"source": "vanus.source.test",
"type": "vanus.type.test",
"datacontenttype": "application/json",
"time": "2022-06-14T07:05:55.777689Z",
"xvindexname": "myindex",
"xvop": "c",
"xvid": "123",
"data": {
"id": 123,
"date": "2022-06-13",
"service": "test data"
}
}'

Run in Kubernetes

kubectl apply -f sink-es.yaml
apiVersion: v1
kind: Service
metadata:
name: sink-es
namespace: vanus
spec:
selector:
app: sink-es
type: ClusterIP
ports:
- port: 8080
name: sink-es
---
apiVersion: v1
kind: ConfigMap
metadata:
name: sink-es
namespace: vanus
data:
config.yml: |-
port: 8080
es:
address: "http://localhost:9200"
index_name: "vanus_test"
username: "elastic"
password: "elastic"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: sink-es
namespace: vanus
labels:
app: sink-es
spec:
selector:
matchLabels:
app: sink-es
replicas: 1
template:
metadata:
labels:
app: sink-es
spec:
containers:
- name: sink-es
image: public.ecr.aws/vanus/connector/sink-elasticsearch
imagePullPolicy: Always
ports:
- name: http
containerPort: 8080
volumeMounts:
- name: config
mountPath: /vanus-connect/config
volumes:
- name: config
configMap:
name: sink-es

Integrate with Vanus

This section shows how a sink connector can receive CloudEvents from a running Vanus cluster.

  1. Run the sink-es.yaml
kubectl apply -f sink-es.yaml
  1. Create an eventbus
vsctl eventbus create --name quick-start
  1. Create a subscription (the sink should be specified as the sink service address or the host name with its port)
vsctl subscription create \
--name quick-start \
--eventbus quick-start \
--sink 'http://sink-es:8080'