Source code for ckg.analytics_core.analytics.kaplan_meierAnalysis

import pandas as pd
from lifelines import KaplanMeierFitter, NelsonAalenFitter
from lifelines.statistics import multivariate_logrank_test


[docs]def get_data_ready_for_km(dfs_dict, args): kmdf = None for df_key in dfs_dict: if df_key == 'clinical': kmdf = dfs_dict[df_key] else: df = dfs_dict[df_key] if 'marker' in args and 'index_col' in args: how = 'half' value = None if 'how' in args: how = args['how'] if 'value' in args: value = args['value'] mdf = group_data_based_on_marker(df, args['marker'], args['index_col'], how, value) if kmdf is not None: if 'index_col' in args and args['index_col'] in kmdf: index_col = args['index_col'] kmdf = kmdf.set_index(index_col).join(mdf.set_index(index_col), how='inner') return kmdf
[docs]def group_data_based_on_marker(df, marker, index_col, how, value): mdf = pd.DataFrame() if index_col is not None and marker is not None: if index_col in df and marker in df: mdf = df[[marker, index_col]] if how == 'cutoff': mdf['new_grouping'] = mdf.apply(lambda row: str(marker) + '+' if row[marker] >= value else str(marker)+'-') elif how == 'top' or how == 'top%': mdf = mdf.sort_values(by=marker, ascending=False) num_values = len(mdf[marker].values.tolist()) if how == 'top%': value = int(num_values * value / 100) if value < num_values: labels = [str(marker)+'+'] * value labels.extend([str(marker)+'-'] * (num_values - value)) else: print("Invalid value provided. Exceeded maximun number of samples {}".format(num_values)) mdf['new_grouping'] = labels else: print("Grouping method {} not implemented. Try with 'cutoff' or 'top'".format(how)) return mdf
[docs]def run_km(data, time_col, event_col, group_col, args={}): kmdf = None kmf = pd.DataFrame() summary = None if isinstance(data, dict): kmdf = get_data_ready_for_km(data, args) group_col = 'new_grouping' elif isinstance(data, pd.DataFrame): kmdf = data if kmdf is not None: kmf, summary = get_km_results(kmdf, group_col, time_col, event_col) return kmf, summary
[docs]def get_km_results(df, group_col, time_col, event_col): models = [] summary_ = None summary_result = None df = df[[event_col, time_col, group_col]].dropna() df[event_col] = df[event_col].astype('category') df[event_col] = df[event_col].cat.codes df[time_col] = df[time_col].astype('float') if not df.empty: for name, grouped_df in df.groupby(group_col): kmf = KaplanMeierFitter() t = grouped_df[time_col] e = grouped_df[event_col] kmf.fit(t, event_observed=e, label=name + " (N=" + str(len(t.tolist())) + ")") models.append(kmf) summary_ = multivariate_logrank_test(df[time_col].tolist(), df[group_col].tolist(), df[event_col].tolist(), alpha=99) if summary_ is not None: summary_result = "Multivariate logrank test: pval={}, t_statistic={}".format(summary_.p_value, summary_._test_statistic) return models, summary_result
[docs]def get_hazard_ratio_results(df, group_col, time_col, event_col): models = [] summary_ = None summary_result = None df = df[[event_col, time_col, group_col]].dropna() df[event_col] = df[event_col].astype('category') df[event_col] = df[event_col].cat.codes df[time_col] = df[time_col].astype('float') if not df.empty: for name, grouped_df in df.groupby(group_col): hr = NelsonAalenFitter() t = grouped_df[time_col] e = grouped_df[event_col] hr.fit(t, event_observed=e, label=name + " (N=" + str(len(t.tolist())) + ")") models.append(hr) return models