import argparse
from typing import Dict, Hashable, Any, Tuple, List, Optional, Union
from edgePy.data_import.mongodb.mongo_wrapper import MongoWrapper
from edgePy.data_import.mongodb.gene_functions import get_canonical_rpkm
from edgePy.data_import.mongodb.gene_functions import get_canonical_raw
from edgePy.data_import.mongodb.gene_functions import get_genelist_from_file
from edgePy.data_import.mongodb.gene_functions import translate_genes
from edgePy.util import getLogger
log = getLogger(name=__name__)
[docs]def parse_arguments(parser: Any = None, ci_values: List[str] = None) -> Any:
"""
Standard argparse wrapper for interpreting command line arguments.
Args:
parser: if there's an existing parser, provide it, else, this will
create a new one.
ci_values: use for testing purposes only.
"""
if not parser:
parser = argparse.ArgumentParser()
parser.add_argument("--config", help="location of the config file", required=True)
parser.add_argument("--key_name", default="Project")
parser.add_argument("--key_value", default="RNA-Seq1")
parser.add_argument("--gene_list", default=None)
if ci_values:
args = parser.parse_args(ci_values)
else:
args = parse_arguments()
return args
[docs]class ImportFromMongodb(object):
"""
A utility for importing mongo data from a proprietary mongodb database - hopefully we'll
open this database up in the future. If not, we can re-engineer it from the examples given.
Args:
host: the name of the machine hosting the database
port: the port number (usually 27017)
mongo_key: a key in the samples collection to filter on
mongo_value: accepted values in the samples collection to
gene_list_file: a list of genes to filter the results on.
"""
def __init__(
self,
host: str,
port: int,
mongo_key: Optional[str],
mongo_value: Union[str, List, None],
gene_list_file: Optional[str],
) -> None:
self.mongo_host = host
self.mongo_port = port
self.mongo_reader = MongoWrapper(host=self.mongo_host, port=self.mongo_port, connect=False)
self.search_key = mongo_key
self.search_value = mongo_value
self.input_gene_file = gene_list_file
self.gene_list: Optional[List[str]] = None
[docs] def translate_gene_list(self, database: str) -> None:
"""
If there was a list of genes provided, convert them to ENSG symbols.
Args:
database: name of the database
"""
if self.input_gene_file:
input_genes = get_genelist_from_file(self.input_gene_file)
ensg_genes, gene_symbols = translate_genes(
input_genes, self.mongo_reader, database=database
)
self.gene_list = ensg_genes
[docs] def get_data_from_mongo(
self, database: str, rpkm_flag: bool = False
) -> Tuple[List[str], Dict[Hashable, Any], List[str], Dict[Hashable, Any]]:
"""
Run the queries to get the samples, from mongo, and then use that data to retrieve
the counts.
Args:
database: name of the database to retrieve data from.
rpkm_flag: takes the rpkm values from the mongodb, instead of the raw counts
Returns:
the list of samples, the data itself,
the gene list and the categories of the samples.
"""
if self.input_gene_file and not self.gene_list:
self.translate_gene_list(database)
query: Dict[Hashable, Any] = {}
if self.search_key and self.search_value:
if self.search_value == 'regex':
query = {self.search_key: {'$regex': 'myocyte|fibroblast'}}
else:
if isinstance(self.search_value, list):
query[self.search_key] = {'$in': self.search_value}
else:
query[self.search_key] = self.search_value
elif self.search_key and not self.search_value:
query[self.search_key] = {"$exists": True}
elif not self.search_key and not self.search_value:
pass
else:
raise Exception(
"Invalid input - you can't specify a " "key_value without specifying a key_name"
)
projection: Dict[Hashable, Any] = {"sample_name": 1, "_id": 0}
if self.search_key and not self.search_key == "sample_name":
projection[self.search_key] = 1
cursor = self.mongo_reader.find_as_cursor(
database=database, collection="samples", query=query, projection=projection
)
sample_names = set()
sample_category = {}
for result in cursor:
log.info(result)
sample_names.add(result["sample_name"])
sample_category[result["sample_name"]] = (
result[self.search_key] if self.search_key else result["sample_name"]
)
log.info(f"Get data for sample_names {list(sample_names)}")
query = {"sample_name": {"$in": list(sample_names)}}
if self.gene_list:
log.info(self.gene_list)
query["gene"] = {"$in": list(self.gene_list)}
cursor = self.mongo_reader.find_as_cursor(
database=database, collection="RNASeq", query=query, projection={"_id": 0}
)
# make it a list of lists
log.info(f"Importing data from mongo ({self.mongo_host})...")
dataset: Dict[Hashable, Dict[Hashable, Optional[int]]] = {}
gene_list = set()
sample_list = set()
for count, result in enumerate(cursor):
if count % 100_000 == 0:
log.info(f"{count} rows processed.")
sample = result["sample_name"]
rpkm = get_canonical_rpkm(result) if rpkm_flag else get_canonical_raw(result)
gene = result["gene"]
if sample not in dataset:
dataset[sample] = {}
dataset[sample][gene] = rpkm
sample_list.add(sample)
gene_list.add(gene)
return sorted(sample_list), dataset, sorted(gene_list), sample_category