优秀的编程知识分享平台

网站首页 > 技术文章 正文

python散装笔记——122: Websockets

nanyue 2025-03-25 16:01:00 技术文章 8 ℃

1: 使用 aiohttp 实现简单回声

aiohttp 提供异步 WebSockets。

适用于 Python 3.x 版本 ≥ 3.5

 import asyncio
 from aiohttp import ClientSession
 
 async with ClientSession() as session:
   async def hello_world():
     
     websocket = await session.ws_connect("wss://echo.websocket.org")
     websocket.send_str("Hello, world!")
     print("Received:", (await websocket.receive()).data)
     await websocket.close()
 
   loop = asyncio.get_event_loop()
   loop.run_until_complete(hello_world())

2: 使用 aiohttp 的包装类

aiohttp.ClientSession 可以作为自定义 WebSocket 类的父类。

适用于 Python 3.x 版本 ≥ 3.5

 import asyncio
 from aiohttp import ClientSession
 
 class EchoWebSocket(ClientSession):
   
   URL = "wss://echo.websocket.org"
 
   def __init__(self):
     super().__init__()
     self.websocket = None
 
   async def connect(self):
     """Connect to the WebSocket."""
     self.websocket = await self.ws_connect(self.URL)
 
   async def send(self, message):
     """Send a message to the WebSocket."""
     assert self.websocket is not None, "You must connect first!"
     self.websocket.send_str(message)
     print("Sent:", message)
 
   async def receive(self):
     """Receive one message from the WebSocket."""
     assert self.websocket is not None, "You must connect first!"
     return (await self.websocket.receive()).data
 
   async def read(self):
     """Read messages from the WebSocket."""
     assert self.websocket is not None, "You must connect first!"
     while self.websocket.receive():
       message = await self.receive()
       print("Received:", message)
       if message == "Echo 9!":
         break
 
 async def send(websocket):
   for n in range(10):
     await websocket.send("Echo {}!".format(n))
     await asyncio.sleep(1)
 
 loop = asyncio.get_event_loop()
 
 async with EchoWebSocket() as websocket:
   loop.run_until_complete(websocket.connect())
   
   tasks = (
     send(websocket),
     websocket.read()
   )
 
   loop.run_until_complete(asyncio.wait(tasks))
   loop.close()

3: 使用 Autobahn 作为 WebSocket 工厂

Autobahn 包可用于 Python WebSocket 服务器工厂。

Python Autobahn 包文档

安装通常只需在终端中使用以下命令:

(对于 Linux):

 sudo pip install autobahn

(对于 Windows):

 python -m pip install autobahn

然后,可以在 Python 脚本中创建一个简单的回声服务器:

 from autobahn.asyncio.websocket import WebSocketServerProtocol
 class MyServerProtocol(WebSocketServerProtocol):
     '''创建服务器协议时,用户定义的类继承自 WebSocketServerProtocol,
     需要覆盖 onMessage、onConnect 等事件以实现用户指定的功能,
     这些事件定义了服务器的协议,本质上'''
 
   def onMessage(self,payload,isBinary):
         '''当服务器收到消息时,会调用 onMessage 方法。
         它需要 payload 和布尔值 isBinary 作为参数。payload 是消息的实际内容,
         isBinary 是一个标志,用于告知用户 payload 是否包含二进制数据。
         在此示例中,payload 将按原样返回给发送者。'''
     self.sendMessage(payload,isBinary)
 
 if__name__=='__main__':
   try:
     importasyncio
   except ImportError:
     '''Trollius = 0.3 被重命名'''
     import trollius as asyncio
   from autobahn.asyncio.websocketimportWebSocketServerFactory
   factory=WebSocketServerFactory()
     '''初始化 WebSocket 工厂,并将协议设置为上述定义的协议
     (继承自 autobahn.asyncio.websocket.WebSocketServerProtocol 的类)'''
   factory.protocol=MyServerProtocol
     '''上述代码可以理解为将 MyServerProtocol 类中描述的
     onConnect、onMessage 等方法绑定到服务器,
     设置服务器的功能,即协议'''
   loop=asyncio.get_event_loop()
   coro=loop.create_server(factory,'127.0.0.1',9000)
   server=loop.run_until_complete(coro)
       '''在无限循环中运行服务器'''
   try:
     loop.run_forever()
   except KeyboardInterrupt:
     pass
   finally:
     server.close()
     loop.close()

在此示例中,服务器将在本地主机(127.0.0.1)的 9000 端口上创建。这是监听的 IP 和端口。此信息非常重要,因为使用它,你可以查找计算机的局域网地址,并从调制解调器进行端口转发,通过你拥有的任何路由器将信息转发到计算机。然后,通过使用 Google 查找你的广域网 IP,你可以设计你的网站向你的广域网 IP 发送 WebSocket 消息,端口为 9000(在此示例中)。

重要的是,你需要从调制解调器进行反向端口转发,也就是说,如果你有路由器串联连接到调制解调器,请进入调制解调器的配置设置,从调制解调器向连接的路由器进行端口转发,依此类推,直到连接到计算机的最终路由器将从调制解调器端口 9000(在此示例中)接收到的信息转发到它。

最近发表
标签列表