高级功能#

关于线程的说明#

Redis 客户端实例可以在线程之间安全共享。在内部,连接实例仅在命令执行期间从连接池中检索,并在执行后直接返回到池中。命令执行不会修改客户端实例上的状态。

但是,有一个需要注意的地方:Redis SELECT 命令。SELECT 命令允许您切换连接当前使用的数据库。该数据库将保持选中状态,直到选择另一个数据库或连接关闭。这会导致一个问题,即连接可能返回到连接到不同数据库的池中。

因此,redis-py 在客户端实例中没有实现 SELECT 命令。如果您在同一个应用程序中使用多个 Redis 数据库,则应为每个数据库创建单独的客户端实例(以及可能单独的连接池)。

在不同线程之间传递 PubSub 或 Pipeline 对象是不安全的。

管道#

默认管道#

管道是 Redis 基类的子类,它提供对将多个命令缓冲到服务器中的单个请求的支持。它们可用于通过减少客户端和服务器之间的 TCP 数据包来回次数来显着提高命令组的性能。

管道使用起来非常简单

>>> r = redis.Redis(...)
>>> r.set('bing', 'baz')
>>> # Use the pipeline() method to create a pipeline instance
>>> pipe = r.pipeline()
>>> # The following SET commands are buffered
>>> pipe.set('foo', 'bar')
>>> pipe.get('bing')
>>> # the EXECUTE call sends all buffered commands to the server, returning
>>> # a list of responses, one for each command.
>>> pipe.execute()
[True, b'baz']

为了方便使用,所有缓冲到管道中的命令都返回管道对象本身。因此,调用可以像这样链接

>>> pipe.set('foo', 'bar').sadd('faz', 'baz').incr('auto_number').execute()
[True, True, 6]

此外,管道还可以确保缓冲的命令作为一组原子地执行。这是默认行为。如果您想禁用管道的原子性,但仍然想缓冲命令,您可以关闭事务。

>>> pipe = r.pipeline(transaction=False)

一个常见的问题是,当需要原子事务,但需要在事务中使用之前从 Redis 中检索值时。例如,假设 INCR 命令不存在,我们需要在 Python 中构建 INCR 的原子版本。

完全幼稚的实现可以获取值,在 Python 中递增它,然后将新值设置回去。但是,这不是原子的,因为多个客户端可能同时执行此操作,每个客户端都从 GET 获取相同的值。

输入 WATCH 命令。WATCH 提供在开始事务之前监视一个或多个键的能力。如果在执行该事务之前任何这些键发生更改,则整个事务将被取消,并且会引发 WatchError。为了实现我们自己的客户端 INCR 命令,我们可以执行以下操作

>>> with r.pipeline() as pipe:
...     while True:
...         try:
...             # put a WATCH on the key that holds our sequence value
...             pipe.watch('OUR-SEQUENCE-KEY')
...             # after WATCHing, the pipeline is put into immediate execution
...             # mode until we tell it to start buffering commands again.
...             # this allows us to get the current value of our sequence
...             current_value = pipe.get('OUR-SEQUENCE-KEY')
...             next_value = int(current_value) + 1
...             # now we can put the pipeline back into buffered mode with MULTI
...             pipe.multi()
...             pipe.set('OUR-SEQUENCE-KEY', next_value)
...             # and finally, execute the pipeline (the set command)
...             pipe.execute()
...             # if a WatchError wasn't raised during execution, everything
...             # we just did happened atomically.
...             break
...        except WatchError:
...             # another client must have changed 'OUR-SEQUENCE-KEY' between
...             # the time we started WATCHing it and the pipeline's execution.
...             # our best bet is to just retry.
...             continue

请注意,由于 Pipeline 必须在 WATCH 期间绑定到单个连接,因此必须注意通过调用 reset() 方法将连接返回到连接池。如果 Pipeline 用作上下文管理器(如上面的示例所示),则会自动调用 reset()。当然,您可以通过显式调用 reset() 来手动执行此操作

>>> pipe = r.pipeline()
>>> while True:
...     try:
...         pipe.watch('OUR-SEQUENCE-KEY')
...         ...
...         pipe.execute()
...         break
...     except WatchError:
...         continue
...     finally:
...         pipe.reset()

名为“transaction”的便捷方法存在于处理处理和重试监视错误的所有样板代码。它接受一个可调用对象,该可调用对象应期望一个参数,一个管道对象,以及要监视的任意数量的键。我们上面的客户端 INCR 命令可以这样编写,这更容易阅读

>>> def client_side_incr(pipe):
...     current_value = pipe.get('OUR-SEQUENCE-KEY')
...     next_value = int(current_value) + 1
...     pipe.multi()
...     pipe.set('OUR-SEQUENCE-KEY', next_value)
>>>
>>> r.transaction(client_side_incr, 'OUR-SEQUENCE-KEY')
[True]

请确保在传递给 Redis.transaction 的可调用对象中调用 pipe.multi(),然后再执行任何写入命令。

集群中的管道#

ClusterPipeline 是 RedisCluster 的一个子类,它为集群模式下的 Redis 管道提供支持。当调用 execute() 命令时,所有命令都会根据它们将在其上执行的节点进行分组,然后由各个节点并行执行。管道实例将等待所有节点响应,然后再将结果返回给调用者。命令响应以列表形式返回,排序顺序与发送顺序相同。管道可用于显着提高 Redis 集群的吞吐量,方法是显着减少客户端和服务器之间的网络往返次数。

>>> with rc.pipeline() as pipe:
...     pipe.set('foo', 'value1')
...     pipe.set('bar', 'value2')
...     pipe.get('foo')
...     pipe.get('bar')
...     print(pipe.execute())
[True, True, b'value1', b'value2']
...     pipe.set('foo1', 'bar1').get('foo1').execute()
[True, b'bar1']

请注意:- RedisCluster 管道目前仅支持基于键的命令。- 管道从集群的参数中获取其“read_from_replicas”值。因此,如果在集群实例中启用了从副本读取,则管道也将将读取命令定向到副本。- 集群模式下不支持“transaction”选项。在非集群模式下,执行管道时可以使用“transaction”选项。这会将管道命令包装在 MULTI/EXEC 命令中,并有效地将管道命令转换为单个事务块。这意味着所有命令都按顺序执行,不会受到其他客户端的任何干扰。但是,在集群模式下,这是不可能的,因为命令根据其各自的目标节点进行分区。这意味着我们无法将管道命令转换为一个事务块,因为在大多数情况下,它们被拆分为几个较小的管道。

发布/订阅#

redis-py 包含一个 PubSub 对象,它订阅频道并监听新消息。创建 PubSub 对象很容易。

>>> r = redis.Redis(...)
>>> p = r.pubsub()

创建 PubSub 实例后,可以订阅频道和模式。

>>> p.subscribe('my-first-channel', 'my-second-channel', ...)
>>> p.psubscribe('my-*', ...)

现在 PubSub 实例已订阅这些频道/模式。可以通过从 PubSub 实例读取消息来查看订阅确认。

>>> p.get_message()
{'pattern': None, 'type': 'subscribe', 'channel': b'my-second-channel', 'data': 1}
>>> p.get_message()
{'pattern': None, 'type': 'subscribe', 'channel': b'my-first-channel', 'data': 2}
>>> p.get_message()
{'pattern': None, 'type': 'psubscribe', 'channel': b'my-*', 'data': 3}

从 PubSub 实例读取的每条消息都将是一个包含以下键的字典。

  • 类型: 以下之一:‘subscribe’,‘unsubscribe’,‘psubscribe’,‘punsubscribe’,‘message’,‘pmessage’

  • 频道: 订阅或取消订阅的频道,或发布消息的频道

  • 模式: 与已发布消息的频道匹配的模式。除‘pmessage’类型外,在所有情况下都将为 None。

  • 数据: 消息数据。对于 [un]subscribe 消息,此值将是连接当前订阅的频道和模式的数量。对于 [p]message 消息,此值将是实际发布的消息。

现在让我们发送一条消息。

# the publish method returns the number matching channel and pattern
# subscriptions. 'my-first-channel' matches both the 'my-first-channel'
# subscription and the 'my-*' pattern subscription, so this message will
# be delivered to 2 channels/patterns
>>> r.publish('my-first-channel', 'some data')
2
>>> p.get_message()
{'channel': b'my-first-channel', 'data': b'some data', 'pattern': None, 'type': 'message'}
>>> p.get_message()
{'channel': b'my-first-channel', 'data': b'some data', 'pattern': b'my-*', 'type': 'pmessage'}

取消订阅的工作方式与订阅相同。如果未向 [p]unsubscribe 传递任何参数,则将取消订阅所有频道或模式。

>>> p.unsubscribe()
>>> p.punsubscribe('my-*')
>>> p.get_message()
{'channel': b'my-second-channel', 'data': 2, 'pattern': None, 'type': 'unsubscribe'}
>>> p.get_message()
{'channel': b'my-first-channel', 'data': 1, 'pattern': None, 'type': 'unsubscribe'}
>>> p.get_message()
{'channel': b'my-*', 'data': 0, 'pattern': None, 'type': 'punsubscribe'}

redis-py 还允许您注册回调函数来处理已发布的消息。消息处理程序接受一个参数,即消息,它是一个与上述示例相同的字典。要使用消息处理程序订阅频道或模式,请将频道或模式名称作为关键字参数传递,其值为回调函数。

当使用消息处理程序在频道或模式上读取消息时,将创建消息字典并将其传递给消息处理程序。在这种情况下,get_message() 返回 None 值,因为消息已处理。

>>> def my_handler(message):
...     print('MY HANDLER: ', message['data'])
>>> p.subscribe(**{'my-channel': my_handler})
# read the subscribe confirmation message
>>> p.get_message()
{'pattern': None, 'type': 'subscribe', 'channel': b'my-channel', 'data': 1}
>>> r.publish('my-channel', 'awesome data')
1
# for the message handler to work, we need tell the instance to read data.
# this can be done in several ways (read more below). we'll just use
# the familiar get_message() function for now
>>> message = p.get_message()
MY HANDLER:  awesome data
# note here that the my_handler callback printed the string above.
# `message` is None because the message was handled by our handler.
>>> print(message)
None

如果您的应用程序对(有时很吵闹的)订阅/取消订阅确认消息不感兴趣,您可以通过将 ignore_subscribe_messages=True 传递给 r.pubsub() 来忽略它们。这将导致所有订阅/取消订阅消息都被读取,但它们不会冒泡到您的应用程序。

>>> p = r.pubsub(ignore_subscribe_messages=True)
>>> p.subscribe('my-channel')
>>> p.get_message()  # hides the subscribe message and returns None
>>> r.publish('my-channel', 'my data')
1
>>> p.get_message()
{'channel': b'my-channel', 'data': b'my data', 'pattern': None, 'type': 'message'}

有三种不同的策略可以读取消息。

上面的示例一直在使用 pubsub.get_message()。在幕后,get_message() 使用系统的‘select’模块来快速轮询连接的套接字。如果有可供读取的数据,get_message() 将读取它,格式化消息并将其返回或传递给消息处理程序。如果没有可供读取的数据,get_message() 将立即返回 None。这使得它很容易集成到应用程序中的现有事件循环中。

>>> while True:
>>>     message = p.get_message()
>>>     if message:
>>>         # do something with the message
>>>     time.sleep(0.001)  # be nice to the system :)

旧版本的 redis-py 仅使用 pubsub.listen() 读取消息。listen() 是一个生成器,它会阻塞,直到有消息可用。如果您的应用程序除了接收和处理从 redis 收到的消息之外什么都不需要做,listen() 是一种简单易用的方法。

>>> for message in p.listen():
...     # do something with the message

第三种选择是在单独的线程中运行事件循环。pubsub.run_in_thread() 创建一个新线程并启动事件循环。线程对象将返回给 [un_in_thread() 的调用者。调用者可以使用 thread.stop() 方法来关闭事件循环和线程。在幕后,这只是一个围绕 get_message() 的包装器,它在单独的线程中运行,本质上为您创建了一个微小的非阻塞事件循环。run_in_thread() 接受一个可选的 sleep_time 参数。如果指定,事件循环将在循环的每次迭代中使用该值调用 time.sleep()。

注意:由于我们在单独的线程中运行,因此无法处理未通过注册的消息处理程序自动处理的消息。因此,如果您订阅了没有附加消息处理程序的模式或频道,redis-py 会阻止您调用 run_in_thread()。

>>> p.subscribe(**{'my-channel': my_handler})
>>> thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
>>> thread.stop()

run_in_thread 还支持可选的异常处理程序,它允许您捕获工作线程中发生的异常并适当地处理它们。异常处理程序将以异常本身、pubsub 对象和 run_in_thread 返回的工作线程作为参数。

>>> p.subscribe(**{'my-channel': my_handler})
>>> def exception_handler(ex, pubsub, thread):
>>>     print(ex)
>>>     thread.stop()
>>>     thread.join(timeout=1.0)
>>>     pubsub.close()
>>> thread = p.run_in_thread(exception_handler=exception_handler)

PubSub 对象遵循与其创建的客户端实例相同的编码语义。任何 Unicode 频道或模式将在发送到 Redis 之前使用客户端上指定的字符集进行编码。如果客户端的 decode_responses 标志设置为 False(默认值),则消息字典中的“channel”、“pattern”和“data”值将是字节字符串(Python 2 上的 str,Python 3 上的 bytes)。如果客户端的 decode_responses 为 True,则“channel”、“pattern”和“data”值将使用客户端的字符集自动解码为 Unicode 字符串。

PubSub 对象会记住它们订阅的频道和模式。如果发生断开连接(例如网络错误或超时),PubSub 对象将在重新连接时重新订阅所有先前的频道和模式。在客户端断开连接期间发布的消息无法传递。完成 PubSub 对象后,请调用其 .close() 方法以关闭连接。

>>> p = r.pubsub()
>>> ...
>>> p.close()

PUBSUB 子命令集 CHANNELS、NUMSUB 和 NUMPAT 也受支持。

>>> r.pubsub_channels()
[b'foo', b'bar']
>>> r.pubsub_numsub('foo', 'bar')
[(b'foo', 9001), (b'bar', 42)]
>>> r.pubsub_numsub('baz')
[(b'baz', 0)]
>>> r.pubsub_numpat()
1204

分片发布订阅#

分片发布订阅 是 Redis 7.0 中引入的功能,从 5.0 版本开始,redis-py 完全支持该功能。它有助于在集群模式下扩展发布/订阅的使用,方法是让集群将消息分片到拥有分片频道插槽的节点。在这里,集群确保已发布的分片消息被转发到相应的节点。客户端通过连接到负责插槽的主节点或其任何副本来订阅频道。

这利用了 Redis 中的 SSUBSCRIBESPUBLISH 命令。

以下是简化示例

>>> from redis.cluster import RedisCluster, ClusterNode
>>> r = RedisCluster(startup_nodes=[ClusterNode('localhost', 6379), ClusterNode('localhost', 6380)])
>>> p = r.pubsub()
>>> p.ssubscribe('foo')
>>> # assume someone sends a message along the channel via a publish
>>> message = p.get_sharded_message()

同样,可以使用相同的流程来获取已发送到特定节点的碎片化发布/订阅消息,方法是将节点传递给 get_sharded_message

>>> from redis.cluster import RedisCluster, ClusterNode
>>> first_node = ClusterNode['localhost', 6379]
>>> second_node = ClusterNode['localhost', 6380]
>>> r = RedisCluster(startup_nodes=[first_node, second_node])
>>> p = r.pubsub()
>>> p.ssubscribe('foo')
>>> # assume someone sends a message along the channel via a publish
>>> message = p.get_sharded_message(target_node=second_node)

监控#

redis-py 包含一个 Monitor 对象,它会流式传输 Redis 服务器处理的每个命令。使用 Monitor 对象上的 listen() 来阻塞,直到收到命令。

>>> r = redis.Redis(...)
>>> with r.monitor() as m:
>>>     for command in m.listen():
>>>         print(command)