Redis 流示例#

基本配置#

[1]:
redis_host = "redis"
stream_key = "skey"
stream2_key = "s2key"
group1 = "grp1"
group2 = "grp2"

连接#

[2]:
import redis
from time import time
from redis.exceptions import ConnectionError, DataError, NoScriptError, RedisError, ResponseError

r = redis.Redis( redis_host )
r.ping()
[2]:
True

xadd 和 xread#

向流中添加一些数据#

[3]:
for i in range(0,10):
    r.xadd( stream_key, { 'ts': time(), 'v': i } )
print( f"stream length: {r.xlen( stream_key )}")
stream length: 10

从流中读取一些数据#

[4]:
## read 2 entries from stream_key
l = r.xread( count=2, streams={stream_key:0} )
print(l)
[[b'skey', [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'}), (b'1657571033117-0', {b'ts': b'1657571033.1176307', b'v': b'1'})]]]

从返回的结构中提取数据#

[5]:
first_stream = l[0]
print( f"got data from stream: {first_stream[0]}")
fs_data = first_stream[1]
for id, value in fs_data:
    print( f"id: {id} value: {value[b'v']}")
got data from stream: b'skey'
id: b'1657571033115-0' value: b'0'
id: b'1657571033117-0' value: b'1'

从流中读取更多数据#

如果我们使用相同的参数调用 xread,我们将获得相同的数据

[6]:
l = r.xread( count=2, streams={stream_key:0} )
for id, value in l[0][1]:
    print( f"id: {id} value: {value[b'v']}")
id: b'1657571033115-0' value: b'0'
id: b'1657571033117-0' value: b'1'

为了获得新数据,我们需要更改传递给调用的键

[7]:
last_id_returned = l[0][1][-1][0]
l = r.xread( count=2, streams={stream_key: last_id_returned} )
for id, value in l[0][1]:
    print( f"id: {id} value: {value[b'v']}")
id: b'1657571033118-0' value: b'2'
id: b'1657571033119-0' value: b'3'
[8]:
last_id_returned = l[0][1][-1][0]
l = r.xread( count=2, streams={stream_key: last_id_returned} )
for id, value in l[0][1]:
    print( f"id: {id} value: {value[b'v']}")
id: b'1657571033119-1' value: b'4'
id: b'1657571033121-0' value: b'5'

只获取较新的条目

[9]:
print( f"stream length: {r.xlen( stream_key )}")
# wait for 5s for new messages
l = r.xread( count=1, block=5000, streams={stream_key: '$'} )
print( f"after 5s block, got an empty list {l}, no *new* messages on the stream")
print( f"stream length: {r.xlen( stream_key )}")
stream length: 10
after 5s block, got an empty list [], no *new* messages on the stream
stream length: 10

第二个流#

向第二个流添加一些消息

[10]:
for i in range(1000,1010):
    r.xadd( stream2_key, { 'v': i } )
print( f"stream length: {r.xlen( stream2_key )}")
stream length: 10

从两个流中获取消息

[11]:
l = r.xread( count=1, streams={stream_key:0,stream2_key:0} )
for k,d in l:
    print(f"got from {k} the entry {d}")
got from b'skey' the entry [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'})]
got from b's2key' the entry [(b'1657571042111-0', {b'v': b'1000'})]

流组#

使用组可以跟踪多个消费者,并在 Redis 端跟踪哪些消息已经被消费。 ## 向流添加一些数据 创建两个包含 10 条消息的流。

[12]:
def add_some_data_to_stream( sname, key_range ):
    for i in key_range:
        r.xadd( sname, { 'ts': time(), 'v': i } )
    print( f"stream '{sname}' length: {r.xlen( stream_key )}")

add_some_data_to_stream( stream_key, range(0,10) )
add_some_data_to_stream( stream2_key, range(1000,1010) )
stream 'skey' length: 20
stream 's2key' length: 20

使用组从流中读取数据#

  • 创建一个名为 grp1 的组,该组与流 skey 相关联,并且

  • 创建一个名为 grp2 的组,该组与流 skeys2key 相关联

使用 xinfo_group 验证组创建的结果。

[13]:
## create the group
def create_group( skey, gname ):
    try:
        r.xgroup_create( name=skey, groupname=gname, id=0 )
    except ResponseError as e:
        print(f"raised: {e}")

# group1 read the stream 'skey'
create_group( stream_key, group1 )
# group2 read the streams 'skey' and 's2key'
create_group( stream_key, group2 )
create_group( stream2_key, group2 )

def group_info( skey ):
    res = r.xinfo_groups( name=skey )
    for i in res:
        print( f"{skey} -> group name: {i['name']} with {i['consumers']} consumers and {i['last-delivered-id']}"
              + f" as last read id")

group_info( stream_key )
group_info( stream2_key )
skey -> group name: b'grp1' with 0 consumers and b'0-0' as last read id
skey -> group name: b'grp2' with 0 consumers and b'0-0' as last read id
s2key -> group name: b'grp2' with 0 consumers and b'0-0' as last read id

组读取#

方法 xreadgroup 允许从流组中读取数据。

[14]:
def print_xreadgroup_reply( reply, group = None, run = None):
    for d_stream in reply:
        for element in d_stream[1]:
            print(  f"got element {element[0]}"
                  + f"from stream {d_stream[0]}" )
            if run is not None:
                run( d_stream[0], group, element[0] )
[15]:
# read some messages on group1 with consumer 'c'
d = r.xreadgroup( groupname=group1, consumername='c', block=10,
                  count=2, streams={stream_key:'>'})
print_xreadgroup_reply( d )
got element b'1657571033115-0'from stream b'skey'
got element b'1657571033117-0'from stream b'skey'

同一个流组的 **第二个消费者** 将获得未传递的消息。

[16]:
# read some messages on group1 with consumer 'c'
d = r.xreadgroup( groupname=group1, consumername='c2', block=10,
                  count=2, streams={stream_key:'>'})
print_xreadgroup_reply( d )
got element b'1657571033118-0'from stream b'skey'
got element b'1657571033119-0'from stream b'skey'

但是,**第二个流组** 可以再次读取已经传递的消息。

请注意,第二个流组也包含第二个流。这可以在回复中识别(回复列表的第一个元素)。

[18]:
d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,
                   count=2, streams={stream_key:'>',stream2_key:'>'})
print_xreadgroup_reply( d2 )
got element b'1657571033115-0'from stream b'skey'
got element b'1657571033117-0'from stream b'skey'
got element b'1657571042111-0'from stream b's2key'
got element b'1657571042113-0'from stream b's2key'

要检查待处理的消息(已传递但未确认的消息),可以使用 xpending

[19]:
# check pending status (read messages without a ack)
def print_pending_info( key_group ):
    for s,k in key_group:
        pr = r.xpending( name=s, groupname=k )
        print( f"{pr.get('pending')} pending messages on '{s}' for group '{k}'" )

print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )
4 pending messages on 'skey' for group 'grp1'
2 pending messages on 'skey' for group 'grp2'
2 pending messages on 's2key' for group 'grp2'

确认#

使用 xack 确认一些消息。

[20]:
# do acknowledges for group1
toack = lambda k,g,e: r.xack( k,g, e )
print_xreadgroup_reply( d, group=group1, run=toack )
got element b'1657571033118-0'from stream b'skey'
got element b'1657571033119-0'from stream b'skey'
[21]:
# check pending again
print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )
2 pending messages on 'skey' for group 'grp1'
2 pending messages on 'skey' for group 'grp2'
2 pending messages on 's2key' for group 'grp2'

确认 group1 上的所有消息。

[22]:
d = r.xreadgroup( groupname=group1, consumername='c', block=10,
                      count=100, streams={stream_key:'>'})
print_xreadgroup_reply( d, group=group1, run=toack)
print_pending_info( ((stream_key,group1),) )
got element b'1657571033119-1'from stream b'skey'
got element b'1657571033121-0'from stream b'skey'
got element b'1657571033121-1'from stream b'skey'
got element b'1657571033121-2'from stream b'skey'
got element b'1657571033122-0'from stream b'skey'
got element b'1657571033122-1'from stream b'skey'
got element b'1657571049557-0'from stream b'skey'
got element b'1657571049557-1'from stream b'skey'
got element b'1657571049558-0'from stream b'skey'
got element b'1657571049559-0'from stream b'skey'
got element b'1657571049559-1'from stream b'skey'
got element b'1657571049559-2'from stream b'skey'
got element b'1657571049560-0'from stream b'skey'
got element b'1657571049562-0'from stream b'skey'
got element b'1657571049563-0'from stream b'skey'
got element b'1657571049563-1'from stream b'skey'
2 pending messages on 'skey' for group 'grp1'

但是,在所有 group1 上的消息都完成 xack 后,流的长度将保持不变。

[24]:
r.xlen(stream_key)
[24]:
20

删除所有#

要删除消息,需要使用 xdel 显式删除它们。

[25]:
s1 = r.xread( streams={stream_key:0} )
for streams in s1:
    stream_name, messages = streams
    # del all ids from the message list
    [ r.xdel( stream_name, i[0] ) for i in messages ]

流长度

[26]:
r.xlen(stream_key)
[26]:
0

但是使用 xdel,第二个组可以从 skey 中读取任何未处理的消息。

[27]:
d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,
                   count=2, streams={stream_key:'>',stream2_key:'>'})
print_xreadgroup_reply( d2 )
got element b'1657571042113-1'from stream b's2key'
got element b'1657571042114-0'from stream b's2key'
[ ]: