From a34c2daab7acb7ee9b014914702a0a0889325ba4 Mon Sep 17 00:00:00 2001 From: Markus Rosenstihl Date: Wed, 24 Sep 2025 10:56:10 +0200 Subject: [PATCH] Added CosCos STE for central line excitation --- Scripts/Zeeman_CentreEx/coscos_centr_exp.py | 192 +++++++++++++++++ Scripts/Zeeman_CentreEx/coscos_centr_res.py | 216 ++++++++++++++++++++ 2 files changed, 408 insertions(+) create mode 100644 Scripts/Zeeman_CentreEx/coscos_centr_exp.py create mode 100644 Scripts/Zeeman_CentreEx/coscos_centr_res.py diff --git a/Scripts/Zeeman_CentreEx/coscos_centr_exp.py b/Scripts/Zeeman_CentreEx/coscos_centr_exp.py new file mode 100644 index 0000000..872972e --- /dev/null +++ b/Scripts/Zeeman_CentreEx/coscos_centr_exp.py @@ -0,0 +1,192 @@ +# -*- coding: iso-8859-1 -*- +from xml.etree import cElementTree as ET +import time +TXEnableDelay = 2e-6 +TXEnableValue = 0b0001 # TTL line blanking RF amplifier (bit 0) +TXPulseValue = 0b0010 # TTL line triggering RF pulses (bit 1) +ADCSensitivity = 0.5 # voltage span for ADC + +def experiment(): # CosCos-Stimulated Echos for central excitation + + # set up acquisition parameters: + pars = {} + pars['P90'] = 5.3e-6 # 90-degree pulse length (s) + pars['SF'] = 95.2e6 # spectrometer frequency (Hz) + pars['O1'] = 0 # offset from SF (Hz) + pars['SW'] = 5e6 # spectral window (Hz) + pars['SI'] = 8*1024 # number of acquisition points + pars['NS'] = 32*32*2#*3 # number of scans # multiple of 32 + pars['DS'] = 0 # number of dummy scans + pars['RD'] = 0.1 # delay between scans (s) (Recycle Delay) + pars['TAU'] = 0.014*7 # delay between SatRec and experiment (s) + pars['D1'] = 20e-6 # delay after first pulse, or short tau (s) + pars['D2'] = 10e-6 # delay after second pulse, or long tau (s) + pars['D_PREAQ'] = 5e-6 + pars['PHA'] =335 # receiver phase (degree) + + ### SatRec + pars['D3'] = 1e-5 # shortest SatRec delay + pars['D4'] = 1e-3 # longest SatRec delay + pars["NECH"] = 12 # number of saturation pulse + ### + + pars['DATADIR'] = '/home/fprak/Desktop/' # data directory + pars['OUTFILE'] = None # output file name + + # specify a variable parameter (optional): + pars['tp'] = [20e-6,50e-6,90e-6,150e-6,300e-6]#[12e-6, 20e-6, 30e-6, 50e-6, 75e-6, 90e-6, 125e-6, 200e-6] + pars['VAR_PAR'] = 'D2' # variable parameter name (a string) + start = 20e-6 # starting value + stop =0.3 # end value + steps = 21 # number of values + log_scale = True # log scale flag + stag_range = True # staggered range flag + + # check parameters for safety: + if pars['PHA'] < 0: + pars['PHA'] = 360 + pars['PHA'] + + if pars['P90'] > 20e-6: + raise Exception("Pulse too long!!!") + + # check whether a variable parameter is named: + var_key = pars.get('VAR_PAR') + if var_key == 'P90' and (start > 20e-6 or stop > 20e-6): + raise Exception("Pulse too long!!!") + + + # start the experiment: + if var_key: + # this is an arrayed experiment: + if log_scale: + array = log_range(start,stop,steps) + else: + array = lin_range(start,stop,steps) + + if stag_range: + array = staggered_range(array, size = 3) + + # estimate time by looping once over everything + duration = 0 + for tp in pars["tp"]: + for index, var_par_value in enumerate(array): + # assign each var_key the corresponding var_value + pars[var_key] = var_par_value + # parse state tree + e_xml = ET.fromstring(spinal32_experiment(pars, 0).write_xml_string()) + # sum up all time=x states + for a_state in e_xml.iter("state"): + duration += float(a_state.get("time"))*(pars['NS'] + pars['DS']) + print duration + t = time.localtime(duration) + print "\n\nINFO: Experiment will be finnished in: %id %ih %im" % (t.tm_mday-1, t.tm_hour-1, t.tm_min) + print "INFO: Experiment will be finnished at: %s\n\n" % (time.ctime(time.time() + duration)) + + # loop for a variable parameter: + for tp in pars['tp']: + pars['D1'] = tp + for index, pars[var_key] in enumerate(array): + print 'Arrayed experiment for '+var_key+': run = '+str(index+1)+\ + ' out of '+str(len(pars['tp'])*array.size)+': value = ' + str(tp) + ' / '+str(pars[var_key]) + # loop for accumulation: + + for run in xrange(pars['NS']+pars['DS']): + yield spinal32_experiment(pars, run) + synchronize() + + else: + # estimate the experiment time: + seconds = (pars['D1']*2 + pars['D2'] + pars['RD']) * (pars['NS']+ pars['DS']) + m, s = divmod(seconds, 60) + h, m = divmod(m, 60) + print '%s%02d:%02d:%02d' % ('Experiment time estimated: ', h, m, s) + + # loop for accumulation: + for run in xrange(pars['NS']+pars['DS']): + yield spinal32_experiment(pars, run) + + +# the pulse program: + +def spinal32_experiment(pars, run): + e=Experiment() + + dummy_scans = pars.get('DS') + if dummy_scans: + run -= dummy_scans + + pars['PROG'] = 'coscos32_experiment' + + # phase cycle by F. Qi et al. [JMR 169 (2004) 225-239] with 3rd-phase invertion + pars['PH1'] = [270, 270, 270, 270, 90, 90, 90, 90, 270, 270, 270, 270, 90, 90, 90, 90, 180, 180, 180, 180, 0,0,0,0, 180, 180, 180, 180, 0,0,0,0] + pars['PH3'] = [270, 270, 90, 90, 270, 270, 90, 90, 270, 270, 90, 90, 270, 270, 90, 90, 180, 180, 0, 0, 180, 180, 0, 0, 180, 180, 0, 0, 180, 180, 0, 0] + pars['PH4'] = [270, 90, 270, 90, 270, 90, 270, 90, 180, 0, 180, 0, 180, 0, 180, 0, 270, 90, 270, 90, 270, 90, 270, 90, 180, 0, 180, 0, 180, 0, 180, 0] + pars['PH2'] = [270, 90, 90, 270, 90, 270, 270, 90, 180, 0, 0, 180, 0, 180, 180, 0, 270, 90, 90, 270, 90, 270, 270, 90, 180, 0, 0, 180, 0, 180, 180, 0] + + # read in variables: + P90 = pars['P90'] + SF = pars['SF'] + O1 = pars['O1'] + RD = pars['RD'] + D1 = pars['D1'] + D2 = pars['D2'] + D3 = pars['D3'] + D4 = pars['D4'] + D_PREAQ = pars['D_PREAQ'] + TAU = pars['TAU'] + NECH = pars['NECH'] + PH1 = pars['PH1'][run%len(pars['PH1'])] + PH3 = pars['PH3'][run%len(pars['PH3'])] + PH4 = pars['PH4'][run%len(pars['PH4'])] + PH2 = pars['PH2'][run%len(pars['PH2'])] + PHA = pars['PHA'] + + # set sampling parameters: + SI = pars['SI'] + SW = pars['SW'] + + + # run the pulse program: + # saturation: + # set variable delay list for saturation pulses: + vdlist = log_range(D4, D3, NECH-1) + + e.wait(RD) # relaxation delay between scans + e.set_frequency(SF+O1, phase=PH1) # set frequency and phase for saturation pulses + e.ttl_pulse(TXEnableDelay, value=TXEnableValue) # enable RF amplifier + e.ttl_pulse(P90, value=TXEnableValue|TXPulseValue) # apply 90-degree pulse + + for delay in vdlist[::-1]: + e.wait(delay-P90-TXEnableDelay) # wait for next saturation pulse + e.ttl_pulse(TXEnableDelay, value=TXEnableValue) # enable RF amplifier + e.ttl_pulse(P90, value=TXEnableValue|TXPulseValue) # apply 90-degree pulse + + # recovery: + e.set_phase(PH1) + e.wait(TAU-0.5e-6) # wait for tau + e.ttl_pulse(TXEnableDelay, value=TXEnableValue) + e.ttl_pulse(P90, value=TXEnableValue|TXPulseValue) # 90-degree pulse + + e.set_phase(PH3) + e.wait(D1-P90-TXEnableDelay-0.5e-6) # 'short tau' + + e.ttl_pulse(TXEnableDelay, value=TXEnableValue) + e.ttl_pulse(P90, value=TXEnableValue|TXPulseValue) # 90-degree pulse + + e.set_phase(PH4) + e.wait(D2-P90-TXEnableDelay-0.5e-6) # 'long tau' + + e.ttl_pulse(TXEnableDelay, value=TXEnableValue) + e.ttl_pulse(P90, value=TXEnableValue|TXPulseValue) # 90-degree pulse + + e.set_phase(PHA) + e.wait(D1-P90/2-0.5e-6-D_PREAQ) # 'short tau' + e.record(SI, SW, sensitivity=ADCSensitivity) # acquisition + + # write experiment parameters: + for key in pars.keys(): + e.set_description(key, pars[key]) # acqusition parameters + e.set_description('run', run) # current scan + e.set_description('rec_phase', -PH2) # current receiver phase + + return e \ No newline at end of file diff --git a/Scripts/Zeeman_CentreEx/coscos_centr_res.py b/Scripts/Zeeman_CentreEx/coscos_centr_res.py new file mode 100644 index 0000000..634fe75 --- /dev/null +++ b/Scripts/Zeeman_CentreEx/coscos_centr_res.py @@ -0,0 +1,216 @@ +# -*- coding: iso-8859-1 -*- + +from numpy import * +from scipy.signal import * +from scipy.optimize import * +from os import path, rename + +d1s = [] + +def result(): + + + + measurement_range = [4.5e-6, 8e-6] + + + suffix = '' # output file name's suffix and... + counter = 1 # counter for arrayed experiments + var_key = '' # variable parameter name + + # loop over the incoming results: + for timesignal in results: + if not isinstance(timesignal,ADC_Result): + print 'Error during aquisition!' + if isinstance(timesignal, Error_Result): + print timesignal + raise IOError + continue + + # read experiment parameters: + pars = timesignal.get_description_dictionary() + if not pars["D1"] in d1s: + measurement = MeasurementResult("Magnetization D1=%e"%(pars["D1"])) + d1s.append(pars["D1"]) + + # ---------------- digital filter ------------------ + + # get actual sampling rate of timesignal: + sampling_rate = timesignal.get_sampling_rate() + + # get user-defined spectrum width: + spec_width = pars['SW'] + + # specify cutoff frequency, in relative units: + cutoff = spec_width / sampling_rate + + if False: # no filter applied otherwise + + # number of filter's coefficients: + numtaps = 29 + + # use firwin to create a lowpass FIR filter: + fir_coeff = firwin(numtaps, cutoff) + + # downsize x according to user-defined spectral window: + skip = int(sampling_rate / spec_width) + timesignal.x = timesignal.x[::skip] + + for i in range(2): + # apply the filter to ith channel: + timesignal.y[i] = lfilter(fir_coeff, 1.0, timesignal.y[i]) + + # zeroize first N-1 "corrupted" samples: + timesignal.y[i][:numtaps-1] = 0.0 + + # circular left shift of y: + timesignal.y[i] = roll(timesignal.y[i], -(numtaps-1)) + + # downsize y to user-defined number of samples (SI): + timesignal.y[i] = timesignal.y[i][::skip] + + # update the sampling_rate attribute of the signal's: + timesignal.set_sampling_rate(spec_width) + + # ---------------------------------------------------- + + # rotate timesignal according to current receiver's phase: + timesignal.phase(pars['rec_phase']) + + # provide timesignal to the display tab: + data['Current scan'] = timesignal + + # accumulate... + if not locals().get('accu'): + accu = Accumulation() + + # skip dummy scans, if any: + if pars['run'] < 0: continue + + # add up: + accu += timesignal + + # provide accumulation to the display tab: + data['Accumulation'] = accu + + # check how many scans are done: + if accu.n == pars['NS']: # accumulation is complete + + # make a copy: + echo = accu + 0 + + # compute the initial phase of FID: + phi0 = arctan2(echo.y[1][0], echo.y[0][0]) * 180 / pi + if not 'ref' in locals(): ref = phi0 + print 'phi0 = ', phi0 + + # rotate FID to maximize y[0][0]: + #echo.phase(-phi0) + + # do FFT: + echo.exp_window(line_broadening=10) + spectrum = echo.fft(samples=2*pars['SI']) + + # try zero-order phase correction: + spectrum.phase(-phi0) + + # provide spectrum to the display tab: + data['Spectrum'] = spectrum + + # check whether it is an arrayed experiment: + var_key = pars.get('VAR_PAR') + if var_key: + # get variable parameter's value: + var_value = pars.get(var_key) + + # provide signal recorded with this var_value to the display tab: + data['Accumulation'+"/D1="+ "%e"%(pars["D1"]) + "/" + var_key+"=%e"%(var_value)] = accu + + # measure signal intensity vs. var_value: + [start, stop] = accu.get_sampling_rate() * array(measurement_range) + measurement[var_value] = accu.y[0][int(start):int(stop)].mean() + + + # provide measurement to the display tab: + data[measurement.get_title()] = measurement + + # update the file name suffix: + suffix = '_' + str(counter) + counter += 1 + + # save accu if required: + outfile = pars.get('OUTFILE') + if outfile: + datadir = pars.get('DATADIR') + + # write raw data in Simpson format: + filename = datadir+outfile+suffix+'.dat' + if path.exists(filename): + rename(filename, datadir+'~'+outfile+suffix+'.dat') + accu.write_to_simpson(filename) + + # write raw data in Tecmag format: + # filename = datadir+outfile+'.tnt' + # accu.write_to_tecmag(filename, nrecords=20) + + # write parameters in a text file: + filename = datadir+outfile+suffix+'.par' + if path.exists(filename): + rename(filename, datadir+'~'+outfile+suffix+'.par') + + fileobject = open(filename, 'w') + for key in sorted(pars.iterkeys()): + if key=='run': continue + if key=='rec_phase': continue + fileobject.write('%s%s%s'%(key,'=', pars[key])) + fileobject.write('\n') + fileobject.close() + + # reset accumulation: + del accu + + + if var_key == 'D2': + # KWW fit: + xdata = measurement.get_xdata() + ydata = measurement.get_ydata() + [amplitude, rate, beta] = fitfunc(xdata, ydata) + print '%s%02g' % ('Amplitude = ', amplitude) + print '%s%02g' % ('T2 [s] = ', 1./rate) + print '%s%02g' % ('Beta = ', beta) + + # update display for the fit: + measurement.y = func([amplitude, rate, beta], xdata) + data[measurement.get_title()] = measurement+0 + +# the fitting procedure: +def fitfunc(xdata, ydata): + + # initialize variable parameters: + try: + # solve Az = b: + A = array((ones(xdata.size/2), xdata[0:xdata.size/2])) + b = log(abs(ydata[0:xdata.size/2])) + z = linalg.lstsq(transpose(A), b) + amplitude = exp(z[0][0]) + rate = -z[0][1] + except: + amplitude = abs(ydata[0]) + rate = 1./(xdata[-1] - xdata[0]) + beta = 1 + p0 = [amplitude, rate, beta] + + # run least-squares optimization: + plsq = leastsq(residuals, p0, args=(xdata, ydata)) + + return plsq[0] # best-fit parameters + +def residuals(p, xdata, ydata): + return ydata - func(p, xdata) + +# here is the function to fit: +def func(p, xdata): + return p[0]*exp(-(p[1]*xdata)**p[2]) + + +pass \ No newline at end of file