网站首页 > 技术文章 正文
将多个大型语言模型(LLMs),如OpenAI和Anthropic的Claude,集成到应用程序中可能是一项艰巨的任务。处理不同的API和通信协议以及确保请求的高效路由会带来重大挑战。
但是,使用消息代理和路由器可以成为解决这个问题的不错方案,解决这些痛点并提供几个关键优势。
在本文中,我们将探讨如何实现这一点。我们将提供代码示例,指导您使用KubeMQ(作为示例)设置一个与OpenAI和Anthropic的Claude接口的路由器。
使用消息代理作为LLM路由器的关键优势
- 简化集成
通过使用消息代理作为路由器,您可以抽象直接与不同LLM API接口的复杂性。这简化了客户端代码并减少了错误的可能性。 - 多模型用例
消息代理促进了多个LLM或专门用于不同任务的模型(例如,一个用于摘要,另一个用于情感分析)之间的通信。它确保请求高效地路由到适当的模型,使应用程序能够利用每个模型的优势而无需额外开销。 - 批处理和大规模推理
对于需要批处理或大规模推理任务的应用程序,消息代理通过在LLM繁忙或不可用时排队请求来实现异步处理。这确保了即使在高负载下也不会丢失数据或请求,提供可靠的处理。 - 冗余和故障转移保障
在需要高可用性的场景中,消息代理确保无缝故障转移到备用环境。例如,如果与提供OpenAI模型的云提供商的连接失败,KubeMQ可以自动切换到另一个提供商。这种冗余保证了AI操作的持续性,保持服务可靠性和客户满意度。 - 处理高流量应用程序
消息代理将传入的请求分发到多个LLM实例或副本中,防止过载并确保平稳运行。这种负载均衡对于高流量应用程序至关重要,使其能够有效扩展而不影响性能。
使用KubeMQ构建LLM路由器:集成OpenAI和Claude
现在,我将指导您使用KubeMQ(一个领先的开源消息代理和消息队列平台)设置一个与OpenAI和Anthropic的Claude接口的路由器。
利用KubeMQ的优势并提供代码示例,我们将逐步设置消息代理、构建服务器端路由器以及创建客户端以发送查询。
所有代码示例都可以在KubeMQ的GitHub存储库中找到。
先决条件
在开始之前,请确保您具备以下条件:
- 已安装Python 3.7或更高版本。
- 已在您的机器上安装Docker。
- OpenAI和Anthropic的有效API密钥。
- KubeMQ令牌(您可以从KubeMQ网站获取)。
- 已安装kubemq-cq Python包:
pip install kubemq-cq
- 包含API密钥的.env文件:
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
设置KubeMQ
首先,我们需要确保KubeMQ正在运行。我们将使用Docker部署它:
docker run -d --rm \
-p 8080:8080 \
-p 50000:50000 \
-p 9090:9090 \
-e KUBEMQ_TOKEN="your_token" \
kubemq/kubemq-community:latest
端口说明:
- 8080 – 暴露KubeMQ REST API
- 50000 – 打开gRPC端口以进行客户端-服务器通信
- 9090 – 暴露KubeMQ REST网关
注意:将your_token替换为您的实际KubeMQ令牌。
创建LLM路由器服务器
LLM路由器充当客户端和LLM之间的中介。它监听特定通道的查询并将它们路由到适当的LLM。
server.py
import time from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken from langchain.chat_models import ChatOpenAI from langchain.llms import Anthropic import os from dotenv import load_dotenv import threading load_dotenv() class LLMRouter: def __init__(self): self.openai_llm = ChatOpenAI( api_key=os.getenv("OPENAI_API_KEY"), model_name="gpt-3.5-turbo" ) self.claude_llm = Anthropic( api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3" ) self.client = Client(address="localhost:50000") def handle_openai_query(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') result = self.openai_llm(message) response = QueryResponseMessage( query_received=request, is_executed=True, body=result.encode('utf-8') ) self.client.send_response_message(response) except Exception as e: self.client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def handle_claude_query(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') result = self.claude_llm(message) response = QueryResponseMessage( query_received=request, is_executed=True, body=result.encode('utf-8') ) self.client.send_response_message(response) except Exception as e: self.client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def run(self): def on_error(err: str): print(f"Error: {err}") def subscribe_openai(): self.client.subscribe_to_queries( subscription=QueriesSubscription( channel="openai_requests", on_receive_query_callback=self.handle_openai_query, on_error_callback=on_error, ), cancel=CancellationToken() ) def subscribe_claude(): self.client.subscribe_to_queries( subscription=QueriesSubscription( channel="claude_requests", on_receive_query_callback=self.handle_claude_query, on_error_callback=on_error, ), cancel=CancellationToken() ) threading.Thread(target=subscribe_openai).start() threading.Thread(target=subscribe_claude).start() print("LLM Router running on channels: openai_requests, claude_requests") try: while True: time.sleep(1) except KeyboardInterrupt: print("Shutting down...") if __name__ == "__main__": router = LLMRouter() router.run()
说明:
- 初始化 加载API密钥的环境变量。 初始化OpenAI和Anthropic LLMs的客户端。 设置KubeMQ客户端。
- 处理查询 handle_openai_query 和 handle_claude_query 解码传入的消息,将其传递给相应的LLM,并发送回响应。 捕获错误并发送回 is_executed 标志为 False 的响应。
- 订阅 路由器订阅两个通道:openai_requests 和 claude_requests。 使用线程并发处理订阅。
- 运行服务器 run 方法启动订阅并保持服务器运行,直到中断。
开发LLM客户端
客户端将查询发送到LLM路由器,并指定要使用的模型。
client.py
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading
load_dotenv()
class LLMRouter:
def __init__(self):
self.openai_llm = ChatOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-3.5-turbo"
)
self.claude_llm = Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3"
)
self.client = Client(address="localhost:50000")
def handle_openai_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.openai_llm(message)
response = QueryResponseMessage(
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def handle_claude_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.claude_llm(message)
response = QueryResponseMessage(
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def run(self):
def on_error(err: str):
print(f"Error: {err}")
def subscribe_openai():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="openai_requests",
on_receive_query_callback=self.handle_openai_query,
on_error_callback=on_error,
),
cancel=CancellationToken()
)
def subscribe_claude():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="claude_requests",
on_receive_query_callback=self.handle_claude_query,
on_error_callback=on_error,
),
cancel=CancellationToken()
)
threading.Thread(target=subscribe_openai).start()
threading.Thread(target=subscribe_claude).start()
print("LLM Router running on channels: openai_requests, claude_requests")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down...")
if __name__ == "__main__":
router = LLMRouter()
router.run()
说明:
- 初始化 设置KubeMQ客户端。
- 发送消息 send_message 方法根据所选模型构建适当的通道。 向路由器发送查询消息并等待响应。 处理错误并解码响应体。
- 用户交互 提示用户输入消息并选择模型。 打印LLM的响应。
通过REST发送和接收
对于偏好或需要RESTful通信的服务或客户端,KubeMQ提供REST端点。
通过REST发送请求
- 端点:
POST http://localhost:9090/send/request
- 标头:
POST http://localhost:9090/send/request
- 正文:
{
"RequestTypeData": 2,
"ClientID": "LLMRouter-sender",
"Channel": "openai_requests",
"BodyString": "What is the capital of France?",
"Timeout": 30000
}
- 负载详情:
- RequestTypeData – 指定请求类型(2表示查询)。
- ClientID – 发送请求的客户端的标识符。
- Channel – 对应于LLM模型的通道(openai_requests 或 claude_requests)。
- BodyString – 要发送到LLM的消息。
- Timeout – 等待响应的时间(以毫秒为单位)。
接收响应
响应将是一个JSON对象,包含LLM的输出或错误消息。
结论
通过利用消息代理(KubeMQ),我们构建了一个可扩展且高效的路由器,可以与多个LLM接口。这种设置允许客户端无缝地向不同模型发送查询,并且可以扩展以包含更多模型或功能。
这种方法的一些好处是:
- 简化集成:抽象了直接与不同LLM API接口的复杂性,简化了客户端代码并减少了错误的可能性。
- 多模型支持:高效地将请求路由到专门用于不同任务的适当模型。
- 可靠性:即使在LLM繁忙或不可用时也能确保不丢失数据。
- 冗余:提供故障转移机制以保持不间断的操作。
- 可扩展性:通过将请求分发到多个LLM实例来处理高流量。
猜你喜欢
- 2025-03-25 Dify工具使用全场景:dify-sandbox沙盒的原理(源码篇·第2期)
- 2025-03-25 虚拟机panic问题排查(虚拟机出现故障,恢复方法)
- 2025-03-25 3分钟简述熔断器使用方法(熔断器的操作流程?)
- 2025-03-25 太实用了!自己动手写软件——SSH、FTP和SQL server的密码破解
- 2025-03-25 最近很火的MCP是什么?一文带你搞懂,附大量MCP开源服务端项目!
- 2025-03-25 数据库调优-连接池优化(数据库连接池配置优化)
- 2025-03-25 小白之Tkinter库读文:其他功能-网络功能(51)
- 2025-03-25 python散装笔记——123: 客户端与服务器之间套接字和消息加解密
- 2025-03-25 【Python备忘单】Python编程的快速参考指南
- 2025-03-25 nginx的internal指令(nginx指令详解)
- 最近发表
- 标签列表
-
- cmd/c (57)
- c++中::是什么意思 (57)
- sqlset (59)
- ps可以打开pdf格式吗 (58)
- phprequire_once (61)
- localstorage.removeitem (74)
- routermode (59)
- vector线程安全吗 (70)
- & (66)
- java (73)
- org.redisson (64)
- log.warn (60)
- cannotinstantiatethetype (62)
- js数组插入 (83)
- resttemplateokhttp (59)
- gormwherein (64)
- linux删除一个文件夹 (65)
- mac安装java (72)
- reader.onload (61)
- outofmemoryerror是什么意思 (64)
- flask文件上传 (63)
- eacces (67)
- 查看mysql是否启动 (70)
- java是值传递还是引用传递 (58)
- 无效的列索引 (74)