Source code for ckg.analytics_core.utils

import random
from Bio import Entrez, Medline
from collections import defaultdict
import pandas as pd
import io
import base64
import bs4 as bs
import dash_html_components as html
import requests
import networkx as nx
from networkx.readwrite import json_graph
from urllib import error = '' # TODO: This should probably be changed to the email of the person installing ckg?

[docs]def check_columns(df, cols): for col in cols: if col not in df: return False return True
[docs]def mpl_to_html_image(plot, width=800): buf = io.BytesIO() plot.savefig(buf, format="png") data = base64.b64encode(buf.getbuffer()).decode("utf8") figure = html.Img(src="data:image/png;base64,{}".format(data), width="800") return figure
[docs]def generate_html(network): """ This method gets the data structures supporting the nodes, edges, and options and updates the pyvis html template holding the visualization. :type name_html: str """ # here, check if an href is present in the hover data use_link_template = False for n in network.nodes: title = n.get("title", None) if title: if "href" in title: """ this tells the template to override default hover mechanic, as the tooltip would move with the mouse cursor which made interacting with hover data useless. """ use_link_template = True break template = network.template nodes, edges, height, width, options = network.get_network_data() network.html = template.render(height=height, width=width, nodes=nodes, edges=edges, options=options, use_DOT=network.use_DOT, dot_lang=network.dot_lang, widget=network.widget, bgcolor=network.bgcolor, conf=network.conf, tooltip_link=use_link_template)
[docs]def append_to_list(mylist, myappend): if isinstance(myappend, list): mylist.extend(myappend) else: mylist.append(myappend)
[docs]def neo4j_path_to_networkx(paths, key='path'): nodes = set() rels = set() for path in paths: if key in path: relationships = path[key] if len(relationships) == 3: node1, rel, node2 = relationships if 'name' in node1: source = node1['name'] if 'name' in node2: target = node2['name'] nodes.update([source, target]) rels.add((source, target, rel)) G = nx.Graph() G.add_nodes_from(nodes) for s, t, label in rels: G.add_edge(s, t, label=label) return G
[docs]def neo4j_schema_to_networkx(schema): nodes = set() rels = set() if 'relationships' in schema[0]: relationships = schema[0]['relationships'] for node1, rel, node2 in relationships: if 'name' in node1: source = node1['name'] if 'name' in node2: target = node2['name'] nodes.update([source, target]) rels.add((source, target, rel)) G = nx.Graph() G.add_nodes_from(nodes) colors = dict(zip(nodes, get_hex_colors(len(nodes)))) nx.set_node_attributes(G, colors, 'color') for s, t, label in rels: G.add_edge(s, t, label=label) return G
[docs]def networkx_to_cytoscape(graph): cy_graph = json_graph.cytoscape_data(graph) cy_nodes = cy_graph['elements']['nodes'] cy_edges = cy_graph['elements']['edges'] cy_elements = cy_nodes cy_elements.extend(cy_edges) mouseover_node = dict(graph.nodes(data=True)) return cy_elements, mouseover_node
[docs]def networkx_to_gml(graph, path): nx.write_gml(graph, path)
[docs]def networkx_to_neo4j_document(graph): graph_json = [] seen_rels = set() for n, attr in graph.nodes(data=True): rels = defaultdict(list) attr.update({'id': n}) for r in graph[n]: edge = graph[n][r] edge.update({'id': r}) if 'type' in edge: rel_type = edge['type'] if 'type' in graph.nodes()[r]: edge['type'] = graph.nodes()[r]['type'] if not (n, r, edge['type']) in seen_rels: rels[rel_type].append(edge) seen_rels.update({(n, r, edge['type']), (r, n, edge['type'])}) attr.update(rels) graph_json.append(attr) return graph_json
[docs]def json_network_to_gml(graph_json, path): graph = json_network_to_networkx(graph_json) with open(path, 'wb') as out: nx.write_gml(graph, out)
[docs]def networkx_to_graphml(graph, path): nx.write_graphml(graph, path)
[docs]def json_network_to_graphml(graph_json, path): graph = json_network_to_networkx(graph_json) with open(path, 'wb') as out: nx.write_graphml(graph, out)
[docs]def json_network_to_networkx(graph_json): graph = json_graph.node_link_graph(graph_json) return graph
[docs]def generator_to_dict(genvar): dictvar = {} for i, gen in enumerate(genvar): dictvar.update({n: i for n in gen}) return dictvar
[docs]def parse_html(html_snippet): html_parsed = bs.BeautifulSoup(html_snippet, 'html.parser') return html_parsed
[docs]def convert_html_to_dash(el, style=None): ALLOWED_CST = {'div', 'span', 'a', 'hr', 'br', 'p', 'b', 'i', 'u', 's', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'ol', 'ul', 'li', 'em', 'strong', 'cite', 'tt', 'pre', 'small', 'big', 'center', 'blockquote', 'address', 'font', 'img', 'table', 'tr', 'td', 'caption', 'th', 'textarea', 'option'} def __extract_style(el): if not el.attrs.get("style"): return None return {k.strip(): v.strip() for k, v in [x.split(": ") for x in el.attrs["style"].split(";") if x != '']} if type(el) is str: return convert_html_to_dash(parse_html(el)) if type(el) == bs.element.NavigableString: return str(el) else: name = style = __extract_style(el) if style is None else style contents = [convert_html_to_dash(x) for x in el.contents] if name.title().lower() not in ALLOWED_CST: return contents[0] if len(contents) == 1 else html.Div(contents) return getattr(html, name.title())(contents, style=style)
[docs]def hex2rgb(color): hex = color.lstrip('#') rgb = tuple(int(hex[i:i+2], 16) for i in (0, 2, 4)) rgba = rgb + (0.6,) return rgba
[docs]def get_rgb_colors(n): colors = [] r = int(random.random() * 256) g = int(random.random() * 256) b = int(random.random() * 256) step = 256 / n for i in range(n): r += step g += step b += step r = int(r) % 256 g = int(g) % 256 b = int(b) % 256 colors.append((r, g, b)) return colors
[docs]def get_hex_colors(n): initial_seed = 123 colors = [] for i in range(n): random.seed(initial_seed + i) color = "#%06x" % random.randint(0, 0xFFFFFF) colors.append(color) return colors
[docs]def getMedlineAbstracts(idList): fields = {"TI": "title", "AU": "authors", "JT": "journal", "DP": "date", "MH": "keywords", "AB": "abstract", "PMID": "PMID"} pubmedUrl = "" abstracts = pd.DataFrame() try: handle = Entrez.efetch(db="pubmed", id=idList, rettype="medline", retmode="json") records = Medline.parse(handle) results = [] for record in records: aux = {} for field in fields: if field in record: aux[fields[field]] = record[field] if "PMID" in aux: aux["url"] = pubmedUrl + aux["PMID"] else: aux["url"] = "" results.append(aux) abstracts = pd.DataFrame.from_dict(results) except error.URLError as e: print("URLError: Request to Bio.Entrez failed. Error: {}".format(e)) except error.HTTPError as e: print("HTTPError: Request to Bio.Entrez failed. Error: {}".format(e)) except Exception as e: print("Request to Bio.Entrez failed. Error: {}".format(e)) return abstracts