哈哈哈~~~
这篇文章只讲如何使用aioredis操作列表和列表的基本属性。
redis列表可以应用于代理池等功能上。
最后将代码封装一下, 随便学一下封装方法, 写的多了, 慢慢就熟练了
插入单个元素
将字符串 'element1' 插入到 Redis 列表 my_list 的左侧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| # -*- coding: utf-8 -*- # @Author: Mehaei # @Date: 2023-11-11 17:40:14 # @Last Modified by: Mehaei # @Last Modified time: 2023-11-11 17:43:13 import asyncio import aioredis async def insert_single_element(): """ 插入单个元素 """ redis = await aioredis.from_url('redis://localhost') await redis.lpush('my_list', 'element1') await redis.close()
|
批量插入多个元素
有的时候会批量插入,这个示例将列表 elements 中的多个元素一次性插入到 Redis 列表my_list的左侧。
1 2 3 4 5 6 7 8
| async def insert_multiple_elements(): """ 插入多个元素 """ redis = await aioredis.from_url('redis://localhost') elements = ['element1', 'element2', 'element3'] await redis.lpush('my_list', *elements) await redis.close()
|
获取数据并转换为字符串
要检索 Redis 列表中的数据并将其转换为字符串,您可以使用 lrange 方法和 Python 的 str() 函数。这个示例使用 lrange 获取 Redis 列表 my_list 中的所有元素,然后将它们转换为字符串并打印出来。:
1 2 3 4 5 6 7 8 9 10 11
| async def retrieve_and_convert(): """ 取出多个元素 """ redis = await aioredis.from_url('redis://localhost') # 按元素索引获取 result = await redis.lrange('my_list', 0, -1) await redis.close() # 将结果列表转换为字符串 result_as_strings = [str(item, 'utf-8') for item in result] print(result_as_strings)
|
使用
1 2 3 4 5
| # 使用异步事件循环执行插入操作 loop = asyncio.get_event_loop() # loop.run_until_complete(insert_single_element()) # loop.run_until_complete(insert_multiple_elements()) loop.run_until_complete(retrieve_and_convert())
|

封装一下
简单封装一下, 主要是学思路和方法, 后续需要什么属性, 可以再加。代码中有注释, 这里就不多做解释了,今天又完了!!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| # -*- coding: utf-8 -*- # @Author: Mehaei # @Date: 2023-11-11 15:05:11 # @Last Modified by: Mehaei # @Last Modified time: 2023-11-11 18:15:57 import asyncio import aioredis class RedisArray(object): def __init__(self, url="redis://localhost", kname="my_list"): """ :param url: redis url :param kname: list name """ self.url = url self.kname = kname async def create_pool(self): """ 创建连接池 """ self.aioClient = await aioredis.from_url('redis://localhost') async def lpush(self, kvalue): """ 往列表左边, 也就是头部插入元素 :param kvalue: element value """ try: return await self.aioClient.lpush(self.kname, kvalue) except Exception as e: print(f"lpush 插入: {self.kname}/{kvalue}报错: {str(e)}") return None async def rpush(self, kvalue): """ 往列表右边, 也就是尾部插入元素 :param kvalue: element value """ try: return await self.aioClient.rpush(self.kname, kvalue) except Exception as e: print(f"rpush 插入: {self.kname}/{kvalue}报错: {str(e)}") return None async def get(self, start=0, end=-1): """ 获取元素 :param start: element start index :param end: element end index """ try: result = await self.aioClient.lrange(self.kname, 0, -1) result = [str(item, 'utf-8') for item in result] print(result) return result except Exception as e: print(f"读取: {self.kname}/{start}/{end}错误: {str(e)}") return [] async def close(self): """ 关闭redis连接 """ await self.aioClient.close() async def main(): # 使用异步事件循环执行插入操作 RA = RedisArray() await RA.create_pool() await RA.lpush("test22") await RA.rpush("test23") await RA.get() await RA.close() loop = asyncio.get_event_loop() loop.run_until_complete(main())
|
