eKuiper 中使用 Golang 模版 (template) 定制分析结果

简介

用户通过 eKuiper 进行数据分析处理后,使用各种 sink 可以往不同的系统发送数据分析结果。针对同样的分析结果,不同的 sink 需要的格式可能未必一样。比如,在某物联网场景中,当发现某设备温度过高的时候,需要向云端某 rest 服务发送一个请求,同时在本地需要通过 MQTT 协议往设备发送一个控制命令,这两者需要的数据格式可能并不一样,因此,需要对来自于分析的结果进行「二次处理」后,才可以往不同的目标发送针对数据。本文将介绍如何利用 sink 中的数据模版(data template )来实现对分析结果的「二次处理」。

Golang 模版介绍

Golang 模版将一段逻辑应用到数据上,然后按照用户指定的逻辑对数据进行格式化输出,Golang 模版常见的使用场景为在网页开发中,比如将 Golang 中的某数据结构进行转换和控制后,将其转换为 HTML 标签输出到浏览器。在eKuiper 使用了 Golang 的 template(模版)对分析结果实现「二次处理」,请参考以下来自于 Golang 的官方介绍。

动作动作动作

动作 (Actions)

Golang 模版提供了一些内置的动作,可以让用户写各种控制语句,用于提取内容。比如,

  • 根据判断条件来输出不同的内容
  1. {{if pipeline}} T1 {{else}} T0 {{end}}
  • 循环遍历数据,并进行处理
  1. {{range pipeline}} T1 {{else}} T0 {{end}}
{{}}{}
  1. {{if pipeline}} {"field1": true} {{else}} {"field1": false} {{end}}

上述表达式的意思如下(请注意动作的界定符和 JSON 的界定符):

{"field1": true}{"field1": false}

eKuiper sink 数据格式

map
  1. []map[string]interface{}

切片 (slice) 数据按条发送

map[string]interface{}
  1. [
  2. {"device_id":"1","t_av":36.25,"t_count":4,"t_max":80,"t_min":10},
  3. {"device_id":"2","t_av":27,"t_count":4,"t_max":45,"t_min":12}
  4. ]
sendSingletrue{{json .}}
  1. ...
  2. "sendSingle": true,
  3. "dataTemplate": "{{toJson .}}"
sendSingletrue[]map[string]interface{}toJson

Golang 还内置提供了一些函数,用户可以参考更多 Golang 内置提供的函数来获取更多函数信息。

数据内容转换

t_av
$t_av$t_av

假设目标 sink 还是需要 JSON 数据,该数据模版的内容如下,

  1. ...
  2. "dataTemplate": "{\"device_id\": {{.device_id}}, \"description\": \"{{if lt .t_av 30.0}}Current temperature is {{.t_av}}, it's normal.\"{{else if ge .t_av 30.0}}Current temperature is {{.t_av}}, it's high.\"{{end}}}"
  3. "sendSingle": true,
{{if pipeline}} T1 {{else if pipeline}} T0 {{end}}
  1. {"device_id": {{.device_id}}, "description": "
  2. {{if lt .t_av 30.0}}
  3. Current temperature is {{.t_av}}, it's normal."
  4. {{else if ge .t_av 30.0}}
  5. Current temperature is {{.t_av}}, it's high."
  6. {{end}}
  7. }

使用了 Golang 内置的二元比较函数,

ltge
ltge3030.030
sendSingletrue
  1. {"device_id": 1, "description": "Current temperature is 36.25, it's high."}
  2. {"device_id": 2, "description": "Current temperature is 27, it's normal."}

数据遍历

sendSingletrue

假设流入 sink 中的数据内容如下所示,

  1. {"device_id":"1",
  2. "values": [
  3. {"temperature": 10.5},
  4. {"temperature": 20.3},
  5. {"temperature": 30.3}
  6. ]
  7. }

需求为,

temperature25descriptionfinetemperature25descriptionhigh
  1. "sendSingle": true,
  2. "dataTemplate": "{{$len := len .values}} {{$loopsize := add $len -1}} {\"device_id\": \"{{.device_id}}\", \"description\": [{{range $index, $ele := .values}} {{if le .temperature 25.0}}\"fine\"{{else if gt .temperature 25.0}}\"high\"{{end}} {{if eq $loopsize $index}}]{{else}},{{end}}{{end}}}"

该数据模板比较复杂,解释如下,

::: v-pre

{{$len := len .values}} {{$loopsize := add $len -1}}lenvaluesaddloopsizeadd

::: v-pre

{\"device_id\": \"{{.device_id}}\", \"description\": [{"device_id": "1", "description": [

::: v-pre

{{range $index, $ele := .values}} {{if le .temperature 25.0}}\"fine\"{{else if gt .temperature 25.0}}\"high\"{{end}} {{if eq $loopsize $index}}]{{else}},{{end}}{{end}}
  1. {{range $index, $ele := .values}}
  2. {{if le .temperature 25.0}}
  3. "fine"
  4. {{else if gt .temperature 25.0}}
  5. "high"
  6. {{end}}
  7. {{if eq $loopsize $index}}
  8. ]
  9. {{else}}
  10. ,
  11. {{end}}
  12. {{end}}
finehigh,]
sendSingletrue
  1. {"device_id": "1", "description": [ "fine" , "fine" , "high" ]}

总结

通过 eKuiper 提供的数据模版功能可以实现对分析结果的二次处理,以满足不同的 sink 目标的需求。但是读者也可以看到,由于 Golang 模版本身的限制,实现比较复杂的数据转换的时候会比较笨拙,希望将来 Golang 模版的功能可以做得更加强大和灵活,这样可以支持处理更加复杂的需求。目前建议用户可以通过数据模版来实现一些较为简单的数据的转换;如果用户需要对数据进行比较复杂的处理,并且自己扩展了 sink 的情况下,可以在 sink 的实现中直接进行处理。

另外,eKuiper 团队在规划将来支持自定义扩展 sink 中的模版函数,这样一些比较复杂的逻辑可以在函数内部实现,用户调用的时候只需一个简单的模版函数调用即可实现。