Python连接Redis实战
一. 模块安装
使用 redis 模块,使用 pip 安装即可
pip install redis
二. 直接连接
import redis
r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
conn = r.ping()
res = r.set("name", "yuchaoshui")
r.ping()
返回 True 则连接成功。插入数据成功则返回 True ,redis 会自动根据需要自动断开连接,所以不需要手动断开。redis.StrictRedis
可以换成 redis.Redis
,不推荐Redis类,因为它和 redis-cli 操作上有些不同。
三. 连接池连接
import redis
redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0)
r = redis.StrictRedis(connection_pool=redis_pool)
res1 = r.set('one', 'first')
res2 = r.get('one')
四. pipline机制
可以在一次请求中执行多个命令,这样避免了多次的往返时延。同时执行多条命令也会返回每条命令是否成功执行。
import redis
redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0)
r = redis.StrictRedis(connection_pool=redis_pool)
redis_pipe = r.pipeline()
redis_pipe.set('one', 'first')
redis_pipe.set('two', 'second')
res3 = redis_pipe.execute()
print(res3)
[True, True]
五. 编写成模块导入
编写模块 redishelper.py ,这样一来,在其他需要连接 redis 的地方只有导入该模块,然后在初始化即可使用 redis 数据。
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import sys
import redis
class RedisHelper():
"""
host='127.0.0.1', port=6379, db=0, password='',
socket_timeout=5, socket_connect_timeout=10, encoding='utf-8'
"""
def __init__(self,
host='127.0.0.1',
port=6379,
db=0,
socket_timeout=5,
socket_connect_timeout=10,
encoding='utf-8'):
self.use_connection_pool = True
self.host = str(host)
self.port = int(port)
self.db = int(db)
self.socket_timeout = int(socket_timeout)
self.socket_connect_timeout = int(socket_connect_timeout)
self.encoding = encoding
def conn(self):
# use ConnectionPool
if self.use_connection_pool:
pool = redis.ConnectionPool(
host=self.host,
port=self.port,
db=self.db,
socket_timeout=self.socket_timeout,
socket_connect_timeout=self.socket_connect_timeout,
encoding=self.encoding)
rds = redis.StrictRedis(connection_pool=pool)
# do not use ConnectionPool
else:
rds = redis.StrictRedis(
host=self.host,
port=self.port,
db=self.db,
socket_timeout=self.socket_timeout,
socket_connect_timeout=self.socket_connect_timeout,
encoding=self.encoding)
# check connection
try:
if rds.ping(): return rds
except Exception as e:
print("Except: {0}".format(e))
return False
return False
def conn_info(self):
print("Redis Connection Infomation:")
for k,v in self.__dict__.items():
print(" {0} : {1}".format(k, v))
rds = RedisHelper(host='127.0.0.1', port=10500).conn()
if __name__ == '__main__':
print rds.set("redishelper_test","tesk ok!")
print rds.get("redishelper_test")
pipe = rds.pipeline()
pipe.set('redishelper_test_one', 'pipline test one ok!')
pipe.set('redishelper_test_two', 'pipline test two ok!')
print(pipe.execute())
print(rds.get("redishelper_test_one"))
print(rds.get("redishelper_test_two"))
在需要连接redis的地方通过from redishelper import RedisHelper
方式导入模块,然后对 RedisHelper 进行初始化。注意可根据实际情况对 RedisHelper 对象的初始化传递具体参数即可。