from __future__ import annotations from time import perf_counter import numpy as np from numpy.random import Generator from datetime import datetime from scipy.interpolate import interp1d import matplotlib.pyplot as plt from .spectrum import save_spectrum_data from .ste import save_ste_data, fit_ste, save_ste_fit, plot_ste_fits from .parameter import Parameter from .distributions import BaseDistribution from .motions import BaseMotion from .parsing import parse def run_ste_sim(config_file: str): p = parse(config_file) rng, num_traj, t_max, delta, eta, num_variables = _prepare_sim(p) t_mix = p.ste.t_mix t_evo = p.ste.t_evo t_echo = p.ste.t_echo fits_cc = [] fits_ss = [] # outer loop over variables of distribution of correlation times for (i, dist_values) in enumerate(p.dist): # noinspection PyCallingNonCallable dist = p.dist.dist_type(**dist_values, rng=rng) # second loop over parameter of motion model for (j, motion_values) in enumerate(p.motion): # noinspection PyCallingNonCallable motion = p.motion.model(delta, eta, **motion_values, rng=rng) print(f'\nStart of {dist}, {motion}') start = last_print = perf_counter() cc = np.zeros((len(t_mix), len(t_evo))) ss = np.zeros((len(t_mix), len(t_evo))) # inner loop to create trajectories for n in range(num_traj): phase = make_trajectory(dist, motion, t_max) for (k, t_evo_k) in enumerate(t_evo): dephased = phase(t_evo_k) t0 = t_mix + t_evo_k rephased = phase(t0 + t_evo_k + 2*t_echo) + phase(t0) - 2 * phase(t0+t_echo) cc[:, k] += np.cos(dephased)*np.cos(rephased) ss[:, k] += np.sin(dephased)*np.sin(rephased) last_print = print_step(n, num_traj, start, last_print) cc[:, 1:] /= num_traj ss[:, 1:] /= num_traj save_ste_data(cc, ss, p, dist, motion) p_fit_cc, p_fit_ss = fit_ste(cc, ss, t_evo, t_mix, dist_values, num_variables) fits_cc.append(p_fit_cc) fits_ss.append(p_fit_ss) save_ste_fit(p_fit_cc, p_fit_ss, p, dist, motion) plot_ste_fits(fits_cc, fits_ss, p.dist, p.motion) plt.show() def run_spectrum_sim(config_file: str): p: Parameter = parse(config_file) rng, num_traj, t_max, delta, eta, num_variables = _prepare_sim(p) num_echos = len(p.spec.t_echo) reduction_factor = np.zeros((num_variables, num_echos)) t_echo = p.spec.t_echo t_echo_strings = list(map(str, t_echo)) # outer loop over variables of distribution of correlation times for (i, dist_values) in enumerate(p.dist): # noinspection PyCallingNonCallable dist = p.dist.dist_type(**dist_values, rng=rng) # second loop over parameter of motion model for (j, motion_values) in enumerate(p.motion): # noinspection PyCallingNonCallable motion = p.motion.model(delta, eta, **motion_values, rng=rng) print(f'\nStart of {dist}, {motion}') timesignal = np.zeros((p.spec.num_points, num_echos)) start = perf_counter() last_print = start # inner loop to create trajectories for n in range(num_traj): phase = make_trajectory(dist, motion, t_max) for (k, t_echo_k) in enumerate(t_echo): # effect of de-phasing and re-phasing start_amp = -2 * phase(t_echo_k) # start of actual acquisition timesignal[:, k] += np.cos(start_amp + phase(p.spec.t_acq + 2*t_echo_k)) reduction_factor[max(p.motion.num_variables, 1)*i+j, k] += np.cos(phase(2*t_echo_k) + start_amp) # print(n+1, num_traj, start, last_print) last_print = print_step(n+1, num_traj, start, last_print) # apply line broadening timesignal *= p.spec.dampening[:, None] timesignal /= num_traj timesignal[0, :] /= 2 # FT to spectrum spec = np.fft.fftshift(np.fft.fft(timesignal, axis=0), axes=0).real spec -= spec[0] spec *= p.spec.pulse_attn[:, None] # save timesignals and spectra, also plots them save_spectrum_data(timesignal, spec, p, dist, motion, t_echo_strings) # plot and save reduction factor reduction_factor /= num_traj fig, ax = plt.subplots() lines = ax.semilogx(p.dist.variables['tau'], reduction_factor, 'o--') ax.legend(lines, t_echo_strings) plt.savefig(f'{dist.name()}_{motion.name()}_reduction.png') plt.show() def make_trajectory( dist: BaseDistribution, motion: BaseMotion, t_max: float, t_passed: float = 0., init_phase: float = 0. ): # set initial orientations and correlation times motion.start() dist.start() # number of jumps that are simulated at once chunks = min(int(0.51 * t_max / dist.tau_jump), 100_000) + 1 t = [np.array([t_passed])] phase = [np.array([init_phase])] while t_passed < t_max: # frequencies between jumps current_omega = motion.jump(size=chunks) # times at a particular position t_wait = dist.wait(size=chunks) accumulated_phase = np.cumsum(t_wait * current_omega) + phase[-1][-1] phase.append(accumulated_phase) t_wait = np.cumsum(t_wait) + t_passed t_passed = t_wait[-1] t.append(t_wait) t = np.concatenate(t) phase = np.concatenate(phase) # convenient interpolation to get phase at arbitrary times phase_interpol = interp1d(t, phase) return phase_interpol def _prepare_sim(parameter: Parameter) -> tuple[Generator, int, float, float, float, int]: # collect variables that are common to spectra and stimulated echo # random number generator rng = np.random.default_rng(parameter.sim.seed) # number of random walkers num_traj = parameter.sim.num_walker # length of trajectories t_max = parameter.sim.t_max # parameter for omega_q delta, eta = parameter.molecule.delta, parameter.molecule.eta num_variables = parameter.dist.num_variables * parameter.motion.num_variables return rng, num_traj, t_max, delta, eta, num_variables def print_step(n: int, num_traj: int, start: float, last_print: float) -> float: step_time = perf_counter() dt = step_time - last_print if dt > 10 or n == num_traj: date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f'{date} - step {n} of {num_traj}', end=' - ') elapsed = step_time - start total = num_traj * elapsed / n print(f'expected total: {total:.2f}s - elapsed: {elapsed:.2f}s - remaining: {total - elapsed:.2f}s') if dt > 10: last_print = step_time return last_print