Doris Sink
Introduction
The Doris Sink is a Vanus Connector which aims to handle incoming CloudEvents in a way that extracts the data
part of the original event and deliver these extracted data to Doris. The Doris Sink use Stream Load
way to import data.
For example, an incoming CloudEvent looks like:
{
  "specversion": "1.0",
  "id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
  "source": "vanus.source.test",
  "type": "vanus.type.test",
  "datacontenttype": "application/json",
  "time": "2022-11-20T07:05:55.777689Z",
  "data": {
    "id": 1,
    "username": "name",
    "birthday": "2022-11-20"
  }
}
The Doris Sink will extract the data field and write it to Doris table like:
+------+----------+------------+
| id   | username | birthday   |
+------+----------+------------+
|    1 | name     | 2022-11-20 |
+------+----------+------------+
Quickstart
Prerequisites
- Have a container runtime (i.e., docker).
 - Have a Doris cluster
 
Create the config file
cat << EOF > config.yml
port: 8080
secret:
  # doris info
  fenodes: "localhost:8030"
  db_name: "vanus_test"
  table_name: "vanus_test"
  username: "vanus_test"
  password: "123456"
EOF
| Name | Required | Default | Description | 
|---|---|---|---|
| port | NO | 8080 | the port which Doris Sink listens on | 
| fenodes | YES | doris fenodes, example: "17.0.0.1:8003" | |
| db_name | YES | doris database name | |
| table_name | YES | doris table name | |
| username | YES | doris username | |
| password | YES | doris password | |
| stream_load | NO | doris stream load properties, map struct | |
| load_interval | NO | 5 | doris stream load interval, unit second | 
| load_size | NO | 1010241024 | doris stream load max body size | 
| timeout | NO | 30 | doris stream load timeout, unit second | 
The Doris Sink tries to find the config file at /vanus-connect/config/config.yml by default. You can specify the position of config file by setting the environment variable CONNECTOR_CONFIG for your connector.
Start with Docker
docker run -it --rm --network=host\
  -v ${PWD}:/vanus-connect/config \
  --name sink-doris public.ecr.aws/vanus/connector/sink-doris
Test
Connect to Doris and use the following command to create a database and table.
create database vanus_test;
use vanus_test;
CREATE TABLE IF NOT EXISTS vanus_test.vanus_test
(
    `id` LARGEINT NOT NULL COMMENT "id",
    `username` VARCHAR(64) COMMENT "username",
    `birthday` DATE NOT NULL COMMENT "birthday"
)
AGGREGATE KEY(`id`, `username`, `birthday`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
    "replication_allocation" = "tag.location.default: 1"
);
Open a terminal and use following command to send a CloudEvent to the Doris Sink.
curl --location --request POST 'localhost:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
  "specversion": "1.0",
  "id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
  "source": "vanus.source.test",
  "type": "vanus.type.test",
  "datacontenttype": "application/json",
  "time": "2022-11-20T07:05:55.777689Z",
  "data": {
    "id": 1,
    "username": "name",
    "birthday": "2022-11-20"
  }
}'
The result in Doris will be as follow:
+------+----------+------------+
| id   | username | birthday   |
+------+----------+------------+
|    1 | name     | 2022-11-20 |
+------+----------+------------+
Clean resource
docker stop sink-doris
Run in Kubernetes
kubectl apply -f sink-doris.yaml
apiVersion: v1
kind: Service
metadata:
  name: sink-doris
  namespace: vanus
spec:
  selector:
    app: sink-doris
  type: ClusterIP
  ports:
    - port: 8080
      name: sink-doris
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: sink-doris
  namespace: vanus
data:
  config.yml: |-
    port: 8080
    secret:
      # doris info
      fenodes: "localhost:8030"
      db_name: "vanus_test"
      table_name: "user"
      username: "vanus_test"
      password: "123456"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: sink-doris
  namespace: vanus
  labels:
    app: sink-doris
spec:
  selector:
    matchLabels:
      app: sink-doris
  replicas: 1
  template:
    metadata:
      labels:
        app: sink-doris
    spec:
      containers:
        - name: sink-doris
          image: public.ecr.aws/vanus/connector/sink-doris
          imagePullPolicy: Always
          volumeMounts:
            - name: config
              mountPath: /vanus-connect/config
      volumes:
        - name: config
          configMap:
            name: sink-doris
Integrate with Vanus
This section shows how a sink connector can receive CloudEvents from a running Vanus cluster.
- Run the sink-doris.yaml
 
kubectl apply -f sink-doris.yaml
- Create an eventbus
 
vsctl eventbus create --name quick-start
- Create a subscription (the sink should be specified as the sink service address or the host name with its port)
 
vsctl subscription create \
  --name quick-start \
  --eventbus quick-start \
  --sink 'http://sink-doris:8080'