1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
| # -*- coding: utf-8 -*- # @Author: 胖胖很瘦 # @Date: 2024-03-03 12:58:42 # @LastEditors: 胖胖很瘦 # @LastEditTime: 2024-03-03 13:57:49 # 导入包 import time from pymongo import MongoClient as MC from pymongo import UpdateOne client = MC()["test"] def exists_update_and_insert(data, bulk=False): """ 存在则更新 不存在则插入 :param data: 数据 :param bulk: 是否使用批量插入 # ordered # 有序执行, 一条报错, 后面不再执行 # 无序执行, 一条报错, 其它不受影响 """ if bulk: bulk_docs = [] for rd in data: bulk_docs.append( UpdateOne( {"_id": rd["_id"]}, {"$set": rd}, upsert=True )) result = client['goods'].bulk_write(bulk_docs, ordered=False).bulk_api_result print(result) else: for rd in data: client['goods'].update_one( {'_id': rd['_id']}, {'$set': rd}, upsert=True ) def exists_do_noting_and_insert(data, bulk=False): """ 存在则不做任何操作 不存在则插入 :param data: 数据 :param bulk: 是否使用批量插入 """ if bulk: bulk_docs = [] for rd in data: bulk_docs.append( UpdateOne( {"_id": rd["_id"]}, {"$setOnInsert": rd}, upsert=True )) result = client['goods'].bulk_write(bulk_docs, ordered=False).bulk_api_result print(result) else: for rd in data: client['goods'].update_one( {'_id': rd['_id']}, {'$setOnInsert': rd}, upsert=True ) def exists_update_any_field_and_insert(data, bulk=False): """ 存在则更新部分字段 不存在则插入 :param data: 数据 :param bulk: 是否使用批量插入 """ if bulk: bulk_docs = [] for rd in data: update_time = rd.pop("update_time", time.time()) bulk_docs.append( UpdateOne( {"_id": rd["_id"]}, { "$setOnInsert": rd, "$set": {"update_time": update_time} }, upsert=True )) result = client['goods'].bulk_write(bulk_docs, ordered=False).bulk_api_result print(result) else: for rd in data: update_time = rd.pop("update_time", time.time()) client['goods'].update_one( {'_id': rd['_id']}, { '$setOnInsert': rd, "$set": {"update_time": update_time} }, upsert=True ) def find(query=None): """ 查询数据 :query: 查询条件 """ return list(client['goods'].find(query)) def remove(query=None): """ 删除数据 :query: 删除条件 """ return client["goods"].remove(query) if __name__ == '__main__': # 测试数据 datas = [ { "_id": 1, "insert_time": time.time(), "update_time": time.time(), "name": "zs" }, { "_id": 2, "insert_time": time.time(), "update_time": time.time(), "name": "ls" }, { "_id": 3, "insert_time": time.time(), "update_time": time.time(), "name": "ww" }, ] """ 测试用例 """ # exists_update_and_insert(datas) # exists_update_and_insert(datas, True) # print(find()) # remove() # exists_do_noting_and_insert(datas) # exists_do_noting_and_insert(datas, True) # print(find()) # remove() # exists_update_any_field_and_insert(datas) # exists_update_any_field_and_insert(datas, True) # print(find()) # remove()
|