Compare commits
36 Commits
main
...
Save-Path-
Author | SHA1 | Date | |
---|---|---|---|
|
a0c46cbcd0 | ||
|
14d59a639b | ||
|
4e2993486f | ||
|
2969541799 | ||
|
e746a664bd | ||
|
dd24a0ace9 | ||
|
9754e25d9c | ||
|
28263aaefb | ||
|
bcf86c4d70 | ||
|
67c564188c | ||
|
736a5a8e14 | ||
|
45e5fd9107 | ||
|
6e1ea0fd8b | ||
|
69c10571ba | ||
|
b382e77dd8 | ||
|
9b851c8018 | ||
|
35f16f7dcd | ||
|
c93d4ff2f7 | ||
|
c4d349bfa7 | ||
78b2aa6ba7 | |||
15d55f2c9c | |||
e2f92a2faf | |||
|
4450fd1e76 | ||
54a2948d0b | |||
080ce6b1e6 | |||
5c01110ce5 | |||
6efedda662 | |||
0c21e935ed | |||
4881f4de3d | |||
082f99aa0d | |||
0ecbfa12e7 | |||
a566dda4b0 | |||
87b8908a99 | |||
c859404a15 | |||
1ddd3e96a4 | |||
130fdfbab1 |
@ -7,6 +7,8 @@ Lightfield + Positioner
|
||||
############################################
|
||||
# Packages from Ryan
|
||||
import re
|
||||
import math
|
||||
import threading
|
||||
import pyvisa
|
||||
# from pyvisa import ResourceManager, constants
|
||||
|
||||
@ -31,6 +33,7 @@ from System import String
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
import datetime
|
||||
from typing import Union
|
||||
|
||||
|
||||
#First choose your controller
|
||||
@ -162,6 +165,8 @@ def move_xy(target_x, target_y): # moving in x and y direction closed to desired
|
||||
# intensity_data = [] # To store data from each scan
|
||||
# data_list = []
|
||||
|
||||
|
||||
|
||||
def move_scan_xy(range_x, range_y, resolution, Settings, baseFileName):
|
||||
"""
|
||||
This function moves the positioners to scan the sample with desired ranges and resolution in 2 dimensions.
|
||||
@ -204,7 +209,7 @@ def move_scan_xy(range_x, range_y, resolution, Settings, baseFileName):
|
||||
|
||||
#This gives a directory, in which the script will save the spectrum of each spot as spe
|
||||
#However, it will open the spectrum, convert it to txt, add it to the intensity_data and delete the spe file
|
||||
Path_save = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
|
||||
temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
|
||||
|
||||
#scanning loop
|
||||
for i, x_positions in enumerate(array_x):
|
||||
@ -216,9 +221,7 @@ def move_scan_xy(range_x, range_y, resolution, Settings, baseFileName):
|
||||
#this if will make the positioner wait a bit longer to really go to the target.
|
||||
if y == False:
|
||||
move_axis(axis_y, y_positions)
|
||||
y = True
|
||||
|
||||
|
||||
y = True
|
||||
|
||||
#we acquire with the LF
|
||||
acquire_name_spe = f'{baseFileName}_X{x_positions}_Y{y_positions}'
|
||||
@ -226,12 +229,12 @@ def move_scan_xy(range_x, range_y, resolution, Settings, baseFileName):
|
||||
|
||||
#read the .spe file and get the data as loaded_files
|
||||
cwd = os.getcwd() # save original directory
|
||||
os.chdir(Path_save) #change directory
|
||||
os.chdir(temp_folder_path) #change directory
|
||||
loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable
|
||||
os.chdir(cwd) # go back to original directory
|
||||
|
||||
# Delete the created .spe file from acquiring after getting necessary info
|
||||
spe_file_path = os.path.join(Path_save, acquire_name_spe + '.spe')
|
||||
spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe')
|
||||
os.remove(spe_file_path)
|
||||
|
||||
distance = calculate_distance(x_positions, y_positions,amc.move.getPosition(axis_x), amc.move.getPosition(axis_y))
|
||||
@ -301,7 +304,8 @@ def sep_num_from_units(powerbox_output :str)->list:
|
||||
else:
|
||||
return [powerbox_output,]
|
||||
|
||||
def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0.01)->str:
|
||||
|
||||
def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
|
||||
"""helper function for the Attocube APS100 that queries a function to the device, removing the echo.
|
||||
|
||||
Args:
|
||||
@ -325,7 +329,8 @@ def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0.01)-
|
||||
print(f"Error communicating with instrument: {e}")
|
||||
return None
|
||||
|
||||
def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0.01)->str:
|
||||
|
||||
def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
|
||||
"""helper function for the Attocube APS100 that writes a function to the device, removing the echo.
|
||||
|
||||
Args:
|
||||
@ -351,22 +356,25 @@ def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0.01)-
|
||||
except pyvisa.VisaIOError as e:
|
||||
print(f"Error communicating with instrument: {e}")
|
||||
|
||||
# TODO: implement the reverse scan and zero when finish functionality
|
||||
|
||||
# receive values in units of T, rescale in kg to talk with the power supplyy. 1T = 10kG
|
||||
# NOTE: removed singlepowersupply_bool, reading serial-nr. of the device instead.
|
||||
# old save folder: "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
|
||||
def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
|
||||
res:float, Settings:str, base_file_name='', path_save="C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test",
|
||||
singlepowersupply_bool=False, reversescan_bool=False, zerowhenfin_bool=False)->None:
|
||||
""" this function performs a sweep of the B field of the chosen magnet coil. It creates a list o B values from the given min and max values, with the given resolution. For each value, a measurement of the spectrum
|
||||
of the probe in the cryostat is made, using the LightField spectrometer.
|
||||
res:float, magnet_coil:str, Settings:str, base_file_name='',
|
||||
reversescan_bool=False, zerowhenfin_bool=False, loopscan_bool=False)->None:
|
||||
# TODO: update docs in the end
|
||||
""" this function performs a sweep of the B field of the chosen magnet coil. It creates a list o B values from the given min and max values,
|
||||
with the given resolution. For each value, a measurement of the spectrum of the probe in the cryostat is made, using the LightField spectrometer.
|
||||
|
||||
Args:
|
||||
instr (pyvisa.resources.Resource): chosen power supply device to connect to
|
||||
min_bval (float): min B value of the scan (please input in units of Tesla)
|
||||
max_bval (float): max B value of the scan (please input in units of Tesla)
|
||||
res (float): resolution of the list of B values (please input in units of Tesla)
|
||||
magnet_coil (str): select magnet coil to be used. String should be 'x-axis','y-axis' or 'z-axis'.
|
||||
Settings (str): experiment settings, included in file name.
|
||||
base_file_name (str, optional): base file name. Defaults to ''.
|
||||
path_save (str, optional): file path where the file will be saved. Defaults to "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test".
|
||||
singlepowersupply_bool (bool, optional): _description_. Defaults to False.
|
||||
reversescan_bool (bool, optional): _description_. Defaults to False.
|
||||
zerowhenfin_bool (bool, optional): _description_. Defaults to False.
|
||||
@ -375,31 +383,76 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
|
||||
ValueError: when By limit is exceeded.
|
||||
ValueError: when Bz limit is exceeded.
|
||||
ValueError: when Bx limit is exceeded.
|
||||
ConnectionError: when no device is connected.
|
||||
""" ''''''
|
||||
|
||||
def pyramid_list(lst) -> Union[list, np.ndarray]:
|
||||
"""reverses the list and removes the first element of reversed list. Then, this is appended to
|
||||
the end of the original list and returned as the 'pyramid' list.
|
||||
|
||||
Args:
|
||||
lst (list or np.ndarray):
|
||||
Raises:
|
||||
TypeError: if the input object isn't a list or np.ndarray
|
||||
Returns:
|
||||
Union[list, np.ndarray]: the pyramid list
|
||||
""" ''''''
|
||||
if isinstance(lst, list):
|
||||
return lst + lst[-2::-1]
|
||||
elif isinstance(lst, np.ndarray):
|
||||
return np.append(lst, lst[-2::-1])
|
||||
else:
|
||||
raise TypeError('Please input a list!')
|
||||
|
||||
# defines the folder, in which the data from the spectrometer is temporarily stored in
|
||||
temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
|
||||
|
||||
# if path_save =='':
|
||||
# path_save = datetime.datetime.now().strftime("%Y_%m_%d_%H%M_hrs_")
|
||||
|
||||
if base_file_name =='':
|
||||
base_file_name = datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')
|
||||
|
||||
start_time = time.time()
|
||||
instr_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
|
||||
start_time = time.time() # start of the scan function
|
||||
|
||||
instr_info = query_no_echo(instr, '*IDN?')
|
||||
|
||||
instr_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element list of lists containing the set unit, llim and ulim
|
||||
|
||||
if instr_bsettings[0][0] == 'T':
|
||||
instr_bsettings[1][0] = instr_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
|
||||
instr_bsettings[2][0] = instr_bsettings[2][0]*0.1
|
||||
|
||||
if singlepowersupply_bool: # checks limits of Bx or By
|
||||
# if singlepowersupply_bool: # checks limits of Bx or By
|
||||
# if (min_bval< -BY_MAX) or (max_bval > BY_MAX):
|
||||
# raise ValueError('Input limits exceed that of the magnet By! Please input smaller limits.')
|
||||
# elif '1' in query_no_echo(instr, 'CHAN?'): # check if its the coils for Bz
|
||||
# if (min_bval < -BZ_MAX) or (max_bval > BZ_MAX):
|
||||
# raise ValueError('Input limits exceed that of the magnet (Bz)! Please input smaller limits.')
|
||||
# else: # checks limits of Bx
|
||||
# if (min_bval< -BX_MAX) or (max_bval > BX_MAX):
|
||||
# raise ValueError('Input limits exceed that of the magnet Bx! Please input smaller limits.')
|
||||
|
||||
if '2101014' in instr_info and (magnet_coil=='y-axis'): # single power supply
|
||||
if (min_bval< -BY_MAX) or (max_bval > BY_MAX):
|
||||
raise ValueError('Input limits exceed that of the magnet By! Please input smaller limits.')
|
||||
elif '1' in query_no_echo(instr, 'CHAN?'): # check if its the coils for Bz
|
||||
if (min_bval < -BZ_MAX) or (max_bval > BZ_MAX):
|
||||
raise ValueError('Input limits exceed that of the magnet (Bz)! Please input smaller limits.')
|
||||
else: # checks limits of Bx
|
||||
if (min_bval< -BX_MAX) or (max_bval > BX_MAX):
|
||||
raise ValueError('Input limits exceed that of the magnet Bx! Please input smaller limits.')
|
||||
elif '2301034' in instr_info: # dual power supply
|
||||
if magnet_coil=='z-axis': # check if its the coils for Bz
|
||||
if (min_bval < -BZ_MAX) or (max_bval > BZ_MAX):
|
||||
raise ValueError('Input limits exceed that of the magnet (Bz)! Please input smaller limits.')
|
||||
write_no_echo(instr, 'CHAN 1')
|
||||
elif magnet_coil=='x-axis': # checks limits of Bx
|
||||
if (min_bval< -BX_MAX) or (max_bval > BX_MAX):
|
||||
raise ValueError('Input limits exceed that of the magnet Bx! Please input smaller limits.')
|
||||
write_no_echo(instr, 'CHAN 2')
|
||||
else:
|
||||
raise ConnectionError('Device is not connected!')
|
||||
|
||||
write_no_echo(instr, f'LLIM {min_bval*10};ULIM {max_bval*10}') # sets the given limits, must convert to kG for the device to read
|
||||
bval_lst = np.arange(min_bval, max_bval + res, res) # creates list of B values to measure at, with given resolution, in T
|
||||
|
||||
init_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1 # queries the initial B value of the coil, rescale from kG to T
|
||||
# TODO: unused, see if can remove
|
||||
# init_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1 # queries the initial B value of the coil, rescale from kG to T
|
||||
|
||||
init_lim, subsequent_lim = 'LLIM', 'ULIM'
|
||||
init_sweep, subsequent_sweep = 'DOWN', 'UP'
|
||||
@ -417,24 +470,19 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
|
||||
init_lim, subsequent_lim = subsequent_lim, init_lim
|
||||
init_sweep, subsequent_sweep = subsequent_sweep, init_sweep
|
||||
|
||||
# creates the pyramid list of B vals if one were to perform a hysteresis measurement
|
||||
if loopscan_bool:
|
||||
bval_lst = pyramid_list(bval_lst)
|
||||
|
||||
total_points = len(bval_lst)
|
||||
middle_index_bval_lst = total_points // 2
|
||||
intensity_data = [] # To store data from each scan
|
||||
cwd = os.getcwd() # save original directory
|
||||
|
||||
#This gives a directory, in which the script will save the spectrum of each spot as spe
|
||||
#However, it will open the spectrum, convert it to txt, add it to the intensity_data and delete the spe file
|
||||
#scanning loop
|
||||
for i, bval in enumerate(bval_lst):
|
||||
|
||||
# if init_bval == bval:
|
||||
# # if initial bval is equal to the element of the given iteration from the bval_lst, then commence measuring the spectrum
|
||||
# pass
|
||||
# else:
|
||||
|
||||
# TODO: improve the conditional block later on... try to shorten the number of conditionals needed/flatten the nested conditionals
|
||||
# else, travel to the lower or higher limit, depending on how far the init val is to each bound, and commence the measurement from there on
|
||||
# if not reversescan_bool:
|
||||
if i == 0: # for first iteration, sweep to one of the limits
|
||||
# NOTE: helper function for the scanning loop
|
||||
def helper_scan_func(idx, bval, instr=instr, init_lim=init_lim, init_sweep=init_sweep,
|
||||
subsequent_lim=subsequent_lim, subsequent_sweep=subsequent_sweep, sleep=5):
|
||||
if idx == 0: # for first iteration, sweep to one of the limits
|
||||
write_no_echo(instr, f'{init_lim} {bval*10}') # convert back to kG
|
||||
write_no_echo(instr, f'SWEEP {init_sweep}')
|
||||
else:
|
||||
@ -449,6 +497,40 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
|
||||
actual_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1
|
||||
# update the actual bval
|
||||
print(f'Actual magnet strength: {actual_bval} T,', f'Target magnet strength: {bval} T')
|
||||
|
||||
#scanning loop
|
||||
for i, bval in enumerate(bval_lst):
|
||||
# if init_bval == bval:
|
||||
# # if initial bval is equal to the element of the given iteration from the bval_lst, then commence measuring the spectrum
|
||||
# pass
|
||||
# else:
|
||||
|
||||
# NOTE: original code without the loop scan
|
||||
################################################
|
||||
# if i == 0: # for first iteration, sweep to one of the limits
|
||||
# write_no_echo(instr, f'{init_lim} {bval*10}') # convert back to kG
|
||||
# write_no_echo(instr, f'SWEEP {init_sweep}')
|
||||
# else:
|
||||
# write_no_echo(instr, f'{subsequent_lim} {bval*10}') # convert back to kG
|
||||
# write_no_echo(instr, f'SWEEP {subsequent_sweep}')
|
||||
|
||||
# actual_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1 # convert kG to T
|
||||
# print(f'Actual magnet strength: {actual_bval} T,', f'Target magnet strength: {bval} T')
|
||||
|
||||
# while abs(actual_bval - bval) > 0.0001:
|
||||
# time.sleep(5) # little break
|
||||
# actual_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1
|
||||
# # update the actual bval
|
||||
# print(f'Actual magnet strength: {actual_bval} T,', f'Target magnet strength: {bval} T')
|
||||
###############################################
|
||||
if not loopscan_bool:
|
||||
helper_scan_func(i, bval)
|
||||
else:
|
||||
if i <= middle_index_bval_lst:
|
||||
helper_scan_func(i, bval)
|
||||
else:
|
||||
helper_scan_func(i, bval, instr=instr, init_lim=subsequent_lim, init_sweep=subsequent_sweep,
|
||||
subsequent_lim=init_lim, subsequent_sweep=init_sweep, sleep=5)
|
||||
|
||||
time.sleep(5)
|
||||
# we acquire with the LF
|
||||
@ -457,12 +539,12 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
|
||||
|
||||
# read the .spe file and get the data as loaded_files
|
||||
cwd = os.getcwd() # save original directory
|
||||
os.chdir(path_save) #change directory
|
||||
os.chdir(temp_folder_path) #change directory
|
||||
loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable
|
||||
os.chdir(cwd) # go back to original directory
|
||||
|
||||
# Delete the created .spe file from acquiring after getting necessary info
|
||||
spe_file_path = os.path.join(path_save, acquire_name_spe + '.spe')
|
||||
spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe')
|
||||
os.remove(spe_file_path)
|
||||
|
||||
points_left = total_points - i - 1 # TODO: SEE IF THIS IS CORRECT
|
||||
@ -476,6 +558,8 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
|
||||
elapsed_time = (end_time - start_time) / 60
|
||||
print('Scan time: ', elapsed_time, 'minutes')
|
||||
|
||||
write_no_echo(instr, f'LLIM {instr_bsettings[1][0]*10};ULIM {instr_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
|
||||
|
||||
if zerowhenfin_bool:
|
||||
write_no_echo(instr, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above
|
||||
|
||||
@ -495,6 +579,279 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
|
||||
np.savetxt("Wavelength.txt", wl)
|
||||
|
||||
|
||||
def polar_to_cartesian(radius, start_angle, end_angle, step_size, clockwise=True):
|
||||
# TODO: DOCS
|
||||
"""Creates a list of discrete cartesian coordinates (x,y), given the radius, start- and end angles, the angle step size, and the direction of rotation.
|
||||
Function then returns a list of two lists: list of angles and list of cartesian coordinates (x,y coordinates in a tuple).
|
||||
|
||||
Args:
|
||||
radius (_type_): _description_
|
||||
start_angle (_type_): _description_
|
||||
end_angle (_type_): _description_
|
||||
step_size (_type_): _description_
|
||||
clockwise (bool, optional): _description_. Defaults to True.
|
||||
|
||||
Returns:
|
||||
_type_: _description_
|
||||
""" """"""
|
||||
# Initialize lists to hold angles and (x, y) pairs
|
||||
angles = []
|
||||
coordinates = []
|
||||
|
||||
# Normalize angles to the range [0, 360)
|
||||
start_angle = start_angle % 360
|
||||
end_angle = end_angle % 360
|
||||
|
||||
if clockwise:
|
||||
# Clockwise rotation
|
||||
current_angle = start_angle
|
||||
while True:
|
||||
# Append the current angle to the angles list
|
||||
angles.append(current_angle % 360)
|
||||
|
||||
# Convert the current angle to radians
|
||||
current_angle_rad = math.radians(current_angle % 360)
|
||||
|
||||
# Convert polar to Cartesian coordinates
|
||||
x = radius * math.cos(current_angle_rad)
|
||||
y = radius * math.sin(current_angle_rad)
|
||||
|
||||
# Append the (x, y) pair to the list
|
||||
coordinates.append((x, y))
|
||||
|
||||
# Check if we've reached the end_angle (handling wrap-around) (current_angle - step_size) % 360 == end_angle or
|
||||
if current_angle % 360 == end_angle:
|
||||
break
|
||||
|
||||
# Decrement the current angle by the step size
|
||||
current_angle -= step_size
|
||||
if current_angle < 0:
|
||||
current_angle += 360
|
||||
else:
|
||||
# Counterclockwise rotation
|
||||
current_angle = start_angle
|
||||
while True:
|
||||
# Append the current angle to the angles list
|
||||
angles.append(current_angle % 360)
|
||||
|
||||
# Convert the current angle to radians
|
||||
current_angle_rad = math.radians(current_angle % 360)
|
||||
|
||||
# Convert polar to Cartesian coordinates
|
||||
x = radius * math.cos(current_angle_rad)
|
||||
y = radius * math.sin(current_angle_rad)
|
||||
|
||||
# Append the (x, y) pair to the list
|
||||
coordinates.append((x, y))
|
||||
|
||||
# Check if we've reached the end_angle (handling wrap-around) (current_angle + step_size) % 360 == end_angle or
|
||||
if current_angle % 360 == end_angle:
|
||||
break
|
||||
|
||||
# Increment the current angle by the step size
|
||||
current_angle += step_size
|
||||
if current_angle >= 360:
|
||||
current_angle -= 360
|
||||
|
||||
return [angles, coordinates]
|
||||
|
||||
|
||||
def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Resource,
|
||||
Babs:float, startangle:float, endangle:float, angle_stepsize:float, Settings:str, clockwise=True, base_file_name='', zerowhenfin_bool=False)->None:
|
||||
# TODO: update docs
|
||||
"""Rotation of the b-field in discrete steps, spectrum is measured at each discrete step in the rotation. Scan angle is
|
||||
defined as the angle between the x-axis and the current B-field vector, i.e., in the anticlockwise direction.
|
||||
|
||||
Args:
|
||||
instr1 (pyvisa.resources.Resource): _description_
|
||||
instr2 (pyvisa.resources.Resource): _description_
|
||||
Babs (float): absolute B-field value in T
|
||||
startangle (float): start angle in degrees
|
||||
endangle (float): end angle in degrees
|
||||
angle_stepsize (float): angle step size in degrees
|
||||
clockwise (bool): determines the direction of rotation of the B-field. Defaults to True.
|
||||
zerowhenfin_bool (bool, optional): after finishing the rotation, both B-field components should be set to 0 T. Defaults to False.
|
||||
"""
|
||||
|
||||
# TODO: possibly rename instr1 and instr2 to the dual and single power supplies respectively??
|
||||
|
||||
# defines the folder, in which the data from the spectrometer is temporarily stored in
|
||||
temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
|
||||
|
||||
if base_file_name =='':
|
||||
base_file_name = datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')
|
||||
|
||||
start_time = time.time() # start of the scan function
|
||||
|
||||
startangle = startangle % 360
|
||||
endangle = endangle % 360 # ensures that the angles are within [0,360)
|
||||
|
||||
idnstr1 = query_no_echo(instr1, '*IDN?')
|
||||
idnstr2 = query_no_echo(instr1, '*IDN?')
|
||||
|
||||
intensity_data = [] # To store data from each scan
|
||||
cwd = os.getcwd() # save original directory
|
||||
|
||||
# find which one is the dual power supply, then, ramp B_x to Babs value
|
||||
if '2301034' in idnstr1: # serial no. the dual power supply
|
||||
pass
|
||||
elif '2101034' in idnstr2:
|
||||
# swap instruments, instr 1 to be the dual power supply (^= x-axis)
|
||||
instr1, instr2 = instr2, instr1
|
||||
|
||||
# save initial low and high sweep limits of each device, and set them back after the rotation
|
||||
instr1_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
|
||||
instr2_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
|
||||
if instr1_bsettings[0][0] == 'T':
|
||||
instr1_bsettings[1][0] = instr1_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
|
||||
instr1_bsettings[2][0] = instr1_bsettings[2][0]*0.1
|
||||
if instr2_bsettings[0][0] == 'T':
|
||||
instr2_bsettings[1][0] = instr2_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
|
||||
instr2_bsettings[2][0] = instr2_bsettings[2][0]*0.1
|
||||
|
||||
# initialise the sweep angle list as well as the sweep limits and directions for each instrument
|
||||
instr1_lim, instr2_lim = 'LLIM', 'ULIM'
|
||||
instr1_sweep, instr2_sweep = 'DOWN', 'UP'
|
||||
|
||||
# create lists of angles and discrete Cartesian coordinates
|
||||
angles, cartesian_coords = polar_to_cartesian(Babs, startangle, endangle, angle_stepsize, clockwise=clockwise)
|
||||
|
||||
if clockwise: # NOTE: old conditional was: startangle > endangle see if this works....
|
||||
# reverse sweep limits and directions for the clockwise rotation
|
||||
instr1_lim, instr2_lim = instr2_lim, instr1_lim
|
||||
instr1_sweep, instr2_sweep = instr2_sweep, instr1_sweep
|
||||
|
||||
# list of rates (with units) for diff ranges of each device, only up to Range 1 for single power supply as that is already
|
||||
# the max recommended current.
|
||||
init_range_lst1 = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'RATE? 0;RATE? 1;RATE? 2').split(';'))
|
||||
init_range_lst2 = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'RATE? 0;RATE? 1').split(';'))
|
||||
|
||||
min_range_lst = [min(el1[0], el2[0]) for el1,el2 in zip(init_range_lst1, init_range_lst2)] # min rates for each given range
|
||||
|
||||
# set both devices to the min rates
|
||||
write_no_echo(instr1, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
|
||||
write_no_echo(instr2, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
|
||||
|
||||
write_no_echo(instr1, f'CHAN 2;ULIM {Babs*10};SWEEP UP') # sets to B_x, the B_x upper limit and sweeps the magnet field to the upper limit
|
||||
print(f'SWEEPING B-X TO {Babs} T NOW')
|
||||
|
||||
# wait for Babs to be reached by the Bx field
|
||||
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
|
||||
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
|
||||
while abs(actual_bval - Babs) > 0.0001:
|
||||
time.sleep(5) # little break
|
||||
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1
|
||||
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
|
||||
|
||||
# NOTE: implement PID control, possibly best option to manage the b field DO THIS LATER ON, WE DO DISCRETE B VALUES RN
|
||||
# Helper function that listens to a device
|
||||
def listen_to_device(device_id, target_value, shared_values, lock, all_targets_met_event):
|
||||
while not all_targets_met_event.is_set(): # Loop until the event is set
|
||||
# value = 0 # Simulate receiving a float from the device INSERT QUERY NO ECHO HERE TO ASK FOR DEVICE IMAG
|
||||
if '2301034' in device_id:
|
||||
value = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
|
||||
elif '2101014' in device_id:
|
||||
value = sep_num_from_units(query_no_echo(instr2, 'IMAG?'))[0]*0.1 # convert kG to T
|
||||
print(f"Device {device_id} reports value: {value} T")
|
||||
|
||||
with lock:
|
||||
shared_values[device_id] = value
|
||||
# Check if both devices have met their targets
|
||||
if all(shared_values.get(device) is not None and abs(shared_values[device] - target_value[device]) <= 0.0001
|
||||
for device in shared_values):
|
||||
print(f"Both devices reached their target values: {shared_values}")
|
||||
all_targets_met_event.set() # Signal that both targets are met
|
||||
|
||||
# time.sleep(1) # Simulate periodic data checking
|
||||
|
||||
# Main function to manage threads and iterate over target values
|
||||
def monitor_devices(device_target_values, angles_lst, intensity_data=intensity_data):
|
||||
for iteration, target in enumerate(device_target_values):
|
||||
print(f"\nStarting iteration {iteration+1} for target values: {target}")
|
||||
# Shared dictionary to store values from devices
|
||||
shared_values = {device: None for device in target.keys()}
|
||||
# Event to signal when both target values are reached
|
||||
all_targets_met_event = threading.Event()
|
||||
|
||||
# Lock to synchronize access to shared_values
|
||||
lock = threading.Lock()
|
||||
|
||||
# Create and start threads for each device
|
||||
threads = []
|
||||
for device_id in target.keys():
|
||||
thread = threading.Thread(target=listen_to_device, args=(device_id, target, shared_values, lock, all_targets_met_event))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
# Wait until both devices meet their target values
|
||||
all_targets_met_event.wait()
|
||||
print(f"Both target values for iteration {iteration+1} met. Performing action...")
|
||||
|
||||
# Perform some action after both targets are met
|
||||
# we acquire with the LF
|
||||
acquire_name_spe = f'{base_file_name}_{angles_lst[iteration]}°' # NOTE: save each intensity file with the given angle
|
||||
AcquireAndLock(acquire_name_spe) #this creates a .spe file with the scan name.
|
||||
|
||||
# read the .spe file and get the data as loaded_files
|
||||
cwd = os.getcwd() # save original directory
|
||||
os.chdir(temp_folder_path) #change directory
|
||||
loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable
|
||||
os.chdir(cwd) # go back to original directory
|
||||
|
||||
# Delete the created .spe file from acquiring after getting necessary info
|
||||
spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe')
|
||||
os.remove(spe_file_path)
|
||||
|
||||
# points_left = total_points - i - 1 # TODO: SEE IF THIS IS CORRECT
|
||||
# print('Points left in the scan: ', points_left)
|
||||
|
||||
#append the intensity data as it is (so after every #of_wl_points, the spectrum of the next point begins)
|
||||
intensity_data.append(loaded_files.data[0][0][0])
|
||||
|
||||
# Clean up threads
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
print(f"Threads for iteration {iteration+1} closed.\n")
|
||||
|
||||
#prints total time the mapping lasted
|
||||
end_time = time.time()
|
||||
elapsed_time = (end_time - start_time) / 60
|
||||
print('Scan time: ', elapsed_time, 'minutes')
|
||||
|
||||
# reset both devices to original sweep limits
|
||||
write_no_echo(instr1, f'LLIM {instr1_bsettings[1][0]*10};ULIM {instr1_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
|
||||
write_no_echo(instr2, f'LLIM {instr2_bsettings[1][0]*10};ULIM {instr2_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
|
||||
# reset both devices' initial rates for each range
|
||||
write_no_echo(instr1, f'RANGE 0 {init_range_lst1[0][0]};RANGE 1 {init_range_lst1[1][0]};RANGE 2 {init_range_lst1[2][0]}') # reset the initial limits of the device after the scan
|
||||
write_no_echo(instr2, f'RANGE 0 {init_range_lst2[0][0]};RANGE 1 {init_range_lst2[1][0]}') # reset the initial limits of the device after the scan
|
||||
|
||||
if zerowhenfin_bool:
|
||||
write_no_echo(instr1, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above
|
||||
write_no_echo(instr2, 'SWEEP ZERO')
|
||||
|
||||
#save intensity & WL data as .txt
|
||||
os.chdir('C:/Users/localadmin/Desktop/Users/Lukas')
|
||||
# creates new folder for MAP data
|
||||
new_folder_name = "Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}"
|
||||
os.mkdir(new_folder_name)
|
||||
# Here the things will be saved in a new folder under user Lukas !
|
||||
# IMPORTANT last / has to be there, otherwise data cannot be saved and will be lost!!!!!!!!!!!!!!!!
|
||||
os.chdir('C:/Users/localadmin/Desktop/Users/Lukas/'+ new_folder_name)
|
||||
|
||||
intensity_data = np.array(intensity_data)
|
||||
np.savetxt(Settings + f'{angles[0]}°_to_{angles[-1]}°' + experiment_name +'.txt', intensity_data)
|
||||
# TODO: remove/edit experiment_name in line above, as well in sweep_b_val func, rn takes a global variable below
|
||||
|
||||
wl = np.array(loaded_files.wavelength)
|
||||
np.savetxt("Wavelength.txt", wl)
|
||||
|
||||
# modify cartesian_coords to suite the required data struct in monitor_devices
|
||||
cartesian_coords = [{'2301034': t[0], '2101014': t[1]} for t in cartesian_coords]
|
||||
|
||||
# call the helper function to carry out the rotation/measurement of spectrum
|
||||
monitor_devices(cartesian_coords, angles, intensity_data)
|
||||
|
||||
|
||||
################################################################# END OF FUNCTION DEFS ###########################################################################################
|
||||
|
||||
@ -502,20 +859,33 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
|
||||
|
||||
# Initialise PYVISA ResourceManager
|
||||
rm = pyvisa.ResourceManager()
|
||||
# print(rm.list_resources()) # 'ASRL8::INSTR' for dual power supply, 'ASRL9::INSTR' for single power supply
|
||||
# print(rm.list_resources())
|
||||
# 'ASRL8::INSTR' for dual power supply, 'ASRL9::INSTR' for single power supply (online PC)
|
||||
# 'ASRL10::INSTR' for dual power supply, 'ASRL12::INSTR' for single power supply (offline PC)
|
||||
|
||||
|
||||
# Open the connection with the APS100 dual power supply
|
||||
powerbox_dualsupply = rm.open_resource('ASRL8::INSTR',
|
||||
baud_rate=9600, # Example baud rate, adjust as needed
|
||||
powerbox_dualsupply = rm.open_resource('ASRL10::INSTR',
|
||||
baud_rate=9600,
|
||||
data_bits=8,
|
||||
parity= pyvisa.constants.Parity.none,
|
||||
stop_bits= pyvisa.constants.StopBits.one,
|
||||
timeout=5000)# 5000 ms timeout
|
||||
timeout=100)# 5000 ms timeout
|
||||
|
||||
# Open the connection with the APS100 dual power supply
|
||||
powerbox_singlesupply = rm.open_resource('ASRL12::INSTR',
|
||||
baud_rate=9600,
|
||||
data_bits=8,
|
||||
parity= pyvisa.constants.Parity.none,
|
||||
stop_bits= pyvisa.constants.StopBits.one,
|
||||
timeout=100)# 5000 ms timeout
|
||||
|
||||
write_no_echo(powerbox_dualsupply, 'REMOTE') # turn on the remote mode
|
||||
write_no_echo(powerbox_singlesupply, 'REMOTE') # turn on the remote mode
|
||||
|
||||
# TODO: test functionality of the magnet_coil param later on, should work... as this code below is basically implemented inside the scan func.
|
||||
# select axis for the dual supply, either z-axis(CHAN 1 ^= Supply A) or x-axis(CHAN 2 ^= Supply B)
|
||||
write_no_echo(powerbox_dualsupply, 'CHAN 1')
|
||||
# write_no_echo(powerbox_dualsupply, 'CHAN 1')
|
||||
|
||||
# Setup connection to AMC
|
||||
amc = AMC.Device(IP)
|
||||
@ -550,21 +920,18 @@ experiment_settings = 'PL_SP_700_LP_700_HeNe_52muW_exp_2s_Start_'
|
||||
#The program adds the range of the scan as well as the resolution and the date and time of the measurement
|
||||
experiment_name = f"{set_llim_bval}T_to_{set_ulim_bval}T_{set_res_bval}T_{datetime.datetime.now().strftime('%Y_%m_%d_%H%M')}"
|
||||
|
||||
# # TODO: write the bval scan here
|
||||
# for idx, bval in enumerate(bval_lst):
|
||||
# write_no_echo(powerbox_dualsupply, '')
|
||||
|
||||
# this moves the probe in xy-direction and measures spectrum there
|
||||
# move_scan_xy(range_x, range_y, resolution, experiment_settings, experiment_name)
|
||||
|
||||
# perform the B-field measurement for selected axis above
|
||||
# sweep_b_val(powerbox_dualsupply, set_llim_bval, set_ulim_bval, set_res_bval, experiment_settings, experiment_name)
|
||||
sweep_b_val(powerbox_dualsupply, set_llim_bval, set_ulim_bval, set_res_bval,
|
||||
experiment_settings, experiment_name, singlepowersupply_bool=False, zerowhenfin_bool=True, reversescan_bool=False)
|
||||
sweep_b_val(powerbox_dualsupply, set_llim_bval, set_ulim_bval, set_res_bval, 'z-axis',
|
||||
experiment_settings, experiment_name, zerowhenfin_bool=True, reversescan_bool=False)
|
||||
|
||||
# Internally, axes are numbered 0 to 2
|
||||
|
||||
write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode
|
||||
write_no_echo(powerbox_singlesupply, 'LOCAL') # turn off the remote mode
|
||||
# time.sleep(0.5)
|
||||
powerbox_dualsupply.close()
|
||||
|
||||
powerbox_singlesupply.close()
|
33
FunctionsTest.py
Normal file
33
FunctionsTest.py
Normal file
@ -0,0 +1,33 @@
|
||||
import re
|
||||
import numpy as np
|
||||
|
||||
def sep_num_from_units(powerbox_output :str)->list:
|
||||
'''
|
||||
Receives a string as input and separates the numberic value and unit and returns it as a list.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
powerbox_output : str
|
||||
string output from the attocube powerbox, e.g. 1.35325kG
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
list of float value and string (b value and it's units). If string is purely alphabets, then return a single element list
|
||||
|
||||
'''
|
||||
match = re.match(r'\s*([+-]?\d*\.?\d+)([A-Za-z]+)', powerbox_output)
|
||||
if match:
|
||||
numeric_part = float(match.group(1)) # Convert the numeric part to a float
|
||||
alphabetic_part = match.group(2) # Get the alphabetic part
|
||||
return [numeric_part, alphabetic_part]
|
||||
else:
|
||||
return [powerbox_output,]
|
||||
|
||||
angles = [1,2,3]
|
||||
|
||||
print(str(angles[0]) +"\n"+ str(angles[-1]))
|
||||
|
||||
rates_lst = list(sep_num_from_units(el) for el in "0.0kG;1.0kG".split(";"))
|
||||
|
||||
print(rates_lst[1][0])
|
1310
Mag_Field_Sweep_2025_04_15.py
Normal file
1310
Mag_Field_Sweep_2025_04_15.py
Normal file
File diff suppressed because it is too large
Load Diff
127
README.md
Normal file
127
README.md
Normal file
@ -0,0 +1,127 @@
|
||||
|
||||
# Magnetic Field Sweep and Spatial Mapping Automation
|
||||
|
||||
**Author:** Serdar (adjusted by Lukas and Ryan)
|
||||
**Last Updated:** April 2025
|
||||
**Filename:** `Mag_Field_Sweep_2024_10_21.py`
|
||||
|
||||
## Overview
|
||||
|
||||
This script automates spectral acquisition in a magneto-optical experiment using:
|
||||
|
||||
- **LightField** for spectrometer control (Princeton Instruments)
|
||||
- **AMC Positioner** for precise spatial scanning
|
||||
- **Attocube APS100** power supplies for magnetic field control
|
||||
|
||||
It enables:
|
||||
- Magnetic field sweeps along selected axes
|
||||
- Spatial scans across X-Y positions
|
||||
- B-field vector rotations with spectral capture
|
||||
- Live spectrum acquisition and intensity mapping
|
||||
|
||||
## Features
|
||||
|
||||
- **2D Spatial Scan:** Raster-scan across a surface using AMC positioners, capturing spectra at each coordinate.
|
||||
- **Magnetic Field Sweep:** Vary B-fields in controlled steps along x/y/z, measure spectra at each step.
|
||||
- **Field Rotation:** Circular B-field rotation (in-plane) with angle-defined steps.
|
||||
- **Automated File Handling:** Acquires `.spe` files, extracts and saves intensity/wavelengths, deletes intermediates.
|
||||
- **Flexible Configuration:** Resolution, range, exposure, filters, filenames and scan directions are all customizable.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
### Hardware
|
||||
- AMC100/AMC300 positioner
|
||||
- Attocube APS100 single/dual-channel magnet power supplies
|
||||
- Spectrometer compatible with Princeton Instruments LightField
|
||||
|
||||
### Software & Libraries
|
||||
- **Python 3.8+**
|
||||
- Packages: `pyvisa`, `numpy`, `matplotlib`, `pandas`, `clr`, `spe2py`, `spe_loader`, `AMC` module
|
||||
- .NET integration via `pythonnet`
|
||||
- LightField SDK: Princeton Instruments (with DLLs loaded via `clr`)
|
||||
|
||||
> Note: Ensure `LIGHTFIELD_ROOT` environment variable is set.
|
||||
|
||||
## Setup
|
||||
|
||||
1. **Install dependencies**
|
||||
```bash
|
||||
pip install pyvisa pandas numpy matplotlib pythonnet
|
||||
```
|
||||
|
||||
2. **Ensure required DLLs** are present in:
|
||||
```
|
||||
C:\Program Files\Princeton Instruments\LightField\
|
||||
```
|
||||
|
||||
3. **Set up device IPs**
|
||||
```python
|
||||
IP_AMC100 = "192.168.71.100" # or AMC300
|
||||
```
|
||||
|
||||
4. **Edit scan parameters in main block:**
|
||||
```python
|
||||
range_x = 20000
|
||||
range_y = 20000
|
||||
resolution = 1000 # nanometers
|
||||
|
||||
set_llim_bval = -0.3
|
||||
set_ulim_bval = 0.3
|
||||
set_res_bval = 0.003 # Tesla
|
||||
```
|
||||
|
||||
## Main Functions
|
||||
|
||||
### `move_scan_xy(range_x, range_y, resolution, Settings, baseFileName)`
|
||||
Performs a 2D XY raster scan of the probe. Acquires spectra and saves results.
|
||||
|
||||
### `sweep_b_val(instr, min_bval, max_bval, res, axis, Settings, base_file_name)`
|
||||
Sweeps magnetic field (in T) along the specified axis, collecting spectra at each field.
|
||||
|
||||
### `ramp_b_val(instr, bval, magnet_coil)`
|
||||
Smooth ramping of B-field to target value.
|
||||
|
||||
### `b_field_rotation(instr1, instr2, Babs, startangle, endangle, step, Settings)`
|
||||
Rotates the in-plane magnetic field by vector combination of Bx and By components.
|
||||
|
||||
## File Saving
|
||||
|
||||
- `.txt`: Intensity data and wavelength arrays saved to timestamped folders
|
||||
- Folder names include experiment metadata
|
||||
- `.spe` files are deleted after processing to conserve space
|
||||
|
||||
## Usage Example
|
||||
|
||||
To sweep B-field along the **Y-axis**:
|
||||
|
||||
```python
|
||||
sweep_b_val(
|
||||
instr=powerbox_singlesupply,
|
||||
min_bval=-0.3,
|
||||
max_bval=0.3,
|
||||
res=0.003,
|
||||
magnet_coil='y-axis',
|
||||
Settings='experiment_config',
|
||||
base_file_name='scan_name',
|
||||
zerowhenfin_bool=True,
|
||||
reversescan_bool=False,
|
||||
loopscan_bool=True
|
||||
)
|
||||
```
|
||||
|
||||
## Notes
|
||||
|
||||
- Always close power supply connections with `.close()`
|
||||
- Make sure `.spe` files are not locked by LightField before running
|
||||
- The AMC section is currently commented — uncomment if positioner control is needed
|
||||
- Ensure `experiment.Load(...)` points to the correct `.lfe` config
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
- **DLL loading issues?** Confirm path via `sys.path.append(...)` and DLL names.
|
||||
- **Communication errors?** Check serial port resource names via `pyvisa.ResourceManager().list_resources()`
|
||||
- **No spectra saved?** Ensure LightField is licensed and experiment file is valid.
|
||||
|
||||
## License
|
||||
|
||||
Internal use only – please contact the authors before distribution or reuse.
|
153
Test.py
Normal file
153
Test.py
Normal file
@ -0,0 +1,153 @@
|
||||
import math
|
||||
from magnet_modules import sweep_functions as sf
|
||||
|
||||
# sf.test_func()
|
||||
|
||||
def generate_angle_coord_list(radius, start_angle, end_angle, step_size, clockwise=True):
|
||||
# TODO: DOCS
|
||||
"""Creates a list of discrete cartesian coordinates (x,y), given the radius, start- and end angles, the angle step size, and the direction of rotation.
|
||||
Function then returns a list of two lists: list of angles and list of cartesian coordinates (x,y coordinates in a tuple).
|
||||
|
||||
Args:
|
||||
radius (_type_): _description_
|
||||
start_angle (_type_): _description_
|
||||
end_angle (_type_): _description_
|
||||
step_size (_type_): _description_
|
||||
clockwise (bool, optional): _description_. Defaults to True.
|
||||
|
||||
Returns:
|
||||
_type_: _description_
|
||||
""" """"""
|
||||
# Initialize lists to hold angles and (x, y) pairs
|
||||
angles = []
|
||||
coordinates = []
|
||||
|
||||
# Normalize angles to the range [0, 360)
|
||||
start_angle = start_angle % 360
|
||||
end_angle = end_angle % 360
|
||||
|
||||
if not clockwise:
|
||||
# Clockwise rotation
|
||||
current_angle = start_angle
|
||||
while True:
|
||||
# Append the current angle to the angles list
|
||||
angles.append(current_angle % 360)
|
||||
|
||||
# Convert the current angle to radians
|
||||
current_angle_rad = math.radians(current_angle % 360)
|
||||
|
||||
# Convert polar to Cartesian coordinates
|
||||
x = radius * math.cos(current_angle_rad)
|
||||
y = radius * math.sin(current_angle_rad)
|
||||
|
||||
# Append the (x, y) pair to the list
|
||||
coordinates.append((x, y))
|
||||
|
||||
# Check if we've reached the end_angle (handling wrap-around) (current_angle - step_size) % 360 == end_angle or
|
||||
if current_angle % 360 == end_angle:
|
||||
break
|
||||
|
||||
# Decrement the current angle by the step size
|
||||
current_angle -= step_size
|
||||
if current_angle < 0:
|
||||
current_angle += 360
|
||||
else:
|
||||
# Counterclockwise rotation
|
||||
current_angle = start_angle
|
||||
while True:
|
||||
# Append the current angle to the angles list
|
||||
angles.append(current_angle % 360)
|
||||
|
||||
# Convert the current angle to radians
|
||||
current_angle_rad = math.radians(current_angle % 360)
|
||||
|
||||
# Convert polar to Cartesian coordinates
|
||||
x = radius * math.cos(current_angle_rad)
|
||||
y = radius * math.sin(current_angle_rad)
|
||||
|
||||
# Append the (x, y) pair to the list
|
||||
coordinates.append((x, y))
|
||||
|
||||
# Check if we've reached the end_angle (handling wrap-around) (current_angle + step_size) % 360 == end_angle or
|
||||
if current_angle % 360 == end_angle:
|
||||
break
|
||||
|
||||
# Increment the current angle by the step size
|
||||
current_angle += step_size
|
||||
if current_angle >= 360:
|
||||
current_angle -= 360
|
||||
|
||||
return [angles, coordinates]
|
||||
|
||||
|
||||
def generate_coord_list_fixed_angle(angle, b_val, b_val_step_size, reverse=False):
|
||||
"""
|
||||
Generates a list of (x, y) Cartesian coordinates along a line defined by a fixed angle,
|
||||
scanning from -b_val to b_val or from b_val to -b_val depending on the reverse flag.
|
||||
|
||||
Args:
|
||||
angle (float): The fixed angle (in degrees) from the positive x-axis.
|
||||
b_val (float): The maximum distance from the origin (both positive and negative).
|
||||
b_val_step_size (float): The increment in distance for each point.
|
||||
reverse (bool): If True, scan from b_val to -b_val. If False, scan from -b_val to b_val.
|
||||
|
||||
Returns:
|
||||
list: A list of tuples representing Cartesian coordinates (x, y).
|
||||
"""
|
||||
coordinates = []
|
||||
|
||||
# Convert angle from degrees to radians
|
||||
angle_rad = math.radians(angle)
|
||||
|
||||
# Determine the scan direction based on the reverse flag
|
||||
if reverse:
|
||||
# Scan from b_val to -b_val
|
||||
current_b = b_val
|
||||
while current_b >= -b_val:
|
||||
x = current_b * math.cos(angle_rad)
|
||||
y = current_b * math.sin(angle_rad)
|
||||
coordinates.append((x, y))
|
||||
current_b -= b_val_step_size
|
||||
else:
|
||||
# Scan from -b_val to b_val
|
||||
current_b = -b_val
|
||||
while current_b <= b_val:
|
||||
x = current_b * math.cos(angle_rad)
|
||||
y = current_b * math.sin(angle_rad)
|
||||
coordinates.append((x, y))
|
||||
current_b += b_val_step_size
|
||||
|
||||
return coordinates
|
||||
|
||||
if __name__=="__main__":
|
||||
# Example usage
|
||||
radius = 5
|
||||
start_angle = 0
|
||||
end_angle = 180
|
||||
step_size = 10
|
||||
|
||||
angles, coordinates = generate_angle_coord_list(radius, start_angle, end_angle, step_size, clockwise=True)
|
||||
|
||||
print('\n', "Angles:", angles, '\n')
|
||||
print("Coordinates:", coordinates, '\n',)
|
||||
# device_target_values = [{'2301034': bval[0], '2101014': bval[1]} for bval in coordinates]
|
||||
xcoord_tuple, ycoord_tuple = zip(*coordinates)
|
||||
device_target_values = {'2301034': list(xcoord_tuple), '2101014': list(ycoord_tuple)}
|
||||
print(f"{device_target_values['2301034']=}")
|
||||
print(f"{device_target_values['2101014']=}")
|
||||
|
||||
for iteration, (device_id,bval_lst) in enumerate(device_target_values.items()):
|
||||
print(iteration, device_id, bval_lst)
|
||||
|
||||
# print(generate_coord_list_fixed_angle(10, 5, 1, reverse=False))
|
||||
|
||||
testdict = [{'2301034': bval[0], '2101014': bval[1]} for bval in coordinates]
|
||||
print(f"{testdict=}")
|
||||
|
||||
for i, target in enumerate(testdict):
|
||||
print(i, target.keys())
|
||||
# for key in target.keys():
|
||||
# print(type(key))
|
||||
|
||||
|
||||
|
84
ThreadTest.py
Normal file
84
ThreadTest.py
Normal file
@ -0,0 +1,84 @@
|
||||
import threading
|
||||
import time
|
||||
import random
|
||||
|
||||
# Shared values
|
||||
device_values = [0, 0]
|
||||
value_lock = threading.Lock()
|
||||
|
||||
# Per-device pause controls
|
||||
device_events = [threading.Event(), threading.Event()]
|
||||
device_events[0].set() # Start as running
|
||||
device_events[1].set()
|
||||
|
||||
# Tolerance threshold
|
||||
TOLERANCE = 20
|
||||
|
||||
# Stop flag
|
||||
stop_event = threading.Event()
|
||||
|
||||
def is_within_tolerance(val_a, val_b):
|
||||
return abs(val_a - val_b) <= TOLERANCE
|
||||
|
||||
# Device thread
|
||||
def device_thread(device_id):
|
||||
other_id = 1 - device_id
|
||||
while not stop_event.is_set():
|
||||
device_events[device_id].wait() # Pause if needed
|
||||
|
||||
# Simulate value from device
|
||||
new_value = random.randint(0, 100)
|
||||
|
||||
with value_lock:
|
||||
device_values[device_id] = new_value
|
||||
my_val = device_values[device_id]
|
||||
other_val = device_values[other_id]
|
||||
|
||||
print(f"Device {device_id} => {my_val} | Device {other_id} => {other_val}")
|
||||
|
||||
if not is_within_tolerance(my_val, other_val):
|
||||
# print(f"Device {device_id} is out of tolerance! Pausing...")
|
||||
# device_events[device_id].clear()
|
||||
print("Not within tolerance!")
|
||||
|
||||
time.sleep(0.1) # Faster check interval
|
||||
|
||||
# Watcher thread
|
||||
def tolerance_watcher():
|
||||
while not stop_event.is_set():
|
||||
with value_lock:
|
||||
val0, val1 = device_values
|
||||
if is_within_tolerance(val0, val1):
|
||||
for i, event in enumerate(device_events):
|
||||
if not event.is_set():
|
||||
print(f"Resuming Device {i}")
|
||||
event.set()
|
||||
|
||||
time.sleep(0.05) # Fast response
|
||||
|
||||
# Start threads
|
||||
threads = [
|
||||
threading.Thread(target=device_thread, args=(0,)),
|
||||
threading.Thread(target=device_thread, args=(1,)),
|
||||
threading.Thread(target=tolerance_watcher)
|
||||
]
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
# Run loop (press Ctrl+C to stop)
|
||||
try:
|
||||
while True:
|
||||
time.sleep(0.1)
|
||||
except KeyboardInterrupt:
|
||||
print("Stopping...")
|
||||
|
||||
stop_event.set()
|
||||
for event in device_events:
|
||||
event.set()
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
print("All threads stopped.")
|
||||
|
519
b_rotation_test.py
Normal file
519
b_rotation_test.py
Normal file
@ -0,0 +1,519 @@
|
||||
############################################
|
||||
# Packages from Ryan
|
||||
import re
|
||||
import math
|
||||
import threading
|
||||
import pyvisa
|
||||
# from pyvisa import ResourceManager, constants
|
||||
|
||||
# B Field Limits (in T)
|
||||
BX_MAX = 1.7
|
||||
BY_MAX = 1.7
|
||||
BZ_MAX = 4.0
|
||||
############################################
|
||||
|
||||
# import AMC
|
||||
import csv
|
||||
import time
|
||||
import clr
|
||||
import sys
|
||||
import os
|
||||
import spe2py as spe
|
||||
import spe_loader as sl
|
||||
import pandas as pd
|
||||
import time
|
||||
# from System.IO import *
|
||||
# from System import String
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
import datetime
|
||||
from typing import Union
|
||||
|
||||
def sep_num_from_units(powerbox_output :str)->list:
|
||||
'''
|
||||
Receives a string as input and separates the numberic value and unit and returns it as a list.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
powerbox_output : str
|
||||
string output from the attocube powerbox, e.g. 1.35325kG
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
list of float value and string (b value and it's units). If string is purely alphabets, then return a single element list
|
||||
|
||||
'''
|
||||
match = re.match(r'\s*([+-]?\d*\.?\d+)([A-Za-z]+)', powerbox_output)
|
||||
if match:
|
||||
numeric_part = float(match.group(1)) # Convert the numeric part to a float
|
||||
alphabetic_part = match.group(2) # Get the alphabetic part
|
||||
return [numeric_part, alphabetic_part]
|
||||
else:
|
||||
return [powerbox_output,]
|
||||
|
||||
|
||||
def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
|
||||
"""helper function for the Attocube APS100 that queries a function to the device, removing the echo.
|
||||
|
||||
Args:
|
||||
instr (pyvisa.resources.Resource):
|
||||
command (str): commands, can be stringed in series with ; between commands
|
||||
sleeptime (float, optional): delay time between commands. Defaults to 0.01.
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
""" ''''''
|
||||
try:
|
||||
print(f"Sending command: {command}")
|
||||
instr.write(command)
|
||||
time.sleep(sleeptime)
|
||||
echo_response = instr.read() # Read and discard the echo
|
||||
# print(f"Echo response: {echo_response}")
|
||||
actual_response = instr.read() # Read the actual response
|
||||
print(f"Actual response: {actual_response}")
|
||||
return actual_response
|
||||
except pyvisa.VisaIOError as e:
|
||||
print(f"Error communicating with instrument: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
|
||||
"""helper function for the Attocube APS100 that writes a function to the device, removing the echo.
|
||||
|
||||
Args:
|
||||
instr (pyvisa.resources.Resource):
|
||||
command (str): commands, can be stringed in series with ; between commands
|
||||
sleeptime (float, optional): delay time between commands. Defaults to 0.01.
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
""" ''''''
|
||||
try:
|
||||
print(f"Sending command: {command}")
|
||||
instr.write(command)
|
||||
time.sleep(sleeptime) # Give the device some time to process
|
||||
try:
|
||||
while True:
|
||||
echo_response = instr.read() # Read and discard the echo
|
||||
# print(f"Echo response: {echo_response}")
|
||||
except pyvisa.VisaIOError as e:
|
||||
# Expected timeout after all echoed responses are read
|
||||
if e.error_code != pyvisa.constants.VI_ERROR_TMO:
|
||||
raise
|
||||
except pyvisa.VisaIOError as e:
|
||||
print(f"Error communicating with instrument: {e}")
|
||||
|
||||
|
||||
def generate_angle_coord_list(radius, start_angle, end_angle, step_size, clockwise=True):
|
||||
# TODO: DOCS
|
||||
"""Creates a list of discrete cartesian coordinates (x,y), given the radius, start- and end angles, the angle step size, and the direction of rotation.
|
||||
Function then returns a list of two lists: list of angles and list of cartesian coordinates (x,y coordinates in a tuple).
|
||||
|
||||
Args:
|
||||
radius (_type_): _description_
|
||||
start_angle (_type_): _description_
|
||||
end_angle (_type_): _description_
|
||||
step_size (_type_): _description_
|
||||
clockwise (bool, optional): _description_. Defaults to True.
|
||||
|
||||
Returns:
|
||||
_type_: _description_
|
||||
""" """"""
|
||||
# Initialize lists to hold angles and (x, y) pairs
|
||||
angles = []
|
||||
coordinates = []
|
||||
|
||||
# Normalize angles to the range [0, 360)
|
||||
start_angle = start_angle % 360
|
||||
end_angle = end_angle % 360
|
||||
|
||||
if not clockwise:
|
||||
# Clockwise rotation
|
||||
current_angle = start_angle
|
||||
while True:
|
||||
# Append the current angle to the angles list
|
||||
angles.append(current_angle % 360)
|
||||
|
||||
# Convert the current angle to radians
|
||||
current_angle_rad = math.radians(current_angle % 360)
|
||||
|
||||
# Convert polar to Cartesian coordinates
|
||||
x = radius * math.cos(current_angle_rad)
|
||||
y = radius * math.sin(current_angle_rad)
|
||||
|
||||
# Append the (x, y) pair to the list
|
||||
coordinates.append((x, y))
|
||||
|
||||
# Check if we've reached the end_angle (handling wrap-around) (current_angle - step_size) % 360 == end_angle or
|
||||
if current_angle % 360 == end_angle:
|
||||
break
|
||||
|
||||
# Decrement the current angle by the step size
|
||||
current_angle -= step_size
|
||||
if current_angle < 0:
|
||||
current_angle += 360
|
||||
else:
|
||||
# Counterclockwise rotation
|
||||
current_angle = start_angle
|
||||
while True:
|
||||
# Append the current angle to the angles list
|
||||
angles.append(current_angle % 360)
|
||||
|
||||
# Convert the current angle to radians
|
||||
current_angle_rad = math.radians(current_angle % 360)
|
||||
|
||||
# Convert polar to Cartesian coordinates
|
||||
x = radius * math.cos(current_angle_rad)
|
||||
y = radius * math.sin(current_angle_rad)
|
||||
|
||||
# Append the (x, y) pair to the list
|
||||
coordinates.append((x, y))
|
||||
|
||||
# Check if we've reached the end_angle (handling wrap-around) (current_angle + step_size) % 360 == end_angle or
|
||||
if current_angle % 360 == end_angle:
|
||||
break
|
||||
|
||||
# Increment the current angle by the step size
|
||||
current_angle += step_size
|
||||
if current_angle >= 360:
|
||||
current_angle -= 360
|
||||
|
||||
return [angles, coordinates]
|
||||
|
||||
def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Resource,
|
||||
Babs:float, startangle:float, endangle:float, angle_stepsize:float,
|
||||
Settings:str, clockwise=True, base_file_name='', zerowhenfin_bool=False)->None:
|
||||
# TODO: update docs
|
||||
"""Rotation of the b-field in discrete steps, spectrum is measured at each discrete step in the rotation. Scan angle is
|
||||
defined as the angle between the x-axis and the current B-field vector, i.e., in the anticlockwise direction.
|
||||
|
||||
Args:
|
||||
instr1 (pyvisa.resources.Resource): _description_
|
||||
instr2 (pyvisa.resources.Resource): _description_
|
||||
Babs (float): absolute B-field value in T
|
||||
startangle (float): start angle in degrees
|
||||
endangle (float): end angle in degrees
|
||||
angle_stepsize (float): angle step size in degrees
|
||||
clockwise (bool): determines the direction of rotation of the B-field. Defaults to True.
|
||||
zerowhenfin_bool (bool, optional): after finishing the rotation, both B-field components should be set to 0 T. Defaults to False.
|
||||
"""
|
||||
|
||||
# TODO: add logging to the script
|
||||
|
||||
# defines the folder, in which the data from the spectrometer is temporarily stored in
|
||||
temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/B_Field_Dump"
|
||||
# temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
|
||||
|
||||
if base_file_name =='':
|
||||
base_file_name = datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')
|
||||
|
||||
start_time = time.time() # start of the scan function
|
||||
|
||||
startangle = startangle % 360
|
||||
endangle = endangle % 360 # ensures that the angles are within [0,360)
|
||||
|
||||
idnstr1 = query_no_echo(instr1, '*IDN?')
|
||||
idnstr2 = query_no_echo(instr2, '*IDN?')
|
||||
|
||||
intensity_data = [] # To store data from each scan
|
||||
cwd = os.getcwd() # save original directory
|
||||
|
||||
# find which one is the dual power supply, then, ramp B_x to Babs value
|
||||
if '2301034' in idnstr1: # serial no. the dual power supply
|
||||
pass
|
||||
elif '2101034' in idnstr2:
|
||||
# swap instruments, instr 1 to be the dual power supply (^= x-axis)
|
||||
instr1, instr2 = instr2, instr1
|
||||
|
||||
# save initial low and high sweep limits of each device, and set them back after the rotation
|
||||
instr1_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
|
||||
instr2_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
|
||||
if instr1_bsettings[0][0] == 'T':
|
||||
instr1_bsettings[1][0] = instr1_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
|
||||
instr1_bsettings[2][0] = instr1_bsettings[2][0]*0.1
|
||||
if instr2_bsettings[0][0] == 'T':
|
||||
instr2_bsettings[1][0] = instr2_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
|
||||
instr2_bsettings[2][0] = instr2_bsettings[2][0]*0.1
|
||||
|
||||
# initialise the sweep angle list as well as the sweep limits and directions for each instrument
|
||||
instr1_lim, instr2_lim = 'LLIM', 'ULIM'
|
||||
instr1_sweep, instr2_sweep = 'DOWN', 'UP'
|
||||
|
||||
# create lists of angles and discrete Cartesian coordinates
|
||||
angles, cartesian_coords = generate_angle_coord_list(Babs, startangle, endangle, angle_stepsize, clockwise=clockwise)
|
||||
|
||||
if clockwise: # NOTE: old conditional was: startangle > endangle see if this works....
|
||||
# reverse sweep limits and directions for the clockwise rotation
|
||||
instr1_lim, instr2_lim = instr2_lim, instr1_lim
|
||||
instr1_sweep, instr2_sweep = instr2_sweep, instr1_sweep
|
||||
|
||||
|
||||
# TODO: i dont think we need to change the rates just yet, think about this later
|
||||
'''
|
||||
# list of rates (with units) for diff ranges of each device, only up to Range 1 for single power supply as that is already
|
||||
# the max recommended current.
|
||||
init_range_lst1 = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'RATE? 0;RATE? 1;RATE? 2').split(';'))
|
||||
init_range_lst2 = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'RATE? 0;RATE? 1').split(';'))
|
||||
|
||||
min_range_lst = [min(el1[0], el2[0]) for el1,el2 in zip(init_range_lst1, init_range_lst2)] # min rates for each given range
|
||||
|
||||
# set both devices to the min rates
|
||||
write_no_echo(instr1, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
|
||||
write_no_echo(instr2, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
|
||||
'''
|
||||
|
||||
|
||||
# TODO: see if this is the desired process: to always start from the x-axis ASK LUKAS
|
||||
if Babs <= BX_MAX:
|
||||
# write_no_echo(instr1, f'CHAN 2;ULIM {Babs*10};SWEEP UP') # sets to B_x, the B_x upper limit and sweeps the magnet field to the upper limit
|
||||
print(f'SWITCHED TO BX, SWEEPING B-X TO {Babs} T NOW')
|
||||
else:
|
||||
raise ValueError(f'{Babs=}T value exceeds the max limit of the Bx field {BX_MAX}T!')
|
||||
|
||||
# wait for Babs to be reached by the Bx field
|
||||
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
|
||||
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
|
||||
while abs(actual_bval - Babs) > 0.0001:
|
||||
time.sleep(5) # little break
|
||||
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1
|
||||
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
|
||||
|
||||
|
||||
|
||||
# TODO: copy and mod code to see if block logic works, test in lab
|
||||
# NOTE: implement PID control, possibly best option to manage the b field DO THIS LATER ON, WE DO DISCRETE B VALUES RN
|
||||
# Helper function that listens to a device
|
||||
def listen_to_device(device_id, target_value, shared_values, lock, all_targets_met_event):
|
||||
while not all_targets_met_event.is_set(): # Loop until the event is set
|
||||
# value = 0 # Simulate receiving a float from the device INSERT QUERY NO ECHO HERE TO ASK FOR DEVICE IMAG
|
||||
if '2301034' in device_id:
|
||||
value = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
|
||||
if value <= target_value[device_id]:
|
||||
# write_no_echo(instr1, f"CHAN 2;ULIM {target_value[device_id]*10};SWEEP UP")
|
||||
print(f'sweeping Bx up to {target_value[device_id]*10}')
|
||||
else:
|
||||
# write_no_echo(instr1, "CHAN 2;LLIM {target_value[device_id]*10};SWEEP DOWN")
|
||||
print(f'sweeping Bx down to {target_value[device_id]*10}')
|
||||
|
||||
elif '2101014' in device_id:
|
||||
value = sep_num_from_units(query_no_echo(instr2, 'IMAG?'))[0]*0.1 # convert kG to T
|
||||
if value <= target_value[device_id]:
|
||||
# write_no_echo(instr2, f"ULIM {target_value[device_id]*10};SWEEP UP")
|
||||
print(f'sweeping By up to {target_value[device_id]*10}')
|
||||
else:
|
||||
# write_no_echo(instr2, "LLIM {target_value[device_id]*10};SWEEP DOWN")
|
||||
print(f'sweeping By down to {target_value[device_id]*10}')
|
||||
else:
|
||||
continue # Skip if device ID is not recognized
|
||||
print(f"Device {device_id} reports value: {value} T")
|
||||
|
||||
with lock:
|
||||
shared_values[device_id] = value
|
||||
# Check if both devices have met their targets
|
||||
if all(shared_values.get(device) is not None and abs(value - target_value[device]) <= 0.0001
|
||||
for device,value in shared_values.items()):
|
||||
print(f"Both devices reached their target values: {shared_values}")
|
||||
all_targets_met_event.set() # Signal that both targets are met
|
||||
|
||||
# time.sleep(1) # Simulate periodic data checking
|
||||
|
||||
# Main function to manage threads and iterate over target values
|
||||
def monitor_devices(device_target_values, angles_lst, intensity_data=intensity_data):
|
||||
for iteration, target in enumerate(device_target_values):
|
||||
print(f"\nStarting iteration {iteration+1} for target values: {target}")
|
||||
# Shared dictionary to store values from devices
|
||||
shared_values = {device: None for device in target.keys()}
|
||||
# Event to signal when both target values are reached
|
||||
all_targets_met_event = threading.Event()
|
||||
|
||||
# Lock to synchronize access to shared_values
|
||||
lock = threading.Lock()
|
||||
|
||||
# Create and start threads for each device
|
||||
threads = []
|
||||
for device_id in target.keys():
|
||||
thread = threading.Thread(target=listen_to_device, args=(device_id, target, shared_values, lock, all_targets_met_event))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
print(f"======================\nThread started for device {device_id}\n======================")
|
||||
|
||||
# Wait until both devices meet their target values
|
||||
all_targets_met_event.wait()
|
||||
print(f"Both target values for iteration {iteration+1} met. Performing action...")
|
||||
# Clean up threads
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
print(f"Threads for iteration {iteration+1} closed.\n")
|
||||
|
||||
|
||||
print(f'COLLECTING SPECTRUM FOR ANGLE {angles_lst[iteration]}°\n')
|
||||
# Perform some action after both targets are met
|
||||
# we acquire with the LF
|
||||
# acquire_name_spe = f'{base_file_name}_{angles_lst[iteration]}°' # NOTE: save each intensity file with the given angle
|
||||
# AcquireAndLock(acquire_name_spe) #this creates a .spe file with the scan name.
|
||||
|
||||
# read the .spe file and get the data as loaded_files
|
||||
# cwd = os.getcwd() # save original directory
|
||||
# os.chdir(temp_folder_path) #change directory
|
||||
# loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable
|
||||
# os.chdir(cwd) # go back to original directory
|
||||
|
||||
# Delete the created .spe file from acquiring after getting necessary info
|
||||
# spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe')
|
||||
# os.remove(spe_file_path)
|
||||
|
||||
points_left = len(angles) - iteration - 1
|
||||
|
||||
print('Points left in the scan: ', points_left)
|
||||
|
||||
#append the intensity data as it is (so after every #of_wl_points, the spectrum of the next point begins)
|
||||
# intensity_data.append(loaded_files.data[0][0][0])
|
||||
|
||||
#prints total time the mapping lasted
|
||||
end_time = time.time()
|
||||
elapsed_time = (end_time - start_time) / 60
|
||||
print('Scan time: ', elapsed_time, 'minutes')
|
||||
|
||||
# reset both devices to original sweep limits
|
||||
write_no_echo(instr1, f'LLIM {instr1_bsettings[1][0]*10};ULIM {instr1_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
|
||||
write_no_echo(instr2, f'LLIM {instr2_bsettings[1][0]*10};ULIM {instr2_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
|
||||
|
||||
|
||||
# TODO: uncomment later if resetting original rates implemented
|
||||
'''
|
||||
# reset both devices' initial rates for each range
|
||||
write_no_echo(instr1, f'RANGE 0 {init_range_lst1[0][0]};RANGE 1 {init_range_lst1[1][0]};RANGE 2 {init_range_lst1[2][0]}') # reset the initial limits of the device after the scan
|
||||
write_no_echo(instr2, f'RANGE 0 {init_range_lst2[0][0]};RANGE 1 {init_range_lst2[1][0]}') # reset the initial limits of the device after the scan
|
||||
'''
|
||||
|
||||
|
||||
if zerowhenfin_bool:
|
||||
# write_no_echo(instr1, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above
|
||||
# write_no_echo(instr2, 'SWEEP ZERO')
|
||||
print('======================\nSWEEPING BOTH DEVICES TO ZERO NOW\n======================')
|
||||
|
||||
#save intensity & WL data as .txt
|
||||
os.chdir('C:/Users/localadmin/Desktop/Users/Ryan')
|
||||
# creates new folder for MAP data
|
||||
# new_folder_name = "Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}"
|
||||
# os.mkdir(new_folder_name)
|
||||
# Here the things will be saved in a new folder under user Lukas !
|
||||
# IMPORTANT last / has to be there, otherwise data cannot be saved and will be lost!!!!!!!!!!!!!!!!
|
||||
# os.chdir('C:/Users/localadmin/Desktop/Users/Ryan/'+ new_folder_name)
|
||||
|
||||
# intensity_data = np.array(intensity_data)
|
||||
# np.savetxt(Settings + f'{angles[0]}°_to_{angles[-1]}°' + experiment_name +'.txt', intensity_data)
|
||||
# TODO: remove/edit experiment_name in line above, as well in sweep_b_val func, rn takes a global variable below
|
||||
|
||||
# wl = np.array(loaded_files.wavelength)
|
||||
# np.savetxt("Wavelength.txt", wl)
|
||||
|
||||
# NOTE: data struct of device_target_values is a list of dictionaries, where each dictionary contains the target values for each device
|
||||
device_target_values = [{'2301034': bval[0], '2101014': bval[1]} for bval in cartesian_coords]
|
||||
|
||||
# call the helper function to carry out the rotation/measurement of spectrum
|
||||
monitor_devices(device_target_values, angles, intensity_data)
|
||||
|
||||
|
||||
################################################################# END OF FUNCTION DEFS ###########################################################################################
|
||||
|
||||
# NOTE: RYAN INTRODUCED SOME FUNCTIONS HERE TO PERFORM THE SCAN
|
||||
|
||||
# Initialise PYVISA ResourceManager
|
||||
rm = pyvisa.ResourceManager()
|
||||
# print(rm.list_resources())
|
||||
# 'ASRL8::INSTR' for dual power supply, 'ASRL9::INSTR' for single power supply (online PC)
|
||||
# 'ASRL10::INSTR' for dual power supply, 'ASRL12::INSTR' for single power supply (offline PC)
|
||||
|
||||
try:
|
||||
# Open the connection with the APS100 dual power supply
|
||||
powerbox_dualsupply = rm.open_resource('ASRL10::INSTR',
|
||||
baud_rate=9600,
|
||||
data_bits=8,
|
||||
parity= pyvisa.constants.Parity.none,
|
||||
stop_bits= pyvisa.constants.StopBits.one,
|
||||
timeout=10000)# 5000 ms timeout
|
||||
write_no_echo(powerbox_dualsupply, 'REMOTE') # turn on the remote mode
|
||||
|
||||
# # select axis for the dual supply, either z-axis(CHAN 1 ^= Supply A) or x-axis(CHAN 2 ^= Supply B)
|
||||
write_no_echo(powerbox_dualsupply, 'CHAN 2')
|
||||
# # #for dual until here
|
||||
|
||||
# Open the connection with the APS100 single power supply
|
||||
powerbox_singlesupply = rm.open_resource('ASRL12::INSTR',
|
||||
baud_rate=9600,
|
||||
data_bits=8,
|
||||
parity= pyvisa.constants.Parity.none,
|
||||
stop_bits= pyvisa.constants.StopBits.one,
|
||||
timeout=10000)# 5000 ms timeout
|
||||
write_no_echo(powerbox_singlesupply, 'REMOTE') # turn on the remote mode
|
||||
#for single until here
|
||||
# TODO: uncomment AMC connection code later, when moving the probe in cryostat is needed.
|
||||
# Setup connection to AMC
|
||||
# amc = AMC.Device(IP)
|
||||
# amc.connect()
|
||||
|
||||
# # Internally, axes are numbered 0 to 2
|
||||
# amc.control.setControlOutput(0, True)
|
||||
# amc.control.setControlOutput(1, True)
|
||||
|
||||
|
||||
# auto = Automation(True, List[String]())
|
||||
# experiment = auto.LightFieldApplication.Experiment
|
||||
# acquireCompleted = AutoResetEvent(False)
|
||||
|
||||
# experiment.Load("2025_03_28_Priyanka_CrSBr_DR_Sweep")
|
||||
# experiment.ExperimentCompleted += experiment_completed # we are hooking a listener.
|
||||
# experiment.SetValue(SpectrometerSettings.GratingSelected, '[750nm,1200][0][0]')
|
||||
# InitializerFilenameParams()
|
||||
|
||||
|
||||
#set scan range and resolution in nanometers
|
||||
range_x = 20000
|
||||
range_y = 20000
|
||||
resolution = 1000
|
||||
# set B-field scan range and resolution (all in T)
|
||||
set_llim_bval = -0.3
|
||||
set_ulim_bval = 0.3
|
||||
set_res_bval = 0.003
|
||||
|
||||
#Here you can specify the filename of the map e.g. put experiment type, exposure time, used filters, etc....
|
||||
# 'PL_SP_700_LP_700_HeNe_52muW_exp_2s_Start_'
|
||||
# experiment_settings = 'PL_X_1859.2_Y_3918.3_HeNe_10.4muW_H_a-axis_LP_SP_650_exp_180s_600g_cwl_930_det_b-axis_Pol_90_l2_45'
|
||||
experiment_settings = 'DR_white_6th spot_Power_G600_exp_25s_l1_40_l2_262_det_b_mag_b'
|
||||
#The program adds the range of the scan as well as the resolution and the date and time of the measurement
|
||||
# f"{set_llim_bval}T_to_{set_ulim_bval}T_{set_res_bval}T_{datetime.datetime.now().strftime('%Y_%m_%d_%H%M')}"
|
||||
experiment_name = f"{set_llim_bval}T_to_{set_ulim_bval}T_stepsize_{set_res_bval}T"
|
||||
|
||||
# this moves the probe in xy-direction and measures spectrum there
|
||||
# move_scan_xy(range_x, range_y, resolution, experiment_settings, experiment_name)
|
||||
|
||||
# ramp_b_val(powerbox_singlesupply, 0, 'y-axis')
|
||||
# ramp_b_val(powerbox_dualsupply, 0, 'z-axis')
|
||||
|
||||
|
||||
# for single/ dual replace and vice versa all the way down
|
||||
# sweep_b_val(powerbox_singlesupply, set_llim_bval, set_ulim_bval, set_res_bval, 'y-axis',
|
||||
# experiment_settings, experiment_name, zerowhenfin_bool=True, reversescan_bool=False, loopscan_bool=True)
|
||||
b_field_rotation(powerbox_dualsupply, powerbox_singlesupply, Babs=0.1, startangle=0, endangle=3,
|
||||
angle_stepsize=1, Settings=experiment_settings, zerowhenfin_bool=True
|
||||
)
|
||||
|
||||
write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode
|
||||
write_no_echo(powerbox_singlesupply, 'LOCAL') # turn off the remote mode
|
||||
|
||||
time.sleep(0.5)
|
||||
# powerbox_dualsupply.close()
|
||||
powerbox_singlesupply.close()
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
# Internally, axes are numbered 0 to 2
|
||||
|
||||
write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode
|
||||
write_no_echo(powerbox_singlesupply, 'LOCAL') # turn off the remote mode
|
||||
|
||||
time.sleep(0.5)
|
||||
powerbox_dualsupply.close()
|
||||
powerbox_singlesupply.close()
|
531
b_rotation_test_v2.py
Normal file
531
b_rotation_test_v2.py
Normal file
@ -0,0 +1,531 @@
|
||||
'''
|
||||
16.04.2025: Initial Test Results for B-field rotation
|
||||
The logic chain in the function works well, now we need the other function that
|
||||
scans along an arbitrary axis in plane.
|
||||
'''
|
||||
############################################
|
||||
# Packages from Ryan
|
||||
import re
|
||||
import math
|
||||
import threading
|
||||
import pyvisa
|
||||
# from pyvisa import ResourceManager, constants
|
||||
|
||||
# B Field Limits (in T)
|
||||
BX_MAX = 1.7
|
||||
BY_MAX = 1.7
|
||||
BZ_MAX = 4.0
|
||||
############################################
|
||||
|
||||
# import AMC
|
||||
import csv
|
||||
import time
|
||||
import clr
|
||||
import sys
|
||||
import os
|
||||
import spe2py as spe
|
||||
import spe_loader as sl
|
||||
import pandas as pd
|
||||
import time
|
||||
# from System.IO import *
|
||||
# from System import String
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
import datetime
|
||||
from typing import Union
|
||||
|
||||
def sep_num_from_units(powerbox_output :str)->list:
|
||||
'''
|
||||
Receives a string as input and separates the numberic value and unit and returns it as a list.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
powerbox_output : str
|
||||
string output from the attocube powerbox, e.g. 1.35325kG
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
list of float value and string (b value and it's units). If string is purely alphabets, then return a single element list
|
||||
|
||||
'''
|
||||
match = re.match(r'\s*([+-]?\d*\.?\d+)([A-Za-z]+)', powerbox_output)
|
||||
if match:
|
||||
numeric_part = float(match.group(1)) # Convert the numeric part to a float
|
||||
alphabetic_part = match.group(2) # Get the alphabetic part
|
||||
return [numeric_part, alphabetic_part]
|
||||
else:
|
||||
return [powerbox_output,]
|
||||
|
||||
|
||||
def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
|
||||
"""helper function for the Attocube APS100 that queries a function to the device, removing the echo.
|
||||
|
||||
Args:
|
||||
instr (pyvisa.resources.Resource):
|
||||
command (str): commands, can be stringed in series with ; between commands
|
||||
sleeptime (float, optional): delay time between commands. Defaults to 0.01.
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
""" ''''''
|
||||
try:
|
||||
print(f"Sending command: {command}")
|
||||
instr.write(command)
|
||||
time.sleep(sleeptime)
|
||||
echo_response = instr.read() # Read and discard the echo
|
||||
# print(f"Echo response: {echo_response}")
|
||||
actual_response = instr.read() # Read the actual response
|
||||
print(f"Actual response: {actual_response}")
|
||||
return actual_response
|
||||
except pyvisa.VisaIOError as e:
|
||||
print(f"Error communicating with instrument: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
|
||||
"""helper function for the Attocube APS100 that writes a function to the device, removing the echo.
|
||||
|
||||
Args:
|
||||
instr (pyvisa.resources.Resource):
|
||||
command (str): commands, can be stringed in series with ; between commands
|
||||
sleeptime (float, optional): delay time between commands. Defaults to 0.01.
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
""" ''''''
|
||||
try:
|
||||
print(f"Sending command: {command}")
|
||||
instr.write(command)
|
||||
time.sleep(sleeptime) # Give the device some time to process
|
||||
try:
|
||||
while True:
|
||||
echo_response = instr.read() # Read and discard the echo
|
||||
# print(f"Echo response: {echo_response}")
|
||||
except pyvisa.VisaIOError as e:
|
||||
# Expected timeout after all echoed responses are read
|
||||
if e.error_code != pyvisa.constants.VI_ERROR_TMO:
|
||||
raise
|
||||
except pyvisa.VisaIOError as e:
|
||||
print(f"Error communicating with instrument: {e}")
|
||||
|
||||
|
||||
def generate_angle_coord_list(radius, start_angle, end_angle, step_size, clockwise=True):
|
||||
# TODO: DOCS
|
||||
"""Creates a list of discrete cartesian coordinates (x,y), given the radius, start- and end angles, the angle step size, and the direction of rotation.
|
||||
Function then returns a list of two lists: list of angles and list of cartesian coordinates (x,y coordinates in a tuple).
|
||||
|
||||
Args:
|
||||
radius (_type_): _description_
|
||||
start_angle (_type_): _description_
|
||||
end_angle (_type_): _description_
|
||||
step_size (_type_): _description_
|
||||
clockwise (bool, optional): _description_. Defaults to True.
|
||||
|
||||
Returns:
|
||||
_type_: _description_
|
||||
""" """"""
|
||||
# Initialize lists to hold angles and (x, y) pairs
|
||||
angles = []
|
||||
coordinates = []
|
||||
|
||||
# Normalize angles to the range [0, 360)
|
||||
start_angle = start_angle % 360
|
||||
end_angle = end_angle % 360
|
||||
|
||||
if not clockwise:
|
||||
# Clockwise rotation
|
||||
current_angle = start_angle
|
||||
while True:
|
||||
# Append the current angle to the angles list
|
||||
angles.append(current_angle % 360)
|
||||
|
||||
# Convert the current angle to radians
|
||||
current_angle_rad = math.radians(current_angle % 360)
|
||||
|
||||
# Convert polar to Cartesian coordinates
|
||||
x = radius * math.cos(current_angle_rad)
|
||||
y = radius * math.sin(current_angle_rad)
|
||||
|
||||
# Append the (x, y) pair to the list
|
||||
coordinates.append((x, y))
|
||||
|
||||
# Check if we've reached the end_angle (handling wrap-around) (current_angle - step_size) % 360 == end_angle or
|
||||
if current_angle % 360 == end_angle:
|
||||
break
|
||||
|
||||
# Decrement the current angle by the step size
|
||||
current_angle -= step_size
|
||||
if current_angle < 0:
|
||||
current_angle += 360
|
||||
else:
|
||||
# Counterclockwise rotation
|
||||
current_angle = start_angle
|
||||
while True:
|
||||
# Append the current angle to the angles list
|
||||
angles.append(current_angle % 360)
|
||||
|
||||
# Convert the current angle to radians
|
||||
current_angle_rad = math.radians(current_angle % 360)
|
||||
|
||||
# Convert polar to Cartesian coordinates
|
||||
x = radius * math.cos(current_angle_rad)
|
||||
y = radius * math.sin(current_angle_rad)
|
||||
|
||||
# Append the (x, y) pair to the list
|
||||
coordinates.append((x, y))
|
||||
|
||||
# Check if we've reached the end_angle (handling wrap-around) (current_angle + step_size) % 360 == end_angle or
|
||||
if current_angle % 360 == end_angle:
|
||||
break
|
||||
|
||||
# Increment the current angle by the step size
|
||||
current_angle += step_size
|
||||
if current_angle >= 360:
|
||||
current_angle -= 360
|
||||
|
||||
return [angles, coordinates]
|
||||
|
||||
def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Resource,
|
||||
Babs:float, startangle:float, endangle:float, angle_stepsize:float,
|
||||
Settings:str, clockwise=True, base_file_name='', zerowhenfin_bool=False)->None:
|
||||
# TODO: update docs
|
||||
"""Rotation of the b-field in discrete steps, spectrum is measured at each discrete step in the rotation. Scan angle is
|
||||
defined as the angle between the x-axis and the current B-field vector, i.e., in the anticlockwise direction.
|
||||
|
||||
Args:
|
||||
instr1 (pyvisa.resources.Resource): _description_
|
||||
instr2 (pyvisa.resources.Resource): _description_
|
||||
Babs (float): absolute B-field value in T
|
||||
startangle (float): start angle in degrees
|
||||
endangle (float): end angle in degrees
|
||||
angle_stepsize (float): angle step size in degrees
|
||||
clockwise (bool): determines the direction of rotation of the B-field. Defaults to True.
|
||||
zerowhenfin_bool (bool, optional): after finishing the rotation, both B-field components should be set to 0 T. Defaults to False.
|
||||
"""
|
||||
|
||||
# TODO: add logging to the script
|
||||
|
||||
# defines the folder, in which the data from the spectrometer is temporarily stored in
|
||||
temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/B_Field_Dump"
|
||||
# temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
|
||||
|
||||
if base_file_name =='':
|
||||
base_file_name = datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')
|
||||
|
||||
start_time = time.time() # start of the scan function
|
||||
|
||||
startangle = startangle % 360
|
||||
endangle = endangle % 360 # ensures that the angles are within [0,360)
|
||||
|
||||
idnstr1 = query_no_echo(instr1, '*IDN?')
|
||||
idnstr2 = query_no_echo(instr2, '*IDN?')
|
||||
|
||||
intensity_data = [] # To store data from each scan
|
||||
cwd = os.getcwd() # save original directory
|
||||
|
||||
# find which one is the dual power supply, then, ramp B_x to Babs value
|
||||
if '2301034' in idnstr1: # serial no. the dual power supply
|
||||
pass
|
||||
elif '2101034' in idnstr2:
|
||||
# swap instruments, instr 1 to be the dual power supply (^= x-axis)
|
||||
instr1, instr2 = instr2, instr1
|
||||
|
||||
# save initial low and high sweep limits of each device, and set them back after the rotation
|
||||
instr1_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
|
||||
instr2_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
|
||||
if instr1_bsettings[0][0] == 'T':
|
||||
instr1_bsettings[1][0] = instr1_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
|
||||
instr1_bsettings[2][0] = instr1_bsettings[2][0]*0.1
|
||||
if instr2_bsettings[0][0] == 'T':
|
||||
instr2_bsettings[1][0] = instr2_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
|
||||
instr2_bsettings[2][0] = instr2_bsettings[2][0]*0.1
|
||||
|
||||
# initialise the sweep angle list as well as the sweep limits and directions for each instrument
|
||||
instr1_lim, instr2_lim = 'LLIM', 'ULIM'
|
||||
instr1_sweep, instr2_sweep = 'DOWN', 'UP'
|
||||
|
||||
# create lists of angles and discrete Cartesian coordinates
|
||||
angles, cartesian_coords = generate_angle_coord_list(Babs, startangle, endangle, angle_stepsize, clockwise=clockwise)
|
||||
|
||||
if clockwise: # NOTE: old conditional was: startangle > endangle see if this works....
|
||||
# reverse sweep limits and directions for the clockwise rotation
|
||||
instr1_lim, instr2_lim = instr2_lim, instr1_lim
|
||||
instr1_sweep, instr2_sweep = instr2_sweep, instr1_sweep
|
||||
|
||||
|
||||
# TODO: i dont think we need to change the rates just yet, think about this later
|
||||
'''
|
||||
# list of rates (with units) for diff ranges of each device, only up to Range 1 for single power supply as that is already
|
||||
# the max recommended current.
|
||||
init_range_lst1 = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'RATE? 0;RATE? 1;RATE? 2').split(';'))
|
||||
init_range_lst2 = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'RATE? 0;RATE? 1').split(';'))
|
||||
|
||||
min_range_lst = [min(el1[0], el2[0]) for el1,el2 in zip(init_range_lst1, init_range_lst2)] # min rates for each given range
|
||||
|
||||
# set both devices to the min rates
|
||||
write_no_echo(instr1, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
|
||||
write_no_echo(instr2, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
|
||||
'''
|
||||
|
||||
|
||||
# TODO: see if this is the desired process: to always start from the x-axis ASK LUKAS
|
||||
if Babs <= BX_MAX:
|
||||
# write_no_echo(instr1, f'CHAN 2;ULIM {Babs*10};SWEEP UP') # sets to B_x, the B_x upper limit and sweeps the magnet field to the upper limit
|
||||
print(f'SWITCHED TO BX, SWEEPING B-X TO {Babs} T NOW')
|
||||
else:
|
||||
raise ValueError(f'{Babs=}T value exceeds the max limit of the Bx field {BX_MAX}T!')
|
||||
|
||||
# wait for Babs to be reached by the Bx field
|
||||
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
|
||||
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
|
||||
while abs(actual_bval - Babs) > 0.0001:
|
||||
time.sleep(5) # little break
|
||||
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1
|
||||
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
|
||||
actual_bval = Babs # NOTE: ONLY FOR TESTING; REMOVE THIS LINE IN ACTUAL USE
|
||||
|
||||
|
||||
# TODO: copy and mod code to see if block logic works, test in lab
|
||||
# NOTE: implement PID control, possibly best option to manage the b field DO THIS LATER ON, WE DO DISCRETE B VALUES RN
|
||||
# Helper function that listens to a device
|
||||
def listen_to_device(device_id, target_value, shared_values, lock, all_targets_met_event):
|
||||
while not all_targets_met_event.is_set(): # Loop until the event is set
|
||||
# value = 0 # Simulate receiving a float from the device INSERT QUERY NO ECHO HERE TO ASK FOR DEVICE IMAG
|
||||
if '2301034' in device_id:
|
||||
value = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
|
||||
if value <= target_value[device_id]:
|
||||
# write_no_echo(instr1, f"CHAN 2;ULIM {target_value[device_id]*10};SWEEP UP")
|
||||
print(f'sweeping Bx up to {target_value[device_id]}T')
|
||||
else:
|
||||
# write_no_echo(instr1, "CHAN 2;LLIM {target_value[device_id]*10};SWEEP DOWN")
|
||||
print(f'sweeping Bx down to {target_value[device_id]}T')
|
||||
value = target_value['2301034'] # NOTE: ONLY FOR TESTING; REMOVE IN REAL USE
|
||||
time.sleep(6)
|
||||
|
||||
elif '2101014' in device_id:
|
||||
value = sep_num_from_units(query_no_echo(instr2, 'IMAG?'))[0]*0.1 # convert kG to T
|
||||
if value <= target_value[device_id]:
|
||||
# write_no_echo(instr2, f"ULIM {target_value[device_id]*10};SWEEP UP")
|
||||
print(f'sweeping By up to {target_value[device_id]}T')
|
||||
else:
|
||||
# write_no_echo(instr2, "LLIM {target_value[device_id]*10};SWEEP DOWN")
|
||||
print(f'sweeping By down to {target_value[device_id]}T')
|
||||
value = target_value['2101014'] # NOTE: ONLY FOR TESTING; REMOVE IN REAL USE
|
||||
time.sleep(3)
|
||||
|
||||
else:
|
||||
continue # Skip if device ID is not recognized
|
||||
print(f"Device {device_id} reports value: {value} T")
|
||||
|
||||
# time.sleep(2)
|
||||
|
||||
with lock:
|
||||
shared_values[device_id] = value
|
||||
# Check if both devices have met their targets
|
||||
if all(shared_values.get(device) is not None and abs(value - target_value[device]) <= 0.0001
|
||||
for device,value in shared_values.items()):
|
||||
print(f"Both devices reached their target values: {shared_values}")
|
||||
all_targets_met_event.set() # Signal that both targets are met
|
||||
|
||||
# time.sleep(1) # Simulate periodic data checking
|
||||
|
||||
# Main function to manage threads and iterate over target values
|
||||
def monitor_devices(device_target_values, angles_lst, intensity_data=intensity_data):
|
||||
for iteration, target in enumerate(device_target_values):
|
||||
print(f"\nStarting iteration {iteration+1} for target values: {target}")
|
||||
# Shared dictionary to store values from devices
|
||||
shared_values = {device: None for device in target.keys()}
|
||||
# Event to signal when both target values are reached
|
||||
all_targets_met_event = threading.Event()
|
||||
|
||||
# Lock to synchronize access to shared_values
|
||||
lock = threading.Lock()
|
||||
|
||||
# Create and start threads for each device
|
||||
threads = []
|
||||
for device_id in target.keys():
|
||||
thread = threading.Thread(target=listen_to_device, args=(device_id, target, shared_values, lock, all_targets_met_event))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
print(f"======================\nThread started for device {device_id}\n======================")
|
||||
|
||||
# Wait until both devices meet their target values
|
||||
all_targets_met_event.wait()
|
||||
print(f"Both target values for iteration {iteration+1} met. Performing action...")
|
||||
# Clean up threads
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
print(f"Threads for iteration {iteration+1} closed.\n")
|
||||
|
||||
|
||||
print(f'COLLECTING SPECTRUM FOR ANGLE {angles_lst[iteration]}°\n')
|
||||
# Perform some action after both targets are met
|
||||
# we acquire with the LF
|
||||
# acquire_name_spe = f'{base_file_name}_{angles_lst[iteration]}°' # NOTE: save each intensity file with the given angle
|
||||
# AcquireAndLock(acquire_name_spe) #this creates a .spe file with the scan name.
|
||||
|
||||
# read the .spe file and get the data as loaded_files
|
||||
# cwd = os.getcwd() # save original directory
|
||||
# os.chdir(temp_folder_path) #change directory
|
||||
# loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable
|
||||
# os.chdir(cwd) # go back to original directory
|
||||
|
||||
# Delete the created .spe file from acquiring after getting necessary info
|
||||
# spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe')
|
||||
# os.remove(spe_file_path)
|
||||
|
||||
points_left = len(angles) - iteration - 1
|
||||
|
||||
print('Points left in the scan: ', points_left)
|
||||
|
||||
#append the intensity data as it is (so after every #of_wl_points, the spectrum of the next point begins)
|
||||
# intensity_data.append(loaded_files.data[0][0][0])
|
||||
|
||||
#prints total time the mapping lasted
|
||||
end_time = time.time()
|
||||
elapsed_time = (end_time - start_time) / 60
|
||||
print('Scan time: ', elapsed_time, 'minutes')
|
||||
|
||||
# reset both devices to original sweep limits
|
||||
write_no_echo(instr1, f'LLIM {instr1_bsettings[1][0]*10};ULIM {instr1_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
|
||||
write_no_echo(instr2, f'LLIM {instr2_bsettings[1][0]*10};ULIM {instr2_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
|
||||
|
||||
|
||||
# TODO: uncomment later if resetting original rates implemented
|
||||
'''
|
||||
# reset both devices' initial rates for each range
|
||||
write_no_echo(instr1, f'RANGE 0 {init_range_lst1[0][0]};RANGE 1 {init_range_lst1[1][0]};RANGE 2 {init_range_lst1[2][0]}') # reset the initial limits of the device after the scan
|
||||
write_no_echo(instr2, f'RANGE 0 {init_range_lst2[0][0]};RANGE 1 {init_range_lst2[1][0]}') # reset the initial limits of the device after the scan
|
||||
'''
|
||||
|
||||
|
||||
if zerowhenfin_bool:
|
||||
# write_no_echo(instr1, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above
|
||||
# write_no_echo(instr2, 'SWEEP ZERO')
|
||||
print('======================\nSWEEPING BOTH DEVICES TO ZERO NOW\n======================')
|
||||
|
||||
#save intensity & WL data as .txt
|
||||
os.chdir('C:/Users/localadmin/Desktop/Users/Ryan')
|
||||
# creates new folder for MAP data
|
||||
# new_folder_name = "Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}"
|
||||
# os.mkdir(new_folder_name)
|
||||
# Here the things will be saved in a new folder under user Lukas !
|
||||
# IMPORTANT last / has to be there, otherwise data cannot be saved and will be lost!!!!!!!!!!!!!!!!
|
||||
# os.chdir('C:/Users/localadmin/Desktop/Users/Ryan/'+ new_folder_name)
|
||||
|
||||
# intensity_data = np.array(intensity_data)
|
||||
# np.savetxt(Settings + f'{angles[0]}°_to_{angles[-1]}°' + experiment_name +'.txt', intensity_data)
|
||||
# TODO: remove/edit experiment_name in line above, as well in sweep_b_val func, rn takes a global variable below
|
||||
|
||||
# wl = np.array(loaded_files.wavelength)
|
||||
# np.savetxt("Wavelength.txt", wl)
|
||||
|
||||
# NOTE: data struct of device_target_values is a list of dictionaries, where each dictionary contains the target values for each device
|
||||
device_target_values = [{'2301034': bval[0], '2101014': bval[1]} for bval in cartesian_coords]
|
||||
|
||||
# call the helper function to carry out the rotation/measurement of spectrum
|
||||
monitor_devices(device_target_values, angles, intensity_data)
|
||||
|
||||
|
||||
################################################################# END OF FUNCTION DEFS ###########################################################################################
|
||||
|
||||
# NOTE: RYAN INTRODUCED SOME FUNCTIONS HERE TO PERFORM THE SCAN
|
||||
|
||||
# Initialise PYVISA ResourceManager
|
||||
rm = pyvisa.ResourceManager()
|
||||
# print(rm.list_resources())
|
||||
# 'ASRL8::INSTR' for dual power supply, 'ASRL9::INSTR' for single power supply (online PC)
|
||||
# 'ASRL10::INSTR' for dual power supply, 'ASRL12::INSTR' for single power supply (offline PC)
|
||||
|
||||
try:
|
||||
# Open the connection with the APS100 dual power supply
|
||||
powerbox_dualsupply = rm.open_resource('ASRL10::INSTR',
|
||||
baud_rate=9600,
|
||||
data_bits=8,
|
||||
parity= pyvisa.constants.Parity.none,
|
||||
stop_bits= pyvisa.constants.StopBits.one,
|
||||
timeout=10000)# 5000 ms timeout
|
||||
write_no_echo(powerbox_dualsupply, 'REMOTE') # turn on the remote mode
|
||||
|
||||
# # select axis for the dual supply, either z-axis(CHAN 1 ^= Supply A) or x-axis(CHAN 2 ^= Supply B)
|
||||
write_no_echo(powerbox_dualsupply, 'CHAN 2')
|
||||
# # #for dual until here
|
||||
|
||||
# Open the connection with the APS100 single power supply
|
||||
powerbox_singlesupply = rm.open_resource('ASRL12::INSTR',
|
||||
baud_rate=9600,
|
||||
data_bits=8,
|
||||
parity= pyvisa.constants.Parity.none,
|
||||
stop_bits= pyvisa.constants.StopBits.one,
|
||||
timeout=10000)# 5000 ms timeout
|
||||
write_no_echo(powerbox_singlesupply, 'REMOTE') # turn on the remote mode
|
||||
#for single until here
|
||||
# TODO: uncomment AMC connection code later, when moving the probe in cryostat is needed.
|
||||
# Setup connection to AMC
|
||||
# amc = AMC.Device(IP)
|
||||
# amc.connect()
|
||||
|
||||
# # Internally, axes are numbered 0 to 2
|
||||
# amc.control.setControlOutput(0, True)
|
||||
# amc.control.setControlOutput(1, True)
|
||||
|
||||
|
||||
# auto = Automation(True, List[String]())
|
||||
# experiment = auto.LightFieldApplication.Experiment
|
||||
# acquireCompleted = AutoResetEvent(False)
|
||||
|
||||
# experiment.Load("2025_03_28_Priyanka_CrSBr_DR_Sweep")
|
||||
# experiment.ExperimentCompleted += experiment_completed # we are hooking a listener.
|
||||
# experiment.SetValue(SpectrometerSettings.GratingSelected, '[750nm,1200][0][0]')
|
||||
# InitializerFilenameParams()
|
||||
|
||||
|
||||
#set scan range and resolution in nanometers
|
||||
range_x = 20000
|
||||
range_y = 20000
|
||||
resolution = 1000
|
||||
# set B-field scan range and resolution (all in T)
|
||||
set_llim_bval = -0.3
|
||||
set_ulim_bval = 0.3
|
||||
set_res_bval = 0.003
|
||||
|
||||
#Here you can specify the filename of the map e.g. put experiment type, exposure time, used filters, etc....
|
||||
# 'PL_SP_700_LP_700_HeNe_52muW_exp_2s_Start_'
|
||||
# experiment_settings = 'PL_X_1859.2_Y_3918.3_HeNe_10.4muW_H_a-axis_LP_SP_650_exp_180s_600g_cwl_930_det_b-axis_Pol_90_l2_45'
|
||||
experiment_settings = 'DR_white_6th spot_Power_G600_exp_25s_l1_40_l2_262_det_b_mag_b'
|
||||
#The program adds the range of the scan as well as the resolution and the date and time of the measurement
|
||||
# f"{set_llim_bval}T_to_{set_ulim_bval}T_{set_res_bval}T_{datetime.datetime.now().strftime('%Y_%m_%d_%H%M')}"
|
||||
experiment_name = f"{set_llim_bval}T_to_{set_ulim_bval}T_stepsize_{set_res_bval}T"
|
||||
|
||||
# this moves the probe in xy-direction and measures spectrum there
|
||||
# move_scan_xy(range_x, range_y, resolution, experiment_settings, experiment_name)
|
||||
|
||||
# ramp_b_val(powerbox_singlesupply, 0, 'y-axis')
|
||||
# ramp_b_val(powerbox_dualsupply, 0, 'z-axis')
|
||||
|
||||
|
||||
# for single/ dual replace and vice versa all the way down
|
||||
# sweep_b_val(powerbox_singlesupply, set_llim_bval, set_ulim_bval, set_res_bval, 'y-axis',
|
||||
# experiment_settings, experiment_name, zerowhenfin_bool=True, reversescan_bool=False, loopscan_bool=True)
|
||||
b_field_rotation(powerbox_dualsupply, powerbox_singlesupply, Babs=0.1, startangle=0, endangle=3,
|
||||
angle_stepsize=1, Settings=experiment_settings, zerowhenfin_bool=True
|
||||
)
|
||||
|
||||
write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode
|
||||
write_no_echo(powerbox_singlesupply, 'LOCAL') # turn off the remote mode
|
||||
|
||||
time.sleep(0.5)
|
||||
# powerbox_dualsupply.close()
|
||||
powerbox_singlesupply.close()
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
# Internally, axes are numbered 0 to 2
|
||||
|
||||
write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode
|
||||
write_no_echo(powerbox_singlesupply, 'LOCAL') # turn off the remote mode
|
||||
|
||||
time.sleep(0.5)
|
||||
powerbox_dualsupply.close()
|
||||
powerbox_singlesupply.close()
|
1
magnet_modules/__init__.py
Normal file
1
magnet_modules/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
|
BIN
magnet_modules/__pycache__/__init__.cpython-311.pyc
Normal file
BIN
magnet_modules/__pycache__/__init__.cpython-311.pyc
Normal file
Binary file not shown.
BIN
magnet_modules/__pycache__/sweep_functions.cpython-311.pyc
Normal file
BIN
magnet_modules/__pycache__/sweep_functions.cpython-311.pyc
Normal file
Binary file not shown.
2
magnet_modules/sweep_functions.py
Normal file
2
magnet_modules/sweep_functions.py
Normal file
@ -0,0 +1,2 @@
|
||||
def test_func():
|
||||
print('hello world! from test func')
|
Loading…
x
Reference in New Issue
Block a user