Asyncio 示例#

所有命令都是协程函数。

连接和断开连接#

使用 asyncio Redis 需要显式断开连接,因为没有 asyncio 解构器魔术方法。默认情况下,连接池在 redis.Redis() 上创建并附加到此 Redis 实例。连接池在调用 Redis.aclose 时自动关闭,这将断开所有连接。

[1]:
import redis.asyncio as redis

client = redis.Redis()
print(f"Ping successful: {await client.ping()}")
await client.aclose()
Ping successful: True

如果您为 Redis 实例创建自定义 ConnectionPool 以单独使用,请使用 from_pool 类方法来创建它。这将导致池与 Redis 实例一起断开连接。断开连接池只是断开池中托管的所有连接。

[ ]:
import redis.asyncio as redis

pool = redis.ConnectionPool.from_url("redis://127.0.0.1")
client = redis.Redis.from_pool(pool)
await client.close()

但是,如果您提供一个由多个 Redis 实例共享的 ConnectionPool,您可能希望显式断开连接池。在这种情况下,请使用 connection_pool 参数。

[2]:
import redis.asyncio as redis

pool = redis.ConnectionPool.from_url("redis://127.0.0.1")
client1 = redis.Redis(connection_pool=pool)
client2 = redis.Redis(connection_pool=pool)
await client1.aclose()
await client2.aclose()
await pool.aclose()

默认情况下,此库使用 RESP 协议的版本 2。要启用 RESP 版本 3,您需要将 protocol 设置为 3。

[ ]:
import redis.asyncio as redis

client = redis.Redis(protocol=3)
await client.aclose()
await client.ping()

事务 (Multi/Exec)#

aioredis.Redis.pipeline 将返回一个 aioredis.Pipeline 对象,该对象将在内存中缓冲所有命令,并使用 Redis 批量字符串协议将它们编译成批次。此外,每个命令都将返回 Pipeline 实例,允许您链接您的命令,例如,p.set('foo', 1).set('bar', 2).mget('foo', 'bar')。

在调用并等待 execute() 之前,命令不会反映在 Redis 中。

通常,在执行批量操作时,利用“事务”(例如,Multi/Exec)是可取的,因为它还将为您的批量操作添加一层原子性。

[3]:
import redis.asyncio as redis

r = await redis.from_url("redis://127.0.0.1")
async with r.pipeline(transaction=True) as pipe:
    ok1, ok2 = await (pipe.set("key1", "value1").set("key2", "value2").execute())
assert ok1
assert ok2

发布/订阅模式#

订阅特定频道

[4]:
import asyncio

import redis.asyncio as redis

STOPWORD = "STOP"


async def reader(channel: redis.client.PubSub):
    while True:
        message = await channel.get_message(ignore_subscribe_messages=True)
        if message is not None:
            print(f"(Reader) Message Received: {message}")
            if message["data"].decode() == STOPWORD:
                print("(Reader) STOP")
                break

r = redis.from_url("redis://127.0.0.1")
async with r.pubsub() as pubsub:
    await pubsub.subscribe("channel:1", "channel:2")

    future = asyncio.create_task(reader(pubsub))

    await r.publish("channel:1", "Hello")
    await r.publish("channel:2", "World")
    await r.publish("channel:1", STOPWORD)

    await future
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:1', 'data': b'Hello'}
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:2', 'data': b'World'}
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:1', 'data': b'STOP'}
(Reader) STOP

订阅与 glob 风格模式匹配的频道

[5]:
import asyncio

import redis.asyncio as redis

STOPWORD = "STOP"


async def reader(channel: redis.client.PubSub):
    while True:
        message = await channel.get_message(ignore_subscribe_messages=True)
        if message is not None:
            print(f"(Reader) Message Received: {message}")
            if message["data"].decode() == STOPWORD:
                print("(Reader) STOP")
                break


r = await redis.from_url("redis://127.0.0.1")
async with r.pubsub() as pubsub:
    await pubsub.psubscribe("channel:*")

    future = asyncio.create_task(reader(pubsub))

    await r.publish("channel:1", "Hello")
    await r.publish("channel:2", "World")
    await r.publish("channel:1", STOPWORD)

    await future
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:1', 'data': b'Hello'}
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:2', 'data': b'World'}
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:1', 'data': b'STOP'}
(Reader) STOP

哨兵客户端#

哨兵客户端需要一个 Redis 哨兵地址列表来连接并开始发现服务。

调用 aioredis.sentinel.Sentinel.master_for 或 aioredis.sentinel.Sentinel.slave_for 方法将返回连接到哨兵监控的指定服务的 Redis 客户端。

哨兵客户端将自动检测故障转移并重新连接 Redis 客户端。

[ ]:
import asyncio

from redis.asyncio.sentinel import Sentinel


sentinel = Sentinel([("localhost", 26379), ("sentinel2", 26379)])
r = sentinel.master_for("mymaster")

ok = await r.set("key", "value")
assert ok
val = await r.get("key")
assert val == b"value"

通过指定 URL 方案连接到 Redis 实例。#

参数作为 URL 方案的参数传递给以下方案。

支持三种 URL 方案

[ ]:
import redis.asyncio as redis
url_connection = redis.from_url("redis://127.0.0.1:6379?decode_responses=True")
url_connection.ping()
True

要启用 RESP 3 协议,请将 protocol=3 附加到 URL。

[ ]:
import redis.asyncio as redis

url_connection = redis.from_url("redis://127.0.0.1:6379?decode_responses=True&protocol=3")
url_connection.ping()