Source code for pykingenie.octet

import re
import base64
import xml.etree.ElementTree as elementTree

import numpy as np
import pandas as pd

from .utils.processing import etree_to_dict, combine_dicts
from .surface_exp      import SurfaceBasedExperiment

factor_conc_to_micro = {'nM':1e-3, 'µM':1, 'mM':1e3, 'M':1e6, 'mg/ml':1e3, 'µg/ml':1}

[docs] class OctetExperiment(SurfaceBasedExperiment): """ OctetExperiment class for handling Octet BLI data. Parameters ---------- name : str Name of the experiment. Attributes ---------- name (str): name of the experiment fns (list): list of file names (length n, one per sensor) xs (list): list of x values (time, length n, one per sensor) ys (list): list of y values (length n, one per sensor) exp_info (list): list of dictionaries with experimental information step_info (list): list of dictionaries with step information no_steps (int): number of steps no_sensors (int): number of sensors sample_column (np.ndarray): array of sample column information (96 elements, one per well) sample_row (np.ndarray): array of sample row information (96 elements, one per well) sample_type (list): list of sample types (96 elements, one per well) sample_id (list): list of sample ids (96 elements, one per well) sensor_names_unique (list): list of unique sensor names (length n, one per sensor) sensor_names (list): list of sensor names (length n, one per sensor) df_steps (pd.DataFrame): dataframe with the steps information ligand_conc_df (pd.DataFrame): dataframe with the ligand concentration information ligand_conc_df.head(2): Sensor Analyte_location Concentration_micromolar SampleID Replicate Experiment A1 5 0.1300 wt 1 t B1 5 0.0692 wt 1 t traces_loaded (bool): True if traces are loaded sample_plate_loaded (bool): True if sample plate information is loaded sample_conc (np.array): array with the sample concentrations (96 elements, one per well) sample_conc_labeled (list): list with the sample concentrations labeled (96 elements, one per well) steps_performed (pd.DataFrame): dataframe with the steps performed steps_performed.head(2): #Step Type Column Time 1 Regeneration 1 25 2 BaselineNeutralization 2 90 3 BaselineLoading 3 50 """
[docs] def __init__(self, name = 'BLI_experiment'): """ Initialize the OctetExperiment instance. Parameters ---------- name : str, optional Name of the experiment. Default is 'BLI_experiment'. """ super().__init__(name,'BLI_experiment')
[docs] def read_sensor_data(self, files, names=None): """ Read the sensor data from the .frd files. Parameters ---------- files : str or list of str Path(s) to .frd file(s) to read. names : str or list of str, optional Name(s) to assign to the sensors. If None, file names are used. Returns ------- None The method populates class attributes with data from the files. Notes ----- This method creates the following attributes: - traces_loaded : bool Whether traces were successfully loaded. - xs : list List of x values (time) for each sensor. - ys : list List of y values (signal) for each sensor. - exp_info : list List of dictionaries with experimental information. - step_info : list List of dictionaries with step information. - no_steps : int Number of steps in the experiment. - no_sensors : int Number of sensors in the experiment. """ if names is None: names = files if not isinstance(files, list): files = [files] names = [names] fns = [fn for fn,name in zip(files,names) if '.frd' in name] if len(fns) < 1: self.traces_loaded = False return None else: self.fns = fns # Initialize dictionaries with data xs, ys, all_expinfo, all_stepinfo, more_info = [], [], [], [], [] for fn in fns: # Load file tree = elementTree.parse(fn) root = tree.getroot() # Extract experimental info all_expinfo.append(etree_to_dict(root.find('ExperimentInfo'))) # Initialize lists for each file x_values, y_values, step_info = [], [], [] more_dict = {'FlowRate': [], 'StepType': [], 'StepName':[], 'StepStatus':[], 'ActualTime':[], 'CycleTime':[]} for step in root.find('KineticsData'): for step_x in step.findall('AssayXData'): # Convert string to binary data_text = bytes(step_x.text, 'utf-8') # Convert to base64 decoded = base64.decodebytes(data_text) # And now convert to float32 array data_values = np.array(np.frombuffer(decoded, dtype=np.float32)) x_values.append(data_values) for step_y in step.findall('AssayYData'): # Convert string to binary data_text = bytes(step_y.text, 'utf-8') # Convert to base64 decoded = base64.decodebytes(data_text) # And now convert to float32 array data_values = np.array(np.frombuffer(decoded, dtype=np.float32)) y_values.append(data_values) for step_data in step.findall('CommonData'): step_info.append(etree_to_dict(step_data)) for tag in ['FlowRate', 'StepType', 'StepName', 'StepStatus', 'ActualTime', 'CycleTime']: for step_data in step.findall(tag): more_dict[tag].append(step_data.text) xs.append(x_values) ys.append(y_values) all_stepinfo.append(combine_dicts(step_info)) more_info.append(more_dict) # Merge all_stepinfo and more_info for i in range(len(all_stepinfo)): all_stepinfo[i] = {**all_stepinfo[i], **more_info[i]} # Fill instance self.xs = xs self.ys = ys self.exp_info = all_expinfo self.step_info = all_stepinfo # Convert text to floats self.convert_to_numbers() self.generate_ligand_conc_df() return None
[docs] def generate_ligand_conc_df(self): """ Generate a DataFrame with the analyte concentrations based on step_info and exp_info. Parameters ---------- None Returns ------- None This method populates class attributes related to ligand concentration. Notes ----- This method requires the following attributes to be already populated: - step_info : list List of dictionaries with step information. - exp_info : list List of dictionaries with experimental information. - fns : list List of file names. This method creates/updates the following attributes: - no_steps : int Number of steps in the experiment. - no_sensors : int Number of sensors. - sensor_names : list List of sensor names. - df_steps : pandas.DataFrame DataFrame with step information. - ligand_conc_df : pandas.DataFrame DataFrame with ligand concentration information. """ self.no_steps = len(self.step_info[0]['ActualTime']) self.no_sensors = len(self.fns) self.sensor_names = [self.exp_info[i]['SensorName'] for i in range(self.no_sensors)] steps_names = self.step_info[0]['StepName'] steps_types = self.step_info[0]['StepType'] steps_start = self.step_info[0]['StartTime'] / 1000 # To seconds steps_loc = self.step_info[0]['SampleLocation'] self.df_steps = pd.DataFrame({'#Step':np.arange(len(steps_names))+1, 'Name':steps_names, 'Type':steps_types, 'Start':steps_start, 'Column_location':steps_loc}) # We need to include the loading location in self.df_steps loading_location = [] for row in self.df_steps.iterrows(): step_type = row[1]['Type'] if step_type == 'ASSOC': # Find the previous loading step for i in range(row[0],0,-1): if self.df_steps.iloc[i]['Type'] == 'LOADING': loading_location.append(self.df_steps.iloc[i]['Column_location']) break # If no loading step is found, append NaN if i == 1: loading_location.append(np.nan) else: loading_location.append(np.nan) self.df_steps['Loading_location'] = loading_location sensor_locs_all = np.concatenate([self.step_info[i]['SampleLocation'] for i in range(self.no_sensors)]) sensor_type_all = np.concatenate([self.step_info[i]['StepType'] for i in range(self.no_sensors)]) sensor_molar_conc_all = np.concatenate([self.step_info[i]['MolarConcentration'] for i in range(self.no_sensors)]) sensor_mass_conc_all = np.concatenate([self.step_info[i]['Concentration'] for i in range(self.no_sensors)]) sensor_conc_all = [] for i in range(len(sensor_molar_conc_all)): if sensor_molar_conc_all[i] < 0: sensor_conc_all.append(sensor_mass_conc_all[i]) else: sensor_conc_all.append(sensor_molar_conc_all[i]) sensor_conc_all = np.array(sensor_conc_all) sample_id_all = np.concatenate([self.step_info[i]['SampleID'] for i in range(self.no_sensors)]) sensor_name_rep = np.concatenate([np.repeat(self.exp_info[i]['SensorName'],len(self.step_info[i]['Concentration'])) for i in range(self.no_sensors)]) conc_units = np.concatenate([self.step_info[i]['MolarConcUnits'] for i in range(self.no_sensors)]) df_all = pd.DataFrame({'Sensor':sensor_name_rep, 'Analyte_location':sensor_locs_all, 'Type':sensor_type_all, 'Concentration_micromolar':sensor_conc_all, 'ConcUnits':conc_units, 'SampleID':sample_id_all}) # For each association step, find the corresponding loading step # and add the loading step to the dataframe as a column next to the association # Add empty column to the data frame loading_location = [] loading_sample_id = [] for i in range(len(df_all)): row = df_all.iloc[i] # Find if row is association step if row['Type'] == 'ASSOC': # Find the previous loading step for j in range(i,0,-1): if df_all.iloc[j]['Type'] == 'LOADING': loading_location.append(df_all.iloc[j]['Analyte_location']) loading_sample_id.append(df_all.iloc[j]['SampleID']) break # Keep only association or dissociation steps df = df_all[df_all['Type'] == 'ASSOC'].copy() # If loading location is empty, fill with 0 if len(loading_location) == 0: loading_location = [0] * len(df) loading_sample_id = [0] * len(df) # ADD column loading_location df['Loading_location'] = loading_location # Replace None with empty string in loading_sample_id loading_sample_id = [x if x is not None else '' for x in loading_sample_id] # Include the loading_sample_id, if we have more than one unique value unq_loading_ids = np.unique(loading_sample_id) if len(unq_loading_ids) > 1: # Combine the sample id with the loading id df['SampleID'] = df['SampleID'] + ' - ' + loading_sample_id # Remove the Type column df = df.drop(columns=['Type']) # Sort by location and sensor name df = df.sort_values(by=['Loading_location','Analyte_location','Sensor']) # Add rep column sizes = df.groupby(['Loading_location','Analyte_location','Sensor']).size().reset_index(name="Repetitions") rep_number = [] for i in range(len(sizes)): rep_number.extend(np.arange(sizes['Repetitions'].iloc[i])+1) # Group by sensor and location df['Replicate'] = rep_number # Sort the dataframe first by sensor, second by Loading location, # Third by replicate and finally by location df = df.sort_values(by=['Analyte_location','Replicate','Loading_location','Sensor']) df['Factor'] = df.apply(lambda x: factor_conc_to_micro[x['ConcUnits']], axis=1) df['Concentration_micromolar'] = df['Concentration_micromolar'] * df['Factor'] # Remove the factor column and conc units df = df.drop(columns=['Factor','ConcUnits']) # Add the experiment name df['Experiment'] = self.name self.ligand_conc_df = df self.create_unique_sensor_names() self.traces_loaded = True return None
[docs] def merge_consecutive_steps(self, idx_ref, idx_to_merge): """ Combine two consecutive steps into one step. Parameters ---------- idx_ref : int Index of the reference step. The type of step will be taken from this step. idx_to_merge : int Index of the step to merge with the reference step. Returns ------- None The method modifies the xs, ys, and step information in place. Notes ----- The two steps must be consecutive (their indices must differ by exactly 1). """ assert np.abs(idx_ref - idx_to_merge) == 1, "The two steps must be consecutive" assert idx_ref != idx_to_merge, "The two steps must be different" idx_ref -= 1 # Adjust for 0-based indexing idx_to_merge -= 1 # Adjust for 0-based indexing for sensor in range(self.no_sensors): # Extract the reference step x and y values x_ref = self.xs[sensor][idx_ref] y_ref = self.ys[sensor][idx_ref] # Extract the step to merge x and y values x_merge = self.xs[sensor][idx_to_merge] y_merge = self.ys[sensor][idx_to_merge] # Find wich has the lowest start time t0_ref = np.min(x_ref) t0_merge = np.min(x_merge) # Concatenate the x and y values if t0_ref < t0_merge: self.xs[sensor][idx_ref] = np.concatenate((x_ref, x_merge)) self.ys[sensor][idx_ref] = np.concatenate((y_ref, y_merge)) else: self.xs[sensor][idx_ref] = np.concatenate((x_merge, x_ref)) self.ys[sensor][idx_ref] = np.concatenate((y_merge, y_ref)) # Remove the merge step from the xs and ys lists self.xs[sensor].pop(idx_to_merge) self.ys[sensor].pop(idx_to_merge) # Remove the step from self.step_info # using the idx_to_merge index for sensor_id in range(self.no_sensors): step_info = self.step_info[sensor_id] for i,l in enumerate(step_info): values = self.step_info[sensor_id][l] if isinstance(values, list) and len(values) > idx_to_merge: self.step_info[sensor_id][l].pop(idx_to_merge) elif isinstance(values, np.ndarray) and len(values) > idx_to_merge: self.step_info[sensor_id][l] = np.delete(values, idx_to_merge) #self.step_info[i].pop(key) # Create the analyte concentration dataframe again self.generate_ligand_conc_df() return None
[docs] def merge_consecutive_steps_by_name(self, step_name, reference=True): """ Merge steps with a specific name with their consecutive step. Parameters ---------- step_name : str Name of the step to merge with the next/previous step. reference : bool, optional If True, the step with name step_name will be used as reference to extract the analyte concentration, loading location, etc. Default is True. Returns ------- None The method modifies the xs, ys, and step information in place. Notes ----- This method finds all steps with the given name and merges them with their adjacent step. The merged step inherits properties from the reference step. """ # Find the indices of the steps of type step_type idxs = [] for i in range(len(self.df_steps)): if self.df_steps.iloc[i]['Name'] == step_name: # check if idx+2 is valid if i+2 <= len(self.df_steps): idxs.append(i) # Sort them in reverse order to avoid index shifting issues idxs.sort(reverse=True) for idx in idxs: if reference: # Merge the step with the next step self.merge_consecutive_steps(idx+1,idx+2) else: # Merge the step with the previous step self.merge_consecutive_steps(idx+2,idx+1) return None
[docs] def read_sample_plate_info(self, files, names=None): """ Read the sample plate information from the .fmf file. Parameters ---------- files : str or list of str Path(s) to .fmf file(s) containing sample plate information. names : str or list of str, optional Name(s) to assign to the files. If None, file names are used. Returns ------- None The method populates class attributes with sample plate data. Notes ----- This method creates the following attributes: - sample_column : numpy.ndarray Array of sample column information (96 elements, one per well). - sample_row : numpy.ndarray Array of sample row information (96 elements, one per well). - sample_type : list List of sample types (96 elements, one per well). - sample_id : list List of sample IDs (96 elements, one per well). - sample_plate_loaded : bool Set to True if sample plate information is successfully loaded. - sample_conc : numpy.ndarray Array with the sample concentrations (96 elements, one per well). - sample_conc_labeled : list List with the sample concentrations labeled (96 elements, one per well). """ if names is None: names = files if not isinstance(files, list): files = [files] names = [names] index = next((i for i, s in enumerate(names) if 'ExpMethod.fmf' in s), None) if index is None: self.sample_plate_loaded = False return None file = files[index] tree = elementTree.parse(file) root = tree.getroot() sample_types = [x.text for x in root.findall(".//SampleType")] sample_locations = [x.text for x in root.findall(".//SampleLoc")] sample_ids = [x.text for x in root.findall(".//SampleID")] sample_conc_molar = np.array([float(x.text) for x in root.findall(".//SampleMolarConc")]) sample_conc_mass = np.array([float(x.text) for x in root.findall(".//SampleConc")]) sample_conc = sample_conc_mass sel_ids = ['SAMPLE' in s for s in sample_types] counter = 0 for i in range(len(sample_conc)): if sel_ids[i]: if sample_conc_molar[counter] > 0: sample_conc[i] = sample_conc_molar[counter] counter += 1 conc_units = [x.text for x in root.findall(".//ConcUnits")][0] molar_conc_units = [x.text for x in root.findall(".//MolarConcUnits")][0] factors = [factor_conc_to_micro[molar_conc_units] if 'SAMPLE' in st else factor_conc_to_micro[conc_units] for st in sample_types] sample_conc = sample_conc * np.array(factors) sample_conc = np.round(sample_conc, 5) sample_column = np.array([int(re.sub(r'\D', '', text)) for text in sample_locations]) sample_row = np.array([re.sub(r'\d+', '', text) for text in sample_locations]) self.sample_column = sample_column self.sample_row = sample_row self.sample_type = sample_types self.sample_id = sample_ids self.sample_conc = sample_conc sample_conc_labeled = [f"{x} µM" if t == 'KSAMPLE' and x >= 0 else f"{x} µg/ml" if t != 'KSAMPLE' and x >= 0 else '' for x, t in zip(sample_conc, sample_types)] self.sample_conc_labeled = sample_conc_labeled data_name = [x.text for x in root.findall(".//DataName")] assay_time = [x.text for x in root.findall(".//AssayTime")] steps_info_df = pd.DataFrame({'Type':data_name,'Time':assay_time}) data_name = [x.text for x in root.findall(".//StepDataName")] data_col = [x.text for x in root.findall(".//SampleCol")] steps_performed = pd.DataFrame({'#Step':np.arange(len(data_name))+1,'Type':data_name,'Column':data_col}) steps_performed = pd.merge(steps_performed, steps_info_df, on='Type', how='left') self.steps_performed = steps_performed self.sample_plate_loaded = True return None
[docs] def convert_to_numbers(self): """ Convert the strings in the step info to numbers. Parameters ---------- None Returns ------- None The method modifies the step_info attribute in place. Notes ----- This method processes the following entries in step_info: 'Concentration', 'MolarConcentration', 'MolecularWeight', 'Temperature', 'StartTime', 'AssayTime', 'FlowRate', 'ActualTime', 'CycleTime'. """ # List of entries in step info entries = ['Concentration', 'MolarConcentration', 'MolecularWeight', 'Temperature', 'StartTime', 'AssayTime', 'FlowRate', 'ActualTime', 'CycleTime'] for entry in entries: for sensor in range(len(self.fns)): # Do sanity check try: self.step_info[sensor][entry] = np.array(self.step_info[sensor][entry], dtype=float) except: print("Erroneous entry found for %s and sensor %i: %s" % (entry, sensor, self.step_info[sensor][entry])) print("Will set it to -1. Needs to be corrected") # Correct erroneous value for i in range(len(self.step_info[sensor][entry])): try: float(self.step_info[sensor][entry][i]) except: self.step_info[sensor][entry][i] = -1 self.step_info[sensor][entry] = np.array(self.step_info[sensor][entry], dtype=float) return None