Skip to main content

Doris Sink

Introduction

The Doris Sink is a Vanus Connector which aims to handle incoming CloudEvents in a way that extracts the data part of the original event and deliver these extracted data to Doris. The Doris Sink use Stream Load way to import data.

For example, an incoming CloudEvent looks like:

{
"specversion": "1.0",
"id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
"source": "vanus.source.test",
"type": "vanus.type.test",
"datacontenttype": "application/json",
"time": "2022-11-20T07:05:55.777689Z",
"data": {
"id": 1,
"username": "name",
"birthday": "2022-11-20"
}
}

The Doris Sink will extract the data field and write it to Doris table like:

+------+----------+------------+
| id | username | birthday |
+------+----------+------------+
| 1 | name | 2022-11-20 |
+------+----------+------------+

Quickstart

Prerequisites

Create the config file

cat << EOF > config.yml
port: 8080
secret:
# doris info
fenodes: "localhost:8030"
db_name: "vanus_test"
table_name: "vanus_test"
username: "vanus_test"
password: "123456"
EOF
NameRequiredDefaultDescription
portNO8080the port which Doris Sink listens on
fenodesYESdoris fenodes, example: "17.0.0.1:8003"
db_nameYESdoris database name
table_nameYESdoris table name
usernameYESdoris username
passwordYESdoris password
stream_loadNOdoris stream load properties, map struct
load_intervalNO5doris stream load interval, unit second
load_sizeNO1010241024doris stream load max body size
timeoutNO30doris stream load timeout, unit second

The Doris Sink tries to find the config file at /vanus-connect/config/config.yml by default. You can specify the position of config file by setting the environment variable CONNECTOR_CONFIG for your connector.

Start with Docker

docker run -it --rm --network=host\
-v ${PWD}:/vanus-connect/config \
--name sink-doris public.ecr.aws/vanus/connector/sink-doris

Test

Connect to Doris and use the following command to create a database and table.

create database vanus_test;
use vanus_test;
CREATE TABLE IF NOT EXISTS vanus_test.vanus_test
(
`id` LARGEINT NOT NULL COMMENT "id",
`username` VARCHAR(64) COMMENT "username",
`birthday` DATE NOT NULL COMMENT "birthday"
)
AGGREGATE KEY(`id`, `username`, `birthday`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

Open a terminal and use following command to send a CloudEvent to the Doris Sink.

curl --location --request POST 'localhost:8080' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"specversion": "1.0",
"id": "4395ffa3-f6de-443c-bf0e-bb9798d26a1d",
"source": "vanus.source.test",
"type": "vanus.type.test",
"datacontenttype": "application/json",
"time": "2022-11-20T07:05:55.777689Z",
"data": {
"id": 1,
"username": "name",
"birthday": "2022-11-20"
}
}'

The result in Doris will be as follow:

+------+----------+------------+
| id | username | birthday |
+------+----------+------------+
| 1 | name | 2022-11-20 |
+------+----------+------------+

Clean resource

docker stop sink-doris

Run in Kubernetes

kubectl apply -f sink-doris.yaml
apiVersion: v1
kind: Service
metadata:
name: sink-doris
namespace: vanus
spec:
selector:
app: sink-doris
type: ClusterIP
ports:
- port: 8080
name: sink-doris
---
apiVersion: v1
kind: ConfigMap
metadata:
name: sink-doris
namespace: vanus
data:
config.yml: |-
port: 8080
secret:
# doris info
fenodes: "localhost:8030"
db_name: "vanus_test"
table_name: "user"
username: "vanus_test"
password: "123456"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: sink-doris
namespace: vanus
labels:
app: sink-doris
spec:
selector:
matchLabels:
app: sink-doris
replicas: 1
template:
metadata:
labels:
app: sink-doris
spec:
containers:
- name: sink-doris
image: public.ecr.aws/vanus/connector/sink-doris
imagePullPolicy: Always
volumeMounts:
- name: config
mountPath: /vanus-connect/config
volumes:
- name: config
configMap:
name: sink-doris

Integrate with Vanus

This section shows how a sink connector can receive CloudEvents from a running Vanus cluster.

  1. Run the sink-doris.yaml
kubectl apply -f sink-doris.yaml
  1. Create an eventbus
vsctl eventbus create --name quick-start
  1. Create a subscription (the sink should be specified as the sink service address or the host name with its port)
vsctl subscription create \
--name quick-start \
--eventbus quick-start \
--sink 'http://sink-doris:8080'