python/src/rwsims/sims.py

273 lines
9.1 KiB
Python
Raw Normal View History

2024-06-19 17:10:49 +00:00
from __future__ import annotations
2024-06-30 10:06:44 +00:00
from time import perf_counter
2024-06-19 17:10:49 +00:00
import numpy as np
from numpy.random import Generator
2024-06-30 10:06:44 +00:00
from datetime import datetime
2024-06-19 17:10:49 +00:00
from scipy.interpolate import interp1d
import matplotlib.pyplot as plt
2024-06-20 17:19:55 +00:00
from scipy.optimize import curve_fit
2024-06-19 17:10:49 +00:00
2024-06-30 10:06:44 +00:00
from .functions import ste
2024-06-20 17:19:55 +00:00
from .parameter import Parameter
from .distributions import BaseDistribution
from .motions import BaseMotion
from .parsing import parse
2024-06-19 17:10:49 +00:00
2024-06-20 17:19:55 +00:00
def run_ste_sim(config_file: str):
2024-06-19 17:10:49 +00:00
p = parse(config_file)
rng, num_traj, t_max, delta, eta, num_variables = _prepare_sim(p)
2024-06-20 17:19:55 +00:00
t_mix = p.ste.t_mix
t_evo = p.ste.t_evo
t_echo = p.ste.t_echo
fig, ax = plt.subplots(2)
fig2, ax2 = plt.subplots(2)
fig3, ax3 = plt.subplots(2)
2024-06-19 17:10:49 +00:00
# outer loop over variables of distribution of correlation times
for (i, dist_values) in enumerate(p.dist):
# noinspection PyCallingNonCallable
dist = p.dist.dist_type(**dist_values, rng=rng)
2024-06-30 10:06:44 +00:00
# second loop over parameter of motion model
2024-06-19 17:10:49 +00:00
for (j, motion_values) in enumerate(p.motion):
# noinspection PyCallingNonCallable
motion = p.motion.model(delta, eta, **motion_values, rng=rng)
2024-06-30 10:06:44 +00:00
print(f'\nStart of {dist}, {motion}')
2024-06-19 17:10:49 +00:00
2024-06-30 10:06:44 +00:00
start = last_print = perf_counter()
2024-06-19 17:10:49 +00:00
2024-06-20 17:19:55 +00:00
cc = np.zeros((len(t_mix), len(t_evo)))
ss = np.zeros((len(t_mix), len(t_evo)))
2024-06-19 17:10:49 +00:00
# inner loop to create trajectories
for n in range(num_traj):
2024-06-30 10:06:44 +00:00
phase = make_trajectory(dist, motion, t_max)
2024-06-19 17:10:49 +00:00
2024-06-20 17:19:55 +00:00
for (k, t_evo_k) in enumerate(t_evo):
dephased = phase(t_evo_k)
2024-06-30 10:06:44 +00:00
t0 = t_mix + t_evo_k
2024-06-20 17:19:55 +00:00
rephased = phase(t0 + t_evo_k + 2*t_echo) + phase(t0) - 2 * phase(t0+t_echo)
cc[:, k] += np.cos(dephased)*np.cos(rephased)
ss[:, k] += np.sin(dephased)*np.sin(rephased)
2024-06-19 17:10:49 +00:00
2024-06-30 10:06:44 +00:00
last_print = print_step(n, num_traj, start, last_print)
2024-06-19 17:10:49 +00:00
2024-06-20 17:19:55 +00:00
cc[:, 1:] /= num_traj
ss[:, 1:] /= num_traj
fig4, ax4 = plt.subplots()
ax4.semilogx(t_mix, cc/cc[0, :], '.-')
fig5, ax5 = plt.subplots()
ax5.semilogx(t_mix, ss/ss[0, :], '.-')
for k in range(num_variables):
p0 = [0.5, 0, p.dist.variables.get('tau', 1), 1]
p_final = []
p_final1 = []
for k, t_evo_k in enumerate(p.ste.t_evo):
try:
res = curve_fit(ste, t_mix, cc[:, k], p0=p0, bounds=([0, 0, 0, 0], [np.inf, 1, np.inf, 1]))
p_final.append(res[0].tolist())
except RuntimeError:
p_final.append([np.nan, np.nan, np.nan, np.nan])
try:
res2 = curve_fit(ste, t_mix, ss[:, k], p0=p0, bounds=([0, 0, 0, 0], [np.inf, 1, np.inf, 1]))
p_final1.append(res2[0].tolist())
except RuntimeError:
p_final1.append([np.nan, np.nan, np.nan, np.nan])
p_final = np.array(p_final)
p_final1 = np.array(p_final1)
# ax[0].semilogy(p.ste.t_evo, p_final[:, 0], '.--')
# ax[1].semilogy(t_evo, p_final1[:, 0], '.--')
ax[0].plot(t_evo, p_final[:, 1], '.-')
ax[1].plot(t_evo, p_final1[:, 1], '.-')
ax2[0].semilogy(t_evo, p_final[:, 2], '.-')
ax2[1].semilogy(t_evo, p_final1[:, 2], '.-')
ax3[0].plot(t_evo, p_final[:, 3], '.-')
ax3[1].plot(t_evo, p_final1[:, 3], '.-')
2024-06-19 17:10:49 +00:00
plt.show()
2024-06-20 17:19:55 +00:00
def run_spectrum_sim(config_file: str):
2024-06-19 17:10:49 +00:00
p = parse(config_file)
rng, num_traj, t_max, delta, eta, num_variables = _prepare_sim(p)
2024-06-20 17:19:55 +00:00
num_echos = len(p.spec.t_echo)
reduction_factor = np.zeros((num_variables, num_echos))
t_echo = p.spec.t_echo
t_echo_strings = list(map(str, t_echo))
2024-06-19 17:10:49 +00:00
# outer loop over variables of distribution of correlation times
for (i, dist_values) in enumerate(p.dist):
# noinspection PyCallingNonCallable
dist = p.dist.dist_type(**dist_values, rng=rng)
2024-06-30 10:06:44 +00:00
# second loop over parameter of motion model
2024-06-19 17:10:49 +00:00
for (j, motion_values) in enumerate(p.motion):
# noinspection PyCallingNonCallable
motion = p.motion.model(delta, eta, **motion_values, rng=rng)
2024-06-30 10:06:44 +00:00
print(f'\nStart of {dist}, {motion}')
2024-06-20 17:19:55 +00:00
timesignal = np.zeros((p.spec.num_points, num_echos))
2024-06-19 17:10:49 +00:00
2024-06-30 10:06:44 +00:00
start = perf_counter()
last_print = start
2024-06-19 17:10:49 +00:00
# inner loop to create trajectories
for n in range(num_traj):
2024-06-30 10:06:44 +00:00
phase = make_trajectory(dist, motion, t_max)
2024-06-20 17:19:55 +00:00
for (k, t_echo_k) in enumerate(t_echo):
# effect of de-phasing and re-phasing
start_amp = -2 * phase(t_echo_k)
2024-06-19 17:10:49 +00:00
2024-06-20 17:19:55 +00:00
# start of actual acquisition
timesignal[:, k] += np.cos(start_amp + phase(p.spec.t_acq + 2*t_echo_k))
reduction_factor[max(p.motion.num_variables, 1)*i+j, k] += np.cos(phase(2*t_echo_k) + start_amp)
2024-06-19 17:10:49 +00:00
2024-06-30 10:06:44 +00:00
# print(n+1, num_traj, start, last_print)
last_print = print_step(n+1, num_traj, start, last_print)
2024-06-19 17:10:49 +00:00
2024-06-30 10:06:44 +00:00
# apply line broadening
2024-06-20 17:19:55 +00:00
timesignal *= p.spec.dampening[:, None]
timesignal /= num_traj
2024-06-30 10:06:44 +00:00
timesignal[0, :] /= 2
2024-06-20 17:19:55 +00:00
# FT to spectrum
spec = np.fft.fftshift(np.fft.fft(timesignal, axis=0), axes=0).real
spec -= spec[0]
2024-06-30 10:06:44 +00:00
spec *= p.spec.pulse_attn[:, None]
2024-06-20 17:19:55 +00:00
2024-06-30 10:06:44 +00:00
save_spectrum_data(timesignal, spec, p, dist, motion, t_echo_strings)
2024-06-19 17:10:49 +00:00
fig2, ax2 = plt.subplots()
2024-06-30 10:06:44 +00:00
lines = ax2.semilogx(p.dist.variables['tau'], reduction_factor / num_traj, 'o--')
ax2.legend(lines, t_echo_strings)
plt.savefig(f'{dist.name}_{motion.name}_reduction.png')
2024-06-19 17:10:49 +00:00
plt.show()
2024-06-30 10:06:44 +00:00
def make_trajectory(
dist: BaseDistribution,
motion: BaseMotion,
t_max: float,
t_passed: float = 0.,
init_phase: float = 0.
):
2024-06-19 17:10:49 +00:00
2024-06-30 10:06:44 +00:00
# set initial orientations and correlation times
2024-06-20 17:19:55 +00:00
motion.start()
dist.start()
2024-06-30 10:06:44 +00:00
# number of jumps that are simulated at once
chunks = min(int(0.51 * t_max / dist.tau_jump), 100_000) + 1
t = [np.array([t_passed])]
phase = [np.array([init_phase])]
2024-06-19 17:10:49 +00:00
while t_passed < t_max:
2024-06-20 17:19:55 +00:00
# frequencies between jumps
2024-06-19 17:10:49 +00:00
current_omega = motion.jump(size=chunks)
2024-06-20 17:19:55 +00:00
# times at a particular position
2024-06-19 17:10:49 +00:00
t_wait = dist.wait(size=chunks)
accumulated_phase = np.cumsum(t_wait * current_omega) + phase[-1]
2024-06-30 10:06:44 +00:00
phase.append(accumulated_phase)
2024-06-19 17:10:49 +00:00
t_wait = np.cumsum(t_wait) + t_passed
t_passed = t_wait[-1]
2024-06-30 10:06:44 +00:00
t.append(t_wait)
2024-06-19 17:10:49 +00:00
2024-06-30 10:06:44 +00:00
t = np.concatenate(t)
phase = np.concatenate(phase)
2024-06-19 17:10:49 +00:00
# convenient interpolation to get phase at arbitrary times
phase_interpol = interp1d(t, phase)
return phase_interpol
def _prepare_sim(parameter: Parameter) -> tuple[Generator, int, float, float, float, int]:
2024-06-30 10:06:44 +00:00
# collect variables that are common to spectra and stimulated echo
2024-06-19 17:10:49 +00:00
# random number generator
rng = np.random.default_rng(parameter.sim.seed)
# number of random walkers
num_traj = parameter.sim.num_walker
# length of trajectories
t_max = parameter.sim.t_max
# parameter for omega_q
delta, eta = parameter.molecule.delta, parameter.molecule.eta
2024-06-20 17:19:55 +00:00
num_variables = parameter.dist.num_variables * parameter.motion.num_variables
2024-06-19 17:10:49 +00:00
return rng, num_traj, t_max, delta, eta, num_variables
2024-06-30 10:06:44 +00:00
def print_step(n: int, num_traj: int, start: float, last_print: float) -> float:
step_time = perf_counter()
dt = step_time - last_print
if dt > 10 or n == num_traj:
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{date} - step {n} of {num_traj}', end=' - ')
elapsed = step_time - start
total = num_traj * elapsed / n
print(f'expected total: {total:.2f}s - elapsed: {elapsed:.2f}s - remaining: {total - elapsed:.2f}s')
if dt > 10:
last_print = step_time
return last_print
2024-06-20 17:19:55 +00:00
2024-06-30 10:06:44 +00:00
def make_filename(dist: BaseDistribution, motion: BaseMotion) -> str:
filename = f'{dist}_{motion}'
filename = filename.replace(' ', '_')
filename = filename.replace('.', 'p')
return filename
def save_spectrum_data(
timesignal: np.ndarray,
spectrum: np.ndarray,
param: Parameter,
dist: BaseDistribution,
motion: BaseMotion,
echo_strings: list[str]
):
filename = make_filename(dist, motion)
header = param.totext(sim=True, spec=True)
header += '\nx\t' + '\t'.join(echo_strings)
np.savetxt(filename + '_timesignal.dat', np.c_[param.spec.t_acq, timesignal], header=header)
np.savetxt(filename + '_spectrum.dat', np.c_[param.spec.freq, spectrum], header=header)
fig, ax = plt.subplots()
lines = ax.plot(param.spec.freq, spectrum)
ax.set_title(f'{dist}, {motion}')
ax.legend(lines, echo_strings)
plt.savefig(filename + '_spectrum.png')
fig1, ax1 = plt.subplots()
lines = ax1.plot(param.spec.t_acq, timesignal)
ax1.set_title(f'{dist}, {motion}')
ax1.legend(lines, echo_strings)
plt.savefig(filename + '_timesignal.png')
plt.show()