如何解决pymongo如何增加插入的写入速度?
我有以下代码将文档插入到MongoDB中,问题在于它很慢,因为我无法对其进行多处理器处理,并且考虑到我必须检查插入的每个文档是否已经存在,我认为这是不可能的。使用批量插入。我想知道是否有更快的方法来解决此问题。在对下面的内容进行分析之后,我发现check record()
和update_upstream()
是两个非常耗时的函数。因此优化它们将提高整体速度。关于如何优化以下内容的任何意见将不胜感激。谢谢!
import os
import pymongo
from directory import Directory
from pymongo import ASCENDING
from pymongo import DESCENDING
from pymongo import MongoClient
from storage_config import StorageConfig
from tqdm import tqdm
dir = Directory()
def DB_collections(collection_type):
types = {'p': 'player_stats','t': 'team_standings','f': 'fixture_stats','l': 'league_standings','pf': 'fixture_players_stats'}
return types.get(collection_type)
class DB():
def __init__(self,league,season,func=None):
self.db_user = os.environ.get('DB_user')
self.db_pass = os.environ.get('DB_pass')
self.MONGODB_URL = f'mongodb+srv://{self.db_user}:{self.db_pass}@cluster0-mbqxj.mongodb.net/<dbname>?retryWrites=true&w=majority'
self.league = league
self.season = str(season)
self.client = MongoClient(self.MONGODB_URL)
self.DATABASE = self.client[self.league + self.season]
self.pool = multiprocessing.cpu_count()
self.playerfile = f'{self.league}_{self.season}_playerstats.json'
self.teamfile = f'{self.league}_{self.season}_team_standings.json'
self.fixturefile = f'{self.league}_{self.season}_fixturestats.json'
self.leaguefile = f'{self.league}_{self.season}_league_standings.json'
self.player_fixture = f'{self.league}_{self.season}_player_fixture.json'
self.func = func
def execute(self):
if self.func is not None:
return self.func(self)
def import_json(file):
"""Imports a json file in read mode
Args:
file(str): Name of file
"""
return dir.load_json(file,StorageConfig.DB_DIR)
def load_file(file):
try:
loaded_file = import_json(file)
return loaded_file
except FileNotFoundError:
print("Please check that",file,"exists")
def check_record(collection,index_dict):
"""Check if record exists in collection
Args:
index_dict (dict): key,value
"""
return collection.find_one(index_dict)
def collection_index(collection,index,*args):
"""Checks if index exists for collection,and return a new index if not
Args:
collection (str): Name of collection in database
index (str): Dict key to be used as an index
args (str): Additional dict keys to create compound indexs
"""
compound_index = tuple((arg,ASCENDING) for arg in args)
if index not in collection.index_information():
return collection.create_index([(index,DESCENDING),*compound_index],unique=True)
def push_upstream(collection,record):
"""Update record in collection
Args:
collection (str): Name of collection in database
record_id (str): record _id to be put for record in collection
record (dict): Data to be pushed in collection
"""
return collection.insert_one(record)
def update_upstream(collection,index_dict,record):
"""Update record in collection
Args:
collection (str): Name of collection in database
index_dict (dict): key,value
record (dict): Data to be updated in collection
"""
return collection.update_one(index_dict,{"$set": record},upsert=True)
def executePushPlayer(db):
playerstats = load_file(db.playerfile)
collection_name = DB_collections('p')
collection = db.DATABASE[collection_name]
collection_index(collection,'p_id')
for player in tqdm(playerstats):
existingPost = check_record(collection,{'p_id': player['p_id']})
if existingPost:
update_upstream(collection,{'p_id': player['p_id']},player)
else:
push_upstream(collection,player)
if __name__ == '__main__':
db = DB('EN_PR','2019')
executePushPlayer(db)
解决方法
您可以使用update_one()
将检查/插入/更新逻辑合并为一个upsert=True
命令,然后将批量运算符与类似以下内容一起使用:
updates = []
for player in tqdm(playerstats):
updates.append(UpdateOne({'p_id': player['p_id']},player,upsert=True))
collection.bulk_write(updates)
最后,在MongoDB shell上使用以下命令检查索引是否正在使用:
db.mycollection.aggregate([{ $indexStats: {} }])
并查看accesses.ops指标。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。