更多编程技术文章,请查阅IOKKS - 专业编程技术分享平台
本文概述了从Kafka中流式传输事件,使用其Stream API将其转发到Redis,并通过其streamingAPI从Redis中读取单个流的解决方案。在这种情况下增加的复杂性是需要从HTTP端点使用服务器发送事件(SSE)来流式传输事件,同时确保只处理和发送与特定客户端ID相关的事件。
问题陈述
许多公司都有现有的Kafka基础架构,事件正在被生产。我们的目标是建立一个系统,订阅Kafka消息,但只处理与特定客户端ID相关的事件。这些经过筛选的事件应使用Redis的Stream API转发到Redis。此外,我们需要为服务器发送事件(SSE)建立一个HTTP端点,允许指定的客户端接收实时事件更新。
解决方案架构概述
该架构由以下组件组成:
- Kafka :分布式事件流平台,允许您发布和订阅记录(事件)流。
- Spring Boot :用于构建Java应用程序的框架。我们将使用它来创建Kafka消费者和Redis Stream生产者。订阅Kafka消息,根据客户端ID过滤事件,并将相关事件转发到Redis Streams。
- Redis :高性能的内存数据存储。我们将使用其Streams功能来处理事件流。使用其Streams API存储流式事件。
- Docker :容器化平台。我们将使用Docker和Docker-Compose为Kafka、Redis和我们的Spring Boot应用程序创建容器。我们将利用这一点进行本地POT、POC。
- HTTP服务器发送事件(SSE)端点 :根据客户端ID过滤事件,为客户端提供实时事件更新。
Redis Streams
Redis Streams是Redis中提供处理实时数据流的功能,适用于各种用例。以下是一些您可能希望使用Redis Streams的场景:
- 实时事件处理 :Redis Streams非常适合处理和存储实时事件。您可以将其用于日志记录、监控、跟踪用户活动或任何涉及处理连续事件流的用例。
- 任务队列 :如果您需要可靠且分布式的任务队列,Redis Streams可能是一个很好的选择。它允许您将任务推送到流中,并且可以让多个消费者并行处理这些任务。
- 活动订阅 :如果您正在构建社交网络或任何需要活动订阅的应用程序,Redis Streams可以有效地处理订阅数据,确保快速访问和可伸缩性。
- 消息代理 :Redis Streams可以作为微服务或其他分布式系统的轻量级消息代理。它可以处理消息路由,并确保消息被传递给感兴趣的消费者。
- 实时分析 :当您需要实时分析数据时,Redis Streams可以用于存储传入数据,然后使用Redis的功能进行处理和聚合。
- 物联网数据摄入 :如果您正在处理来自物联网(IoT)设备的数据,Redis Streams可以处理这些设备生成的高吞吐量和实时性数据。
- 日志和审计跟踪 :Redis Streams可用于实时存储日志或审计跟踪,使其易于分析和解决问题。
- 流处理 :如果您需要按特定顺序处理连续的数据流(例如,金融交易或传感器读数),Redis Streams可以帮助您管理按接收顺序排列的数据。
先决条件
- 已安装Docker和Docker Compose。
- 对Spring Boot、Redis Streams和Kafka有基本了解。
- Java 17或更高版本。
- 您选择的HTTP客户端。我使用了[httpie]。
当然,您可以在此处获取代码。
步骤
1. 为后端基础设施和Spring Boot应用程序设置Docker Compose
创建docker-compose.yml文件来定义服务:
# 请参考原文
创建应用程序的yaml文件'sse-demo.yml'。
# 请参考原文
2. 创建Spring Boot应用程序
使用所需的依赖项创建Spring Boot应用程序:
git checkout https://github.com/glawson6/spring-sse-demo.git
在pom.xml中添加Kafka和Redis集成所需的依赖项。
3. 实现Kafka消费者和Redis Stream生产者
创建一个Kafka消费者,监听Kafka事件并将其发送到Redis Streams。该组件还会消费HTTP客户端的Redis流:
// 请参考原文
4. 在Spring Boot中配置服务
cache.provider.name=redis
cache.host=${REDIS_HOST:localhost}
cache.port=${REDIS_PORT:6379}
cache.password=password
# 生产者属性
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id=group_id
spring.kafka.boostrap.servers=${KAFKA_BOOTSTRAP_SERVERS:loclahost:9093}
# 通用Kafka属性
auto.create.topics.enable=true
sse.client-hold-seconds=${SSE_CLIENT_HOLD_SECONDS:120}
logging.level.root=INFO
logging.level.com.taptech.sse=DEBUG
5. 为Spring Boot应用构建Docker镜像
使用jkube的kubernetes-maven-plugin创建Spring Boot应用的镜像:
./mvnw clean package -Dmaven.test.skip=true k8s:build
6. 启动服务并运行应用
从src/test/resources/docker目录
启动服务:
./startServices.sh
启动应用:
./start-sse-demo.sh
7. 使用client-ids.json文件中的一个ID连接到流
http --stream GET http://localhost:8080/sse clientId==dd07bd51-1ab0-4e69-a0ff-f625fa9e7fc0
8. 生成一些事件
您可以对
http://localhost:8080/sse/generateNE进行HTTP POST请求
http POST http://localhost:8080/sse/generateNE
之后,观察您的HTTP客户端接收到针对其订阅的客户端ID的事件。
讨论
为什么要使用Kafka和Redis?Kafka本身难道不能单独提供这个功能吗?许多公司已经投资于Kafka作为其系统之间的后端消息提供者。Kafka本身并不容易处理消息选择。
消息选择通常不是Kafka的一个典型特性,原因有几点:
- 数据大小和延迟 :Kafka设计用于高吞吐量、低延迟的消息处理。其架构侧重于快速将消息分发给大量消费者。根据任意条件引入消息选择可能会减慢整体处理速度并引入延迟,这与Kafka的主要设计目标相悖。
- 幂等性 :Kafka依赖于幂等的生产者和消费者的概念。这意味着如果消费者或生产者由于故障重试消息,不应导致重复处理。引入选择性消息检索将使这种幂等性保证复杂化,可能导致意外的重复处理。
- 消费者偏移跟踪 :Kafka维护消费者偏移量,允许消费者跟踪最后处理的消息。如果引入消息选择,偏移量变得不那么直接,因为根据选择条件可能会跳过一些消息。
- 解耦架构 :Kafka旨在将生产者与消费者解耳。生产者不知道消费者的行为,消费者可以独立决定要消费哪些消息。消息选择将打破这种解耦,因为生产者需要根据特定的消费者需求知道要生产哪些消息。
- 消费者灵活性 :Kafka消费者在消息处理方面非常灵活。它们可以根据自己的标准设计为过滤、转换和聚合消息。在Kafka级别引入消息选择将限制这种灵活性,并使系统对不断变化的消费者需求不太适应。
- 扩展和并行性 :Kafka的可扩展性和并行性好处来自于能够在多个分区之间分发消息,并允许多个消费者并行处理消息。选择性消息检索将使这种并行性复杂化,使工作难以有效分发。
虽然Kafka本身并不提供原生的消息选择功能,但如果需要,设计消费者来处理消息过滤和选择是至关重要的。消费者可以根据特定标准设计为过滤和处理消息,确保只有相关的消息在消费者应用程序内得到处理。这种方法使Kafka能够保持其核心设计原则,同时为各种消息处理场景提供所需的灵活性。
Kafka本身并不能轻松解决这个问题,这导致将消息推送到另一个可以根据已知标准轻松选择的持久空间。这一要求导致决定使用Redis,并允许直接将消息推送到Redis。
决定限制基于是否有客户端实际期望消息而将事件推送到Redis。如果没有客户端,那么Kafka消息将被过滤掉。
.filterWhen(record -> checkIfStreamBeingAccessed(record))
客户端注册ID,以便Kafka监听器将其推送到Redis流。
.flatMap(id -> addIdToStream(clientId))
结论
通过遵循本文档中概述的步骤,我们成功实现了一个事件流架构,它从Kafka接收事件,根据特定的客户端ID进行过滤,并使用其流API将相关事件转发到Redis。SSE端点允许客户端接收针对其各自客户端ID定制的实时事件更新。这种解决方案为处理针对特定客户端的事件流提供了一种高效且可扩展的方式。