Source code for ckg.report_manager.utils

import os
import pandas as pd
import dash_html_components as html
import dash_core_components as dcc
from ckg.graphdb_connector import connector
from datetime import datetime
import bs4 as bs
import random
import numpy as np
import chart_studio.plotly as py
import base64
from xhtml2pdf import pisa
import json
from urllib import request
import shutil
import smtplib
from email.message import EmailMessage
from ckg import ckg_utils


[docs]def copy_file_to_destination(cfile, destination): if os.path.exists(destination): shutil.copyfile(cfile, destination)
[docs]def send_message_to_slack_webhook(message, message_to, username='albsantosdel'): webhook_file = os.path.join(ckg_utils.read_ckg_config(key='ckg_directory'), "config/wh.txt") if os.path.exists(webhook_file): with open(webhook_file, 'r') as hf: webhook_url = hf.read() post = {"text": "@{} : {}".format(message_to, message), "username": username, "icon_url": "https://slack.com/img/icons/app-57.png"} try: json_data = json.dumps(post) req = request.Request(webhook_url, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'}) resp = request.urlopen(req) except Exception as em: print("EXCEPTION: " + str(em))
[docs]def send_email(message, subject, message_from, message_to): msg = EmailMessage() msg.set_content(message) msg['Subject'] = subject msg['From'] = message_from msg['to'] = message_to s = smtplib.SMTP('localhost:1025') s.send_message(msg) s.quit()
[docs]def compress_directory(name, directory, compression_format='zip'): try: if os.path.exists(directory) and os.path.isdir(directory): if os.path.exists(directory+'.zip'): os.remove(directory+'.zip') shutil.make_archive(name, compression_format, directory) shutil.rmtree(directory) else: print("The directory {} failed. Exists?{} Is dir?{}".format(directory, os.path.exists(directory) and os.path.isdir(directory))) except Exception as err: print("Could not compress file {} in directory {}. Error: {}".format(name, directory, err))
[docs]def get_markdown_date(extra_text): now = datetime.now() current_date = now.strftime("%B %d, %Y (%H:%M:%S)") markdown = html.Div([dcc.Markdown('### *{} {}* ###'.format(extra_text, current_date))], style={'color': '#6a51a3'}) return markdown
[docs]def convert_html_to_dash(el, style=None): if type(el) == bs.element.NavigableString: return str(el) else: name = el.name style = extract_style(el) if style is None else style contents = [convert_html_to_dash(x) for x in el.contents] return getattr(html, name.title())(contents, style=style)
[docs]def extract_style(el): return {k.strip(): v.strip() for k, v in [x.split(": ") for x in el.attrs["style"].split(";")]}
[docs]def get_image(figure, width, height): img = base64.b64encode(py.image.get(figure, width=width, height=height)).decode('utf-8') return img
[docs]def parse_html(html_snippet): html_parsed = bs.BeautifulSoup(html_snippet) return html_parsed
[docs]def hex2rgb(color): hex = color.lstrip('#') rgb = tuple(int(hex[i:i+2], 16) for i in (0, 2, 4)) rgba = rgb + (0.6,) return rgba
[docs]def getNumberText(num): numbers = ["zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve", "thirteen", "fourteen", "fifteen", "sixteen", "seventeen", "eighteen", "nineteen"] if len(numbers) > num: return numbers[num] else: return None
[docs]def get_rgb_colors(n): colors = [] r = int(random.random() * 256) g = int(random.random() * 256) b = int(random.random() * 256) step = 256 / n for i in range(n): r += step g += step b += step r = int(r) % 256 g = int(g) % 256 b = int(b) % 256 colors.append((r, g, b)) return colors
[docs]def get_hex_colors(n): initial_seed = 123 colors = [] for i in range(n): random.seed(initial_seed + i) color = "#%06x" % random.randint(0, 0xFFFFFF) colors.append(color) return colors
[docs]def convert_html_to_pdf(source_html, output_filename): # open output file for writing (truncated binary) result_file = open(output_filename, "w+b") # convert HTML to PDF pisa_status = pisa.CreatePDF( source_html, # the HTML to convert dest=result_file) # file handle to recieve result # close output file result_file.close() # close output file # return True on success and False on errors return pisa_status.err