Source code for ckg.graphdb_builder.databases.parsers.disgenetParser

import os.path
import gzip
from collections import defaultdict
from ckg.graphdb_builder import builder_utils

#########################
#       DisGeNet        #
#########################
[docs]def parser(databases_directory, download=True): relationships = defaultdict(set) config = builder_utils.get_config(config_name="disgenetConfig.yml", data_type='databases') files = config['disgenet_files'] mapping_files = config['disgenet_mapping_files'] url = config['disgenet_url'] directory = os.path.join(databases_directory, "disgenet") builder_utils.checkDirectory(directory) header = config['disgenet_header'] output_file = 'disgenet_associated_with.tsv' if download: for f in files: builder_utils.downloadDB(url+files[f], directory) for f in mapping_files: builder_utils.downloadDB(url+mapping_files[f], directory) proteinMapping = readDisGeNetProteinMapping(config, directory) diseaseMapping = readDisGeNetDiseaseMapping(config, directory) for f in files: first = True associations = gzip.open(os.path.join(directory, files[f]), 'r') dtype, atype = f.split('_') if dtype == 'gene': idType = "Protein" scorePos = 9 if dtype == 'variant': idType = "Transcript" scorePos = 5 for line in associations: if first: first = False continue try: data = line.decode('utf-8').rstrip("\r\n").split("\t") geneId = str(int(data[0])) #disease_specificity_index = data[2] #disease_pleiotropy_index = data[3] diseaseId = data[4] score = float(data[scorePos]) pmids = data[13] source = data[-1] if geneId in proteinMapping: for identifier in proteinMapping[geneId]: if diseaseId in diseaseMapping: for code in diseaseMapping[diseaseId]: code = "DOID:"+code relationships[idType].add((identifier, code, "ASSOCIATED_WITH", score, atype, "DisGeNet: "+source, pmids)) except UnicodeDecodeError: continue associations.close() builder_utils.remove_directory(directory) return (relationships, header, output_file)
[docs]def readDisGeNetProteinMapping(config, directory): files = config['disgenet_mapping_files'] first = True mapping = defaultdict(set) if "protein_mapping" in files: mappingFile = files["protein_mapping"] with gzip.open(os.path.join(directory, mappingFile), 'r') as f: for line in f: if first: first = False continue data = line.decode('utf-8').rstrip("\r\n").split("\t") identifier = data[0] intIdentifier = data[1] mapping[intIdentifier].add(identifier) return mapping
[docs]def readDisGeNetDiseaseMapping(config, directory): files = config['disgenet_mapping_files'] first = True mapping = defaultdict(set) if "disease_mapping" in files: mappingFile = files["disease_mapping"] with gzip.open(os.path.join(directory, mappingFile), 'r') as f: for line in f: if first: first = False continue data = line.decode('utf-8').rstrip("\r\n").split("\t") identifier = data[0] vocabulary = data[2] code = data[3] if vocabulary == "DO": mapping[identifier].add(code) return mapping