Elasticsearch Sink
Introduction
The Elasticsearch Sink is a Vanus Connector which aims to handle incoming CloudEvents in a way that extracts
the data part of the original event and deliver these extracted data to Elasticsearch cluster.
For example, an incoming CloudEvent looks like:
{
  "specversion": "1.0",
  "id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
  "source": "vanus.source.test",
  "type": "vanus.type.test",
  "datacontenttype": "application/json",
  "time": "2022-06-14T07:05:55.777689Z",
  "data": {
    "id": 123,
    "date": "2022-06-13",
    "service": "test data"
  }
}
The Elasticsearch Sink will extract the data field and write it to the Elasticsearch cluster index as a
document:
{
  "_index": "vanus_test",
  "_type": "_doc",
  "_id": "CqFnBIEBzJc0Oa5TERDD",
  "_version": 1,
  "_source": {
    "id": 123,
    "date": "2022-06-13",
    "service": "test data"
  }
}
Quickstart
Prerequisites
- Have a container runtime (i.e., docker).
 - Have an Elasticsearch cluster.
 
Create the config file
cat << EOF > config.yml
port: 8080
insert_mode: "upsert"
es:
  address: "http://localhost:9200"
  index_name: "vanus_test"
  username: "elastic"
  password: "elastic"
EOF
| name | requirement | default | description | 
|---|---|---|---|
| port | NO | 8080 | the port which Elasticsearch Sink listens on | 
| address | YES | elasticsearch cluster address, multi split by "," | |
| index_name | YES | elasticsearch index name | |
| username | YES | elasticsearch cluster username | |
| password | YES | elasticsearch cluster password | |
| timeout | NO | 10000 | elasticsearch index document timeout, unit millisecond | 
| insert_mode | NO | insert | elasticsearch index document type: insert or upsert | 
| buffer_bytes | NO | 5 1024 1024 | elasticsearch each bulk api request max body size | 
The Elasticsearch Sink tries to find the config file at /vanus-connect/config/config.yml by default. You can specify
the position of config file by setting the environment variable CONNECTOR_CONFIG for your connector.
Start with Docker
docker run -it --rm --network=host\
  -v ${PWD}:/vanus-connect/config \
  --name sink-elasticsearch public.ecr.aws/vanus/connector/sink-elasticsearch
Test
Open a terminal and use the following command to send a CloudEvent to the Sink.
curl --location --request POST 'localhost:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
  "specversion": "1.0",
  "id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
  "source": "vanus.source.test",
  "type": "vanus.type.test",
  "datacontenttype": "application/json",
  "time": "2022-06-14T07:05:55.777689Z",
  "data": {
      "id": 123,
      "date": "2022-06-13",
      "service": "test data"
  }
}'
Use the following command to get an es document.
curl http://localhost:9200/vanus_test/_search?pretty
{
  "_index": "vanus_test",
  "_type": "_doc",
  "_id": "123",
  "_version": 1,
  "_source": {
    "id": 123,
    "date": "2022-06-13",
    "service": "test data"
  }
}
Clean resource
docker stop sink-elasticsearch
Source details
Extension Attributes
The Elasticsearch Sink have additional reactions if the incoming CloudEvent contains following Extension Attributes .
| Attribute | Required | Examples | Description | 
|---|---|---|---|
| xvindexname | NO | "myindex" | the event want write to index name, if empty it will use config index_name | 
| xvop | NO | "c" | the event to document action:c, u, d; c convert to es index, u convert to es update with upsert, d convert to es delete, if config insert_mode is upsert the action c will be changed to u | 
| xvid | NO | "123" | the document id , it needs if action is delete, update | 
Examples
Write to es with index
curl --location --request POST 'localhost:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
  "specversion": "1.0",
  "id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
  "source": "vanus.source.test",
  "type": "vanus.type.test",
  "datacontenttype": "application/json",
  "time": "2022-06-14T07:05:55.777689Z",
  "xvindexname": "myindex",
  "xvop": "c",
  "data": {
      "id": 123,
      "date": "2022-06-13",
      "service": "test data"
  }
}'
Write to es with document id
curl --location --request POST 'localhost:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
  "specversion": "1.0",
  "id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
  "source": "vanus.source.test",
  "type": "vanus.type.test",
  "datacontenttype": "application/json",
  "time": "2022-06-14T07:05:55.777689Z",
  "xvindexname": "myindex",
  "xvop": "c",
  "xvid": "123",
  "data": {
      "id": 123,
      "date": "2022-06-13",
      "service": "test data"
  }
}'
Run in Kubernetes
kubectl apply -f sink-es.yaml
apiVersion: v1
kind: Service
metadata:
  name: sink-es
  namespace: vanus
spec:
  selector:
    app: sink-es
  type: ClusterIP
  ports:
    - port: 8080
      name: sink-es
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: sink-es
  namespace: vanus
data:
  config.yml: |-
    port: 8080
    es:
      address: "http://localhost:9200"
      index_name: "vanus_test"
      username: "elastic"
      password: "elastic"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: sink-es
  namespace: vanus
  labels:
    app: sink-es
spec:
  selector:
    matchLabels:
      app: sink-es
  replicas: 1
  template:
    metadata:
      labels:
        app: sink-es
    spec:
      containers:
        - name: sink-es
          image: public.ecr.aws/vanus/connector/sink-elasticsearch
          imagePullPolicy: Always
          ports:
            - name: http
              containerPort: 8080
          volumeMounts:
            - name: config
              mountPath: /vanus-connect/config
      volumes:
        - name: config
          configMap:
            name: sink-es
Integrate with Vanus
This section shows how a sink connector can receive CloudEvents from a running Vanus cluster.
- Run the sink-es.yaml
 
kubectl apply -f sink-es.yaml
- Create an eventbus
 
vsctl eventbus create --name quick-start
- Create a subscription (the sink should be specified as the sink service address or the host name with its port)
 
vsctl subscription create \
  --name quick-start \
  --eventbus quick-start \
  --sink 'http://sink-es:8080'