change layout

This commit is contained in:
Dominik Demuth
2024-08-01 18:46:28 +02:00
parent 3e1325f3ca
commit 66e8925241
14 changed files with 77 additions and 43 deletions

0
rwsims/__init__.py Normal file
View File

65
rwsims/distributions.py Normal file
View File

@ -0,0 +1,65 @@
from __future__ import annotations
from abc import ABC, abstractmethod
import numpy as np
# from numpy.typing import ArrayLike
from numpy.random import Generator
class BaseDistribution(ABC):
def __init__(self, tau: float, rng: Generator | None = None):
self._tau = tau
self._rng = rng
self.tau_jump = tau
@property
def name(self) -> str:
return self.__class__.__name__
@abstractmethod
def __repr__(self):
pass
@abstractmethod
def start(self):
pass
@property
@abstractmethod
def mean_tau(self):
pass
def wait(self, size: int = 1) -> ArrayLike:
return self._rng.exponential(self.tau_jump, size=size)
class DeltaDistribution(BaseDistribution):
def __repr__(self):
return f'Delta Distribution (tau={self._tau})'
def start(self):
self.tau_jump = self._tau
@property
def mean_tau(self):
return self._tau
class LogGaussianDistribution(BaseDistribution):
def __init__(self, tau: float, sigma: float, rng: Generator):
super().__init__(tau=tau, rng=rng)
self._sigma = sigma
def __repr__(self):
return f'Log-Gaussian(tau={self._tau}, sigma={self._sigma})'
def start(self):
self.tau_jump = self._rng.lognormal(np.log(self._tau), self._sigma)
@property
def mean_tau(self):
return self._tau * np.exp(self._sigma**2 / 2)

22
rwsims/functions.py Normal file
View File

@ -0,0 +1,22 @@
from __future__ import annotations
import numpy as np
# from numpy.typing import ArrayLike
from .distributions import BaseDistribution
from .motions import BaseMotion
def ste(x, a, f_infty, tau, beta):
return a*((1-f_infty) * np.exp(-(x/tau)**beta) + f_infty)
def pulse_attn(freq, t_pulse: float):
# cf. Schmitt-Rohr/Spieß eq. 2.126; omega_1 * t_p = pi/2
pi_half_squared = np.pi**2 / 4
omega = 2 * np.pi * freq
numerator = np.sin(np.sqrt(pi_half_squared + omega**2 * t_pulse**2 / 2))
denominator = np.sqrt(pi_half_squared + omega**2 * t_pulse**2 / 4)
return np.pi * numerator/denominator / 2

169
rwsims/motions.py Normal file
View File

@ -0,0 +1,169 @@
from __future__ import annotations
from abc import ABC, abstractmethod
import numpy as np
from numpy.random import Generator
class BaseMotion(ABC):
def __init__(self, delta: float, eta: float, rng: Generator):
self._delta = delta
self._eta = eta
self._rng = rng
@abstractmethod
def __repr__(self):
pass
@property
def name(self) -> str:
return self.__class__.__name__
def start(self):
pass
@abstractmethod
def jump(self, size: int = 1) -> 'ArrayLike':
pass
class RandomJump(BaseMotion):
def __repr__(self):
return 'Random Jump'
def jump(self, size: int = 1) -> 'ArrayLike':
return omega_q(self._delta, self._eta, *draw_orientation(self._rng, size=size))
class TetrahedralJump(BaseMotion):
def __init__(self, delta: float, eta: float, rng: Generator):
super().__init__(delta, eta, rng)
self._orientation = None
self._start = None
def __repr__(self):
return 'Tetrahedral Jump'
def start(self):
self._orientation = self._make_tetrahedron()
self._start = self._rng.choice([0, 1, 2, 3])
def _make_tetrahedron(self) -> np.ndarray:
beta = np.arccos(-1/3) # tetrahedral angle 109.5 degrees
sin_beta = np.sin(beta)
cos_beta = np.cos(beta)
# corners of a tetrahedron
alpha = 2 * np.pi * self._rng.random()
corners = np.array([
[0, 0, 1],
[sin_beta * np.cos(alpha), sin_beta * np.sin(alpha), cos_beta],
[sin_beta * np.cos(alpha+2*np.pi/3), sin_beta * np.sin(alpha+2*np.pi/3), cos_beta],
[sin_beta * np.cos(alpha+4*np.pi/3), sin_beta * np.sin(alpha+4*np.pi/3), cos_beta]
])
# orientation in lab system
cos_theta0, phi0 = draw_orientation(self._rng)
rot = get_rotation_matrix(
corners[0],
np.array(spherical_to_xyz(1., np.arccos(cos_theta0), phi0)),
)
orientations = np.zeros(4)
for i in range(4):
corner_lab = rot @ corners[i]
_, theta_i, phi_i = xyz_to_spherical(*corner_lab)
orientations[i] = omega_q(self._delta, self._eta, theta_i, phi_i)
# print(orientations)
#
# theta0 = np.arccos(cos_theta0)
# v0 = np.array([np.sin(theta0) * np.cos(phi0), np.sin(theta0)*np.sin(theta0), cos_theta0])
# norm = np.linalg.norm(v0)
# print(norm)
#
#
# corners = np.zeros((4, 3))
# corners[0] = v0
return orientations
def jump(self, size: int = 1) -> 'ArrayLike':
jumps = self._rng.choice([1, 2, 3], size=size)
jumps = np.cumsum(jumps) + self._start
jumps %= 4
self._start = jumps[-1]
return self._orientation[jumps]
# Helper functions
def xyz_to_spherical(x_in: float, y_in: float, z_in: float) -> tuple[np.floating, float, float]:
r = np.linalg.norm([x_in, y_in, z_in])
theta: float = np.arccos(z_in)
phi: float = np.arctan2(y_in, x_in)
return r, theta, phi
def spherical_to_xyz(r: float, theta: float, phi: float) -> tuple[float, float, float]:
sin_theta = np.sin(theta)
return r*np.cos(phi)*sin_theta, r*np.sin(phi)*sin_theta, r*np.cos(theta)
def get_rotation_matrix(vec_in: np.ndarray, vec_out: np.ndarray):
rotation = np.eye(3)
# rotation by angle around given axis
cos_angle = np.dot(vec_in, vec_out)
# check for parallel / anti-parallel vectors
if cos_angle == 1:
return rotation
elif cos_angle == -1:
return -rotation
else:
axis = np.cross(vec_in, vec_out)
scale = np.linalg.norm(axis)
axis /= scale
sin_angle = scale / np.linalg.norm(vec_in) / np.linalg.norm(vec_out)
v_cross = np.array([
[0, -axis[2], axis[1]],
[axis[2], 0, -axis[0]],
[-axis[1], axis[0], 0],
])
rotation += sin_angle * v_cross
rotation += (1-cos_angle) * v_cross @ v_cross
return rotation
def omega_q(delta: float, eta: float, cos_theta: ArrayLike, phi: ArrayLike) -> ArrayLike:
# sin_theta = np.sin(cos_theta)
# cos_theta = np.cos(cos_theta)
sin_theta_sq = 1 - cos_theta**2
return np.pi * delta * (3 * cos_theta**2 - 1 + eta * sin_theta_sq * np.cos(2*phi))
def draw_orientation(rng: Generator, size: int | None = None) -> tuple[ArrayLike, ArrayLike]:
if size is not None:
z_theta, z_phi = rng.random((2, size))
else:
z_theta, z_phi = rng.random(2)
cos_theta = 1 - 2 * z_theta
phi = 2 * np.pi * z_phi
return cos_theta, phi

145
rwsims/parameter.py Normal file
View File

@ -0,0 +1,145 @@
from __future__ import annotations
from dataclasses import dataclass, field
from itertools import product
from math import prod
from typing import Any
import numpy as np
# from numpy.typing import ArrayLike
from .functions import pulse_attn
from .distributions import BaseDistribution
from .motions import BaseMotion
__all__ = [
'SimParameter',
'MoleculeParameter',
'StimEchoParameter',
'SpectrumParameter',
'DistParameter',
'MotionParameter',
'Parameter',
]
@dataclass
class SimParameter:
seed: int | None
num_walker: int
t_max: float
def totext(self) -> str:
return f'num_traj={self.num_walker}\nseed={self.seed}'
@dataclass
class MoleculeParameter:
delta: float
eta: float
@dataclass
class StimEchoParameter:
t_evo: 'ArrayLike'
t_mix: 'ArrayLike'
t_echo: float
t_max: float = field(init=False)
def __post_init__(self):
self.t_max = np.max(self.t_mix) + 2 * np.max(self.t_evo) + 2*self.t_echo
@dataclass
class SpectrumParameter:
dwell_time: float
num_points: int
t_echo: 'ArrayLike'
lb: float
t_pulse: float
t_acq: 'ArrayLike' = field(init=False)
freq: 'ArrayLike' = field(init=False)
t_max: float = field(init=False)
dampening: 'ArrayLike' = field(init=False)
pulse_attn: 'ArrayLike' = field(init=False)
def __post_init__(self):
self.t_acq = np.arange(self.num_points) * self.dwell_time
self.dampening = np.exp(-self.lb * self.t_acq)
self.t_max = np.max(self.t_acq) + 2 * np.max(self.t_echo)
self.freq = np.fft.fftshift(np.fft.fftfreq(self.num_points, self.dwell_time))
self.pulse_attn = pulse_attn(self.freq, self.t_pulse)
def totext(self) -> str:
return (f'dwell_time{self.dwell_time}\n'
f'num_points={self.num_points}\n'
f't_echo={self.t_echo}\n'
f'lb={self.lb}\n'
f't_pulse={self.t_pulse}')
@dataclass
class DistParameter:
dist_type: BaseDistribution
variables: field(default_factory=dict)
num_variables: int = 0
iter: field(init=False) = None
def __post_init__(self):
self.num_variables = prod(map(len, self.variables.values()))
def __iter__(self):
return self
def __next__(self) -> dict[str, Any]:
if self.iter is None:
self.iter = product(*self.variables.values())
try:
return dict(zip(self.variables.keys(), next(self.iter)))
except StopIteration:
self.iter = None
raise StopIteration
@dataclass
class MotionParameter:
model: BaseMotion
variables: field(default_factory=dict)
num_variables: int = 0
iter: field(init=False) = None
def __post_init__(self):
self.num_variables = prod(map(len, self.variables.values()))
def __iter__(self):
return self
def __next__(self) -> dict[str, Any]:
if self.iter is None:
self.iter = product(*self.variables.values())
try:
return dict(zip(self.variables.keys(), next(self.iter)))
except StopIteration:
self.iter = None
raise StopIteration
@dataclass
class Parameter:
ste: StimEchoParameter | None
spec: SpectrumParameter | None
sim: SimParameter
dist: DistParameter
motion: MotionParameter
molecule: MoleculeParameter
def totext(self, sim: bool = True, spec: bool = True) -> str:
text = []
if sim:
text.append(self.sim.totext())
if spec:
text.append(self.spec.totext())
return '\n'.join(text)

132
rwsims/parsing.py Normal file
View File

@ -0,0 +1,132 @@
from __future__ import annotations
import json
from typing import Any
import numpy as np
from .distributions import DeltaDistribution, LogGaussianDistribution
from .motions import RandomJump, TetrahedralJump
from .parameter import *
def parse(config_file: str) -> Parameter:
with open(config_file, 'r') as f:
parameter: dict = json.load(f)
ste = _parse_ste(parameter.get('stimulated_echo'))
spec = _parse_spectrum(parameter.get('spectrum'))
if ste is None and spec is None:
raise ValueError("No parameter for STE or spectra given")
t_max = 0
if spec is not None:
t_max = max(spec.t_max, t_max)
if ste is not None:
t_max = max(ste.t_max, t_max)
parameter['simulation'].update({'t_max': t_max})
sim = _parse_sim(parameter['simulation'])
dist = _parse_dist(parameter['correlation_times'])
motion = _parse_motion(parameter['motion'])
mol = _parse_molecule(parameter['molecule'])
p = Parameter(sim=sim, ste=ste, spec=spec, dist=dist, motion=motion, molecule=mol)
return p
def _parse_sim(params: dict[str, Any]) -> SimParameter:
sim = SimParameter(
num_walker=params['num_walker'],
seed=params['seed'],
t_max=params['t_max']
)
return sim
def _parse_ste(params: dict[str, Any] | None) -> StimEchoParameter | None:
if params is None:
return
ste = StimEchoParameter(
t_mix=_make_times(params['t_mix']),
t_evo=_make_times(params['t_evo']),
t_echo=params['t_echo']
)
return ste
def _parse_spectrum(params: dict[str, Any] | None) -> SpectrumParameter | None:
if params is None:
return
spec = SpectrumParameter(
num_points=params['num_points'],
dwell_time=params['dwell_time'],
t_echo=_make_times(params['t_echo']),
lb=params.get('line_broadening', 0),
t_pulse=params.get('t_pulse', 0)
)
return spec
def _parse_dist(params: dict[str, Any]) -> DistParameter:
mapping: dict = {
'DeltaDistribution': DeltaDistribution,
'LogGaussian': LogGaussianDistribution
}
p = DistParameter(
dist_type=mapping[params['distribution']],
variables={k: _make_times(v) for k, v in params.items() if k != 'distribution'},
)
return p
def _parse_motion(params: dict[str, Any]) -> MotionParameter:
mapping = {
'RandomJump': RandomJump,
'TetrahedralJump': TetrahedralJump,
}
p = MotionParameter(
model=mapping[params['model']],
variables={k: _make_times(v) for k, v in params.items() if k != 'model'}
)
return p
def _parse_molecule(params: dict[str, Any]) -> MoleculeParameter:
return MoleculeParameter(
delta=params['delta'],
eta=params['eta']
)
def _make_times(params: float | int | dict[str, Any]) -> np.ndarray:
times = None
if isinstance(params, (int, float, complex)):
times = np.array([params])
else:
if all(k in params for k in ('start', 'stop', 'steps')):
space_func = np.linspace
if 'is_log' in params and params['is_log']:
space_func = np.geomspace
times = space_func(start=params['start'], stop=params['stop'], num=params['steps'])
if 'list' in params:
if times is not None:
raise ValueError('list and range is given')
times = np.array(params['list'])
if times is None:
raise ValueError('No times are given')
return times

284
rwsims/sims.py Normal file
View File

@ -0,0 +1,284 @@
from __future__ import annotations
from time import perf_counter
import numpy as np
from numpy.random import Generator
from datetime import datetime
from scipy.interpolate import interp1d
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
from .functions import ste
from .parameter import Parameter
from .distributions import BaseDistribution
from .motions import BaseMotion
from .parsing import parse
def run_ste_sim(config_file: str):
p = parse(config_file)
rng, num_traj, t_max, delta, eta, num_variables = _prepare_sim(p)
t_mix = p.ste.t_mix
t_evo = p.ste.t_evo
t_echo = p.ste.t_echo
fig, ax = plt.subplots(2)
fig2, ax2 = plt.subplots(2)
fig3, ax3 = plt.subplots(2)
# outer loop over variables of distribution of correlation times
for (i, dist_values) in enumerate(p.dist):
# noinspection PyCallingNonCallable
dist = p.dist.dist_type(**dist_values, rng=rng)
# second loop over parameter of motion model
for (j, motion_values) in enumerate(p.motion):
# noinspection PyCallingNonCallable
motion = p.motion.model(delta, eta, **motion_values, rng=rng)
print(f'\nStart of {dist}, {motion}')
start = last_print = perf_counter()
cc = np.zeros((len(t_mix), len(t_evo)))
ss = np.zeros((len(t_mix), len(t_evo)))
# inner loop to create trajectories
for n in range(num_traj):
phase = make_trajectory(dist, motion, t_max)
for (k, t_evo_k) in enumerate(t_evo):
dephased = phase(t_evo_k)
t0 = t_mix + t_evo_k
rephased = phase(t0 + t_evo_k + 2*t_echo) + phase(t0) - 2 * phase(t0+t_echo)
# print(t_evo_k, t0 + t_evo_k + 2*t_echo, t0)
cc[:, k] += np.cos(dephased)*np.cos(rephased)
ss[:, k] += np.sin(dephased)*np.sin(rephased)
last_print = print_step(n, num_traj, start, last_print)
cc[:, 1:] /= num_traj
ss[:, 1:] /= num_traj
fig4, ax4 = plt.subplots()
ax4.semilogx(t_mix, cc/cc[0, :], '.-')
fig5, ax5 = plt.subplots()
ax5.semilogx(t_mix, ss/ss[0, :], '.-')
for k in range(num_variables):
p0 = [0.5, 0, dist_values.get('tau', 1), 1]
p_final = []
p_final1 = []
for k, t_evo_k in enumerate(p.ste.t_evo):
try:
res = curve_fit(ste, t_mix, cc[:, k], p0=p0, bounds=([0, 0, 0, 0], [np.inf, 1, np.inf, 1]))
p_final.append(res[0].tolist())
except RuntimeError:
p_final.append([np.nan, np.nan, np.nan, np.nan])
try:
res2 = curve_fit(ste, t_mix, ss[:, k], p0=p0, bounds=([0, 0, 0, 0], [np.inf, 1, np.inf, 1]))
p_final1.append(res2[0].tolist())
except RuntimeError:
p_final1.append([np.nan, np.nan, np.nan, np.nan])
p_final = np.array(p_final)
p_final1 = np.array(p_final1)
# ax[0].semilogy(p.ste.t_evo, p_final[:, 0], '.--')
# ax[1].semilogy(t_evo, p_final1[:, 0], '.--')
ax[0].plot(t_evo, p_final[:, 1], '.-')
ax[1].plot(t_evo, p_final1[:, 1], '.-')
ax2[0].semilogy(t_evo, p_final[:, 2], '.-')
ax2[1].semilogy(t_evo, p_final1[:, 2], '.-')
ax3[0].plot(t_evo, p_final[:, 3], '.-')
ax3[1].plot(t_evo, p_final1[:, 3], '.-')
plt.show()
def run_spectrum_sim(config_file: str):
p = parse(config_file)
rng, num_traj, t_max, delta, eta, num_variables = _prepare_sim(p)
num_echos = len(p.spec.t_echo)
reduction_factor = np.zeros((num_variables, num_echos))
t_echo = p.spec.t_echo
t_echo_strings = list(map(str, t_echo))
# outer loop over variables of distribution of correlation times
for (i, dist_values) in enumerate(p.dist):
# noinspection PyCallingNonCallable
dist = p.dist.dist_type(**dist_values, rng=rng)
# second loop over parameter of motion model
for (j, motion_values) in enumerate(p.motion):
# noinspection PyCallingNonCallable
motion = p.motion.model(delta, eta, **motion_values, rng=rng)
print(f'\nStart of {dist}, {motion}')
timesignal = np.zeros((p.spec.num_points, num_echos))
start = perf_counter()
last_print = start
# inner loop to create trajectories
for n in range(num_traj):
phase = make_trajectory(dist, motion, t_max)
for (k, t_echo_k) in enumerate(t_echo):
# effect of de-phasing and re-phasing
start_amp = -2 * phase(t_echo_k)
# start of actual acquisition
timesignal[:, k] += np.cos(start_amp + phase(p.spec.t_acq + 2*t_echo_k))
reduction_factor[max(p.motion.num_variables, 1)*i+j, k] += np.cos(phase(2*t_echo_k) + start_amp)
# print(n+1, num_traj, start, last_print)
last_print = print_step(n+1, num_traj, start, last_print)
# apply line broadening
timesignal *= p.spec.dampening[:, None]
timesignal /= num_traj
timesignal[0, :] /= 2
# FT to spectrum
spec = np.fft.fftshift(np.fft.fft(timesignal, axis=0), axes=0).real
spec -= spec[0]
spec *= p.spec.pulse_attn[:, None]
save_spectrum_data(timesignal, spec, p, dist, motion, t_echo_strings)
fig2, ax2 = plt.subplots()
lines = ax2.semilogx(p.dist.variables['tau'], reduction_factor / num_traj, 'o--')
ax2.legend(lines, t_echo_strings)
plt.savefig(f'{dist.name}_{motion.name}_reduction.png')
plt.show()
def make_trajectory(
dist: BaseDistribution,
motion: BaseMotion,
t_max: float,
t_passed: float = 0.,
init_phase: float = 0.
):
# set initial orientations and correlation times
motion.start()
dist.start()
# number of jumps that are simulated at once
chunks = min(int(0.51 * t_max / dist.tau_jump), 100_000) + 1
# print(chunks)
t = [np.array([t_passed])]
phase = [np.array([init_phase])]
# omega = [np.array([0])]
while t_passed < t_max:
# frequencies between jumps
current_omega = motion.jump(size=chunks)
# times at a particular position
t_wait = dist.wait(size=chunks)
accumulated_phase = np.cumsum(t_wait * current_omega) + phase[-1][-1]
phase.append(accumulated_phase)
# omega.append(current_omega)
t_wait = np.cumsum(t_wait) + t_passed
t_passed = t_wait[-1]
t.append(t_wait)
t = np.concatenate(t)
phase = np.concatenate(phase)
# omega = np.concatenate(omega)
# fig_test, ax_test = plt.subplots()
# ax_test.plot(t, phase, 'x-')
# np.savetxt('trajectory.dat', np.c_[t, phase, omega])
# convenient interpolation to get phase at arbitrary times
phase_interpol = interp1d(t, phase)
return phase_interpol
def _prepare_sim(parameter: Parameter) -> tuple[Generator, int, float, float, float, int]:
# collect variables that are common to spectra and stimulated echo
# random number generator
rng = np.random.default_rng(parameter.sim.seed)
# number of random walkers
num_traj = parameter.sim.num_walker
# length of trajectories
t_max = parameter.sim.t_max
# parameter for omega_q
delta, eta = parameter.molecule.delta, parameter.molecule.eta
num_variables = parameter.dist.num_variables * parameter.motion.num_variables
return rng, num_traj, t_max, delta, eta, num_variables
def print_step(n: int, num_traj: int, start: float, last_print: float) -> float:
step_time = perf_counter()
dt = step_time - last_print
if dt > 10 or n == num_traj:
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{date} - step {n} of {num_traj}', end=' - ')
elapsed = step_time - start
total = num_traj * elapsed / n
print(f'expected total: {total:.2f}s - elapsed: {elapsed:.2f}s - remaining: {total - elapsed:.2f}s')
if dt > 10:
last_print = step_time
return last_print
def make_filename(dist: BaseDistribution, motion: BaseMotion) -> str:
filename = f'{dist}_{motion}'
filename = filename.replace(' ', '_')
filename = filename.replace('.', 'p')
return filename
def save_spectrum_data(
timesignal: np.ndarray,
spectrum: np.ndarray,
param: Parameter,
dist: BaseDistribution,
motion: BaseMotion,
echo_strings: list[str]
):
filename = make_filename(dist, motion)
header = param.totext(sim=True, spec=True)
header += '\nx\t' + '\t'.join(echo_strings)
np.savetxt(filename + '_timesignal.dat', np.c_[param.spec.t_acq, timesignal], header=header)
np.savetxt(filename + '_spectrum.dat', np.c_[param.spec.freq, spectrum], header=header)
fig, ax = plt.subplots()
lines = ax.plot(param.spec.freq, spectrum)
ax.set_title(f'{dist}, {motion}')
ax.legend(lines, echo_strings)
plt.savefig(filename + '_spectrum.png')
fig1, ax1 = plt.subplots()
lines = ax1.plot(param.spec.t_acq, timesignal)
ax1.set_title(f'{dist}, {motion}')
ax1.legend(lines, echo_strings)
plt.savefig(filename + '_timesignal.png')
plt.show()

0
rwsims/spectrum.py Normal file
View File

0
rwsims/ste.py Normal file
View File

View File

@ -0,0 +1,145 @@
from time import time
import numpy as np
from scipy.interpolate import interp1d
import matplotlib.pyplot as plt
# spectral parameter
delta = 161e3 # in Hz
eta = 0
lb = 10e3 # in Hz
# correlation time
tau = [1e-5] # in s
# acquisition parameter
acq_length = 4096
dt = 1e-6 # in s
t_echo = [0, 5e-6, 10e-6, 20e-6, 50e-6, 100e-6, 200e-6] # all in s
# derived parameter
t_acq = np.arange(acq_length) * dt
t_max = acq_length*dt + 2*max(t_echo)
dampening = np.exp(-lb * t_acq)
# random number generator
seed = None
rng = np.random.default_rng(seed)
# number of random walkers
num_traj = 10000
def omega_q(delta_: float, eta_: float, theta_: float, phi_: float) -> np.ndarray:
cos_theta = np.cos(theta_)
sin_theta = np.sin(theta_)
return 2 * np.pi * delta_ * (3 * cos_theta * cos_theta - 1 + eta_ * sin_theta*sin_theta * np.cos(2*phi_))
def rotate(x_in: float, y_in: float, z_in: float, a: float) -> tuple[float, float]:
# rotation by tetrahedral angle is a given, only second angle is free parameter
beta = 109.45 * np.pi / 180.
cos_beta = np.cos(beta)
sin_beta = np.sin(beta)
cos_alpha = np.cos(a)
sin_alpha = np.sin(a)
scale = np.sqrt(1 - z_in * z_in) + 1e-12
x = x_in * cos_beta + sin_beta / scale * (x_in * z_in * cos_alpha - y_in * sin_alpha)
y = y_in * cos_beta + sin_beta / scale * (y_in * z_in * cos_alpha + x_in * sin_alpha)
z = z_in * cos_beta - scale * sin_beta*cos_alpha
z = max(-1, min(1, z))
return np.arccos(z), np.arctan2(y, x)
for tau_i in tau:
print(f'\nStart for tau={tau_i}')
timesignal = np.zeros((acq_length, len(t_echo)))
start = time()
expected_jumps = round(t_max/tau_i)
for i in range(num_traj):
# draw orientation
z_theta, z_phi, z_alpha = rng.random(3)
theta = np.arccos(1 - 2 * z_theta)
phi = 2 * np.pi * z_phi
alpha = 2 * np.pi * z_alpha
# orientation in cartesian coordinates
x_start = np.sin(theta) * np.cos(phi)
y_start = np.sin(theta) * np.sin(phi)
z_start = np.cos(theta)
# calculate orientation of tetrahedral edges and their frequencies
orientation = np.zeros(4)
orientation[0] = omega_q(delta, eta, theta, phi)
orientation[1] = omega_q(delta, eta, *rotate(x_start, y_start, z_start, alpha))
orientation[2] = omega_q(delta, eta, *rotate(x_start, y_start, z_start, alpha + 2 * np.pi / 3))
orientation[3] = omega_q(delta, eta, *rotate(x_start, y_start, z_start, alpha + 4 * np.pi / 3))
t_passed = 0
t = [0]
phase = [0]
accumulated_phase = 0
current_orientation = rng.choice([0, 1, 2, 3])
while t_passed < t_max:
# orientation until the next jump
# always jump to a different position i -> i + {1, 2, 3}
current_orientation += rng.choice([1, 2, 3])
current_orientation %= 4
current_omega = orientation[current_orientation]
# time to next jump
t_wait = rng.exponential(tau_i)
t_passed += t_wait
accumulated_phase += t_wait * current_omega
t.append(t_passed)
phase.append(accumulated_phase)
# convenient interpolation to get phase at arbitrary times
phase_interpol = interp1d(t, phase)
for j, t_echo_j in enumerate(t_echo):
# effect of de-phasing and re-phasing
start_amp = -2 * phase_interpol(t_echo_j)
# start of actual acquisition
timesignal[:, j] += np.cos(start_amp + phase_interpol(t_acq + 2*t_echo_j)) * dampening
if (i+1) % 200 == 0:
elapsed = time()-start
print(f'Step {i+1} of {num_traj}', end=' - ')
total = num_traj * elapsed / (i+1)
print(f'elapsed: {elapsed:.2f}s - total: {total:.2f}s - remaining: {total-elapsed:.2f}s')
timesignal /= num_traj
# FT to spectrum
freq = np.fft.fftshift(np.fft.fftfreq(acq_length, dt))
spec = np.fft.fftshift(np.fft.fft(timesignal, axis=0), axes=0).real
spec -= spec[0]
t_echo_strings = list(map(str, t_echo))
# plot spectra
fig, ax = plt.subplots()
lines = ax.plot(freq, spec)
ax.set_title(f'tau = {tau_i}s')
ax.legend(lines, t_echo_strings)
# plt.savefig(f'{tau_i}.png')
# save time signals and spectra
# np.savetxt(f'spectrum_{tau_i}.dat', np.c_[freq, spec], header='f\t' + '\t'.join(t_echo_strings))
# np.savetxt(f'timesignal_{tau_i}.dat', np.c_[t_acq, timesignal], header='t\t' + '\t'.join(t_echo_strings))
plt.show()

View File

@ -0,0 +1,165 @@
from time import time
from numpy.typing import ArrayLike
import numpy as np
from scipy.interpolate import interp1d
import matplotlib.pyplot as plt
# spectral parameter
delta = 161e3 # in Hz
eta = 0
lb = 5e3 # in Hz
# correlation time
tau = np.logspace(-8, -1, num=15) # in s
# acquisition parameter
acq_length = 4096
dt = 1e-6 # in s
t_echo = [5e-6, 10e-6, 20e-6, 50e-6, 100e-6, 200e-6] # all in s
# derived parameter
t_acq = np.arange(acq_length) * dt
t_max = acq_length*dt + 2*max(t_echo)
dampening = np.exp(-lb * t_acq)
# random number generator
seed = None
rng = np.random.default_rng(seed)
# number of random walkers
num_traj = 1
def omega_q(delta_: float, eta_: float, theta_: ArrayLike, phi_: ArrayLike) -> np.ndarray:
cos_theta = np.cos(theta_)
sin_theta = np.sin(theta_)
return 2 * np.pi * delta_ * (3 * cos_theta * cos_theta - 1 + eta_ * sin_theta*sin_theta * np.cos(2*phi_))
def rotate(x_in: float, y_in: float, z_in: float, a: float) -> tuple[float, float]:
# rotation by tetrahedral angle is a given, only second angle is free parameter
beta = 109.45 * np.pi / 180.
cos_beta = np.cos(beta)
sin_beta = np.sin(beta)
cos_alpha = np.cos(a)
sin_alpha = np.sin(a)
scale = np.sqrt(1 - z_in * z_in) + 1e-12
x = x_in * cos_beta + sin_beta / scale * (x_in * z_in * cos_alpha - y_in * sin_alpha)
y = y_in * cos_beta + sin_beta / scale * (y_in * z_in * cos_alpha + x_in * sin_alpha)
z = z_in * cos_beta - scale * sin_beta*cos_alpha
z = max(-1, min(1, z))
return np.arccos(z), np.arctan2(y, x)
def new_tau(size=1) -> np.ndarray:
return rng.exponential(tau_i, size=size)
reduction_factor = np.zeros((len(tau), len(t_echo)))
for (n, tau_i) in enumerate(tau):
print(f'\nStart for tau={tau_i}')
timesignal = np.zeros((acq_length, len(t_echo)))
start = time()
expected_jumps = round(t_max/tau_i)
if expected_jumps > 1e7:
print(f'Too many jumps to process, Skip {tau_i}s')
continue
chunks = int(0.6 * t_max / tau_i) + 1
print(f'Chunk size for trajectories: {chunks}')
for i in range(num_traj):
# draw orientation
z_theta, z_phi, z_alpha = rng.random(3)
theta = np.arccos(1 - 2 * z_theta)
phi = 2 * np.pi * z_phi
alpha = 2 * np.pi * z_alpha
# orientation in cartesian coordinates
x_start = np.sin(theta) * np.cos(phi)
y_start = np.sin(theta) * np.sin(phi)
z_start = np.cos(theta)
# calculate orientation of tetrahedral edges and their frequencies
orientation = np.zeros(4)
orientation[0] = omega_q(delta, eta, theta, phi)
orientation[1] = omega_q(delta, eta, *rotate(x_start, y_start, z_start, alpha))
orientation[2] = omega_q(delta, eta, *rotate(x_start, y_start, z_start, alpha + 2 * np.pi / 3))
orientation[3] = omega_q(delta, eta, *rotate(x_start, y_start, z_start, alpha + 4 * np.pi / 3))
t_passed = 0
t = [0]
phase = [0]
accumulated_phase = 0
start_position = rng.choice([0, 1, 2, 3])
while t_passed < t_max:
# orientation until the next jump
jumps = rng.choice([1, 2, 3], size=chunks) + start_position
jumps = np.cumsum(jumps)
jumps %= 4
current_omega = orientation[jumps]
# current_omega = rng.choice(orientation, size=chunks)
# time to next jump
t_wait = new_tau(size=chunks)
accumulated_phase = np.cumsum(t_wait*current_omega) + phase[-1]
t_wait = np.cumsum(t_wait) + t_passed
t_passed = t_wait[-1]
t.extend(t_wait.tolist())
phase.extend(accumulated_phase.tolist())
# convenient interpolation to get phase at arbitrary times
phase_interpol = interp1d(t, phase)
for j, t_echo_j in enumerate(t_echo):
# effect of de-phasing and re-phasing
start_amp = -2 * phase_interpol(t_echo_j)
# start of actual acquisition
timesignal[:, j] += np.cos(start_amp + phase_interpol(t_acq + 2*t_echo_j)) * dampening
reduction_factor[n, j] += np.cos(phase_interpol(2*t_echo_j) + start_amp)
if (i+1) % 200 == 0:
elapsed = time()-start
print(f'Step {i+1} of {num_traj}', end=' - ')
total = num_traj * elapsed / (i+1)
print(f'elapsed: {elapsed:.2f}s - total: {total:.2f}s - remaining: {total-elapsed:.2f}s')
timesignal /= num_traj
# FT to spectrum
freq = np.fft.fftshift(np.fft.fftfreq(acq_length, dt))
spec = np.fft.fftshift(np.fft.fft(timesignal, axis=0), axes=0).real
spec -= spec[0]
# spec /= np.max(spec, axis=0)
t_echo_strings = list(map(str, t_echo))
# plot spectra
fig, ax = plt.subplots()
lines = ax.plot(freq, spec)
ax.set_title(f'tau = {tau_i}s')
ax.legend(lines, t_echo_strings)
# plt.savefig(f'{tau_i}.png')
# save time signals and spectra
# np.savetxt(f'spectrum_{tau_i}.dat', np.c_[freq, spec], header='f\t' + '\t'.join(t_echo_strings))
# np.savetxt(f'timesignal_{tau_i}.dat', np.c_[t_acq, timesignal], header='t\t' + '\t'.join(t_echo_strings))
fig2, ax2 = plt.subplots()
ax2.semilogx(tau, reduction_factor / num_traj, 'o--')
plt.show()