优秀的编程知识分享平台

网站首页 > 技术文章 正文

使用消息代理作为LLM路由器:集成OpenAI和Claude

nanyue 2025-03-25 16:01:41 技术文章 10 ℃

将多个大型语言模型(LLMs),如OpenAI和Anthropic的Claude,集成到应用程序中可能是一项艰巨的任务。处理不同的API和通信协议以及确保请求的高效路由会带来重大挑战。

但是,使用消息代理和路由器可以成为解决这个问题的不错方案,解决这些痛点并提供几个关键优势。

在本文中,我们将探讨如何实现这一点。我们将提供代码示例,指导您使用KubeMQ(作为示例)设置一个与OpenAI和Anthropic的Claude接口的路由器。

使用消息代理作为LLM路由器的关键优势

  1. 简化集成
    通过使用消息代理作为路由器,您可以抽象直接与不同LLM API接口的复杂性。这简化了客户端代码并减少了错误的可能性。
  2. 多模型用例
    消息代理促进了多个LLM或专门用于不同任务的模型(例如,一个用于摘要,另一个用于情感分析)之间的通信。它确保请求高效地路由到适当的模型,使应用程序能够利用每个模型的优势而无需额外开销。
  3. 批处理和大规模推理
    对于需要批处理或大规模推理任务的应用程序,消息代理通过在LLM繁忙或不可用时排队请求来实现异步处理。这确保了即使在高负载下也不会丢失数据或请求,提供可靠的处理。
  4. 冗余和故障转移保障
    在需要高可用性的场景中,消息代理确保无缝故障转移到备用环境。例如,如果与提供OpenAI模型的云提供商的连接失败,KubeMQ可以自动切换到另一个提供商。这种冗余保证了AI操作的持续性,保持服务可靠性和客户满意度。
  5. 处理高流量应用程序
    消息代理将传入的请求分发到多个LLM实例或副本中,防止过载并确保平稳运行。这种负载均衡对于高流量应用程序至关重要,使其能够有效扩展而不影响性能。

使用KubeMQ构建LLM路由器:集成OpenAI和Claude

现在,我将指导您使用KubeMQ(一个领先的开源消息代理和消息队列平台)设置一个与OpenAI和Anthropic的Claude接口的路由器。

利用KubeMQ的优势并提供代码示例,我们将逐步设置消息代理、构建服务器端路由器以及创建客户端以发送查询。

所有代码示例都可以在KubeMQ的GitHub存储库中找到。

先决条件

在开始之前,请确保您具备以下条件:

  • 已安装Python 3.7或更高版本。
  • 已在您的机器上安装Docker。
  • OpenAI和Anthropic的有效API密钥。
  • KubeMQ令牌(您可以从KubeMQ网站获取)。
  • 已安装kubemq-cq Python包:
pip install kubemq-cq
  • 包含API密钥的.env文件:
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key

设置KubeMQ

首先,我们需要确保KubeMQ正在运行。我们将使用Docker部署它:

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your_token" \
  kubemq/kubemq-community:latest

端口说明

  • 8080 – 暴露KubeMQ REST API
  • 50000 – 打开gRPC端口以进行客户端-服务器通信
  • 9090 – 暴露KubeMQ REST网关

注意:将your_token替换为您的实际KubeMQ令牌。

创建LLM路由器服务器

LLM路由器充当客户端和LLM之间的中介。它监听特定通道的查询并将它们路由到适当的LLM。

server.py

import time from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken from langchain.chat_models import ChatOpenAI from langchain.llms import Anthropic import os from dotenv import load_dotenv import threading load_dotenv() class LLMRouter: def __init__(self): self.openai_llm = ChatOpenAI( api_key=os.getenv("OPENAI_API_KEY"), model_name="gpt-3.5-turbo" ) self.claude_llm = Anthropic( api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3" ) self.client = Client(address="localhost:50000") def handle_openai_query(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') result = self.openai_llm(message) response = QueryResponseMessage( query_received=request, is_executed=True, body=result.encode('utf-8') ) self.client.send_response_message(response) except Exception as e: self.client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def handle_claude_query(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') result = self.claude_llm(message) response = QueryResponseMessage( query_received=request, is_executed=True, body=result.encode('utf-8') ) self.client.send_response_message(response) except Exception as e: self.client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def run(self): def on_error(err: str): print(f"Error: {err}") def subscribe_openai(): self.client.subscribe_to_queries( subscription=QueriesSubscription( channel="openai_requests", on_receive_query_callback=self.handle_openai_query, on_error_callback=on_error, ), cancel=CancellationToken() ) def subscribe_claude(): self.client.subscribe_to_queries( subscription=QueriesSubscription( channel="claude_requests", on_receive_query_callback=self.handle_claude_query, on_error_callback=on_error, ), cancel=CancellationToken() ) threading.Thread(target=subscribe_openai).start() threading.Thread(target=subscribe_claude).start() print("LLM Router running on channels: openai_requests, claude_requests") try: while True: time.sleep(1) except KeyboardInterrupt: print("Shutting down...") if __name__ == "__main__": router = LLMRouter() router.run()

说明

  • 初始化 加载API密钥的环境变量。 初始化OpenAI和Anthropic LLMs的客户端。 设置KubeMQ客户端。
  • 处理查询 handle_openai_query 和 handle_claude_query 解码传入的消息,将其传递给相应的LLM,并发送回响应。 捕获错误并发送回 is_executed 标志为 False 的响应。
  • 订阅 路由器订阅两个通道:openai_requests 和 claude_requests。 使用线程并发处理订阅。
  • 运行服务器 run 方法启动订阅并保持服务器运行,直到中断。

开发LLM客户端

客户端将查询发送到LLM路由器,并指定要使用的模型。

client.py

import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading

load_dotenv()

class LLMRouter:
    def __init__(self):
        self.openai_llm = ChatOpenAI(
            api_key=os.getenv("OPENAI_API_KEY"),
            model_name="gpt-3.5-turbo"
        )
        self.claude_llm = Anthropic(
            api_key=os.getenv("ANTHROPIC_API_KEY"),
            model="claude-3"
        )
        self.client = Client(address="localhost:50000")

    def handle_openai_query(self, request: QueryMessageReceived):
        try:
            message = request.body.decode('utf-8')
            result = self.openai_llm(message)
            response = QueryResponseMessage(
                query_received=request,
                is_executed=True,
                body=result.encode('utf-8')
            )
            self.client.send_response_message(response)
        except Exception as e:
            self.client.send_response_message(QueryResponseMessage(
                query_received=request,
                is_executed=False,
                error=str(e)
            ))

    def handle_claude_query(self, request: QueryMessageReceived):
        try:
            message = request.body.decode('utf-8')
            result = self.claude_llm(message)
            response = QueryResponseMessage(
                query_received=request,
                is_executed=True,
                body=result.encode('utf-8')
            )
            self.client.send_response_message(response)
        except Exception as e:
            self.client.send_response_message(QueryResponseMessage(
                query_received=request,
                is_executed=False,
                error=str(e)
            ))

    def run(self):
        def on_error(err: str):
            print(f"Error: {err}")

        def subscribe_openai():
            self.client.subscribe_to_queries(
                subscription=QueriesSubscription(
                    channel="openai_requests",
                    on_receive_query_callback=self.handle_openai_query,
                    on_error_callback=on_error,
                ),
                cancel=CancellationToken()
            )

        def subscribe_claude():
            self.client.subscribe_to_queries(
                subscription=QueriesSubscription(
                    channel="claude_requests",
                    on_receive_query_callback=self.handle_claude_query,
                    on_error_callback=on_error,
                ),
                cancel=CancellationToken()
            )

        threading.Thread(target=subscribe_openai).start()
        threading.Thread(target=subscribe_claude).start()

        print("LLM Router running on channels: openai_requests, claude_requests")
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("Shutting down...")

if __name__ == "__main__":
    router = LLMRouter()
    router.run()

说明

  • 初始化 设置KubeMQ客户端。
  • 发送消息 send_message 方法根据所选模型构建适当的通道。 向路由器发送查询消息并等待响应。 处理错误并解码响应体。
  • 用户交互 提示用户输入消息并选择模型。 打印LLM的响应。

通过REST发送和接收

对于偏好或需要RESTful通信的服务或客户端,KubeMQ提供REST端点。

通过REST发送请求

  • 端点
POST http://localhost:9090/send/request
  • 标头
POST http://localhost:9090/send/request
  • 正文
{
   "RequestTypeData": 2,
   "ClientID": "LLMRouter-sender",
   "Channel": "openai_requests",
   "BodyString": "What is the capital of France?",
   "Timeout": 30000
}
  • 负载详情
  • RequestTypeData – 指定请求类型(2表示查询)。
  • ClientID – 发送请求的客户端的标识符。
  • Channel – 对应于LLM模型的通道(openai_requests 或 claude_requests)。
  • BodyString – 要发送到LLM的消息。
  • Timeout – 等待响应的时间(以毫秒为单位)。

接收响应
响应将是一个JSON对象,包含LLM的输出或错误消息。

结论

通过利用消息代理(KubeMQ),我们构建了一个可扩展且高效的路由器,可以与多个LLM接口。这种设置允许客户端无缝地向不同模型发送查询,并且可以扩展以包含更多模型或功能。

这种方法的一些好处是:

  • 简化集成:抽象了直接与不同LLM API接口的复杂性,简化了客户端代码并减少了错误的可能性。
  • 多模型支持:高效地将请求路由到专门用于不同任务的适当模型。
  • 可靠性:即使在LLM繁忙或不可用时也能确保不丢失数据。
  • 冗余:提供故障转移机制以保持不间断的操作。
  • 可扩展性:通过将请求分发到多个LLM实例来处理高流量。
最近发表
标签列表