Source code for scripts.edgepy

import argparse
from typing import List, Dict, Hashable, Any
import configparser

import numpy as np
from scipy.stats import ks_2samp
from smart_open import smart_open  # type: ignore


from edgePy.DGEList import DGEList
from edgePy.data_import.mongodb.mongo_import import ImportFromMongodb
from edgePy.util import getLogger

log = getLogger(name="script")


[docs]def parse_arguments(parser=None): if not parser: parser = argparse.ArgumentParser() parser.add_argument("--count_file", help="name of the count file") parser.add_argument("--groups_file", help="name of the groups file") parser.add_argument("--dge_file", help="import from .dge file;") parser.add_argument("--gene_list", default=None, help="a list of genes to filter the data set") # mongo parameters parser.add_argument( "--mongo_config", help="a way to import data from a supported mongo database" ) parser.add_argument("--mongo_key_name", default="Project") parser.add_argument("--mongo_key_value", default="RNA-Seq1") parser.add_argument("--database_name") parser.add_argument( "--group1_sample_names", nargs='+', help="List of samples names for first group" ) parser.add_argument( "--group2_sample_names", nargs='+', help="List of samples names for second group" ) parser.add_argument( "--groups_json", help="A JSON file with the group names, and list of samples. see example." ) parser.add_argument("--output", help="optional output file for results") parser.add_argument("--cutoff", help="p-value cutoff to accept.", default=0.05) parser.add_argument( "--minimum_cpm", help="discard results for which no group has this many counts", default=1 ) args = parser.parse_args() return args
[docs]class EdgePy(object): def __init__(self, args): self.dge_list = None if args.dge_file: self.dge_list = DGEList(filename=args.dge_file) log.info(f"The DGE list is {self.dge_list}") elif args.mongo_config: # This section is only useful for MongoDB based analyses. Talk to @apfejes about this section if you have # any questions. config = configparser.ConfigParser() config.read(args.mongo_config) if args.group1_sample_names and args.group2_sample_names: key = 'sample_name' value = args.group1_sample_names + args.group2_sample_names elif args.key_name and args.mongo_key_value: key = args.mongo_key_name value = args.mongo_key_value else: raise ValueError("Insufficient parameters for use of Mongodb") mongo_importer = ImportFromMongodb( host=config.get("Mongo", "host"), port=config.get("Mongo", "port"), mongo_key=key, mongo_value=value, gene_list_file=args.gene_list, ) sample_list, data_set, gene_list, sample_category = mongo_importer.get_data_from_mongo( database=args.database_name ) if key == 'sample_name': # Override sample categories if sample name is the source of the categories. sample_category_list = [ "group1" if sample_name in args.group1_sample_names else "group2" for sample_name in sample_list ] sample_category_dict = None else: # TODO: read from file sample_category_dict = args.groups_json sample_category_list = None self.dge_list = DGEList.create_DGEList( sample_list, data_set, gene_list, sample_to_category=sample_category_list, category_to_samples=sample_category_dict, ) self.ensg_to_symbol = mongo_importer.mongo_reader.find_as_dict( 'ensembl_90_37', "symbol_by_ensg", query={} ) else: self.dge_list = DGEList.create_DGEList_data_file( data_file=args.counts_file, group_file=args.groups_file ) self.output = args.output if args.output else None self.p_value_cutoff = args.cutoff self.minimum_cpm = args.minimum_cpm
[docs] def run_ks(self): """ First pass implementation of a Kolmogorov-Smirnov test for different groups, using the Scipy KS test two-tailed implementation. Args: None. """ log.info(self.dge_list.groups_list) gene_details, gene_likelyhood1, group_types = self.ks_2_samples() results = self.generate_results( gene_details, gene_likelyhood1, group_types[0], group_types[1] ) if self.output: with smart_open(self.output, 'w') as out: out.writelines(results) log.info(f"Wrote to {self.output}") else: for line in results: log.info(line)
[docs] def ks_2_samples(self): """Run a 2-tailed Kolmogorov-Smirnov test on the DGEList object. Args: None. Returns: gene_details: a dictionary of dictionary (key, gene), holding mean1 and mean2 for the two groups gene_likelihood: a dictionary (key, gene), holding the p-value of the separation of the two groups group_types: list of the groups in order. """ gene_likelihood1: Dict[Hashable, float] = {} group_types = set(self.dge_list.groups_list) group_types = list(group_types) group_filters: Dict[Hashable, Any] = {} gene_details: Dict[Hashable, Dict[Hashable, Any]] = {} for group in group_types: group_filters[group] = [g == group for g in self.dge_list.groups_list] for gene_idx, gene in enumerate(self.dge_list.genes): gene_row = self.dge_list.counts[gene_idx] if len(group_types) == 2: group_data1 = gene_row.compress(group_filters[group_types[0]]) mean1 = np.mean(group_data1) group_data2 = gene_row.compress(group_filters[group_types[1]]) mean2 = np.mean(group_data2) gene_likelihood1[gene] = ks_2samp(group_data1, group_data2)[1] gene_details[gene] = {'mean1': mean1, 'mean2': mean2} return gene_details, gene_likelihood1, group_types
[docs] def generate_results( self, gene_details: Dict[Hashable, Dict[Hashable, Any]], gene_likelihood1: Dict[Hashable, float], group_type1: str, group_type2: str, ) -> List[str]: """ This function simply prepares a summary of the results of the analysis for dumping to file or to screen Args: gene_details: information about the genes - should contain fields 'mean1' and 'mean2' for display gene_likelihood1: dictionary of gene names and the p-value associated. used to sort the data group_type1: the name of the first grouping group_type2: the name of the second grouping """ results: List[str] = [] sorted_likely = [ (gene, gene_likelihood1[gene]) for gene in sorted(gene_likelihood1, key=gene_likelihood1.get) ] results.append(f"gene_name\tp-value\t{group_type1}\t{group_type2}\n") for gene, p in sorted_likely: m1 = gene_details[gene]['mean1'] m2 = gene_details[gene]['mean2'] symbol = ( self.ensg_to_symbol[gene]['symbols'][0] if gene in self.ensg_to_symbol else gene ) if ( p < self.p_value_cutoff and not (m1 < self.minimum_cpm and m2 < self.minimum_cpm) and m1 < m2 ): results.append( f"{gene}\t" f"{symbol}\t" f"{gene_likelihood1[gene]}\t" f"{gene_details[gene]['mean1']:.2f}\t" f"{gene_details[gene]['mean2']:.2f}\n" ) return results
[docs]def main(): args = parse_arguments() default_class = EdgePy(args) default_class.run_ks()
if __name__ == "__main__": main()