Redis 流示例#
基本配置#
[1]:
redis_host = "redis"
stream_key = "skey"
stream2_key = "s2key"
group1 = "grp1"
group2 = "grp2"
连接#
[2]:
import redis
from time import time
from redis.exceptions import ConnectionError, DataError, NoScriptError, RedisError, ResponseError
r = redis.Redis( redis_host )
r.ping()
[2]:
True
xadd 和 xread#
向流中添加一些数据#
[3]:
for i in range(0,10):
r.xadd( stream_key, { 'ts': time(), 'v': i } )
print( f"stream length: {r.xlen( stream_key )}")
stream length: 10
从流中读取一些数据#
[4]:
## read 2 entries from stream_key
l = r.xread( count=2, streams={stream_key:0} )
print(l)
[[b'skey', [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'}), (b'1657571033117-0', {b'ts': b'1657571033.1176307', b'v': b'1'})]]]
从返回的结构中提取数据#
[5]:
first_stream = l[0]
print( f"got data from stream: {first_stream[0]}")
fs_data = first_stream[1]
for id, value in fs_data:
print( f"id: {id} value: {value[b'v']}")
got data from stream: b'skey'
id: b'1657571033115-0' value: b'0'
id: b'1657571033117-0' value: b'1'
从流中读取更多数据#
如果我们使用相同的参数调用 xread
,我们将获得相同的数据
[6]:
l = r.xread( count=2, streams={stream_key:0} )
for id, value in l[0][1]:
print( f"id: {id} value: {value[b'v']}")
id: b'1657571033115-0' value: b'0'
id: b'1657571033117-0' value: b'1'
为了获得新数据,我们需要更改传递给调用的键
[7]:
last_id_returned = l[0][1][-1][0]
l = r.xread( count=2, streams={stream_key: last_id_returned} )
for id, value in l[0][1]:
print( f"id: {id} value: {value[b'v']}")
id: b'1657571033118-0' value: b'2'
id: b'1657571033119-0' value: b'3'
[8]:
last_id_returned = l[0][1][-1][0]
l = r.xread( count=2, streams={stream_key: last_id_returned} )
for id, value in l[0][1]:
print( f"id: {id} value: {value[b'v']}")
id: b'1657571033119-1' value: b'4'
id: b'1657571033121-0' value: b'5'
只获取较新的条目
[9]:
print( f"stream length: {r.xlen( stream_key )}")
# wait for 5s for new messages
l = r.xread( count=1, block=5000, streams={stream_key: '$'} )
print( f"after 5s block, got an empty list {l}, no *new* messages on the stream")
print( f"stream length: {r.xlen( stream_key )}")
stream length: 10
after 5s block, got an empty list [], no *new* messages on the stream
stream length: 10
第二个流#
向第二个流添加一些消息
[10]:
for i in range(1000,1010):
r.xadd( stream2_key, { 'v': i } )
print( f"stream length: {r.xlen( stream2_key )}")
stream length: 10
从两个流中获取消息
[11]:
l = r.xread( count=1, streams={stream_key:0,stream2_key:0} )
for k,d in l:
print(f"got from {k} the entry {d}")
got from b'skey' the entry [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'})]
got from b's2key' the entry [(b'1657571042111-0', {b'v': b'1000'})]
流组#
使用组可以跟踪多个消费者,并在 Redis 端跟踪哪些消息已经被消费。 ## 向流添加一些数据 创建两个包含 10 条消息的流。
[12]:
def add_some_data_to_stream( sname, key_range ):
for i in key_range:
r.xadd( sname, { 'ts': time(), 'v': i } )
print( f"stream '{sname}' length: {r.xlen( stream_key )}")
add_some_data_to_stream( stream_key, range(0,10) )
add_some_data_to_stream( stream2_key, range(1000,1010) )
stream 'skey' length: 20
stream 's2key' length: 20
使用组从流中读取数据#
创建一个名为
grp1
的组,该组与流skey
相关联,并且创建一个名为
grp2
的组,该组与流skey
和s2key
相关联
使用 xinfo_group
验证组创建的结果。
[13]:
## create the group
def create_group( skey, gname ):
try:
r.xgroup_create( name=skey, groupname=gname, id=0 )
except ResponseError as e:
print(f"raised: {e}")
# group1 read the stream 'skey'
create_group( stream_key, group1 )
# group2 read the streams 'skey' and 's2key'
create_group( stream_key, group2 )
create_group( stream2_key, group2 )
def group_info( skey ):
res = r.xinfo_groups( name=skey )
for i in res:
print( f"{skey} -> group name: {i['name']} with {i['consumers']} consumers and {i['last-delivered-id']}"
+ f" as last read id")
group_info( stream_key )
group_info( stream2_key )
skey -> group name: b'grp1' with 0 consumers and b'0-0' as last read id
skey -> group name: b'grp2' with 0 consumers and b'0-0' as last read id
s2key -> group name: b'grp2' with 0 consumers and b'0-0' as last read id
组读取#
方法 xreadgroup
允许从流组中读取数据。
[14]:
def print_xreadgroup_reply( reply, group = None, run = None):
for d_stream in reply:
for element in d_stream[1]:
print( f"got element {element[0]}"
+ f"from stream {d_stream[0]}" )
if run is not None:
run( d_stream[0], group, element[0] )
[15]:
# read some messages on group1 with consumer 'c'
d = r.xreadgroup( groupname=group1, consumername='c', block=10,
count=2, streams={stream_key:'>'})
print_xreadgroup_reply( d )
got element b'1657571033115-0'from stream b'skey'
got element b'1657571033117-0'from stream b'skey'
同一个流组的 **第二个消费者** 将获得未传递的消息。
[16]:
# read some messages on group1 with consumer 'c'
d = r.xreadgroup( groupname=group1, consumername='c2', block=10,
count=2, streams={stream_key:'>'})
print_xreadgroup_reply( d )
got element b'1657571033118-0'from stream b'skey'
got element b'1657571033119-0'from stream b'skey'
但是,**第二个流组** 可以再次读取已经传递的消息。
请注意,第二个流组也包含第二个流。这可以在回复中识别(回复列表的第一个元素)。
[18]:
d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,
count=2, streams={stream_key:'>',stream2_key:'>'})
print_xreadgroup_reply( d2 )
got element b'1657571033115-0'from stream b'skey'
got element b'1657571033117-0'from stream b'skey'
got element b'1657571042111-0'from stream b's2key'
got element b'1657571042113-0'from stream b's2key'
要检查待处理的消息(已传递但未确认的消息),可以使用 xpending
。
[19]:
# check pending status (read messages without a ack)
def print_pending_info( key_group ):
for s,k in key_group:
pr = r.xpending( name=s, groupname=k )
print( f"{pr.get('pending')} pending messages on '{s}' for group '{k}'" )
print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )
4 pending messages on 'skey' for group 'grp1'
2 pending messages on 'skey' for group 'grp2'
2 pending messages on 's2key' for group 'grp2'
确认#
使用 xack
确认一些消息。
[20]:
# do acknowledges for group1
toack = lambda k,g,e: r.xack( k,g, e )
print_xreadgroup_reply( d, group=group1, run=toack )
got element b'1657571033118-0'from stream b'skey'
got element b'1657571033119-0'from stream b'skey'
[21]:
# check pending again
print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )
2 pending messages on 'skey' for group 'grp1'
2 pending messages on 'skey' for group 'grp2'
2 pending messages on 's2key' for group 'grp2'
确认 group1
上的所有消息。
[22]:
d = r.xreadgroup( groupname=group1, consumername='c', block=10,
count=100, streams={stream_key:'>'})
print_xreadgroup_reply( d, group=group1, run=toack)
print_pending_info( ((stream_key,group1),) )
got element b'1657571033119-1'from stream b'skey'
got element b'1657571033121-0'from stream b'skey'
got element b'1657571033121-1'from stream b'skey'
got element b'1657571033121-2'from stream b'skey'
got element b'1657571033122-0'from stream b'skey'
got element b'1657571033122-1'from stream b'skey'
got element b'1657571049557-0'from stream b'skey'
got element b'1657571049557-1'from stream b'skey'
got element b'1657571049558-0'from stream b'skey'
got element b'1657571049559-0'from stream b'skey'
got element b'1657571049559-1'from stream b'skey'
got element b'1657571049559-2'from stream b'skey'
got element b'1657571049560-0'from stream b'skey'
got element b'1657571049562-0'from stream b'skey'
got element b'1657571049563-0'from stream b'skey'
got element b'1657571049563-1'from stream b'skey'
2 pending messages on 'skey' for group 'grp1'
但是,在所有 group1
上的消息都完成 xack
后,流的长度将保持不变。
[24]:
r.xlen(stream_key)
[24]:
20
删除所有#
要删除消息,需要使用 xdel
显式删除它们。
[25]:
s1 = r.xread( streams={stream_key:0} )
for streams in s1:
stream_name, messages = streams
# del all ids from the message list
[ r.xdel( stream_name, i[0] ) for i in messages ]
流长度
[26]:
r.xlen(stream_key)
[26]:
0
但是使用 xdel
,第二个组可以从 skey
中读取任何未处理的消息。
[27]:
d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,
count=2, streams={stream_key:'>',stream2_key:'>'})
print_xreadgroup_reply( d2 )
got element b'1657571042113-1'from stream b's2key'
got element b'1657571042114-0'from stream b's2key'
[ ]: