目录


是一个Redis模块,它为Redis带来了原生的时间序列数据结构。时间序列解决方案早先建立在排序集(或Redis流)之上,可以从RedisTimeSeries功能中受益,例如高容量插入、低延迟读取、灵活的查询语言、下采样等等!

一般来说,时间序列数据(相对)简单。话虽如此,我们还需要考虑其他特征:

  • 数据速度:例如,考虑每秒来自数千台设备的数百个指标
  • 容量(大数据):考虑数月(甚至数年)的数据积累

因此,RedisTimeSeries等数据库只是整体解决方案的一部分。您还需要考虑如何收集(摄取)、处理所有数据并将其发送到RedisTimeSeries。你真正需要的是一个可扩展的数据管道,它可以作为一个缓冲区来解耦生产者和消费者。

这就是用武之地!除了核心代理之外,它还拥有丰富的组件生态系统,包括(这是本博文中介绍的解决方案架构的一部分)、多语言客户端库、、Mirror Maker等。

这篇博文提供了一个实际示例,说明如何将RedisTimeSeries与Apache Kafka结合使用来分析时间序列数据。

该代码可在此GitHub存储库中找到https://github.com/abhirockzz/redis-timeseries-kafka

让我们首先探索用例。请注意,为了博客文章的目的,它保持简单,然后在后续部分中进一步解释。

场景:设备监控

假设有很多位置,每个位置都有多个设备,您的任务是监控设备指标——现在我们将考虑温度和压力。这些指标将存储在RedisTimeSeries中(当然!)并使用以下键命名约定— <metric name>:<location>:<device>。例如,位置5中设备1的温度将表示为 temp:5:1。每个时间序列数据点还将具有以下(键值对)— metric, location, device。这是为了允许灵活的查询,正如您将在接下来的部分中看到的那样。

以下是几个示例,可让您了解如何使用该TS.ADD命令添加数据点:

#位置3中设备2的温度以及标签:

#位置3中设备2的压力:

解决方案架构

以下是该解决方案的高层次外观:

让我们分解一下:

源(本地)组件

Azure服务

请注意,有些服务是在本地托管的,只是为了简单起见。在生产级部署中,您也希望在Azure中运行它们。例如,您可以在Azure Kubernetes服务中操作Kafka Connect集群和MQTT连接器。

总而言之,这是端到端流程:

  • 脚本生成发送到本地MQTT代理的模拟设备数据。
  • 此数据由MQTT Kafka Connect源连接器获取,并发送到Azure中运行的Confluent Cloud Kafka集群中的主题。
  • 它由托管在Azure Spring Cloud中的Spring Boot应用程序进一步处理,然后将其保存到Azure Cache for Redis实例。

是时候从实用的东西开始了!在此之前,请确保您具备以下条件。

先决条件:

设置基础设施组件

按照文档预配RedisTimeSeries模块附带的。

配置。还要(使用名称mqtt.device-stats)并(API密钥和秘密),稍后您将使用这些安全地连接到您的集群。

您可以或预配Azure Spring Cloud实例:

在继续之前,请确保克隆GitHub存储库:

设置本地服务

组件包括:

MQTT代理

我在Mac上本地安装并启动了mosquitto代理。

您可以也可以随意使用此。

Grafana

我在Mac上本地安装并启动了Grafana。

你可以对你的操作系统做同样的事情,也可以随意使用这个。

Kafka 连接

您应该能够在刚刚克隆的存储库中找到connect-distributed.properties文件。替换bootstrap.servers、sasl.jaas.config等属性的值。

首先,在本地。

启动本地Kafka Connect集群:

要:

  • 下载连接器/插件ZIP文件,然后,
  • 将其解压缩到Connect worker的plugin.path配置属性中列出的目录之一

如果您在本地使用Confluent Platform,只需使用Confluent Hub CLI: confluent-hub install confluentinc/kafka-connect-mqtt:latest

创建MQTT源连接器实例

确保检查mqtt-source-config.json文件。确保输入正确的主题名称kafka.topic并且mqtt.topics保持不变。

部署设备数据处理器应用程序

在您刚刚克隆的GitHub存储库中,在consumer/src/resources文件夹中查找application.yaml文件并替换以下值:

  • Azure Redis缓存主机、端口和主访问密钥
  • Confluent Cloud on Azure API密钥和秘密

构建应用程序JAR文件:

创建一个Azure Spring Cloud应用程序并将JAR文件部署到它:

启动模拟设备数据生成器

您可以使用刚刚克隆的GitHub存储库中的脚本:

注意——它所做的只是使用mosquitto_pub CLI命令发送数据。

数据被发送到device-stats MQTT topic(这不是Kafka主题)。您可以使用CLI订阅者仔细检查:

检查Confluent Cloud门户中的Kafka主题。您还应该检查Azure Spring Cloud中设备数据处理器应用:

享受Grafana仪表板!

浏览到Grafana UI localhost:3000。

Grafana的Redis数据源插件适用于任何Redis数据库,包括Azure Redis缓存。按照的配置数据源。

在您克隆的GitHub存储库中的grafana_dashboards文件夹中导入仪表板(如果您需要有关如何导入仪表板的帮助,请参阅)。

例如,这里有一个仪表板,显示了location 1的device 5 (使用TS.MRANGE)的平均压力(超过30秒)。

这是另一个仪表板,显示了多个设备的最高温度(超过15秒)location 3(再次感谢TS.MRANGE)。

那么,您想运行一些RedisTimeSeries命令吗?

启动redis-cli并连接到Azure Cache for Redis实例:

从简单的查询开始:

按位置过滤并获取所有设备的温度和压力:

提取特定时间范围内一个或多个位置的所有设备的温度和压力:

– +指的是从开始到最新时间戳的所有内容,但您可以更具体。

MRANGE正是我们所需要的!我们还可以按某个位置的特定设备进行过滤,然后按温度或压力进一步深入分析:

所有这些都可以与聚合相结合。

也可以创建一个规则来进行此聚合并将其存储在不同的时间序列中。

完成后,不要忘记删除资源以避免不必要的成本。

删除资源

在您的本地机器上:

  • 停止Kafka Connect集群
  • 停止蚊子经纪人(例如酿造服务停止mosquito)
  • 停止Grafana服务(例如brew services stop grafana)

我们探索了使用Redis和Kafka摄取、处理和查询时间序列数据的数据管道。当您考虑下一步并转向生产级解决方案时,您应该考虑更多的事情。

其他注意事项

优化RedisTimeSeries

这不是一个详尽的清单。其他配置选项请参考

长期数据保留怎么样?

数据是宝贵的,包括时间序列!您可能想要进一步处理它(例如运行机器学习以提取洞察力、预测性维护等)。为此,您需要将这些数据保留更长的时间,并且为了经济高效且经济高效,您需要使用可扩展的对象存储服务,例如

有一个连接器!您可以通过使用完全托管的来处理和存储ADLS中的数据,然后使用或运行机器学习来增强现有数据管道。

可扩展性

您的时间序列数据量只能向上移动!解决方案的可扩展性至关重要:

  • 核心基础设施:托管服务允许团队专注于解决方案,而不是设置和维护基础设施,尤其是在涉及复杂的分布式系统(如数据库和流媒体平台,如Redis和Kafka)时。
  • Kafka Connect:就数据管道而言,由于Kafka Connect平台本质上是无状态且水平可扩展的,因此您掌握得很好。在如何构建和调整Kafka Connect工作集群的大小方面,您有很多选择。
  • 自定义应用程序:与本解决方案一样,我们构建了一个自定义应用程序来处理Kafka主题中的数据。幸运的是,同样的可伸缩性特征也适用于它们。在水平扩展方面,它仅受您拥有的Kafka主题分区数量的限制。

集成:不仅仅是Grafana!RedisTimeSeries还与和集成。但是,在撰写本文时还没有Kafka连接器——这将是一个很棒的附加组件!

结论

当然,您可以将Redis用于(几乎)所有事情,包括时间序列工作负载!请务必考虑从时间序列数据源到Redis及其他范围的数据管道和集成的端到端架构。