213 lines
7.0 KiB
Python
213 lines
7.0 KiB
Python
|
from __future__ import annotations
|
||
|
|
||
|
from time import time
|
||
|
|
||
|
import numpy as np
|
||
|
from numpy.random import Generator
|
||
|
from scipy.interpolate import interp1d
|
||
|
from scipy.optimize import curve_fit
|
||
|
import matplotlib.pyplot as plt
|
||
|
|
||
|
from parameter import Parameter
|
||
|
from parser import parse
|
||
|
|
||
|
|
||
|
def ste(x, a, f_infty, tau, beta):
|
||
|
return a*((1-f_infty) * np.exp(-(x/tau)**beta) + f_infty)
|
||
|
|
||
|
|
||
|
def run_spectrum_sim(config_file: str):
|
||
|
p = parse(config_file)
|
||
|
|
||
|
rng, num_traj, t_max, delta, eta, num_variables = _prepare_sim(p)
|
||
|
|
||
|
num_echos = len(p.spec.t_echo)
|
||
|
reduction_factor = np.zeros((num_variables, num_echos))
|
||
|
freq = np.fft.fftshift(np.fft.fftfreq(p.spec.num_points, p.spec.dwell_time))
|
||
|
t_echo = p.spec.t_echo
|
||
|
t_echo_strings = list(map(str, t_echo))
|
||
|
|
||
|
# outer loop over variables of distribution of correlation times
|
||
|
for (i, dist_values) in enumerate(p.dist):
|
||
|
# noinspection PyCallingNonCallable
|
||
|
dist = p.dist.dist_type(**dist_values, rng=rng)
|
||
|
print(f'\nStart of {dist}')
|
||
|
|
||
|
chunks = int(0.6 * t_max / dist_values.get('tau', 1)) + 1
|
||
|
|
||
|
# second loop over parameter of motional model
|
||
|
for (j, motion_values) in enumerate(p.motion):
|
||
|
# noinspection PyCallingNonCallable
|
||
|
motion = p.motion.model(delta, eta, **motion_values, rng=rng)
|
||
|
print(f'Start of {motion}')
|
||
|
|
||
|
print(f'Simulate in chunks of {chunks}')
|
||
|
|
||
|
timesignal = np.zeros((p.spec.num_points, num_echos))
|
||
|
|
||
|
start = time()
|
||
|
|
||
|
# inner loop to create trajectories
|
||
|
for n in range(num_traj):
|
||
|
phase_interpol = make_trajectory(chunks, dist, motion, t_max)
|
||
|
|
||
|
for (k, t_echo_k) in enumerate(t_echo):
|
||
|
# effect of de-phasing and re-phasing
|
||
|
start_amp = -2 * phase_interpol(t_echo_k)
|
||
|
|
||
|
# start of actual acquisition
|
||
|
timesignal[:, k] += np.cos(start_amp + phase_interpol(p.spec.t_acq + 2 * t_echo_k)) * p.spec.dampening
|
||
|
reduction_factor[max(p.motion.num_variables, 1)*i + j, k] += np.cos(phase_interpol(2 * t_echo_k) + start_amp)
|
||
|
|
||
|
print_step(n, num_traj, start)
|
||
|
|
||
|
timesignal /= num_traj
|
||
|
|
||
|
# FT to spectrum
|
||
|
spec = np.fft.fftshift(np.fft.fft(timesignal, axis=0), axes=0).real
|
||
|
spec -= spec[0]
|
||
|
|
||
|
# plot spectra
|
||
|
fig, ax = plt.subplots()
|
||
|
lines = ax.plot(freq, spec)
|
||
|
ax.set_title(f'{dist}, {motion}')
|
||
|
ax.legend(lines, t_echo_strings)
|
||
|
|
||
|
fig2, ax2 = plt.subplots()
|
||
|
ax2.semilogx(p.dist.variables['tau'], reduction_factor/num_traj, 'o--')
|
||
|
|
||
|
plt.show()
|
||
|
|
||
|
|
||
|
def run_ste_sim(config_file: str):
|
||
|
p = parse(config_file)
|
||
|
|
||
|
rng, num_traj, t_max, delta, eta, num_variables = _prepare_sim(p)
|
||
|
|
||
|
cc = np.zeros((len(p.ste.t_mix), num_variables, len(p.ste.t_evo)))
|
||
|
ss = np.zeros((len(p.ste.t_mix), num_variables, len(p.ste.t_evo)))
|
||
|
|
||
|
# outer loop over variables of distribution of correlation times
|
||
|
for (i, dist_values) in enumerate(p.dist):
|
||
|
# noinspection PyCallingNonCallable
|
||
|
dist = p.dist.dist_type(**dist_values, rng=rng)
|
||
|
print(f'\nStart of {dist}')
|
||
|
|
||
|
chunks = int(0.6 * t_max / dist_values.get('tau', 1)) + 1
|
||
|
|
||
|
# second loop over parameter of motional model
|
||
|
for (j, motion_values) in enumerate(p.motion):
|
||
|
# noinspection PyCallingNonCallable
|
||
|
motion = p.motion.model(delta, eta, **motion_values, rng=rng)
|
||
|
|
||
|
print(f'Start of {motion}')
|
||
|
print(f'Simulate in chunks of {chunks}')
|
||
|
|
||
|
start = time()
|
||
|
|
||
|
# inner loop to create trajectories
|
||
|
for n in range(num_traj):
|
||
|
phase_interpol = make_trajectory(chunks, dist, motion, t_max)
|
||
|
|
||
|
for (k, t_evo_k) in enumerate(p.ste.t_evo):
|
||
|
dephased = phase_interpol(t_evo_k)
|
||
|
rephased = phase_interpol(p.ste.t_mix + 2*t_evo_k)-phase_interpol(p.ste.t_mix+t_evo_k)
|
||
|
cc[:, max(p.motion.num_variables, 1)*i + j, k] += np.cos(dephased)*np.cos(rephased)
|
||
|
ss[:, max(p.motion.num_variables, 1)*i + j, k] += np.sin(dephased)*np.sin(rephased)
|
||
|
|
||
|
print_step(n, num_traj, start)
|
||
|
|
||
|
cc /= num_traj
|
||
|
ss /= num_traj
|
||
|
|
||
|
fig, ax = plt.subplots()
|
||
|
fig2, ax2 = plt.subplots()
|
||
|
fig5, ax5 = plt.subplots()
|
||
|
fig3, ax3 = plt.subplots()
|
||
|
fig4, ax4 = plt.subplots()
|
||
|
|
||
|
for j in range(num_variables):
|
||
|
p0 = [0.5, 0, 1e-2, 1]
|
||
|
|
||
|
ax3.plot(p.ste.t_evo, cc[0, j, :])
|
||
|
ax3.plot(p.ste.t_evo, ss[0, j, :])
|
||
|
ax4.plot(p.ste.t_evo, cc[-1, j, :] / cc[0, j, :])
|
||
|
ax4.plot(p.ste.t_evo, ss[-1, j, :] / ss[0, j, :])
|
||
|
p_final = []
|
||
|
p_final1 = []
|
||
|
for k, t_evo_k in enumerate(p.ste.t_evo):
|
||
|
res = curve_fit(ste, p.ste.t_mix, cc[:, j, k], p0=p0)
|
||
|
res2 = curve_fit(ste, p.ste.t_mix, ss[:, j, k], p0=p0)
|
||
|
p_final.append(res[0].tolist())
|
||
|
p_final1.append(res2[0].tolist())
|
||
|
|
||
|
p_final = np.array(p_final)
|
||
|
p_final1 = np.array(p_final1)
|
||
|
ax.plot(p.ste.t_evo, p_final[:, 0])
|
||
|
ax.plot(p.ste.t_evo, p_final1[:, 0])
|
||
|
ax.plot(p.ste.t_evo, p_final[:, 1])
|
||
|
ax.plot(p.ste.t_evo, p_final1[:, 1])
|
||
|
ax5.semilogy(p.ste.t_evo, p_final[:, 2])
|
||
|
ax5.semilogy(p.ste.t_evo, p_final1[:, 2])
|
||
|
ax2.plot(p.ste.t_evo, p_final[:, 3])
|
||
|
ax2.plot(p.ste.t_evo, p_final1[:, 3])
|
||
|
|
||
|
plt.show()
|
||
|
|
||
|
|
||
|
def print_step(n, num_traj, start):
|
||
|
if (n + 1) % 200 == 0:
|
||
|
elapsed = time() - start
|
||
|
print(f'Step {n + 1} of {num_traj}', end=' - ')
|
||
|
total = num_traj * elapsed / (n + 1)
|
||
|
print(f'total: {total:.2f}s - elapsed: {elapsed:.2f}s - remaining: {total - elapsed:.2f}s')
|
||
|
|
||
|
|
||
|
def make_trajectory(chunks: int, dist, motion, t_max: float):
|
||
|
t_passed = 0
|
||
|
t = [0]
|
||
|
phase = [0]
|
||
|
accumulated_phase = 0
|
||
|
while t_passed < t_max:
|
||
|
# orientation until the next jump
|
||
|
current_omega = motion.jump(size=chunks)
|
||
|
|
||
|
# time to next jump
|
||
|
t_wait = dist.wait(size=chunks)
|
||
|
|
||
|
accumulated_phase = np.cumsum(t_wait * current_omega) + phase[-1]
|
||
|
|
||
|
t_wait = np.cumsum(t_wait) + t_passed
|
||
|
t_passed = t_wait[-1]
|
||
|
t.extend(t_wait.tolist())
|
||
|
|
||
|
phase.extend(accumulated_phase.tolist())
|
||
|
|
||
|
# convenient interpolation to get phase at arbitrary times
|
||
|
phase_interpol = interp1d(t, phase)
|
||
|
|
||
|
return phase_interpol
|
||
|
|
||
|
|
||
|
def _prepare_sim(parameter: Parameter) -> tuple[Generator, int, float, float, float, int]:
|
||
|
# random number generator
|
||
|
rng = np.random.default_rng(parameter.sim.seed)
|
||
|
|
||
|
# number of random walkers
|
||
|
num_traj = parameter.sim.num_walker
|
||
|
|
||
|
# length of trajectories
|
||
|
t_max = parameter.sim.t_max
|
||
|
|
||
|
# parameter for omega_q
|
||
|
delta, eta = parameter.molecule.delta, parameter.molecule.eta
|
||
|
|
||
|
num_variables = parameter.dist.num_variables + parameter.motion.num_variables
|
||
|
|
||
|
return rng, num_traj, t_max, delta, eta, num_variables
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
run_ste_sim('../config.json')
|
||
|
run_spectrum_sim('../config.json')
|