Source code for edgePy.data_import.mongodb.mongo_wrapper

"""
A simple library for wrapping around mongo collections and access issues.
"""
from typing import Dict, Hashable, Any, Iterable, List, Union

import pymongo  # type: ignore
from pymongo.errors import BulkWriteError  # type: ignore
from pymongo import InsertOne, UpdateOne

from edgePy.util import getLogger

log = getLogger(name=__name__)


[docs]class MongoWrapper(object): """This class is for use as a thin layer for interactinvg with the Mongo Database using pymongo. Pymongo is an entirely reasonable way of working with Mongodb, but fails to provide some very common functions that are frequently used. This class should be used for efficient retrieval of information from the database. Args: host: the name of the machine hosting the database port: the port number (usually 27017 connect: whether to create the new session, or to attach to an existing session, set to false, if this is being instantiated by a subprocesses. verbose: suppresses output, when set to false. """ def __init__( self, host: str, port: Union[str, int] = 27017, connect: bool = True, verbose: bool = False ) -> None: self.host = host self.port = int(port) self.session = pymongo.MongoClient(host=self.host, port=self.port, connect=connect) self.verbose = verbose
[docs] def get_db(self, database: str, collection: str) -> Any: """ This function simply hides the db name when using pytest-mongodb, when the database name should always be 'pytest' Args: database: database name collection: collection name Returns: the collection object ready for use with .find() or similar. """ if database == "pytest": return self.session[collection] else: return self.session[database][collection]
[docs] def find_as_cursor( self, database: str, collection: str, query: Dict[Hashable, Any] = None, projection: Dict[Hashable, Any] = None, ) -> Iterable: """ Do a find operation on a mongo collection and return the data as a cursor, the (native MongoClient find return type.) Args: database: db name collection: collection name query: a dictionary providing the criteria for the find command projection: a dictionary that gives the projection - the fields to return. Returns: a cursor object, to be used as an iterator. """ try: cursor = self.get_db(database, collection).find(query, projection) except Exception as exception: log.exception(exception) raise Exception("Mongo find failed") return cursor
[docs] def find_as_list( self, database: str, collection: str, query: Dict[Hashable, Any] = None, projection: Dict[Hashable, Any] = None, ) -> Iterable: """ Do a find operation on a mongo collection, but return the data as a list Args: database: db name collection: collection name query: a dictionary providing the criteria for the find command projection: a dictionary that gives the projection - the fields to return. Returns: a list representation of the returned data. """ cursor = self.find_as_cursor( database=database, collection=collection, query=query, projection=projection ) return [c for c in cursor]
[docs] def find_as_dict( self, database: str, collection: str, query: Dict[Hashable, Any] = None, field: str = "_id", projection: Dict[Hashable, Any] = None, ) -> Iterable: """ Do a find operation on a mongo collection, but return the data as a dictionary Args: database: db name collection: collection name query: a dictionary providing the criteria for the find command projection: a dictionary that gives the projection - the fields to return. field: the field in the projection for which the value will be used as the Hashable key of the dict. Returns: a dictionary representation of the returned data. """ cursor = self.find_as_cursor( database=database, collection=collection, query=query, projection=projection ) return {c[field]: c for c in cursor}
[docs] def insert(self, database: str, collection: str, data_list: List[Any]) -> None: """ bulk insert of items into a mongodb collection. Args: database: db name collection: collection name data_list: a list of documents to insert into mongodb. """ try: self.get_db(database, collection).test.insert_many(data_list, ordered=False) except BulkWriteError as bwe: log.exception(bwe.details)
[docs] def create_index(self, database: str, collection: str, key: str) -> None: """ A tool for creating indexes on a given collection. Args: database: db name collection: collection name key: the field name to create the index on. """ self.get_db(database, collection).create_index(key)
[docs]class MongoInserter(MongoWrapper): """ This class is a thin layer on the MongoWrapper class, which is a thin layer on the pymongo library. It is used for instances where you want to insert data into a mongodb collection. It creates a buffer which is periodically flushed to Mongo. Args: host: the name of the machine hosting the database port: the port number (usually 27017) database: db name collection: collection name connect: whether to create the new session, or to attach to an existing session, set to false, if this is being instantiated by a subprocesses. """ def __init__( self, host: str, port: int, database: str, collection: str, connect: bool = True ) -> None: MongoWrapper.__init__(self, host, port, connect=connect) self.database = database self.collection = collection self.to_insert: List = [] self.mongo_col = self.get_db(database, collection)
[docs] def flush(self) -> None: """ Flush out the buffer and write to mongo db. """ if self.to_insert: try: result = self.mongo_col.bulk_write(self.to_insert, ordered=False) if result and self.verbose: log.info(result.bulk_api_result) except BulkWriteError as bwe: log.exception(bwe.details) raise Exception("Mongo bulk write failed.") del self.to_insert[:]
[docs] def add(self, record: Union[List[Any], Dict[Hashable, Any]]) -> None: """ Add a record to the buffer Args: record: the record to add to the mongo inserter buffer """ self.to_insert.append(InsertOne(record)) if len(self.to_insert) > 1000: self.flush()
[docs] def close(self) -> None: """ Close the MongoInserter - flush the buffer. """ self.flush()
[docs] def create_index_key(self, key: str) -> None: """ A tool for creating indexes on the collection. """ self.create_index(self.database, self.collection, key)
[docs]class MongoUpdater(MongoWrapper): """ This class is a thin layer on the MongoWrapper class, which is a thin layer on the pymongo library. It is used for instances where you want to Update data in a mongodb collection. It creates a buffer which is periodically flushed and written to mongo. Args: host: the name of the machine hosting the database port: the port number (usually 27017 database: db name collection: collection name connect: whether to create the new session, or to attach to an existing session, set to false, if this is being instantiated by a subprocesses. """ def __init__( self, host: str, port: int, database: str, collection: str, connect: bool = True ) -> None: MongoWrapper.__init__(self, host, port, connect=connect) self.database = database self.to_update: List[Any] = [] self.mongo_col = self.get_db(database, collection)
[docs] def flush(self) -> None: """ Flush out the buffer and write to mongo db. """ if self.to_update: try: result = self.mongo_col.bulk_write(self.to_update, ordered=False) if result and self.verbose: log.info(result.bulk_api_result) except BulkWriteError as bwe: log.exception(bwe.details) raise Exception("Mongo bulk write failed.") del self.to_update[:]
[docs] def add(self, updatedict: Dict[Hashable, Any], setdict: Dict[Hashable, Any]) -> None: """ Add a record to the buffer Args: updatedict: the criteria for the update query setdict: the dictionary describing the new record - OR use {$set: {}} to update a particular key without replacing the existing record. """ self.to_update.append(UpdateOne(updatedict, setdict)) if len(self.to_update) > 1000: self.flush()
[docs] def close(self) -> None: """ Close the MongoInserter - flush the buffer. """ self.flush()