2021-07-13 23:25:37 -07:00
|
|
|
import numpy as np
|
|
|
|
from scipy.signal import find_peaks, correlate
|
2021-08-04 16:22:15 -07:00
|
|
|
from scipy.optimize import curve_fit
|
2021-07-16 09:19:00 -07:00
|
|
|
from memdb import mem
|
2021-07-13 23:25:37 -07:00
|
|
|
|
2021-08-04 16:22:15 -07:00
|
|
|
def minmax(data):
|
|
|
|
return np.min(data), np.max(data)
|
|
|
|
|
|
|
|
def exp_func(x, x0, a, y0, tau): # NOTE: X is not something we pass in 🤦♂️
|
|
|
|
return y0 + a*np.exp(-(x-x0)/tau)
|
|
|
|
|
2021-07-13 23:25:37 -07:00
|
|
|
def spaced_groups(
|
|
|
|
x_data: np.array,
|
|
|
|
y_data: np.array,
|
|
|
|
group_len: float,
|
|
|
|
peak_minheight: float,
|
|
|
|
peak_prominence: float,
|
2021-07-23 11:03:43 -07:00
|
|
|
sma_denom: int,
|
|
|
|
mirrored: bool=True,
|
|
|
|
start=None,
|
|
|
|
end=None
|
2021-07-13 23:25:37 -07:00
|
|
|
):
|
|
|
|
"""
|
|
|
|
Use SpacedGroups algo to separate groups
|
|
|
|
|
2021-08-04 16:22:15 -07:00
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
2D array of raw data; every other group
|
2021-07-13 23:25:37 -07:00
|
|
|
"""
|
|
|
|
|
|
|
|
# Helpers
|
|
|
|
def t2i(t): # STATIC time to index
|
|
|
|
delta_t = abs(x_data[0] - t)
|
|
|
|
timestep = abs(x_data[1] - x_data[0])
|
|
|
|
return int(delta_t / timestep)
|
|
|
|
|
|
|
|
def t2i_range(t): # time to index RANGE (just get the delta)
|
|
|
|
timestep = abs(x_data[1] - x_data[0])
|
|
|
|
return int(t / timestep)
|
|
|
|
|
|
|
|
def moving_average(x, w):
|
|
|
|
return np.convolve(x, np.ones(w), 'valid') / w
|
|
|
|
|
|
|
|
def isolate_group(i):
|
|
|
|
i_min = i - t2i_range(group_len)
|
|
|
|
i_max = i + t2i_range(group_len)
|
2021-07-23 11:03:43 -07:00
|
|
|
# NOTE: Groups that are too short just get left out. Too bad!
|
|
|
|
group = y_data.tolist()[i_min:i_max]
|
|
|
|
return group
|
|
|
|
|
|
|
|
# Check if custom start & end values are set
|
|
|
|
|
|
|
|
if not end == None:
|
|
|
|
stop_ind = t2i(end)
|
|
|
|
x_data = x_data[:stop_ind]
|
|
|
|
y_data = y_data[:stop_ind]
|
|
|
|
|
|
|
|
if not start == None:
|
|
|
|
start_ind = t2i(start)
|
|
|
|
x_data = x_data[start_ind:]
|
|
|
|
y_data = y_data[start_ind:]
|
|
|
|
|
2021-07-13 23:25:37 -07:00
|
|
|
# Detect peaks w/ averaged data
|
|
|
|
x_data_av = np.delete(x_data, [range(int(sma_denom / 2))])
|
|
|
|
x_data_av = np.delete(x_data_av, [range(len(x_data)-int((sma_denom / 2) - 1), len(x_data_av))])
|
|
|
|
y_data_av = moving_average(y_data, sma_denom)
|
|
|
|
peak_indices = find_peaks(y_data_av, height=peak_minheight, prominence=peak_prominence) # Get indices of all peaks
|
|
|
|
|
|
|
|
peaks = []
|
|
|
|
for p in peak_indices[0]: # Get x-values of all peaks
|
|
|
|
peaks.append(x_data[p])
|
|
|
|
|
|
|
|
|
|
|
|
# Group peaks together
|
|
|
|
peak_groups = [[]]
|
|
|
|
group_index = 0
|
|
|
|
for i in range(len(peaks)):
|
|
|
|
item = peaks[i]
|
|
|
|
next_item = 0
|
|
|
|
try:
|
|
|
|
next_item = peaks[i+1]
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
|
|
|
peak_groups[group_index].append(item)
|
|
|
|
|
|
|
|
# Removed overlapping peak checks; don't really need that anymore
|
|
|
|
if abs(next_item - item) >= group_len: # Check if far enough away for new group
|
|
|
|
peak_groups.append([])
|
|
|
|
group_index += 1
|
|
|
|
|
|
|
|
for g in peak_groups: # empty group check
|
|
|
|
if len(g) == 0:
|
|
|
|
peak_groups.remove(g)
|
|
|
|
|
|
|
|
peaks_init = [] # Get peaks[0] for every group
|
|
|
|
for g in peak_groups:
|
|
|
|
peaks_init.append(g[0])
|
|
|
|
|
|
|
|
# Isolate group data
|
|
|
|
|
2021-07-23 11:03:43 -07:00
|
|
|
groups_raw = []
|
2021-07-13 23:25:37 -07:00
|
|
|
for p in peaks_init:
|
2021-07-23 11:03:43 -07:00
|
|
|
if mirrored:
|
|
|
|
if peaks_init.index(p) % 2 == 0:
|
|
|
|
groups_raw.append(isolate_group(t2i(p)))
|
|
|
|
else:
|
|
|
|
pass
|
2021-07-13 23:25:37 -07:00
|
|
|
else:
|
2021-07-23 11:03:43 -07:00
|
|
|
groups_raw.append(isolate_group(t2i(p)))
|
|
|
|
|
|
|
|
for i in groups_raw:
|
|
|
|
if len(i) == 0:
|
|
|
|
groups_raw.remove(i)
|
|
|
|
|
2021-07-13 23:25:37 -07:00
|
|
|
return groups_raw
|
|
|
|
|
2021-07-23 11:03:43 -07:00
|
|
|
|
2021-08-04 16:22:15 -07:00
|
|
|
def vthreshold(
|
|
|
|
x_data: np.array,
|
|
|
|
y_data: np.array,
|
|
|
|
v_data: np.array,
|
|
|
|
vmin: float,
|
|
|
|
vmax: float,
|
|
|
|
mirrored: bool=True,
|
|
|
|
start=None,
|
|
|
|
end=None
|
|
|
|
):
|
|
|
|
"""
|
|
|
|
Voltage-threshold grouping algorithm
|
|
|
|
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
A `list` of all peak groups
|
|
|
|
"""
|
|
|
|
|
|
|
|
# Helpers
|
|
|
|
def t2i(t):
|
|
|
|
delta_t = abs(x_data[0] - t)
|
|
|
|
timestep = abs(x_data[1] - x_data[0])
|
|
|
|
return int(delta_t / timestep)
|
|
|
|
|
|
|
|
def t2i_range(t):
|
|
|
|
timestep = abs(x_data[1] - x_data[0])
|
|
|
|
return int(t / timestep)
|
|
|
|
|
|
|
|
groups_raw = []
|
|
|
|
return groups_raw
|
|
|
|
|
2021-07-13 23:25:37 -07:00
|
|
|
def correlate_groups(groups_raw):
|
|
|
|
"""
|
|
|
|
Overlay groups using `scipy.correlate`.
|
|
|
|
|
2021-08-04 16:22:15 -07:00
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
2D array of overlayed groups
|
2021-07-13 23:25:37 -07:00
|
|
|
"""
|
|
|
|
|
|
|
|
# Compare groups (scipy correlate)
|
|
|
|
|
|
|
|
group_base = np.array(groups_raw[0])
|
|
|
|
groups_adjusted = [group_base]
|
2021-07-23 11:03:43 -07:00
|
|
|
for x in groups_raw[1:]:
|
2021-07-13 23:25:37 -07:00
|
|
|
# calculate how much to shift
|
|
|
|
corr = correlate(group_base, np.array(x))
|
|
|
|
shift = corr.tolist().index(max(corr))
|
|
|
|
|
|
|
|
# adjust alignment to fit on top of base group
|
|
|
|
diff = shift - len(x)
|
|
|
|
if diff < 0:
|
|
|
|
for i in range(abs(diff)):
|
|
|
|
x.pop(0)
|
|
|
|
elif diff > 0:
|
|
|
|
x = [0 for i in range(abs(diff))] + x
|
|
|
|
|
|
|
|
groups_adjusted.append(x)
|
|
|
|
|
2021-07-16 09:19:00 -07:00
|
|
|
return groups_adjusted
|
|
|
|
|
2021-08-04 16:22:15 -07:00
|
|
|
def add_peaks_only(groups_adjusted: list):
|
|
|
|
|
|
|
|
def unequal_add_truncation(a,b): # Instead of padding with 0, truncate
|
|
|
|
if len(a) < len(b):
|
|
|
|
c = b.copy()
|
|
|
|
c = c[:len(a)]
|
|
|
|
c += a
|
|
|
|
else:
|
|
|
|
c = a.copy()
|
|
|
|
c = c[:len(b)]
|
|
|
|
c += b
|
|
|
|
return(c)
|
|
|
|
|
|
|
|
added_peaks = np.array(groups_adjusted[0])
|
|
|
|
|
|
|
|
for g in groups_adjusted[1:]:
|
|
|
|
g1 = np.array(g)
|
|
|
|
g0 = added_peaks
|
|
|
|
added_peaks = unequal_add_truncation(g0, g1)
|
|
|
|
|
|
|
|
return added_peaks
|
|
|
|
|
2021-07-16 09:19:00 -07:00
|
|
|
def isolate_peaks(
|
2021-08-04 16:22:15 -07:00
|
|
|
groups_adjusted: list,
|
2021-07-16 09:19:00 -07:00
|
|
|
peak_width: int,
|
2021-08-04 16:22:15 -07:00
|
|
|
sma_denom: int,
|
|
|
|
peak_minheight: int = None,
|
|
|
|
peak_prominence: int = None,
|
|
|
|
shift_over: int = 0
|
2021-07-16 09:19:00 -07:00
|
|
|
):
|
|
|
|
|
2021-08-04 16:22:15 -07:00
|
|
|
def unequal_add(a,b): # NOTE: See https://www.delftstack.com/howto/numpy/vector-addition-in-numpy/
|
|
|
|
if len(a) < len(b):
|
|
|
|
c = b.copy()
|
|
|
|
c[:len(a)] += a
|
|
|
|
else:
|
|
|
|
c = a.copy()
|
|
|
|
c[:len(b)] += b
|
|
|
|
return(c)
|
|
|
|
|
|
|
|
def unequal_add_truncation(a,b): # Instead of padding with 0, truncate
|
|
|
|
if len(a) < len(b):
|
|
|
|
c = b.copy()
|
|
|
|
c = c[:len(a)]
|
|
|
|
c += a
|
|
|
|
else:
|
|
|
|
c = a.copy()
|
|
|
|
c = c[:len(b)]
|
|
|
|
c += b
|
|
|
|
return(c)
|
|
|
|
|
2021-07-16 09:19:00 -07:00
|
|
|
def moving_average(x, w):
|
|
|
|
return np.convolve(x, np.ones(w), 'valid') / w
|
|
|
|
|
2021-08-04 16:22:15 -07:00
|
|
|
added_peaks = np.array(groups_adjusted[0])
|
|
|
|
|
|
|
|
for g in groups_adjusted[1:]:
|
|
|
|
g1 = np.array(g)
|
|
|
|
g0 = added_peaks
|
|
|
|
added_peaks = unequal_add_truncation(g0, g1)
|
|
|
|
|
|
|
|
added_peaks_av = moving_average(added_peaks, sma_denom)
|
|
|
|
peak_indices = find_peaks(added_peaks_av, height=peak_minheight, prominence=peak_prominence, distance=peak_width/2)[0] # Get indices of all peaks
|
|
|
|
|
|
|
|
isolated_peaks = []
|
|
|
|
delta = peak_width/2
|
|
|
|
|
2021-07-16 09:19:00 -07:00
|
|
|
for g in groups_adjusted:
|
2021-08-04 16:22:15 -07:00
|
|
|
peaks_cut = []
|
|
|
|
for i in peak_indices:
|
|
|
|
peak = g[int(i-delta+shift_over):int(i+delta+shift_over)]
|
|
|
|
peaks_cut.append(peak)
|
|
|
|
isolated_peaks.append(peaks_cut)
|
|
|
|
|
|
|
|
return added_peaks, peak_indices, isolated_peaks
|
|
|
|
|
|
|
|
|
|
|
|
def fit_peaks(
|
|
|
|
isolated_peaks: list,
|
|
|
|
peak_indices: list,
|
|
|
|
min_peak_height: float,
|
|
|
|
min_peak_prominence: float,
|
|
|
|
moving_avg_size: int,
|
|
|
|
a: float,
|
|
|
|
tau: float,
|
|
|
|
y0: float,
|
|
|
|
shift_over: int,
|
|
|
|
use_advanced: bool
|
|
|
|
):
|
|
|
|
|
|
|
|
print(f'{use_advanced=}')
|
|
|
|
|
|
|
|
"""
|
|
|
|
Returns
|
|
|
|
-------
|
|
|
|
Peak fit equations. Linked to `mem['isolated_peaks']`.
|
|
|
|
"""
|
|
|
|
|
|
|
|
params_guess = (0.0000, a, y0, tau)
|
|
|
|
equations = []
|
|
|
|
for peaks_cut in isolated_peaks:
|
|
|
|
row = []
|
|
|
|
for peak_data in peaks_cut:
|
|
|
|
x_data = np.arange(len(peak_data)) # just placeholder indices
|
|
|
|
if not use_advanced:
|
|
|
|
peak_index = np.argmax(peak_data, axis=0)
|
|
|
|
else:
|
|
|
|
peak_index = find_peaks(peak_data, height=min_peak_height, prominence=min_peak_prominence)[0][0]
|
|
|
|
# print(peak_index)
|
|
|
|
params_guess = (peak_index+shift_over, a, y0, tau)
|
|
|
|
x_data_target = x_data[peak_index+shift_over:]
|
|
|
|
peak_data_target = peak_data[peak_index+shift_over:]
|
|
|
|
# popt, pcov = curve_fit(exp_func, x_data_target, peak_data_target, bounds=([-np.inf, 0.0, -np.inf, 0.0], np.inf))
|
|
|
|
popt, pcov = curve_fit(exp_func, x_data_target, peak_data_target, bounds=([-np.inf, 0.0, -np.inf, 0.0], np.inf), p0=params_guess, maxfev=10000000)
|
|
|
|
row.append({'popt': popt, 'pcov': pcov})
|
|
|
|
equations.append(row)
|
|
|
|
|
|
|
|
return equations # list linked with isolated_peaks
|