简介
用户通过 eKuiper 进行数据分析处理后,使用各种 sink 可以往不同的系统发送数据分析结果。针对同样的分析结果,不同的 sink 需要的格式可能未必一样。比如,在某物联网场景中,当发现某设备温度过高的时候,需要向云端某 rest 服务发送一个请求,同时在本地需要通过 MQTT 协议往设备发送一个控制命令,这两者需要的数据格式可能并不一样,因此,需要对来自于分析的结果进行「二次处理」后,才可以往不同的目标发送针对数据。本文将介绍如何利用 sink 中的数据模版(data template )来实现对分析结果的「二次处理」。
Golang 模版介绍
Golang 模版将一段逻辑应用到数据上,然后按照用户指定的逻辑对数据进行格式化输出,Golang 模版常见的使用场景为在网页开发中,比如将 Golang 中的某数据结构进行转换和控制后,将其转换为 HTML 标签输出到浏览器。在eKuiper 使用了 Golang 的 template(模版)对分析结果实现「二次处理」,请参考以下来自于 Golang 的官方介绍。
动作动作动作
动作 (Actions)
Golang 模版提供了一些内置的动作,可以让用户写各种控制语句,用于提取内容。比如,
- 根据判断条件来输出不同的内容
{{if pipeline}} T1 {{else}} T0 {{end}}
- 循环遍历数据,并进行处理
{{range pipeline}} T1 {{else}} T0 {{end}}
{{}}{}
{{if pipeline}} {"field1": true} {{else}} {"field1": false} {{end}}
上述表达式的意思如下(请注意动作的界定符和 JSON 的界定符):
{"field1": true}{"field1": false}
eKuiper sink 数据格式
map
[]map[string]interface{}
切片 (slice) 数据按条发送
map[string]interface{}
[
{"device_id":"1","t_av":36.25,"t_count":4,"t_max":80,"t_min":10},
{"device_id":"2","t_av":27,"t_count":4,"t_max":45,"t_min":12}
]
sendSingletrue{{json .}}
...
"sendSingle": true,
"dataTemplate": "{{toJson .}}"
sendSingletrue[]map[string]interface{}toJson
Golang 还内置提供了一些函数,用户可以参考更多 Golang 内置提供的函数来获取更多函数信息。
数据内容转换
t_av
$t_av$t_av
假设目标 sink 还是需要 JSON 数据,该数据模版的内容如下,
...
"dataTemplate": "{\"device_id\": {{.device_id}}, \"description\": \"{{if lt .t_av 30.0}}Current temperature is {{.t_av}}, it's normal.\"{{else if ge .t_av 30.0}}Current temperature is {{.t_av}}, it's high.\"{{end}}}"
"sendSingle": true,
{{if pipeline}} T1 {{else if pipeline}} T0 {{end}}
{"device_id": {{.device_id}}, "description": "
{{if lt .t_av 30.0}}
Current temperature is {{.t_av}}, it's normal."
{{else if ge .t_av 30.0}}
Current temperature is {{.t_av}}, it's high."
{{end}}
}
使用了 Golang 内置的二元比较函数,
ltge
ltge3030.030
sendSingletrue
{"device_id": 1, "description": "Current temperature is 36.25, it's high."}
{"device_id": 2, "description": "Current temperature is 27, it's normal."}
数据遍历
sendSingletrue
假设流入 sink 中的数据内容如下所示,
{"device_id":"1",
"values": [
{"temperature": 10.5},
{"temperature": 20.3},
{"temperature": 30.3}
]
}
需求为,
temperature25descriptionfinetemperature25descriptionhigh
"sendSingle": true,
"dataTemplate": "{{$len := len .values}} {{$loopsize := add $len -1}} {\"device_id\": \"{{.device_id}}\", \"description\": [{{range $index, $ele := .values}} {{if le .temperature 25.0}}\"fine\"{{else if gt .temperature 25.0}}\"high\"{{end}} {{if eq $loopsize $index}}]{{else}},{{end}}{{end}}}"
该数据模板比较复杂,解释如下,
::: v-pre
{{$len := len .values}} {{$loopsize := add $len -1}}lenvaluesaddloopsizeadd
::: v-pre
{\"device_id\": \"{{.device_id}}\", \"description\": [{"device_id": "1", "description": [
::: v-pre
{{range $index, $ele := .values}} {{if le .temperature 25.0}}\"fine\"{{else if gt .temperature 25.0}}\"high\"{{end}} {{if eq $loopsize $index}}]{{else}},{{end}}{{end}}
{{range $index, $ele := .values}}
{{if le .temperature 25.0}}
"fine"
{{else if gt .temperature 25.0}}
"high"
{{end}}
{{if eq $loopsize $index}}
]
{{else}}
,
{{end}}
{{end}}
finehigh,]
sendSingletrue
{"device_id": "1", "description": [ "fine" , "fine" , "high" ]}
总结
通过 eKuiper 提供的数据模版功能可以实现对分析结果的二次处理,以满足不同的 sink 目标的需求。但是读者也可以看到,由于 Golang 模版本身的限制,实现比较复杂的数据转换的时候会比较笨拙,希望将来 Golang 模版的功能可以做得更加强大和灵活,这样可以支持处理更加复杂的需求。目前建议用户可以通过数据模版来实现一些较为简单的数据的转换;如果用户需要对数据进行比较复杂的处理,并且自己扩展了 sink 的情况下,可以在 sink 的实现中直接进行处理。
另外,eKuiper 团队在规划将来支持自定义扩展 sink 中的模版函数,这样一些比较复杂的逻辑可以在函数内部实现,用户调用的时候只需一个简单的模版函数调用即可实现。