from __future__ import annotations from time import time import numpy as np from numpy.random import Generator from scipy.interpolate import interp1d import matplotlib.pyplot as plt from scipy.optimize import curve_fit from .parameter import Parameter from .distributions import BaseDistribution from .motions import BaseMotion from .parsing import parse def run_ste_sim(config_file: str): p = parse(config_file) rng, num_traj, t_max, delta, eta, num_variables = _prepare_sim(p) t_mix = p.ste.t_mix t_evo = p.ste.t_evo t_echo = p.ste.t_echo fig, ax = plt.subplots(2) fig2, ax2 = plt.subplots(2) fig3, ax3 = plt.subplots(2) # outer loop over variables of distribution of correlation times for (i, dist_values) in enumerate(p.dist): # noinspection PyCallingNonCallable dist = p.dist.dist_type(**dist_values, rng=rng) chunks = int(0.6 * t_max / dist_values.get('tau', 1)) + 1 # second loop over parameter of motional model for (j, motion_values) in enumerate(p.motion): # noinspection PyCallingNonCallable motion = p.motion.model(delta, eta, **motion_values, rng=rng) print(f'\nStart of {dist} and {motion}') start = time() cc = np.zeros((len(t_mix), len(t_evo))) ss = np.zeros((len(t_mix), len(t_evo))) # inner loop to create trajectories for n in range(num_traj): phase = make_trajectory(chunks, dist, motion, t_max) for (k, t_evo_k) in enumerate(t_evo): dephased = phase(t_evo_k) t0= t_mix + t_evo_k rephased = phase(t0 + t_evo_k + 2*t_echo) + phase(t0) - 2 * phase(t0+t_echo) cc[:, k] += np.cos(dephased)*np.cos(rephased) ss[:, k] += np.sin(dephased)*np.sin(rephased) print_step(n, num_traj, start) cc[:, 1:] /= num_traj ss[:, 1:] /= num_traj fig4, ax4 = plt.subplots() ax4.semilogx(t_mix, cc/cc[0, :], '.-') fig5, ax5 = plt.subplots() ax5.semilogx(t_mix, ss/ss[0, :], '.-') for k in range(num_variables): p0 = [0.5, 0, p.dist.variables.get('tau', 1), 1] p_final = [] p_final1 = [] for k, t_evo_k in enumerate(p.ste.t_evo): try: res = curve_fit(ste, t_mix, cc[:, k], p0=p0, bounds=([0, 0, 0, 0], [np.inf, 1, np.inf, 1])) p_final.append(res[0].tolist()) except RuntimeError: p_final.append([np.nan, np.nan, np.nan, np.nan]) try: res2 = curve_fit(ste, t_mix, ss[:, k], p0=p0, bounds=([0, 0, 0, 0], [np.inf, 1, np.inf, 1])) p_final1.append(res2[0].tolist()) except RuntimeError: p_final1.append([np.nan, np.nan, np.nan, np.nan]) p_final = np.array(p_final) p_final1 = np.array(p_final1) # ax[0].semilogy(p.ste.t_evo, p_final[:, 0], '.--') # ax[1].semilogy(t_evo, p_final1[:, 0], '.--') ax[0].plot(t_evo, p_final[:, 1], '.-') ax[1].plot(t_evo, p_final1[:, 1], '.-') ax2[0].semilogy(t_evo, p_final[:, 2], '.-') ax2[1].semilogy(t_evo, p_final1[:, 2], '.-') ax3[0].plot(t_evo, p_final[:, 3], '.-') ax3[1].plot(t_evo, p_final1[:, 3], '.-') plt.show() def run_spectrum_sim(config_file: str): p = parse(config_file) rng, num_traj, t_max, delta, eta, num_variables = _prepare_sim(p) print(num_traj) num_echos = len(p.spec.t_echo) reduction_factor = np.zeros((num_variables, num_echos)) freq = np.fft.fftshift(np.fft.fftfreq(p.spec.num_points, p.spec.dwell_time)) t_echo = p.spec.t_echo t_echo_strings = list(map(str, t_echo)) # outer loop over variables of distribution of correlation times for (i, dist_values) in enumerate(p.dist): # noinspection PyCallingNonCallable dist = p.dist.dist_type(**dist_values, rng=rng) print(f'\nStart of {dist}') chunks = int(0.6 * t_max / dist.mean_tau) + 1 # second loop over parameter of motional model for (j, motion_values) in enumerate(p.motion): # noinspection PyCallingNonCallable motion = p.motion.model(delta, eta, **motion_values, rng=rng) print(f'Start of {motion}') timesignal = np.zeros((p.spec.num_points, num_echos)) start = time() # inner loop to create trajectories for n in range(num_traj): phase = make_trajectory(chunks, dist, motion, t_max) for (k, t_echo_k) in enumerate(t_echo): # effect of de-phasing and re-phasing start_amp = -2 * phase(t_echo_k) # start of actual acquisition timesignal[:, k] += np.cos(start_amp + phase(p.spec.t_acq + 2*t_echo_k)) reduction_factor[max(p.motion.num_variables, 1)*i+j, k] += np.cos(phase(2*t_echo_k) + start_amp) print_step(n, num_traj, start) timesignal *= p.spec.dampening[:, None] timesignal /= num_traj # FT to spectrum spec = np.fft.fftshift(np.fft.fft(timesignal, axis=0), axes=0).real spec -= spec[0] # plot spectra fig, ax = plt.subplots() lines = ax.plot(freq, spec) ax.set_title(f'{dist}, {motion}') ax.legend(lines, t_echo_strings) plt.show() fig2, ax2 = plt.subplots() ax2.semilogx(p.dist.variables['tau'], reduction_factor / num_traj, 'o--') plt.show() def print_step(n, num_traj, start): n_1 = n+1 if n_1 % 200 == 0 or n_1 == num_traj: elapsed = time() - start print(f'Step {n_1} of {num_traj}', end=' - ') total = num_traj * elapsed / (n_1) print(f'total: {total:.2f}s - elapsed: {elapsed:.2f}s - remaining: {total - elapsed:.2f}s') def make_trajectory(chunks: int, dist: BaseDistribution, motion: BaseMotion, t_max: float): motion.start() dist.start() t_passed = 0 t = [0] phase = [0] while t_passed < t_max: # frequencies between jumps current_omega = motion.jump(size=chunks) # times at a particular position t_wait = dist.wait(size=chunks) accumulated_phase = np.cumsum(t_wait * current_omega) + phase[-1] t_wait = np.cumsum(t_wait) + t_passed t_passed = t_wait[-1] t.extend(t_wait.tolist()) phase.extend(accumulated_phase.tolist()) # convenient interpolation to get phase at arbitrary times phase_interpol = interp1d(t, phase) return phase_interpol def _prepare_sim(parameter: Parameter) -> tuple[Generator, int, float, float, float, int]: # random number generator rng = np.random.default_rng(parameter.sim.seed) # number of random walkers num_traj = parameter.sim.num_walker # length of trajectories t_max = parameter.sim.t_max # parameter for omega_q delta, eta = parameter.molecule.delta, parameter.molecule.eta num_variables = parameter.dist.num_variables * parameter.motion.num_variables return rng, num_traj, t_max, delta, eta, num_variables def ste(x, a, f_infty, tau, beta): return a*((1-f_infty) * np.exp(-(x/tau)**beta) + f_infty)