diff --git a/src/config.json b/src/config.json index 19ebc43..080de22 100644 --- a/src/config.json +++ b/src/config.json @@ -1,34 +1,36 @@ { "simulation": { - "num_walker": 5000, + "num_walker": 20, "seed": null }, "molecule": { "delta": 161e3, - "eta": 0.0 + "eta": 0.1 }, "correlation_times": { - "distribution": "LogGaussian", + "distribution": "DeltaDistribution", "tau": { - "start": 1e-4, - "stop": 1e-8, - "steps": 9, - "is_log": true - }, - "sigma": { - "list": [0.5, 1, 2] + "list": [1e2, 1e0] } }, "motion": { - "model": "TetrahedralJump" + "model": "RandomJump" }, "spectrum": { "dwell_time": 1e-6, "num_points": 4096, "t_echo": { - "list": [5e-6, 10e-6, 20e-6, 40e-6, 60e-6, 100e-6] + "list": [ + 5e-6, + 10e-6, + 20e-6, + 40e-6, + 60e-6, + 100e-6 + ] }, - "line_broadening": 2e3 + "line_broadening": 4e3, + "t_pulse": 2e-6 }, "stimulated_echo": { "t_evo": { diff --git a/src/config_ste.json b/src/config_ste.json deleted file mode 100644 index befdd8b..0000000 --- a/src/config_ste.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "simulation": { - "num_walker": 20000, - "seed": null - }, - "molecule": { - "delta": 161e3, - "eta": 0.0 - }, - "correlation_times": { - "distribution": "DeltaDistribution", - "tau": 1e-2 - }, - "motion": { - "model": "RandomJump" - }, - "stimulated_echo": { - "t_evo": { - "start": 1e-6, - "stop": 40e-6, - "steps": 80 - }, - "t_mix": { - "start": 1e-5, - "stop": 1e0, - "steps": 21, - "is_log": true - } - } -} \ No newline at end of file diff --git a/src/rwsims/distributions.py b/src/rwsims/distributions.py index 1490039..8752d78 100644 --- a/src/rwsims/distributions.py +++ b/src/rwsims/distributions.py @@ -12,7 +12,11 @@ class BaseDistribution(ABC): self._tau = tau self._rng = rng - self._tau_jump = tau + self.tau_jump = tau + + @property + def name(self) -> str: + return self.__class__.__name__ @abstractmethod def __repr__(self): @@ -28,16 +32,16 @@ class BaseDistribution(ABC): pass def wait(self, size: int = 1) -> ArrayLike: - return self._rng.exponential(self._tau_jump, size=size) + return self._rng.exponential(self.tau_jump, size=size) class DeltaDistribution(BaseDistribution): def __repr__(self): - return f'No distribution(tau={self._tau})' + return f'Delta Distribution (tau={self._tau})' def start(self): - self._tau_jump = self._tau + self.tau_jump = self._tau @property def mean_tau(self): @@ -54,7 +58,7 @@ class LogGaussianDistribution(BaseDistribution): return f'Log-Gaussian(tau={self._tau}, sigma={self._sigma})' def start(self): - self._tau_jump = self._rng.lognormal(np.log(self._tau), self._sigma) + self.tau_jump = self._rng.lognormal(np.log(self._tau), self._sigma) @property def mean_tau(self): diff --git a/src/rwsims/functions.py b/src/rwsims/functions.py new file mode 100644 index 0000000..0d33bbf --- /dev/null +++ b/src/rwsims/functions.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import numpy as np +from numpy.typing import ArrayLike + +from distributions import BaseDistribution +from motions import BaseMotion + + +def ste(x, a, f_infty, tau, beta): + return a*((1-f_infty) * np.exp(-(x/tau)**beta) + f_infty) + + +def pulse_attn(freq: ArrayLike, t_pulse: float) -> ArrayLike: + # cf. Schmitt-Rohr/Spieß eq. 2.126; omega_1 * t_p = pi/2 + pi_half_squared = np.pi**2 / 4 + omega = 2 * np.pi * freq + + numerator = np.sin(np.sqrt(pi_half_squared + omega**2 * t_pulse**2 / 2)) + denominator = np.sqrt(pi_half_squared + omega**2 * t_pulse**2 / 4) + + return np.pi * numerator/denominator / 2 \ No newline at end of file diff --git a/src/rwsims/helper.py b/src/rwsims/helper.py deleted file mode 100644 index 8f7fb1f..0000000 --- a/src/rwsims/helper.py +++ /dev/null @@ -1,65 +0,0 @@ -from __future__ import annotations - -import numpy as np -from numpy.typing import ArrayLike -from numpy.random import Generator - - -def xyz_to_spherical(x_in: float, y_in: float, z_in: float) -> tuple[float, float, float]: - r = np.linalg.norm([x_in, y_in, z_in]) - theta = np.arccos(z_in) - phi = np.arctan2(y_in, x_in) - - return r, theta, phi - - -def spherical_to_xyz(r: float, theta: float, phi: float) -> tuple[float, float, float]: - sin_theta = np.sin(theta) - return r*np.cos(phi)*sin_theta, r*np.sin(phi)*sin_theta, r*np.cos(theta) - - -def get_rotation_matrix(vec_in: np.ndarray, vec_out: np.ndarray): - rotation = np.eye(3) - - # rotation by angle around given axis - cos_angle = np.dot(vec_in, vec_out) - - # check for parallel / anti-parallel vectors - if cos_angle == 1: - return rotation - elif cos_angle == -1: - return -rotation - - else: - axis = np.cross(vec_in, vec_out) - scale = np.linalg.norm(axis) - axis /= scale - sin_angle = scale / np.linalg.norm(vec_in) / np.linalg.norm(vec_out) - - v_cross = np.array([ - [0, -axis[2], axis[1]], - [axis[2], 0, -axis[0]], - [-axis[1], axis[0], 0], - ]) - - rotation += sin_angle * v_cross - rotation += (1-cos_angle) * v_cross @ v_cross - - return rotation - - -def omega_q(delta: float, eta: float, theta: ArrayLike, phi: ArrayLike) -> ArrayLike: - cos_theta = np.cos(theta) - sin_theta = np.sin(theta) - return np.pi * delta * (3 * cos_theta**2 - 1 + eta * sin_theta**2 * np.cos(2*phi)) - - -def draw_orientation(rng: Generator, size: int | None = None) -> tuple[ArrayLike, ArrayLike]: - if size is not None: - z_theta, z_phi = rng.random((2, size)) - else: - z_theta, z_phi = rng.random(2) - theta = np.arccos(1 - 2 * z_theta) - phi = 2 * np.pi * z_phi - - return theta, phi diff --git a/src/rwsims/motions.py b/src/rwsims/motions.py index 6e55591..de2644d 100644 --- a/src/rwsims/motions.py +++ b/src/rwsims/motions.py @@ -6,8 +6,6 @@ import numpy as np from numpy.random import Generator from numpy.typing import ArrayLike -from .helper import xyz_to_spherical, spherical_to_xyz, omega_q, draw_orientation, get_rotation_matrix - class BaseMotion(ABC): def __init__(self, delta: float, eta: float, rng: Generator): @@ -20,6 +18,10 @@ class BaseMotion(ABC): def __repr__(self): pass + @property + def name(self) -> str: + return self.__class__.__name__ + def start(self): pass @@ -66,11 +68,11 @@ class TetrahedralJump(BaseMotion): ]) # orientation in lab system - theta0, phi0 = draw_orientation(self._rng) + cos_theta0, phi0 = draw_orientation(self._rng) rot = get_rotation_matrix( corners[0], - np.array(spherical_to_xyz(1., theta0, phi0)), + np.array(spherical_to_xyz(1., np.arccos(cos_theta0), phi0)), ) orientations = np.zeros(4) for i in range(4): @@ -90,3 +92,66 @@ class TetrahedralJump(BaseMotion): return self._orientation[jumps] +# Helper functions + +def xyz_to_spherical(x_in: float, y_in: float, z_in: float) -> tuple[float, float, float]: + r = np.linalg.norm([x_in, y_in, z_in]) + theta = np.arccos(z_in) + phi = np.arctan2(y_in, x_in) + + return r, theta, phi + + +def spherical_to_xyz(r: float, theta: float, phi: float) -> tuple[float, float, float]: + sin_theta = np.sin(theta) + return r*np.cos(phi)*sin_theta, r*np.sin(phi)*sin_theta, r*np.cos(theta) + + +def get_rotation_matrix(vec_in: np.ndarray, vec_out: np.ndarray): + rotation = np.eye(3) + + # rotation by angle around given axis + cos_angle = np.dot(vec_in, vec_out) + + # check for parallel / anti-parallel vectors + if cos_angle == 1: + return rotation + elif cos_angle == -1: + return -rotation + + else: + axis = np.cross(vec_in, vec_out) + scale = np.linalg.norm(axis) + axis /= scale + sin_angle = scale / np.linalg.norm(vec_in) / np.linalg.norm(vec_out) + + v_cross = np.array([ + [0, -axis[2], axis[1]], + [axis[2], 0, -axis[0]], + [-axis[1], axis[0], 0], + ]) + + rotation += sin_angle * v_cross + rotation += (1-cos_angle) * v_cross @ v_cross + + return rotation + + +def omega_q(delta: float, eta: float, cos_theta: ArrayLike, phi: ArrayLike) -> ArrayLike: + # sin_theta = np.sin(cos_theta) + # cos_theta = np.cos(cos_theta) + sin_theta_sq = 1 - cos_theta**2 + return np.pi * delta * (3 * cos_theta**2 - 1 + eta * sin_theta_sq * np.cos(2*phi)) + + +def draw_orientation(rng: Generator, size: int | None = None) -> tuple[ArrayLike, ArrayLike]: + if size is not None: + z_theta, z_phi = rng.random((2, size)) + else: + z_theta, z_phi = rng.random(2) + cos_theta = 1 - 2 * z_theta + phi = 2 * np.pi * z_phi + + return cos_theta, phi + + diff --git a/src/rwsims/parameter.py b/src/rwsims/parameter.py index 637029e..762f6de 100644 --- a/src/rwsims/parameter.py +++ b/src/rwsims/parameter.py @@ -6,7 +6,9 @@ from math import prod from typing import Any import numpy as np +from numpy._typing import ArrayLike +from functions import pulse_attn from .distributions import BaseDistribution from .motions import BaseMotion @@ -27,6 +29,9 @@ class SimParameter: num_walker: int t_max: float + def totext(self) -> str: + return f'num_traj={self.num_walker}\nseed={self.seed}' + @dataclass class MoleculeParameter: @@ -36,8 +41,8 @@ class MoleculeParameter: @dataclass class StimEchoParameter: - t_evo: np.ndarray - t_mix: np.ndarray + t_evo: ArrayLike + t_mix: ArrayLike t_echo: float t_max: float = field(init=False) @@ -49,16 +54,28 @@ class StimEchoParameter: class SpectrumParameter: dwell_time: float num_points: int - t_echo: np.ndarray - t_acq: np.ndarray = field(init=False) - t_max: float = field(init=False) + t_echo: ArrayLike lb: float - dampening: np.ndarray = field(init=False) + t_pulse: float + t_acq: ArrayLike = field(init=False) + freq: ArrayLike = field(init=False) + t_max: float = field(init=False) + dampening: ArrayLike = field(init=False) + pulse_attn: ArrayLike = field(init=False) def __post_init__(self): self.t_acq = np.arange(self.num_points) * self.dwell_time self.dampening = np.exp(-self.lb * self.t_acq) self.t_max = np.max(self.t_acq) + 2 * np.max(self.t_echo) + self.freq = np.fft.fftshift(np.fft.fftfreq(self.num_points, self.dwell_time)) + self.pulse_attn = pulse_attn(self.freq, self.t_pulse) + + def totext(self) -> str: + return (f'dwell_time{self.dwell_time}\n' + f'num_points={self.num_points}\n' + f't_echo={self.t_echo}\n' + f'lb={self.lb}\n' + f't_pulse={self.t_pulse}') @dataclass @@ -115,3 +132,14 @@ class Parameter: dist: DistParameter motion: MotionParameter molecule: MoleculeParameter + + def totext(self, sim: bool = True, spec: bool = True) -> str: + text = [] + if sim: + text.append(self.sim.totext()) + + if spec: + text.append(self.spec.totext()) + + return '\n'.join(text) + diff --git a/src/rwsims/parsing.py b/src/rwsims/parsing.py index a2cc9bf..3121af4 100644 --- a/src/rwsims/parsing.py +++ b/src/rwsims/parsing.py @@ -66,7 +66,8 @@ def _parse_spectrum(params: dict[str, Any] | None) -> SpectrumParameter | None: num_points=params['num_points'], dwell_time=params['dwell_time'], t_echo=_make_times(params['t_echo']), - lb=params['line_broadening'] + lb=params.get('line_broadening', 0), + t_pulse=params.get('t_pulse', 0) ) return spec diff --git a/src/rwsims/sims.py b/src/rwsims/sims.py index 3bd19a6..c486872 100644 --- a/src/rwsims/sims.py +++ b/src/rwsims/sims.py @@ -1,13 +1,15 @@ from __future__ import annotations -from time import time +from time import perf_counter import numpy as np from numpy.random import Generator +from datetime import datetime from scipy.interpolate import interp1d import matplotlib.pyplot as plt from scipy.optimize import curve_fit +from .functions import ste from .parameter import Parameter from .distributions import BaseDistribution from .motions import BaseMotion @@ -31,32 +33,31 @@ def run_ste_sim(config_file: str): for (i, dist_values) in enumerate(p.dist): # noinspection PyCallingNonCallable dist = p.dist.dist_type(**dist_values, rng=rng) - chunks = int(0.6 * t_max / dist_values.get('tau', 1)) + 1 - # second loop over parameter of motional model + # second loop over parameter of motion model for (j, motion_values) in enumerate(p.motion): # noinspection PyCallingNonCallable motion = p.motion.model(delta, eta, **motion_values, rng=rng) - print(f'\nStart of {dist} and {motion}') + print(f'\nStart of {dist}, {motion}') - start = time() + start = last_print = perf_counter() cc = np.zeros((len(t_mix), len(t_evo))) ss = np.zeros((len(t_mix), len(t_evo))) # inner loop to create trajectories for n in range(num_traj): - phase = make_trajectory(chunks, dist, motion, t_max) + phase = make_trajectory(dist, motion, t_max) for (k, t_evo_k) in enumerate(t_evo): dephased = phase(t_evo_k) - t0= t_mix + t_evo_k + t0 = t_mix + t_evo_k rephased = phase(t0 + t_evo_k + 2*t_echo) + phase(t0) - 2 * phase(t0+t_echo) cc[:, k] += np.cos(dephased)*np.cos(rephased) ss[:, k] += np.sin(dephased)*np.sin(rephased) - print_step(n, num_traj, start) + last_print = print_step(n, num_traj, start, last_print) cc[:, 1:] /= num_traj ss[:, 1:] /= num_traj @@ -101,11 +102,9 @@ def run_spectrum_sim(config_file: str): p = parse(config_file) rng, num_traj, t_max, delta, eta, num_variables = _prepare_sim(p) - print(num_traj) num_echos = len(p.spec.t_echo) reduction_factor = np.zeros((num_variables, num_echos)) - freq = np.fft.fftshift(np.fft.fftfreq(p.spec.num_points, p.spec.dwell_time)) t_echo = p.spec.t_echo t_echo_strings = list(map(str, t_echo)) @@ -113,23 +112,20 @@ def run_spectrum_sim(config_file: str): for (i, dist_values) in enumerate(p.dist): # noinspection PyCallingNonCallable dist = p.dist.dist_type(**dist_values, rng=rng) - print(f'\nStart of {dist}') - - chunks = int(0.6 * t_max / dist.mean_tau) + 1 - - # second loop over parameter of motional model + # second loop over parameter of motion model for (j, motion_values) in enumerate(p.motion): # noinspection PyCallingNonCallable motion = p.motion.model(delta, eta, **motion_values, rng=rng) - print(f'Start of {motion}') + print(f'\nStart of {dist}, {motion}') timesignal = np.zeros((p.spec.num_points, num_echos)) - start = time() + start = perf_counter() + last_print = start # inner loop to create trajectories for n in range(num_traj): - phase = make_trajectory(chunks, dist, motion, t_max) + phase = make_trajectory(dist, motion, t_max) for (k, t_echo_k) in enumerate(t_echo): # effect of de-phasing and re-phasing @@ -139,59 +135,61 @@ def run_spectrum_sim(config_file: str): timesignal[:, k] += np.cos(start_amp + phase(p.spec.t_acq + 2*t_echo_k)) reduction_factor[max(p.motion.num_variables, 1)*i+j, k] += np.cos(phase(2*t_echo_k) + start_amp) - print_step(n, num_traj, start) + # print(n+1, num_traj, start, last_print) + last_print = print_step(n+1, num_traj, start, last_print) + # apply line broadening timesignal *= p.spec.dampening[:, None] timesignal /= num_traj + timesignal[0, :] /= 2 # FT to spectrum spec = np.fft.fftshift(np.fft.fft(timesignal, axis=0), axes=0).real spec -= spec[0] + spec *= p.spec.pulse_attn[:, None] - # plot spectra - fig, ax = plt.subplots() - lines = ax.plot(freq, spec) - ax.set_title(f'{dist}, {motion}') - ax.legend(lines, t_echo_strings) - - plt.show() + save_spectrum_data(timesignal, spec, p, dist, motion, t_echo_strings) fig2, ax2 = plt.subplots() - ax2.semilogx(p.dist.variables['tau'], reduction_factor / num_traj, 'o--') + lines = ax2.semilogx(p.dist.variables['tau'], reduction_factor / num_traj, 'o--') + ax2.legend(lines, t_echo_strings) + plt.savefig(f'{dist.name}_{motion.name}_reduction.png') plt.show() -def print_step(n, num_traj, start): - n_1 = n+1 - if n_1 % 200 == 0 or n_1 == num_traj: - elapsed = time() - start - print(f'Step {n_1} of {num_traj}', end=' - ') - total = num_traj * elapsed / (n_1) - print(f'total: {total:.2f}s - elapsed: {elapsed:.2f}s - remaining: {total - elapsed:.2f}s') +def make_trajectory( + dist: BaseDistribution, + motion: BaseMotion, + t_max: float, + t_passed: float = 0., + init_phase: float = 0. +): - -def make_trajectory(chunks: int, dist: BaseDistribution, motion: BaseMotion, t_max: float): + # set initial orientations and correlation times motion.start() dist.start() - t_passed = 0 - t = [0] - phase = [0] + # number of jumps that are simulated at once + chunks = min(int(0.51 * t_max / dist.tau_jump), 100_000) + 1 + + t = [np.array([t_passed])] + phase = [np.array([init_phase])] while t_passed < t_max: # frequencies between jumps current_omega = motion.jump(size=chunks) - # times at a particular position t_wait = dist.wait(size=chunks) accumulated_phase = np.cumsum(t_wait * current_omega) + phase[-1] + phase.append(accumulated_phase) t_wait = np.cumsum(t_wait) + t_passed t_passed = t_wait[-1] - t.extend(t_wait.tolist()) + t.append(t_wait) - phase.extend(accumulated_phase.tolist()) + t = np.concatenate(t) + phase = np.concatenate(phase) # convenient interpolation to get phase at arbitrary times phase_interpol = interp1d(t, phase) @@ -200,6 +198,8 @@ def make_trajectory(chunks: int, dist: BaseDistribution, motion: BaseMotion, t_m def _prepare_sim(parameter: Parameter) -> tuple[Generator, int, float, float, float, int]: + # collect variables that are common to spectra and stimulated echo + # random number generator rng = np.random.default_rng(parameter.sim.seed) @@ -217,6 +217,56 @@ def _prepare_sim(parameter: Parameter) -> tuple[Generator, int, float, float, fl return rng, num_traj, t_max, delta, eta, num_variables -def ste(x, a, f_infty, tau, beta): - return a*((1-f_infty) * np.exp(-(x/tau)**beta) + f_infty) +def print_step(n: int, num_traj: int, start: float, last_print: float) -> float: + step_time = perf_counter() + dt = step_time - last_print + if dt > 10 or n == num_traj: + date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print(f'{date} - step {n} of {num_traj}', end=' - ') + elapsed = step_time - start + total = num_traj * elapsed / n + print(f'expected total: {total:.2f}s - elapsed: {elapsed:.2f}s - remaining: {total - elapsed:.2f}s') + if dt > 10: + last_print = step_time + + return last_print + + +def make_filename(dist: BaseDistribution, motion: BaseMotion) -> str: + filename = f'{dist}_{motion}' + filename = filename.replace(' ', '_') + filename = filename.replace('.', 'p') + + return filename + + +def save_spectrum_data( + timesignal: np.ndarray, + spectrum: np.ndarray, + param: Parameter, + dist: BaseDistribution, + motion: BaseMotion, + echo_strings: list[str] +): + filename = make_filename(dist, motion) + + header = param.totext(sim=True, spec=True) + header += '\nx\t' + '\t'.join(echo_strings) + + np.savetxt(filename + '_timesignal.dat', np.c_[param.spec.t_acq, timesignal], header=header) + np.savetxt(filename + '_spectrum.dat', np.c_[param.spec.freq, spectrum], header=header) + + fig, ax = plt.subplots() + lines = ax.plot(param.spec.freq, spectrum) + ax.set_title(f'{dist}, {motion}') + ax.legend(lines, echo_strings) + plt.savefig(filename + '_spectrum.png') + + fig1, ax1 = plt.subplots() + lines = ax1.plot(param.spec.t_acq, timesignal) + ax1.set_title(f'{dist}, {motion}') + ax1.legend(lines, echo_strings) + plt.savefig(filename + '_timesignal.png') + + plt.show() diff --git a/src/rwsims/spectrum.py b/src/rwsims/spectrum.py new file mode 100644 index 0000000..e69de29 diff --git a/src/rwsims/ste.py b/src/rwsims/ste.py new file mode 100644 index 0000000..e69de29