基于asyncio实现 Redis 的异步操作 —— 包括哈希数据写入 / 读取、发布订阅(Pub/Sub),同时演示了串行 / 并行两种消息发布方式,最终让订阅者持续监听频道消息,阻止进程退出。
启动 Redis 容器
启动 Redis 容器、
docker run -d --name redis -p 6379:6379 redis
安装 Python 依赖,为后续异步操作 Redis 做环境准备;
pip install asyncio
pip install redis
pip install asyncio:安装 Python 异步 I/O 框架(注:Python 3.7 + 已内置asyncio,此命令实际是冗余的,仅为显式声明依赖); pip install redis:安装 Redis 的 Python 客户端库(最新版redis库已内置异步支持,即aioredis); 核心目的:安装操作 Redis 和实现异步编程所需的依赖包。
Redis操作详解
Redis 哈希操作:
hset:向user:001哈希中写入键值对(mapping参数接收字典); hgetall:读取哈希中所有键值对,默认返回bytes类型(如需字符串,可在创建 Redis 客户端时加decode_responses=True);
串行发布:
await publisher(...) 会等待前一个发布任务完成后,再执行下一个,总耗时是各任务耗时之和; 并行发布(原代码有 Bug): asyncio.create_task(publisher(*args)):创建异步任务(立即启动,不阻塞); *[task1, task2, task3]:解包列表,将多个任务作为独立参数传入gather; asyncio.gather(...) 前漏加await,导致并行任务被等待,不加程序会直接跳过,不等待这些任务可能执行结束;
订阅者任务:
asyncio.create_task(subscriber(...)):将订阅者封装为后台任务,立即启动监听; await subscriber_task:等待订阅者任务结束(但subscriber是无限循环,因此进程会一直运行,持续监听频道消息)。
完整示例
import asyncio
import datetime
from redis import asyncio as aioredis
async def publisher(redis, channel, message, timer = 0.1):
if timer > 0:
await asyncio.sleep(timer)
print(f"Current date time: {datetime.datetime.now()}")
await redis.publish(channel, message)
print(f"Message sent: {message}")
async def subscriber(redis, channel):
pubsub = redis.pubsub()
await pubsub.subscribe(channel)
async for message in pubsub.listen():
# if message['type'] == 'message':
print(f"Message received: {message}")
async def main():
redis = aioredis.Redis(host='localhost', port=6379)
# 写入用户001
await redis.hset("user:001", mapping={ "id": "001", "name": "typescript" })
user = await redis.hgetall("user:001")
print(user)
channel="test channel"
subscriber_task = asyncio.create_task(subscriber(redis, channel))
# 调用方法一: 串行执行
await publisher(redis, channel, "Hello world", 2) #先等待再发送
await publisher(redis, channel, "Hello redis", 0) #立即发送
await publisher(redis, channel, "Hello python")
# 调用方法二: 并行执行
publish_tasks = [
(redis, channel, "gather: Hello world", 10),
(redis, channel, "gather: Hello redis", 0),
(redis, channel, "gather: Hello python")
]
# 并行执行,不区分先后顺序, 用*把列表拆成独立参数
asyncio.gather(*[ asyncio.create_task(publisher(*args)) for args in publish_tasks ])
# 写入用户002
await redis.hset("user:002", mapping={ "id": "002", "name": "python" })
user = await redis.hgetall("user:002")
print(user)
# 阻止进程退出,一直监听
await subscriber_task
asyncio.run(main())
