Asyncio 示例#
所有命令都是协程函数。
连接和断开连接#
使用 asyncio Redis 需要显式断开连接,因为没有 asyncio 解构器魔术方法。默认情况下,连接池在 redis.Redis()
上创建并附加到此 Redis
实例。连接池在调用 Redis.aclose
时自动关闭,这将断开所有连接。
[1]:
import redis.asyncio as redis
client = redis.Redis()
print(f"Ping successful: {await client.ping()}")
await client.aclose()
Ping successful: True
如果您为 Redis
实例创建自定义 ConnectionPool
以单独使用,请使用 from_pool
类方法来创建它。这将导致池与 Redis 实例一起断开连接。断开连接池只是断开池中托管的所有连接。
[ ]:
import redis.asyncio as redis
pool = redis.ConnectionPool.from_url("redis://localhost")
client = redis.Redis.from_pool(pool)
await client.close()
但是,如果您提供一个由多个 Redis
实例共享的 ConnectionPool
,您可能希望显式断开连接池。在这种情况下,请使用 connection_pool
参数。
[2]:
import redis.asyncio as redis
pool = redis.ConnectionPool.from_url("redis://localhost")
client1 = redis.Redis(connection_pool=pool)
client2 = redis.Redis(connection_pool=pool)
await client1.aclose()
await client2.aclose()
await pool.aclose()
默认情况下,此库使用 RESP 协议的版本 2。要启用 RESP 版本 3,您需要将 protocol
设置为 3。
[ ]:
import redis.asyncio as redis
client = redis.Redis(protocol=3)
await client.aclose()
await client.ping()
事务 (Multi/Exec)#
aioredis.Redis.pipeline 将返回一个 aioredis.Pipeline 对象,该对象将在内存中缓冲所有命令,并使用 Redis 批量字符串协议将它们编译成批次。此外,每个命令都将返回 Pipeline 实例,允许您链接您的命令,例如,p.set('foo', 1).set('bar', 2).mget('foo', 'bar')。
在调用并等待 execute() 之前,命令不会反映在 Redis 中。
通常,在执行批量操作时,利用“事务”(例如,Multi/Exec)是可取的,因为它还将为您的批量操作添加一层原子性。
[3]:
import redis.asyncio as redis
r = await redis.from_url("redis://localhost")
async with r.pipeline(transaction=True) as pipe:
ok1, ok2 = await (pipe.set("key1", "value1").set("key2", "value2").execute())
assert ok1
assert ok2
发布/订阅模式#
订阅特定频道
[4]:
import asyncio
import redis.asyncio as redis
STOPWORD = "STOP"
async def reader(channel: redis.client.PubSub):
while True:
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
print(f"(Reader) Message Received: {message}")
if message["data"].decode() == STOPWORD:
print("(Reader) STOP")
break
r = redis.from_url("redis://localhost")
async with r.pubsub() as pubsub:
await pubsub.subscribe("channel:1", "channel:2")
future = asyncio.create_task(reader(pubsub))
await r.publish("channel:1", "Hello")
await r.publish("channel:2", "World")
await r.publish("channel:1", STOPWORD)
await future
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:1', 'data': b'Hello'}
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:2', 'data': b'World'}
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:1', 'data': b'STOP'}
(Reader) STOP
订阅与 glob 风格模式匹配的频道
[5]:
import asyncio
import redis.asyncio as redis
STOPWORD = "STOP"
async def reader(channel: redis.client.PubSub):
while True:
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
print(f"(Reader) Message Received: {message}")
if message["data"].decode() == STOPWORD:
print("(Reader) STOP")
break
r = await redis.from_url("redis://localhost")
async with r.pubsub() as pubsub:
await pubsub.psubscribe("channel:*")
future = asyncio.create_task(reader(pubsub))
await r.publish("channel:1", "Hello")
await r.publish("channel:2", "World")
await r.publish("channel:1", STOPWORD)
await future
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:1', 'data': b'Hello'}
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:2', 'data': b'World'}
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:1', 'data': b'STOP'}
(Reader) STOP
哨兵客户端#
哨兵客户端需要一个 Redis 哨兵地址列表来连接并开始发现服务。
调用 aioredis.sentinel.Sentinel.master_for 或 aioredis.sentinel.Sentinel.slave_for 方法将返回连接到哨兵监控的指定服务的 Redis 客户端。
哨兵客户端将自动检测故障转移并重新连接 Redis 客户端。
[ ]:
import asyncio
from redis.asyncio.sentinel import Sentinel
sentinel = Sentinel([("localhost", 26379), ("sentinel2", 26379)])
r = sentinel.master_for("mymaster")
ok = await r.set("key", "value")
assert ok
val = await r.get("key")
assert val == b"value"
通过指定 URL 方案连接到 Redis 实例。#
参数作为 URL 方案的参数传递给以下方案。
支持三种 URL 方案
redis://
创建一个 TCP 套接字连接。 https://www.iana.org/assignments/uri-schemes/prov/redisrediss://
创建一个 SSL 封装的 TCP 套接字连接。 https://www.iana.org/assignments/uri-schemes/prov/redissunix://
: 创建一个 Unix 域套接字连接。
[ ]:
import redis.asyncio as redis
url_connection = redis.from_url("redis://localhost:6379?decode_responses=True")
url_connection.ping()
True
要启用 RESP 3 协议,请将 protocol=3
附加到 URL。
[ ]:
import redis.asyncio as redis
url_connection = redis.from_url("redis://localhost:6379?decode_responses=True&protocol=3")
url_connection.ping()