连接示例#
连接到本地运行的默认 Redis 实例。#
[2]:
import redis
connection = redis.Redis()
connection.ping()
[2]:
True
默认情况下,Redis 返回二进制响应,要解码它们,请使用 decode_responses=True#
[3]:
import redis
decoded_connection = redis.Redis(decode_responses=True)
decoded_connection.ping()
[3]:
True
默认情况下,此库使用 RESP 2 协议。要启用 RESP3,请设置 protocol=3。#
import redis
r = redis.Redis(protocol=3) rcon.ping()
连接到 Redis 实例,指定主机和端口以及凭据。#
[4]:
import redis
user_connection = redis.Redis(host='localhost', port=6380, username='dvora', password='redis', decode_responses=True)
user_connection.ping()
[4]:
True
使用用户名和密码凭据提供程序连接到 Redis 实例#
[ ]:
import redis
creds_provider = redis.UsernamePasswordCredentialProvider("username", "password")
user_connection = redis.Redis(host="localhost", port=6379, credential_provider=creds_provider)
user_connection.ping()
使用标准凭据提供程序连接到 Redis 实例#
[ ]:
from typing import Tuple
import redis
creds_map = {"user_1": "pass_1",
"user_2": "pass_2"}
class UserMapCredentialProvider(redis.CredentialProvider):
def __init__(self, username: str):
self.username = username
def get_credentials(self) -> Tuple[str, str]:
return self.username, creds_map.get(self.username)
# Create a default connection to set the ACL user
default_connection = redis.Redis(host="localhost", port=6379)
default_connection.acl_setuser(
"user_1",
enabled=True,
passwords=["+" + "pass_1"],
keys="~*",
commands=["+ping", "+command", "+info", "+select", "+flushdb"],
)
# Create a UserMapCredentialProvider instance for user_1
creds_provider = UserMapCredentialProvider("user_1")
# Initiate user connection with the credential provider
user_connection = redis.Redis(host="localhost", port=6379,
credential_provider=creds_provider)
user_connection.ping()
首先使用初始凭据集连接到 Redis 实例,然后调用凭据提供程序#
[ ]:
from typing import Union
import redis
class InitCredsSetCredentialProvider(redis.CredentialProvider):
def __init__(self, username, password):
self.username = username
self.password = password
self.call_supplier = False
def call_external_supplier(self) -> Union[Tuple[str], Tuple[str, str]]:
# Call to an external credential supplier
raise NotImplementedError
def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
if self.call_supplier:
return self.call_external_supplier()
# Use the init set only for the first time
self.call_supplier = True
return self.username, self.password
cred_provider = InitCredsSetCredentialProvider(username="init_user", password="init_pass")
使用 AWS Secrets Manager 凭据提供程序连接到 Redis 实例。#
[ ]:
import redis
import boto3
import json
import cachetools.func
class SecretsManagerProvider(redis.CredentialProvider):
def __init__(self, secret_id, version_id=None, version_stage='AWSCURRENT'):
self.sm_client = boto3.client('secretsmanager')
self.secret_id = secret_id
self.version_id = version_id
self.version_stage = version_stage
def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
@cachetools.func.ttl_cache(maxsize=128, ttl=24 * 60 * 60) #24h
def get_sm_user_credentials(secret_id, version_id, version_stage):
secret = self.sm_client.get_secret_value(secret_id, version_id)
return json.loads(secret['SecretString'])
creds = get_sm_user_credentials(self.secret_id, self.version_id, self.version_stage)
return creds['username'], creds['password']
my_secret_id = "EXAMPLE1-90ab-cdef-fedc-ba987SECRET1"
creds_provider = SecretsManagerProvider(secret_id=my_secret_id)
user_connection = redis.Redis(host="localhost", port=6379, credential_provider=creds_provider)
user_connection.ping()
使用 ElastiCache IAM 凭据提供程序连接到 Redis 实例。#
[4]:
from typing import Tuple, Union
from urllib.parse import ParseResult, urlencode, urlunparse
import botocore.session
import redis
from botocore.model import ServiceId
from botocore.signers import RequestSigner
from cachetools import TTLCache, cached
class ElastiCacheIAMProvider(redis.CredentialProvider):
def __init__(self, user, cluster_name, region="us-east-1"):
self.user = user
self.cluster_name = cluster_name
self.region = region
session = botocore.session.get_session()
self.request_signer = RequestSigner(
ServiceId("elasticache"),
self.region,
"elasticache",
"v4",
session.get_credentials(),
session.get_component("event_emitter"),
)
# Generated IAM tokens are valid for 15 minutes
@cached(cache=TTLCache(maxsize=128, ttl=900))
def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
query_params = {"Action": "connect", "User": self.user}
url = urlunparse(
ParseResult(
scheme="https",
netloc=self.cluster_name,
path="/",
query=urlencode(query_params),
params="",
fragment="",
)
)
signed_url = self.request_signer.generate_presigned_url(
{"method": "GET", "url": url, "body": {}, "headers": {}, "context": {}},
operation_name="connect",
expires_in=900,
region_name=self.region,
)
# RequestSigner only seems to work if the URL has a protocol, but
# Elasticache only accepts the URL without a protocol
# So strip it off the signed URL before returning
return (self.user, signed_url.removeprefix("https://"))
username = "barshaul"
cluster_name = "test-001"
endpoint = "test-001.use1.cache.amazonaws.com"
creds_provider = ElastiCacheIAMProvider(user=username, cluster_name=cluster_name)
user_connection = redis.Redis(host=endpoint, port=6379, credential_provider=creds_provider)
user_connection.ping()
[4]:
True
通过指定 URL 方案连接到 Redis 实例。#
参数传递给以下方案,作为 URL 方案的参数。
支持三种 URL 方案
redis://
创建 TCP 套接字连接。 https://www.iana.org/assignments/uri-schemes/prov/redisrediss://
创建 SSL 封装的 TCP 套接字连接。 https://www.iana.org/assignments/uri-schemes/prov/redissunix://
: 创建 Unix 域套接字连接。
[7]:
url_connection = redis.from_url("redis://127.0.0.1:6379?decode_responses=True&health_check_interval=2")
url_connection.ping()
[7]:
True
通过指定 URL 方案和 RESP3 协议连接到 Redis 实例。#
[ ]:
url_connection = redis.from_url("redis://127.0.0.1:6379?decode_responses=True&health_check_interval=2&protocol=3")
url_connection.ping()
连接到 Sentinel 实例#
[ ]:
from redis.sentinel import Sentinel
sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
sentinel.discover_master("redis-py-test")