已复制
全屏展示
复制代码

Python连接Redis实战


· 2 min read

一. 模块安装

使用 redis 模块,使用 pip 安装即可

pip install redis

二. 直接连接

import redis
r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
conn = r.ping()
res = r.set("name", "yuchaoshui")

r.ping() 返回 True 则连接成功。插入数据成功则返回 True ,redis 会自动根据需要自动断开连接,所以不需要手动断开。redis.StrictRedis 可以换成 redis.Redis,不推荐Redis类,因为它和 redis-cli 操作上有些不同。

三. 连接池连接

import redis 

redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0) 
r = redis.StrictRedis(connection_pool=redis_pool) 
res1 = r.set('one', 'first')   
res2 = r.get('one')

四. pipline机制

可以在一次请求中执行多个命令,这样避免了多次的往返时延。同时执行多条命令也会返回每条命令是否成功执行。

import redis

redis_pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0)
r = redis.StrictRedis(connection_pool=redis_pool)
redis_pipe = r.pipeline()  
redis_pipe.set('one', 'first')  
redis_pipe.set('two', 'second')  
res3 = redis_pipe.execute()

print(res3)

[True, True]

五. 编写成模块导入

编写模块 redishelper.py ,这样一来,在其他需要连接 redis 的地方只有导入该模块,然后在初始化即可使用 redis 数据。

#!/usr/bin/env python
# _*_ coding:utf-8 _*_


import sys
import redis


class RedisHelper():
    """
    host='127.0.0.1', port=6379, db=0, password='',
    socket_timeout=5, socket_connect_timeout=10, encoding='utf-8'
    """
    def __init__(self,
            host='127.0.0.1',
            port=6379,
            db=0,
            socket_timeout=5,
            socket_connect_timeout=10,
            encoding='utf-8'):

        self.use_connection_pool = True
        self.host = str(host)
        self.port = int(port)
        self.db = int(db)
        self.socket_timeout = int(socket_timeout)
        self.socket_connect_timeout = int(socket_connect_timeout)
        self.encoding = encoding 

    def conn(self):
        # use ConnectionPool 
        if self.use_connection_pool:
            pool = redis.ConnectionPool(
                    host=self.host,
                    port=self.port,
                    db=self.db,
                    socket_timeout=self.socket_timeout,
                    socket_connect_timeout=self.socket_connect_timeout,
                    encoding=self.encoding)
            rds = redis.StrictRedis(connection_pool=pool)

        # do not use ConnectionPool 
        else:
            rds = redis.StrictRedis(
                    host=self.host,
                    port=self.port,
                    db=self.db,
                    socket_timeout=self.socket_timeout,
                    socket_connect_timeout=self.socket_connect_timeout,
                    encoding=self.encoding)

        # check connection
        try:
            if rds.ping(): return rds
        except Exception as e:
            print("Except: {0}".format(e))
            return False

        return False

    def conn_info(self):
        print("Redis Connection Infomation:")
        for k,v in self.__dict__.items():
            print("    {0} : {1}".format(k, v))
            
rds = RedisHelper(host='127.0.0.1', port=10500).conn()

if __name__ == '__main__':
    print rds.set("redishelper_test","tesk ok!")
    print rds.get("redishelper_test")

    pipe = rds.pipeline()  
    pipe.set('redishelper_test_one', 'pipline test one ok!')
    pipe.set('redishelper_test_two', 'pipline test two ok!')
    print(pipe.execute())
    print(rds.get("redishelper_test_one"))
    print(rds.get("redishelper_test_two"))
    

在需要连接redis的地方通过from redishelper import RedisHelper方式导入模块,然后对 RedisHelper 进行初始化。注意可根据实际情况对 RedisHelper 对象的初始化传递具体参数即可。

🔗

文章推荐