Save-Path-File-Changes #1

Merged
rtan merged 41 commits from Save-Path-File-Changes into main 2025-04-23 13:56:19 +00:00
15 changed files with 3377 additions and 51 deletions

View File

@ -7,6 +7,8 @@ Lightfield + Positioner
############################################ ############################################
# Packages from Ryan # Packages from Ryan
import re import re
import math
import threading
import pyvisa import pyvisa
# from pyvisa import ResourceManager, constants # from pyvisa import ResourceManager, constants
@ -31,6 +33,7 @@ from System import String
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import datetime import datetime
from typing import Union
#First choose your controller #First choose your controller
@ -162,6 +165,8 @@ def move_xy(target_x, target_y): # moving in x and y direction closed to desired
# intensity_data = [] # To store data from each scan # intensity_data = [] # To store data from each scan
# data_list = [] # data_list = []
def move_scan_xy(range_x, range_y, resolution, Settings, baseFileName): def move_scan_xy(range_x, range_y, resolution, Settings, baseFileName):
""" """
This function moves the positioners to scan the sample with desired ranges and resolution in 2 dimensions. This function moves the positioners to scan the sample with desired ranges and resolution in 2 dimensions.
@ -204,7 +209,7 @@ def move_scan_xy(range_x, range_y, resolution, Settings, baseFileName):
#This gives a directory, in which the script will save the spectrum of each spot as spe #This gives a directory, in which the script will save the spectrum of each spot as spe
#However, it will open the spectrum, convert it to txt, add it to the intensity_data and delete the spe file #However, it will open the spectrum, convert it to txt, add it to the intensity_data and delete the spe file
Path_save = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test" temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
#scanning loop #scanning loop
for i, x_positions in enumerate(array_x): for i, x_positions in enumerate(array_x):
@ -218,20 +223,18 @@ def move_scan_xy(range_x, range_y, resolution, Settings, baseFileName):
move_axis(axis_y, y_positions) move_axis(axis_y, y_positions)
y = True y = True
#we acquire with the LF #we acquire with the LF
acquire_name_spe = f'{baseFileName}_X{x_positions}_Y{y_positions}' acquire_name_spe = f'{baseFileName}_X{x_positions}_Y{y_positions}'
AcquireAndLock(acquire_name_spe) #this creates a .spe file with the scan name. AcquireAndLock(acquire_name_spe) #this creates a .spe file with the scan name.
#read the .spe file and get the data as loaded_files #read the .spe file and get the data as loaded_files
cwd = os.getcwd() # save original directory cwd = os.getcwd() # save original directory
os.chdir(Path_save) #change directory os.chdir(temp_folder_path) #change directory
loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable
os.chdir(cwd) # go back to original directory os.chdir(cwd) # go back to original directory
# Delete the created .spe file from acquiring after getting necessary info # Delete the created .spe file from acquiring after getting necessary info
spe_file_path = os.path.join(Path_save, acquire_name_spe + '.spe') spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe')
os.remove(spe_file_path) os.remove(spe_file_path)
distance = calculate_distance(x_positions, y_positions,amc.move.getPosition(axis_x), amc.move.getPosition(axis_y)) distance = calculate_distance(x_positions, y_positions,amc.move.getPosition(axis_x), amc.move.getPosition(axis_y))
@ -301,7 +304,8 @@ def sep_num_from_units(powerbox_output :str)->list:
else: else:
return [powerbox_output,] return [powerbox_output,]
def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0.01)->str:
def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
"""helper function for the Attocube APS100 that queries a function to the device, removing the echo. """helper function for the Attocube APS100 that queries a function to the device, removing the echo.
Args: Args:
@ -325,7 +329,8 @@ def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0.01)-
print(f"Error communicating with instrument: {e}") print(f"Error communicating with instrument: {e}")
return None return None
def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0.01)->str:
def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
"""helper function for the Attocube APS100 that writes a function to the device, removing the echo. """helper function for the Attocube APS100 that writes a function to the device, removing the echo.
Args: Args:
@ -351,22 +356,25 @@ def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0.01)-
except pyvisa.VisaIOError as e: except pyvisa.VisaIOError as e:
print(f"Error communicating with instrument: {e}") print(f"Error communicating with instrument: {e}")
# TODO: implement the reverse scan and zero when finish functionality
# receive values in units of T, rescale in kg to talk with the power supplyy. 1T = 10kG # receive values in units of T, rescale in kg to talk with the power supplyy. 1T = 10kG
# NOTE: removed singlepowersupply_bool, reading serial-nr. of the device instead.
# old save folder: "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float, def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
res:float, Settings:str, base_file_name='', path_save="C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test", res:float, magnet_coil:str, Settings:str, base_file_name='',
singlepowersupply_bool=False, reversescan_bool=False, zerowhenfin_bool=False)->None: reversescan_bool=False, zerowhenfin_bool=False, loopscan_bool=False)->None:
""" this function performs a sweep of the B field of the chosen magnet coil. It creates a list o B values from the given min and max values, with the given resolution. For each value, a measurement of the spectrum # TODO: update docs in the end
of the probe in the cryostat is made, using the LightField spectrometer. """ this function performs a sweep of the B field of the chosen magnet coil. It creates a list o B values from the given min and max values,
with the given resolution. For each value, a measurement of the spectrum of the probe in the cryostat is made, using the LightField spectrometer.
Args: Args:
instr (pyvisa.resources.Resource): chosen power supply device to connect to instr (pyvisa.resources.Resource): chosen power supply device to connect to
min_bval (float): min B value of the scan (please input in units of Tesla) min_bval (float): min B value of the scan (please input in units of Tesla)
max_bval (float): max B value of the scan (please input in units of Tesla) max_bval (float): max B value of the scan (please input in units of Tesla)
res (float): resolution of the list of B values (please input in units of Tesla) res (float): resolution of the list of B values (please input in units of Tesla)
magnet_coil (str): select magnet coil to be used. String should be 'x-axis','y-axis' or 'z-axis'.
Settings (str): experiment settings, included in file name. Settings (str): experiment settings, included in file name.
base_file_name (str, optional): base file name. Defaults to ''. base_file_name (str, optional): base file name. Defaults to ''.
path_save (str, optional): file path where the file will be saved. Defaults to "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test".
singlepowersupply_bool (bool, optional): _description_. Defaults to False. singlepowersupply_bool (bool, optional): _description_. Defaults to False.
reversescan_bool (bool, optional): _description_. Defaults to False. reversescan_bool (bool, optional): _description_. Defaults to False.
zerowhenfin_bool (bool, optional): _description_. Defaults to False. zerowhenfin_bool (bool, optional): _description_. Defaults to False.
@ -375,31 +383,76 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
ValueError: when By limit is exceeded. ValueError: when By limit is exceeded.
ValueError: when Bz limit is exceeded. ValueError: when Bz limit is exceeded.
ValueError: when Bx limit is exceeded. ValueError: when Bx limit is exceeded.
ConnectionError: when no device is connected.
""" '''''' """ ''''''
def pyramid_list(lst) -> Union[list, np.ndarray]:
"""reverses the list and removes the first element of reversed list. Then, this is appended to
the end of the original list and returned as the 'pyramid' list.
Args:
lst (list or np.ndarray):
Raises:
TypeError: if the input object isn't a list or np.ndarray
Returns:
Union[list, np.ndarray]: the pyramid list
""" ''''''
if isinstance(lst, list):
return lst + lst[-2::-1]
elif isinstance(lst, np.ndarray):
return np.append(lst, lst[-2::-1])
else:
raise TypeError('Please input a list!')
# defines the folder, in which the data from the spectrometer is temporarily stored in
temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
# if path_save =='':
# path_save = datetime.datetime.now().strftime("%Y_%m_%d_%H%M_hrs_")
if base_file_name =='': if base_file_name =='':
base_file_name = datetime.datetime.now().strftime('%Y_%m_%d_%H.%M') base_file_name = datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')
start_time = time.time() start_time = time.time() # start of the scan function
instr_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
instr_info = query_no_echo(instr, '*IDN?')
instr_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element list of lists containing the set unit, llim and ulim
if instr_bsettings[0][0] == 'T': if instr_bsettings[0][0] == 'T':
instr_bsettings[1][0] = instr_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T instr_bsettings[1][0] = instr_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
instr_bsettings[2][0] = instr_bsettings[2][0]*0.1 instr_bsettings[2][0] = instr_bsettings[2][0]*0.1
if singlepowersupply_bool: # checks limits of By # if singlepowersupply_bool: # checks limits of Bx or By
# if (min_bval< -BY_MAX) or (max_bval > BY_MAX):
# raise ValueError('Input limits exceed that of the magnet By! Please input smaller limits.')
# elif '1' in query_no_echo(instr, 'CHAN?'): # check if its the coils for Bz
# if (min_bval < -BZ_MAX) or (max_bval > BZ_MAX):
# raise ValueError('Input limits exceed that of the magnet (Bz)! Please input smaller limits.')
# else: # checks limits of Bx
# if (min_bval< -BX_MAX) or (max_bval > BX_MAX):
# raise ValueError('Input limits exceed that of the magnet Bx! Please input smaller limits.')
if '2101014' in instr_info and (magnet_coil=='y-axis'): # single power supply
if (min_bval< -BY_MAX) or (max_bval > BY_MAX): if (min_bval< -BY_MAX) or (max_bval > BY_MAX):
raise ValueError('Input limits exceed that of the magnet By! Please input smaller limits.') raise ValueError('Input limits exceed that of the magnet By! Please input smaller limits.')
elif '1' in query_no_echo(instr, 'CHAN?'): # check if its the coils for Bz elif '2301034' in instr_info: # dual power supply
if (min_bval < -BZ_MAX) or (max_bval > BZ_MAX): if magnet_coil=='z-axis': # check if its the coils for Bz
raise ValueError('Input limits exceed that of the magnet (Bz)! Please input smaller limits.') if (min_bval < -BZ_MAX) or (max_bval > BZ_MAX):
else: # checks limits of Bx raise ValueError('Input limits exceed that of the magnet (Bz)! Please input smaller limits.')
if (min_bval< -BX_MAX) or (max_bval > BX_MAX): write_no_echo(instr, 'CHAN 1')
raise ValueError('Input limits exceed that of the magnet Bx! Please input smaller limits.') elif magnet_coil=='x-axis': # checks limits of Bx
if (min_bval< -BX_MAX) or (max_bval > BX_MAX):
raise ValueError('Input limits exceed that of the magnet Bx! Please input smaller limits.')
write_no_echo(instr, 'CHAN 2')
else:
raise ConnectionError('Device is not connected!')
write_no_echo(instr, f'LLIM {min_bval*10};ULIM {max_bval*10}') # sets the given limits, must convert to kG for the device to read write_no_echo(instr, f'LLIM {min_bval*10};ULIM {max_bval*10}') # sets the given limits, must convert to kG for the device to read
bval_lst = np.arange(min_bval, max_bval + res, res) # creates list of B values to measure at, with given resolution, in T bval_lst = np.arange(min_bval, max_bval + res, res) # creates list of B values to measure at, with given resolution, in T
init_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1 # queries the initial B value of the coil, rescale from kG to T # TODO: unused, see if can remove
# init_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1 # queries the initial B value of the coil, rescale from kG to T
init_lim, subsequent_lim = 'LLIM', 'ULIM' init_lim, subsequent_lim = 'LLIM', 'ULIM'
init_sweep, subsequent_sweep = 'DOWN', 'UP' init_sweep, subsequent_sweep = 'DOWN', 'UP'
@ -417,24 +470,19 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
init_lim, subsequent_lim = subsequent_lim, init_lim init_lim, subsequent_lim = subsequent_lim, init_lim
init_sweep, subsequent_sweep = subsequent_sweep, init_sweep init_sweep, subsequent_sweep = subsequent_sweep, init_sweep
# creates the pyramid list of B vals if one were to perform a hysteresis measurement
if loopscan_bool:
bval_lst = pyramid_list(bval_lst)
total_points = len(bval_lst) total_points = len(bval_lst)
middle_index_bval_lst = total_points // 2
intensity_data = [] # To store data from each scan intensity_data = [] # To store data from each scan
cwd = os.getcwd() # save original directory cwd = os.getcwd() # save original directory
#This gives a directory, in which the script will save the spectrum of each spot as spe # NOTE: helper function for the scanning loop
#However, it will open the spectrum, convert it to txt, add it to the intensity_data and delete the spe file def helper_scan_func(idx, bval, instr=instr, init_lim=init_lim, init_sweep=init_sweep,
#scanning loop subsequent_lim=subsequent_lim, subsequent_sweep=subsequent_sweep, sleep=5):
for i, bval in enumerate(bval_lst): if idx == 0: # for first iteration, sweep to one of the limits
# if init_bval == bval:
# # if initial bval is equal to the element of the given iteration from the bval_lst, then commence measuring the spectrum
# pass
# else:
# TODO: improve the conditional block later on... try to shorten the number of conditionals needed/flatten the nested conditionals
# else, travel to the lower or higher limit, depending on how far the init val is to each bound, and commence the measurement from there on
# if not reversescan_bool:
if i == 0: # for first iteration, sweep to one of the limits
write_no_echo(instr, f'{init_lim} {bval*10}') # convert back to kG write_no_echo(instr, f'{init_lim} {bval*10}') # convert back to kG
write_no_echo(instr, f'SWEEP {init_sweep}') write_no_echo(instr, f'SWEEP {init_sweep}')
else: else:
@ -450,6 +498,40 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
# update the actual bval # update the actual bval
print(f'Actual magnet strength: {actual_bval} T,', f'Target magnet strength: {bval} T') print(f'Actual magnet strength: {actual_bval} T,', f'Target magnet strength: {bval} T')
#scanning loop
for i, bval in enumerate(bval_lst):
# if init_bval == bval:
# # if initial bval is equal to the element of the given iteration from the bval_lst, then commence measuring the spectrum
# pass
# else:
# NOTE: original code without the loop scan
################################################
# if i == 0: # for first iteration, sweep to one of the limits
# write_no_echo(instr, f'{init_lim} {bval*10}') # convert back to kG
# write_no_echo(instr, f'SWEEP {init_sweep}')
# else:
# write_no_echo(instr, f'{subsequent_lim} {bval*10}') # convert back to kG
# write_no_echo(instr, f'SWEEP {subsequent_sweep}')
# actual_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1 # convert kG to T
# print(f'Actual magnet strength: {actual_bval} T,', f'Target magnet strength: {bval} T')
# while abs(actual_bval - bval) > 0.0001:
# time.sleep(5) # little break
# actual_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1
# # update the actual bval
# print(f'Actual magnet strength: {actual_bval} T,', f'Target magnet strength: {bval} T')
###############################################
if not loopscan_bool:
helper_scan_func(i, bval)
else:
if i <= middle_index_bval_lst:
helper_scan_func(i, bval)
else:
helper_scan_func(i, bval, instr=instr, init_lim=subsequent_lim, init_sweep=subsequent_sweep,
subsequent_lim=init_lim, subsequent_sweep=init_sweep, sleep=5)
time.sleep(5) time.sleep(5)
# we acquire with the LF # we acquire with the LF
acquire_name_spe = f'{base_file_name}_{bval}T' acquire_name_spe = f'{base_file_name}_{bval}T'
@ -457,12 +539,12 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
# read the .spe file and get the data as loaded_files # read the .spe file and get the data as loaded_files
cwd = os.getcwd() # save original directory cwd = os.getcwd() # save original directory
os.chdir(path_save) #change directory os.chdir(temp_folder_path) #change directory
loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable
os.chdir(cwd) # go back to original directory os.chdir(cwd) # go back to original directory
# Delete the created .spe file from acquiring after getting necessary info # Delete the created .spe file from acquiring after getting necessary info
spe_file_path = os.path.join(path_save, acquire_name_spe + '.spe') spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe')
os.remove(spe_file_path) os.remove(spe_file_path)
points_left = total_points - i - 1 # TODO: SEE IF THIS IS CORRECT points_left = total_points - i - 1 # TODO: SEE IF THIS IS CORRECT
@ -476,6 +558,8 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
elapsed_time = (end_time - start_time) / 60 elapsed_time = (end_time - start_time) / 60
print('Scan time: ', elapsed_time, 'minutes') print('Scan time: ', elapsed_time, 'minutes')
write_no_echo(instr, f'LLIM {instr_bsettings[1][0]*10};ULIM {instr_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
if zerowhenfin_bool: if zerowhenfin_bool:
write_no_echo(instr, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above write_no_echo(instr, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above
@ -495,6 +579,279 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
np.savetxt("Wavelength.txt", wl) np.savetxt("Wavelength.txt", wl)
def polar_to_cartesian(radius, start_angle, end_angle, step_size, clockwise=True):
# TODO: DOCS
"""Creates a list of discrete cartesian coordinates (x,y), given the radius, start- and end angles, the angle step size, and the direction of rotation.
Function then returns a list of two lists: list of angles and list of cartesian coordinates (x,y coordinates in a tuple).
Args:
radius (_type_): _description_
start_angle (_type_): _description_
end_angle (_type_): _description_
step_size (_type_): _description_
clockwise (bool, optional): _description_. Defaults to True.
Returns:
_type_: _description_
""" """"""
# Initialize lists to hold angles and (x, y) pairs
angles = []
coordinates = []
# Normalize angles to the range [0, 360)
start_angle = start_angle % 360
end_angle = end_angle % 360
if clockwise:
# Clockwise rotation
current_angle = start_angle
while True:
# Append the current angle to the angles list
angles.append(current_angle % 360)
# Convert the current angle to radians
current_angle_rad = math.radians(current_angle % 360)
# Convert polar to Cartesian coordinates
x = radius * math.cos(current_angle_rad)
y = radius * math.sin(current_angle_rad)
# Append the (x, y) pair to the list
coordinates.append((x, y))
# Check if we've reached the end_angle (handling wrap-around) (current_angle - step_size) % 360 == end_angle or
if current_angle % 360 == end_angle:
break
# Decrement the current angle by the step size
current_angle -= step_size
if current_angle < 0:
current_angle += 360
else:
# Counterclockwise rotation
current_angle = start_angle
while True:
# Append the current angle to the angles list
angles.append(current_angle % 360)
# Convert the current angle to radians
current_angle_rad = math.radians(current_angle % 360)
# Convert polar to Cartesian coordinates
x = radius * math.cos(current_angle_rad)
y = radius * math.sin(current_angle_rad)
# Append the (x, y) pair to the list
coordinates.append((x, y))
# Check if we've reached the end_angle (handling wrap-around) (current_angle + step_size) % 360 == end_angle or
if current_angle % 360 == end_angle:
break
# Increment the current angle by the step size
current_angle += step_size
if current_angle >= 360:
current_angle -= 360
return [angles, coordinates]
def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Resource,
Babs:float, startangle:float, endangle:float, angle_stepsize:float, Settings:str, clockwise=True, base_file_name='', zerowhenfin_bool=False)->None:
# TODO: update docs
"""Rotation of the b-field in discrete steps, spectrum is measured at each discrete step in the rotation. Scan angle is
defined as the angle between the x-axis and the current B-field vector, i.e., in the anticlockwise direction.
Args:
instr1 (pyvisa.resources.Resource): _description_
instr2 (pyvisa.resources.Resource): _description_
Babs (float): absolute B-field value in T
startangle (float): start angle in degrees
endangle (float): end angle in degrees
angle_stepsize (float): angle step size in degrees
clockwise (bool): determines the direction of rotation of the B-field. Defaults to True.
zerowhenfin_bool (bool, optional): after finishing the rotation, both B-field components should be set to 0 T. Defaults to False.
"""
# TODO: possibly rename instr1 and instr2 to the dual and single power supplies respectively??
# defines the folder, in which the data from the spectrometer is temporarily stored in
temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
if base_file_name =='':
base_file_name = datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')
start_time = time.time() # start of the scan function
startangle = startangle % 360
endangle = endangle % 360 # ensures that the angles are within [0,360)
idnstr1 = query_no_echo(instr1, '*IDN?')
idnstr2 = query_no_echo(instr1, '*IDN?')
intensity_data = [] # To store data from each scan
cwd = os.getcwd() # save original directory
# find which one is the dual power supply, then, ramp B_x to Babs value
if '2301034' in idnstr1: # serial no. the dual power supply
pass
elif '2101034' in idnstr2:
# swap instruments, instr 1 to be the dual power supply (^= x-axis)
instr1, instr2 = instr2, instr1
# save initial low and high sweep limits of each device, and set them back after the rotation
instr1_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
instr2_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
if instr1_bsettings[0][0] == 'T':
instr1_bsettings[1][0] = instr1_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
instr1_bsettings[2][0] = instr1_bsettings[2][0]*0.1
if instr2_bsettings[0][0] == 'T':
instr2_bsettings[1][0] = instr2_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
instr2_bsettings[2][0] = instr2_bsettings[2][0]*0.1
# initialise the sweep angle list as well as the sweep limits and directions for each instrument
instr1_lim, instr2_lim = 'LLIM', 'ULIM'
instr1_sweep, instr2_sweep = 'DOWN', 'UP'
# create lists of angles and discrete Cartesian coordinates
angles, cartesian_coords = polar_to_cartesian(Babs, startangle, endangle, angle_stepsize, clockwise=clockwise)
if clockwise: # NOTE: old conditional was: startangle > endangle see if this works....
# reverse sweep limits and directions for the clockwise rotation
instr1_lim, instr2_lim = instr2_lim, instr1_lim
instr1_sweep, instr2_sweep = instr2_sweep, instr1_sweep
# list of rates (with units) for diff ranges of each device, only up to Range 1 for single power supply as that is already
# the max recommended current.
init_range_lst1 = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'RATE? 0;RATE? 1;RATE? 2').split(';'))
init_range_lst2 = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'RATE? 0;RATE? 1').split(';'))
min_range_lst = [min(el1[0], el2[0]) for el1,el2 in zip(init_range_lst1, init_range_lst2)] # min rates for each given range
# set both devices to the min rates
write_no_echo(instr1, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
write_no_echo(instr2, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
write_no_echo(instr1, f'CHAN 2;ULIM {Babs*10};SWEEP UP') # sets to B_x, the B_x upper limit and sweeps the magnet field to the upper limit
print(f'SWEEPING B-X TO {Babs} T NOW')
# wait for Babs to be reached by the Bx field
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
while abs(actual_bval - Babs) > 0.0001:
time.sleep(5) # little break
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
# NOTE: implement PID control, possibly best option to manage the b field DO THIS LATER ON, WE DO DISCRETE B VALUES RN
# Helper function that listens to a device
def listen_to_device(device_id, target_value, shared_values, lock, all_targets_met_event):
while not all_targets_met_event.is_set(): # Loop until the event is set
# value = 0 # Simulate receiving a float from the device INSERT QUERY NO ECHO HERE TO ASK FOR DEVICE IMAG
if '2301034' in device_id:
value = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
elif '2101014' in device_id:
value = sep_num_from_units(query_no_echo(instr2, 'IMAG?'))[0]*0.1 # convert kG to T
print(f"Device {device_id} reports value: {value} T")
with lock:
shared_values[device_id] = value
# Check if both devices have met their targets
if all(shared_values.get(device) is not None and abs(shared_values[device] - target_value[device]) <= 0.0001
for device in shared_values):
print(f"Both devices reached their target values: {shared_values}")
all_targets_met_event.set() # Signal that both targets are met
# time.sleep(1) # Simulate periodic data checking
# Main function to manage threads and iterate over target values
def monitor_devices(device_target_values, angles_lst, intensity_data=intensity_data):
for iteration, target in enumerate(device_target_values):
print(f"\nStarting iteration {iteration+1} for target values: {target}")
# Shared dictionary to store values from devices
shared_values = {device: None for device in target.keys()}
# Event to signal when both target values are reached
all_targets_met_event = threading.Event()
# Lock to synchronize access to shared_values
lock = threading.Lock()
# Create and start threads for each device
threads = []
for device_id in target.keys():
thread = threading.Thread(target=listen_to_device, args=(device_id, target, shared_values, lock, all_targets_met_event))
threads.append(thread)
thread.start()
# Wait until both devices meet their target values
all_targets_met_event.wait()
print(f"Both target values for iteration {iteration+1} met. Performing action...")
# Perform some action after both targets are met
# we acquire with the LF
acquire_name_spe = f'{base_file_name}_{angles_lst[iteration]}°' # NOTE: save each intensity file with the given angle
AcquireAndLock(acquire_name_spe) #this creates a .spe file with the scan name.
# read the .spe file and get the data as loaded_files
cwd = os.getcwd() # save original directory
os.chdir(temp_folder_path) #change directory
loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable
os.chdir(cwd) # go back to original directory
# Delete the created .spe file from acquiring after getting necessary info
spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe')
os.remove(spe_file_path)
# points_left = total_points - i - 1 # TODO: SEE IF THIS IS CORRECT
# print('Points left in the scan: ', points_left)
#append the intensity data as it is (so after every #of_wl_points, the spectrum of the next point begins)
intensity_data.append(loaded_files.data[0][0][0])
# Clean up threads
for thread in threads:
thread.join()
print(f"Threads for iteration {iteration+1} closed.\n")
#prints total time the mapping lasted
end_time = time.time()
elapsed_time = (end_time - start_time) / 60
print('Scan time: ', elapsed_time, 'minutes')
# reset both devices to original sweep limits
write_no_echo(instr1, f'LLIM {instr1_bsettings[1][0]*10};ULIM {instr1_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
write_no_echo(instr2, f'LLIM {instr2_bsettings[1][0]*10};ULIM {instr2_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
# reset both devices' initial rates for each range
write_no_echo(instr1, f'RANGE 0 {init_range_lst1[0][0]};RANGE 1 {init_range_lst1[1][0]};RANGE 2 {init_range_lst1[2][0]}') # reset the initial limits of the device after the scan
write_no_echo(instr2, f'RANGE 0 {init_range_lst2[0][0]};RANGE 1 {init_range_lst2[1][0]}') # reset the initial limits of the device after the scan
if zerowhenfin_bool:
write_no_echo(instr1, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above
write_no_echo(instr2, 'SWEEP ZERO')
#save intensity & WL data as .txt
os.chdir('C:/Users/localadmin/Desktop/Users/Lukas')
# creates new folder for MAP data
new_folder_name = "Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}"
os.mkdir(new_folder_name)
# Here the things will be saved in a new folder under user Lukas !
# IMPORTANT last / has to be there, otherwise data cannot be saved and will be lost!!!!!!!!!!!!!!!!
os.chdir('C:/Users/localadmin/Desktop/Users/Lukas/'+ new_folder_name)
intensity_data = np.array(intensity_data)
np.savetxt(Settings + f'{angles[0]}°_to_{angles[-1]}°' + experiment_name +'.txt', intensity_data)
# TODO: remove/edit experiment_name in line above, as well in sweep_b_val func, rn takes a global variable below
wl = np.array(loaded_files.wavelength)
np.savetxt("Wavelength.txt", wl)
# modify cartesian_coords to suite the required data struct in monitor_devices
cartesian_coords = [{'2301034': t[0], '2101014': t[1]} for t in cartesian_coords]
# call the helper function to carry out the rotation/measurement of spectrum
monitor_devices(cartesian_coords, angles, intensity_data)
################################################################# END OF FUNCTION DEFS ########################################################################################### ################################################################# END OF FUNCTION DEFS ###########################################################################################
@ -502,20 +859,33 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float,
# Initialise PYVISA ResourceManager # Initialise PYVISA ResourceManager
rm = pyvisa.ResourceManager() rm = pyvisa.ResourceManager()
# print(rm.list_resources()) # 'ASRL8::INSTR' for dual power supply, 'ASRL9::INSTR' for single power supply # print(rm.list_resources())
# 'ASRL8::INSTR' for dual power supply, 'ASRL9::INSTR' for single power supply (online PC)
# 'ASRL10::INSTR' for dual power supply, 'ASRL12::INSTR' for single power supply (offline PC)
# Open the connection with the APS100 dual power supply # Open the connection with the APS100 dual power supply
powerbox_dualsupply = rm.open_resource('ASRL8::INSTR', powerbox_dualsupply = rm.open_resource('ASRL10::INSTR',
baud_rate=9600, # Example baud rate, adjust as needed baud_rate=9600,
data_bits=8, data_bits=8,
parity= pyvisa.constants.Parity.none, parity= pyvisa.constants.Parity.none,
stop_bits= pyvisa.constants.StopBits.one, stop_bits= pyvisa.constants.StopBits.one,
timeout=5000)# 5000 ms timeout timeout=100)# 5000 ms timeout
# Open the connection with the APS100 dual power supply
powerbox_singlesupply = rm.open_resource('ASRL12::INSTR',
baud_rate=9600,
data_bits=8,
parity= pyvisa.constants.Parity.none,
stop_bits= pyvisa.constants.StopBits.one,
timeout=100)# 5000 ms timeout
write_no_echo(powerbox_dualsupply, 'REMOTE') # turn on the remote mode write_no_echo(powerbox_dualsupply, 'REMOTE') # turn on the remote mode
write_no_echo(powerbox_singlesupply, 'REMOTE') # turn on the remote mode
# TODO: test functionality of the magnet_coil param later on, should work... as this code below is basically implemented inside the scan func.
# select axis for the dual supply, either z-axis(CHAN 1 ^= Supply A) or x-axis(CHAN 2 ^= Supply B) # select axis for the dual supply, either z-axis(CHAN 1 ^= Supply A) or x-axis(CHAN 2 ^= Supply B)
write_no_echo(powerbox_dualsupply, 'CHAN 1') # write_no_echo(powerbox_dualsupply, 'CHAN 1')
# Setup connection to AMC # Setup connection to AMC
amc = AMC.Device(IP) amc = AMC.Device(IP)
@ -550,20 +920,18 @@ experiment_settings = 'PL_SP_700_LP_700_HeNe_52muW_exp_2s_Start_'
#The program adds the range of the scan as well as the resolution and the date and time of the measurement #The program adds the range of the scan as well as the resolution and the date and time of the measurement
experiment_name = f"{set_llim_bval}T_to_{set_ulim_bval}T_{set_res_bval}T_{datetime.datetime.now().strftime('%Y_%m_%d_%H%M')}" experiment_name = f"{set_llim_bval}T_to_{set_ulim_bval}T_{set_res_bval}T_{datetime.datetime.now().strftime('%Y_%m_%d_%H%M')}"
# # TODO: write the bval scan here
# for idx, bval in enumerate(bval_lst):
# write_no_echo(powerbox_dualsupply, '')
# this moves the probe in xy-direction and measures spectrum there # this moves the probe in xy-direction and measures spectrum there
# move_scan_xy(range_x, range_y, resolution, experiment_settings, experiment_name) # move_scan_xy(range_x, range_y, resolution, experiment_settings, experiment_name)
# perform the B-field measurement for selected axis above # perform the B-field measurement for selected axis above
# sweep_b_val(powerbox_dualsupply, set_llim_bval, set_ulim_bval, set_res_bval, experiment_settings, experiment_name) # sweep_b_val(powerbox_dualsupply, set_llim_bval, set_ulim_bval, set_res_bval, experiment_settings, experiment_name)
sweep_b_val(powerbox_dualsupply, set_llim_bval, set_ulim_bval, set_res_bval, sweep_b_val(powerbox_dualsupply, set_llim_bval, set_ulim_bval, set_res_bval, 'z-axis',
experiment_settings, experiment_name, singlepowersupply_bool=False, zerowhenfin_bool=True, reversescan_bool=False) experiment_settings, experiment_name, zerowhenfin_bool=True, reversescan_bool=False)
# Internally, axes are numbered 0 to 2 # Internally, axes are numbered 0 to 2
write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode
write_no_echo(powerbox_singlesupply, 'LOCAL') # turn off the remote mode
# time.sleep(0.5) # time.sleep(0.5)
powerbox_dualsupply.close() powerbox_dualsupply.close()

44
AMC.py Normal file
View File

@ -0,0 +1,44 @@
import ACS
from about import About
from access import Access
from amcids import Amcids
from control import Control
from description import Description
from diagnostic import Diagnostic
from functions import Functions
from move import Move
from network import Network
from res import Res
from rotcomp import Rotcomp
from rtin import Rtin
from rtout import Rtout
from status import Status
from system_service import System_service
from update import Update
class Device(ACS.Device):
def __init__ (self, address):
super().__init__(address)
self.about = About(self)
self.access = Access(self)
self.amcids = Amcids(self)
self.control = Control(self)
self.description = Description(self)
self.diagnostic = Diagnostic(self)
self.functions = Functions(self)
self.move = Move(self)
self.network = Network(self)
self.res = Res(self)
self.rotcomp = Rotcomp(self)
self.rtin = Rtin(self)
self.rtout = Rtout(self)
self.status = Status(self)
self.system_service = System_service(self)
self.update = Update(self)
def discover():
return Device.discover("amc")

33
FunctionsTest.py Normal file
View File

@ -0,0 +1,33 @@
import re
import numpy as np
def sep_num_from_units(powerbox_output :str)->list:
'''
Receives a string as input and separates the numberic value and unit and returns it as a list.
Parameters
----------
powerbox_output : str
string output from the attocube powerbox, e.g. 1.35325kG
Returns
-------
list
list of float value and string (b value and it's units). If string is purely alphabets, then return a single element list
'''
match = re.match(r'\s*([+-]?\d*\.?\d+)([A-Za-z]+)', powerbox_output)
if match:
numeric_part = float(match.group(1)) # Convert the numeric part to a float
alphabetic_part = match.group(2) # Get the alphabetic part
return [numeric_part, alphabetic_part]
else:
return [powerbox_output,]
angles = [1,2,3]
print(str(angles[0]) +"\n"+ str(angles[-1]))
rates_lst = list(sep_num_from_units(el) for el in "0.0kG;1.0kG".split(";"))
print(rates_lst[1][0])

File diff suppressed because it is too large Load Diff

127
README.md Normal file
View File

@ -0,0 +1,127 @@
# Magnetic Field Sweep and Spatial Mapping Automation
**Author:** Serdar (adjusted by Lukas and Ryan)
**Last Updated:** April 2025
**Filename:** `Mag_Field_Sweep_2024_10_21.py`
## Overview
This script automates spectral acquisition in a magneto-optical experiment using:
- **LightField** for spectrometer control (Princeton Instruments)
- **AMC Positioner** for precise spatial scanning
- **Attocube APS100** power supplies for magnetic field control
It enables:
- Magnetic field sweeps along selected axes
- Spatial scans across X-Y positions
- B-field vector rotations with spectral capture
- Live spectrum acquisition and intensity mapping
## Features
- **2D Spatial Scan:** Raster-scan across a surface using AMC positioners, capturing spectra at each coordinate.
- **Magnetic Field Sweep:** Vary B-fields in controlled steps along x/y/z, measure spectra at each step.
- **Field Rotation:** Circular B-field rotation (in-plane) with angle-defined steps.
- **Automated File Handling:** Acquires `.spe` files, extracts and saves intensity/wavelengths, deletes intermediates.
- **Flexible Configuration:** Resolution, range, exposure, filters, filenames and scan directions are all customizable.
## Prerequisites
### Hardware
- AMC100/AMC300 positioner
- Attocube APS100 single/dual-channel magnet power supplies
- Spectrometer compatible with Princeton Instruments LightField
### Software & Libraries
- **Python 3.8+**
- Packages: `pyvisa`, `numpy`, `matplotlib`, `pandas`, `clr`, `spe2py`, `spe_loader`, `AMC` module
- .NET integration via `pythonnet`
- LightField SDK: Princeton Instruments (with DLLs loaded via `clr`)
> Note: Ensure `LIGHTFIELD_ROOT` environment variable is set.
## Setup
1. **Install dependencies**
```bash
pip install pyvisa pandas numpy matplotlib pythonnet
```
2. **Ensure required DLLs** are present in:
```
C:\Program Files\Princeton Instruments\LightField\
```
3. **Set up device IPs**
```python
IP_AMC100 = "192.168.71.100" # or AMC300
```
4. **Edit scan parameters in main block:**
```python
range_x = 20000
range_y = 20000
resolution = 1000 # nanometers
set_llim_bval = -0.3
set_ulim_bval = 0.3
set_res_bval = 0.003 # Tesla
```
## Main Functions
### `move_scan_xy(range_x, range_y, resolution, Settings, baseFileName)`
Performs a 2D XY raster scan of the probe. Acquires spectra and saves results.
### `sweep_b_val(instr, min_bval, max_bval, res, axis, Settings, base_file_name)`
Sweeps magnetic field (in T) along the specified axis, collecting spectra at each field.
### `ramp_b_val(instr, bval, magnet_coil)`
Smooth ramping of B-field to target value.
### `b_field_rotation(instr1, instr2, Babs, startangle, endangle, step, Settings)`
Rotates the in-plane magnetic field by vector combination of Bx and By components.
## File Saving
- `.txt`: Intensity data and wavelength arrays saved to timestamped folders
- Folder names include experiment metadata
- `.spe` files are deleted after processing to conserve space
## Usage Example
To sweep B-field along the **Y-axis**:
```python
sweep_b_val(
instr=powerbox_singlesupply,
min_bval=-0.3,
max_bval=0.3,
res=0.003,
magnet_coil='y-axis',
Settings='experiment_config',
base_file_name='scan_name',
zerowhenfin_bool=True,
reversescan_bool=False,
loopscan_bool=True
)
```
## Notes
- Always close power supply connections with `.close()`
- Make sure `.spe` files are not locked by LightField before running
- The AMC section is currently commented — uncomment if positioner control is needed
- Ensure `experiment.Load(...)` points to the correct `.lfe` config
## Troubleshooting
- **DLL loading issues?** Confirm path via `sys.path.append(...)` and DLL names.
- **Communication errors?** Check serial port resource names via `pyvisa.ResourceManager().list_resources()`
- **No spectra saved?** Ensure LightField is licensed and experiment file is valid.
## License
Internal use only please contact the authors before distribution or reuse.

153
Test.py Normal file
View File

@ -0,0 +1,153 @@
import math
from magnet_modules import sweep_functions as sf
# sf.test_func()
def generate_angle_coord_list(radius, start_angle, end_angle, step_size, clockwise=True):
# TODO: DOCS
"""Creates a list of discrete cartesian coordinates (x,y), given the radius, start- and end angles, the angle step size, and the direction of rotation.
Function then returns a list of two lists: list of angles and list of cartesian coordinates (x,y coordinates in a tuple).
Args:
radius (_type_): _description_
start_angle (_type_): _description_
end_angle (_type_): _description_
step_size (_type_): _description_
clockwise (bool, optional): _description_. Defaults to True.
Returns:
_type_: _description_
""" """"""
# Initialize lists to hold angles and (x, y) pairs
angles = []
coordinates = []
# Normalize angles to the range [0, 360)
start_angle = start_angle % 360
end_angle = end_angle % 360
if not clockwise:
# Clockwise rotation
current_angle = start_angle
while True:
# Append the current angle to the angles list
angles.append(current_angle % 360)
# Convert the current angle to radians
current_angle_rad = math.radians(current_angle % 360)
# Convert polar to Cartesian coordinates
x = radius * math.cos(current_angle_rad)
y = radius * math.sin(current_angle_rad)
# Append the (x, y) pair to the list
coordinates.append((x, y))
# Check if we've reached the end_angle (handling wrap-around) (current_angle - step_size) % 360 == end_angle or
if current_angle % 360 == end_angle:
break
# Decrement the current angle by the step size
current_angle -= step_size
if current_angle < 0:
current_angle += 360
else:
# Counterclockwise rotation
current_angle = start_angle
while True:
# Append the current angle to the angles list
angles.append(current_angle % 360)
# Convert the current angle to radians
current_angle_rad = math.radians(current_angle % 360)
# Convert polar to Cartesian coordinates
x = radius * math.cos(current_angle_rad)
y = radius * math.sin(current_angle_rad)
# Append the (x, y) pair to the list
coordinates.append((x, y))
# Check if we've reached the end_angle (handling wrap-around) (current_angle + step_size) % 360 == end_angle or
if current_angle % 360 == end_angle:
break
# Increment the current angle by the step size
current_angle += step_size
if current_angle >= 360:
current_angle -= 360
return [angles, coordinates]
def generate_coord_list_fixed_angle(angle, b_val, b_val_step_size, reverse=False):
"""
Generates a list of (x, y) Cartesian coordinates along a line defined by a fixed angle,
scanning from -b_val to b_val or from b_val to -b_val depending on the reverse flag.
Args:
angle (float): The fixed angle (in degrees) from the positive x-axis.
b_val (float): The maximum distance from the origin (both positive and negative).
b_val_step_size (float): The increment in distance for each point.
reverse (bool): If True, scan from b_val to -b_val. If False, scan from -b_val to b_val.
Returns:
list: A list of tuples representing Cartesian coordinates (x, y).
"""
coordinates = []
# Convert angle from degrees to radians
angle_rad = math.radians(angle)
# Determine the scan direction based on the reverse flag
if reverse:
# Scan from b_val to -b_val
current_b = b_val
while current_b >= -b_val:
x = current_b * math.cos(angle_rad)
y = current_b * math.sin(angle_rad)
coordinates.append((x, y))
current_b -= b_val_step_size
else:
# Scan from -b_val to b_val
current_b = -b_val
while current_b <= b_val:
x = current_b * math.cos(angle_rad)
y = current_b * math.sin(angle_rad)
coordinates.append((x, y))
current_b += b_val_step_size
return coordinates
if __name__=="__main__":
# Example usage
radius = 5
start_angle = 0
end_angle = 180
step_size = 10
angles, coordinates = generate_angle_coord_list(radius, start_angle, end_angle, step_size, clockwise=True)
print('\n', "Angles:", angles, '\n')
print("Coordinates:", coordinates, '\n',)
# device_target_values = [{'2301034': bval[0], '2101014': bval[1]} for bval in coordinates]
xcoord_tuple, ycoord_tuple = zip(*coordinates)
device_target_values = {'2301034': list(xcoord_tuple), '2101014': list(ycoord_tuple)}
print(f"{device_target_values['2301034']=}")
print(f"{device_target_values['2101014']=}")
for iteration, (device_id,bval_lst) in enumerate(device_target_values.items()):
print(iteration, device_id, bval_lst)
# print(generate_coord_list_fixed_angle(10, 5, 1, reverse=False))
testdict = [{'2301034': bval[0], '2101014': bval[1]} for bval in coordinates]
print(f"{testdict=}")
for i, target in enumerate(testdict):
print(i, target.keys())
# for key in target.keys():
# print(type(key))

46
Test2.py Normal file
View File

@ -0,0 +1,46 @@
import os
import time
import datetime
import numpy as np
# List to accumulate measurement data
measurement_data = []
def append_measurement(target_b_abs, b_x, b_y, measurement_data):
"""Append a single measurement to the global list."""
measurement = {
"Target B_abs (T)": target_b_abs,
"Target Angle (deg)": 90, # insert target angle here
"Datetime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"B_x (T)": b_x,
"B_y (T)": b_y,
"Actual B_abs (T)": (b_x**2 + b_y**2)**0.5,
"Actual Angle (deg)": np.degrees(np.arctan2(b_y, b_x)) % 360,
}
measurement_data.append(measurement)
def save_measurements_to_file(relative_directory, measurement_data, make_dir=False):
"""Save accumulated measurements to a file in the specified directory."""
script_dir = os.path.dirname(os.path.abspath(__file__))
directory = os.path.join(script_dir, relative_directory)
if make_dir:
os.makedirs(directory, exist_ok=True)
filename = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M") + ".txt"
file_path = os.path.join(directory, filename)
# Write header and data
with open(file_path, 'w') as f:
f.write("Target B_abs (T);Target Angle (deg);Datetime; B_x (T);B_y (T);Actual B_abs (T);Actual Angle (deg)\n")
for entry in measurement_data:
line = f"{entry['Target B_abs (T)']};{entry['Target Angle (deg)']};{entry['Datetime']};{entry['B_x (T)']};{entry['B_y (T)']};{entry['Actual B_abs (T)']};{entry['Actual Angle (deg)']}\n"
f.write(line)
# Example usage
for i in range(5):
append_measurement(target_b_abs=0.5 + i, b_x=1.0 * i, b_y=2.0 * i, measurement_data=measurement_data)
time.sleep(1) # Simulate time delay between measurements
save_measurements_to_file("Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}", measurement_data, make_dir=False)
# print(9**0.5)
# print(datetime.datetime.now())

62
Test3.py Normal file
View File

@ -0,0 +1,62 @@
import os
import threading
from datetime import datetime
import time
import random
# Shared list and lock
measurement_data = []
data_lock = threading.Lock()
def append_measurement(target_b_abs, b_x, b_y):
measurement = {
"Target B_abs": target_b_abs,
"Datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"B_x": b_x,
"B_y": b_y
}
with data_lock:
measurement_data.append(measurement)
def save_measurements_to_file(relative_directory):
script_dir = os.path.dirname(os.path.abspath(__file__))
directory = os.path.join(script_dir, relative_directory)
os.makedirs(directory, exist_ok=True)
filename = datetime.now().strftime("%Y-%m-%d_%H-%M") + ".txt"
file_path = os.path.join(directory, filename)
header_keys = ["Target B_abs", "Datetime", "B_x", "B_y"]
with data_lock:
with open(file_path, 'w') as f:
f.write(", ".join(header_keys) + "\n")
for entry in measurement_data:
line = ", ".join(str(entry[key]) for key in header_keys) + "\n"
f.write(line)
# Thread function
def simulate_sensor_readings(sensor_id):
for i in range(3):
# Simulate some "sensor" data
target_b_abs = round(random.uniform(0.1, 1.0), 3)
b_x = round(random.uniform(-1.0, 1.0), 3)
b_y = round(random.uniform(-1.0, 1.0), 3)
print(f"Sensor {sensor_id} appending: {target_b_abs}, {b_x}, {b_y}")
append_measurement(target_b_abs, b_x, b_y)
time.sleep(random.uniform(1,2)) # Simulate delay
# Launch threads
thread1 = threading.Thread(target=simulate_sensor_readings, args=(1,))
thread2 = threading.Thread(target=simulate_sensor_readings, args=(2,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
# Save all data at the end
save_measurements_to_file("TestDirectory")
print

84
ThreadTest.py Normal file
View File

@ -0,0 +1,84 @@
import threading
import time
import random
# Shared values
device_values = [0, 0]
value_lock = threading.Lock()
# Per-device pause controls
device_events = [threading.Event(), threading.Event()]
device_events[0].set() # Start as running
device_events[1].set()
# Tolerance threshold
TOLERANCE = 20
# Stop flag
stop_event = threading.Event()
def is_within_tolerance(val_a, val_b):
return abs(val_a - val_b) <= TOLERANCE
# Device thread
def device_thread(device_id):
other_id = 1 - device_id
while not stop_event.is_set():
device_events[device_id].wait() # Pause if needed
# Simulate value from device
new_value = random.randint(0, 100)
with value_lock:
device_values[device_id] = new_value
my_val = device_values[device_id]
other_val = device_values[other_id]
print(f"Device {device_id} => {my_val} | Device {other_id} => {other_val}")
if not is_within_tolerance(my_val, other_val):
# print(f"Device {device_id} is out of tolerance! Pausing...")
# device_events[device_id].clear()
print("Not within tolerance!")
time.sleep(0.1) # Faster check interval
# Watcher thread
def tolerance_watcher():
while not stop_event.is_set():
with value_lock:
val0, val1 = device_values
if is_within_tolerance(val0, val1):
for i, event in enumerate(device_events):
if not event.is_set():
print(f"Resuming Device {i}")
event.set()
time.sleep(0.05) # Fast response
# Start threads
threads = [
threading.Thread(target=device_thread, args=(0,)),
threading.Thread(target=device_thread, args=(1,)),
threading.Thread(target=tolerance_watcher)
]
for t in threads:
t.start()
# Run loop (press Ctrl+C to stop)
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print("Stopping...")
stop_event.set()
for event in device_events:
event.set()
for t in threads:
t.join()
print("All threads stopped.")

519
b_rotation_test.py Normal file
View File

@ -0,0 +1,519 @@
############################################
# Packages from Ryan
import re
import math
import threading
import pyvisa
# from pyvisa import ResourceManager, constants
# B Field Limits (in T)
BX_MAX = 1.7
BY_MAX = 1.7
BZ_MAX = 4.0
############################################
# import AMC
import csv
import time
import clr
import sys
import os
import spe2py as spe
import spe_loader as sl
import pandas as pd
import time
# from System.IO import *
# from System import String
import numpy as np
import matplotlib.pyplot as plt
import datetime
from typing import Union
def sep_num_from_units(powerbox_output :str)->list:
'''
Receives a string as input and separates the numberic value and unit and returns it as a list.
Parameters
----------
powerbox_output : str
string output from the attocube powerbox, e.g. 1.35325kG
Returns
-------
list
list of float value and string (b value and it's units). If string is purely alphabets, then return a single element list
'''
match = re.match(r'\s*([+-]?\d*\.?\d+)([A-Za-z]+)', powerbox_output)
if match:
numeric_part = float(match.group(1)) # Convert the numeric part to a float
alphabetic_part = match.group(2) # Get the alphabetic part
return [numeric_part, alphabetic_part]
else:
return [powerbox_output,]
def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
"""helper function for the Attocube APS100 that queries a function to the device, removing the echo.
Args:
instr (pyvisa.resources.Resource):
command (str): commands, can be stringed in series with ; between commands
sleeptime (float, optional): delay time between commands. Defaults to 0.01.
Returns:
str: _description_
""" ''''''
try:
print(f"Sending command: {command}")
instr.write(command)
time.sleep(sleeptime)
echo_response = instr.read() # Read and discard the echo
# print(f"Echo response: {echo_response}")
actual_response = instr.read() # Read the actual response
print(f"Actual response: {actual_response}")
return actual_response
except pyvisa.VisaIOError as e:
print(f"Error communicating with instrument: {e}")
return None
def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
"""helper function for the Attocube APS100 that writes a function to the device, removing the echo.
Args:
instr (pyvisa.resources.Resource):
command (str): commands, can be stringed in series with ; between commands
sleeptime (float, optional): delay time between commands. Defaults to 0.01.
Returns:
str: _description_
""" ''''''
try:
print(f"Sending command: {command}")
instr.write(command)
time.sleep(sleeptime) # Give the device some time to process
try:
while True:
echo_response = instr.read() # Read and discard the echo
# print(f"Echo response: {echo_response}")
except pyvisa.VisaIOError as e:
# Expected timeout after all echoed responses are read
if e.error_code != pyvisa.constants.VI_ERROR_TMO:
raise
except pyvisa.VisaIOError as e:
print(f"Error communicating with instrument: {e}")
def generate_angle_coord_list(radius, start_angle, end_angle, step_size, clockwise=True):
# TODO: DOCS
"""Creates a list of discrete cartesian coordinates (x,y), given the radius, start- and end angles, the angle step size, and the direction of rotation.
Function then returns a list of two lists: list of angles and list of cartesian coordinates (x,y coordinates in a tuple).
Args:
radius (_type_): _description_
start_angle (_type_): _description_
end_angle (_type_): _description_
step_size (_type_): _description_
clockwise (bool, optional): _description_. Defaults to True.
Returns:
_type_: _description_
""" """"""
# Initialize lists to hold angles and (x, y) pairs
angles = []
coordinates = []
# Normalize angles to the range [0, 360)
start_angle = start_angle % 360
end_angle = end_angle % 360
if not clockwise:
# Clockwise rotation
current_angle = start_angle
while True:
# Append the current angle to the angles list
angles.append(current_angle % 360)
# Convert the current angle to radians
current_angle_rad = math.radians(current_angle % 360)
# Convert polar to Cartesian coordinates
x = radius * math.cos(current_angle_rad)
y = radius * math.sin(current_angle_rad)
# Append the (x, y) pair to the list
coordinates.append((x, y))
# Check if we've reached the end_angle (handling wrap-around) (current_angle - step_size) % 360 == end_angle or
if current_angle % 360 == end_angle:
break
# Decrement the current angle by the step size
current_angle -= step_size
if current_angle < 0:
current_angle += 360
else:
# Counterclockwise rotation
current_angle = start_angle
while True:
# Append the current angle to the angles list
angles.append(current_angle % 360)
# Convert the current angle to radians
current_angle_rad = math.radians(current_angle % 360)
# Convert polar to Cartesian coordinates
x = radius * math.cos(current_angle_rad)
y = radius * math.sin(current_angle_rad)
# Append the (x, y) pair to the list
coordinates.append((x, y))
# Check if we've reached the end_angle (handling wrap-around) (current_angle + step_size) % 360 == end_angle or
if current_angle % 360 == end_angle:
break
# Increment the current angle by the step size
current_angle += step_size
if current_angle >= 360:
current_angle -= 360
return [angles, coordinates]
def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Resource,
Babs:float, startangle:float, endangle:float, angle_stepsize:float,
Settings:str, clockwise=True, base_file_name='', zerowhenfin_bool=False)->None:
# TODO: update docs
"""Rotation of the b-field in discrete steps, spectrum is measured at each discrete step in the rotation. Scan angle is
defined as the angle between the x-axis and the current B-field vector, i.e., in the anticlockwise direction.
Args:
instr1 (pyvisa.resources.Resource): _description_
instr2 (pyvisa.resources.Resource): _description_
Babs (float): absolute B-field value in T
startangle (float): start angle in degrees
endangle (float): end angle in degrees
angle_stepsize (float): angle step size in degrees
clockwise (bool): determines the direction of rotation of the B-field. Defaults to True.
zerowhenfin_bool (bool, optional): after finishing the rotation, both B-field components should be set to 0 T. Defaults to False.
"""
# TODO: add logging to the script
# defines the folder, in which the data from the spectrometer is temporarily stored in
temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/B_Field_Dump"
# temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
if base_file_name =='':
base_file_name = datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')
start_time = time.time() # start of the scan function
startangle = startangle % 360
endangle = endangle % 360 # ensures that the angles are within [0,360)
idnstr1 = query_no_echo(instr1, '*IDN?')
idnstr2 = query_no_echo(instr2, '*IDN?')
intensity_data = [] # To store data from each scan
cwd = os.getcwd() # save original directory
# find which one is the dual power supply, then, ramp B_x to Babs value
if '2301034' in idnstr1: # serial no. the dual power supply
pass
elif '2101034' in idnstr2:
# swap instruments, instr 1 to be the dual power supply (^= x-axis)
instr1, instr2 = instr2, instr1
# save initial low and high sweep limits of each device, and set them back after the rotation
instr1_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
instr2_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
if instr1_bsettings[0][0] == 'T':
instr1_bsettings[1][0] = instr1_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
instr1_bsettings[2][0] = instr1_bsettings[2][0]*0.1
if instr2_bsettings[0][0] == 'T':
instr2_bsettings[1][0] = instr2_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
instr2_bsettings[2][0] = instr2_bsettings[2][0]*0.1
# initialise the sweep angle list as well as the sweep limits and directions for each instrument
instr1_lim, instr2_lim = 'LLIM', 'ULIM'
instr1_sweep, instr2_sweep = 'DOWN', 'UP'
# create lists of angles and discrete Cartesian coordinates
angles, cartesian_coords = generate_angle_coord_list(Babs, startangle, endangle, angle_stepsize, clockwise=clockwise)
if clockwise: # NOTE: old conditional was: startangle > endangle see if this works....
# reverse sweep limits and directions for the clockwise rotation
instr1_lim, instr2_lim = instr2_lim, instr1_lim
instr1_sweep, instr2_sweep = instr2_sweep, instr1_sweep
# TODO: i dont think we need to change the rates just yet, think about this later
'''
# list of rates (with units) for diff ranges of each device, only up to Range 1 for single power supply as that is already
# the max recommended current.
init_range_lst1 = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'RATE? 0;RATE? 1;RATE? 2').split(';'))
init_range_lst2 = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'RATE? 0;RATE? 1').split(';'))
min_range_lst = [min(el1[0], el2[0]) for el1,el2 in zip(init_range_lst1, init_range_lst2)] # min rates for each given range
# set both devices to the min rates
write_no_echo(instr1, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
write_no_echo(instr2, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
'''
# TODO: see if this is the desired process: to always start from the x-axis ASK LUKAS
if Babs <= BX_MAX:
# write_no_echo(instr1, f'CHAN 2;ULIM {Babs*10};SWEEP UP') # sets to B_x, the B_x upper limit and sweeps the magnet field to the upper limit
print(f'SWITCHED TO BX, SWEEPING B-X TO {Babs} T NOW')
else:
raise ValueError(f'{Babs=}T value exceeds the max limit of the Bx field {BX_MAX}T!')
# wait for Babs to be reached by the Bx field
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
while abs(actual_bval - Babs) > 0.0001:
time.sleep(5) # little break
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
# TODO: copy and mod code to see if block logic works, test in lab
# NOTE: implement PID control, possibly best option to manage the b field DO THIS LATER ON, WE DO DISCRETE B VALUES RN
# Helper function that listens to a device
def listen_to_device(device_id, target_value, shared_values, lock, all_targets_met_event):
while not all_targets_met_event.is_set(): # Loop until the event is set
# value = 0 # Simulate receiving a float from the device INSERT QUERY NO ECHO HERE TO ASK FOR DEVICE IMAG
if '2301034' in device_id:
value = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
if value <= target_value[device_id]:
# write_no_echo(instr1, f"CHAN 2;ULIM {target_value[device_id]*10};SWEEP UP")
print(f'sweeping Bx up to {target_value[device_id]*10}')
else:
# write_no_echo(instr1, "CHAN 2;LLIM {target_value[device_id]*10};SWEEP DOWN")
print(f'sweeping Bx down to {target_value[device_id]*10}')
elif '2101014' in device_id:
value = sep_num_from_units(query_no_echo(instr2, 'IMAG?'))[0]*0.1 # convert kG to T
if value <= target_value[device_id]:
# write_no_echo(instr2, f"ULIM {target_value[device_id]*10};SWEEP UP")
print(f'sweeping By up to {target_value[device_id]*10}')
else:
# write_no_echo(instr2, "LLIM {target_value[device_id]*10};SWEEP DOWN")
print(f'sweeping By down to {target_value[device_id]*10}')
else:
continue # Skip if device ID is not recognized
print(f"Device {device_id} reports value: {value} T")
with lock:
shared_values[device_id] = value
# Check if both devices have met their targets
if all(shared_values.get(device) is not None and abs(value - target_value[device]) <= 0.0001
for device,value in shared_values.items()):
print(f"Both devices reached their target values: {shared_values}")
all_targets_met_event.set() # Signal that both targets are met
# time.sleep(1) # Simulate periodic data checking
# Main function to manage threads and iterate over target values
def monitor_devices(device_target_values, angles_lst, intensity_data=intensity_data):
for iteration, target in enumerate(device_target_values):
print(f"\nStarting iteration {iteration+1} for target values: {target}")
# Shared dictionary to store values from devices
shared_values = {device: None for device in target.keys()}
# Event to signal when both target values are reached
all_targets_met_event = threading.Event()
# Lock to synchronize access to shared_values
lock = threading.Lock()
# Create and start threads for each device
threads = []
for device_id in target.keys():
thread = threading.Thread(target=listen_to_device, args=(device_id, target, shared_values, lock, all_targets_met_event))
threads.append(thread)
thread.start()
print(f"======================\nThread started for device {device_id}\n======================")
# Wait until both devices meet their target values
all_targets_met_event.wait()
print(f"Both target values for iteration {iteration+1} met. Performing action...")
# Clean up threads
for thread in threads:
thread.join()
print(f"Threads for iteration {iteration+1} closed.\n")
print(f'COLLECTING SPECTRUM FOR ANGLE {angles_lst[iteration]}°\n')
# Perform some action after both targets are met
# we acquire with the LF
# acquire_name_spe = f'{base_file_name}_{angles_lst[iteration]}°' # NOTE: save each intensity file with the given angle
# AcquireAndLock(acquire_name_spe) #this creates a .spe file with the scan name.
# read the .spe file and get the data as loaded_files
# cwd = os.getcwd() # save original directory
# os.chdir(temp_folder_path) #change directory
# loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable
# os.chdir(cwd) # go back to original directory
# Delete the created .spe file from acquiring after getting necessary info
# spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe')
# os.remove(spe_file_path)
points_left = len(angles) - iteration - 1
print('Points left in the scan: ', points_left)
#append the intensity data as it is (so after every #of_wl_points, the spectrum of the next point begins)
# intensity_data.append(loaded_files.data[0][0][0])
#prints total time the mapping lasted
end_time = time.time()
elapsed_time = (end_time - start_time) / 60
print('Scan time: ', elapsed_time, 'minutes')
# reset both devices to original sweep limits
write_no_echo(instr1, f'LLIM {instr1_bsettings[1][0]*10};ULIM {instr1_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
write_no_echo(instr2, f'LLIM {instr2_bsettings[1][0]*10};ULIM {instr2_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
# TODO: uncomment later if resetting original rates implemented
'''
# reset both devices' initial rates for each range
write_no_echo(instr1, f'RANGE 0 {init_range_lst1[0][0]};RANGE 1 {init_range_lst1[1][0]};RANGE 2 {init_range_lst1[2][0]}') # reset the initial limits of the device after the scan
write_no_echo(instr2, f'RANGE 0 {init_range_lst2[0][0]};RANGE 1 {init_range_lst2[1][0]}') # reset the initial limits of the device after the scan
'''
if zerowhenfin_bool:
# write_no_echo(instr1, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above
# write_no_echo(instr2, 'SWEEP ZERO')
print('======================\nSWEEPING BOTH DEVICES TO ZERO NOW\n======================')
#save intensity & WL data as .txt
os.chdir('C:/Users/localadmin/Desktop/Users/Ryan')
# creates new folder for MAP data
# new_folder_name = "Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}"
# os.mkdir(new_folder_name)
# Here the things will be saved in a new folder under user Lukas !
# IMPORTANT last / has to be there, otherwise data cannot be saved and will be lost!!!!!!!!!!!!!!!!
# os.chdir('C:/Users/localadmin/Desktop/Users/Ryan/'+ new_folder_name)
# intensity_data = np.array(intensity_data)
# np.savetxt(Settings + f'{angles[0]}°_to_{angles[-1]}°' + experiment_name +'.txt', intensity_data)
# TODO: remove/edit experiment_name in line above, as well in sweep_b_val func, rn takes a global variable below
# wl = np.array(loaded_files.wavelength)
# np.savetxt("Wavelength.txt", wl)
# NOTE: data struct of device_target_values is a list of dictionaries, where each dictionary contains the target values for each device
device_target_values = [{'2301034': bval[0], '2101014': bval[1]} for bval in cartesian_coords]
# call the helper function to carry out the rotation/measurement of spectrum
monitor_devices(device_target_values, angles, intensity_data)
################################################################# END OF FUNCTION DEFS ###########################################################################################
# NOTE: RYAN INTRODUCED SOME FUNCTIONS HERE TO PERFORM THE SCAN
# Initialise PYVISA ResourceManager
rm = pyvisa.ResourceManager()
# print(rm.list_resources())
# 'ASRL8::INSTR' for dual power supply, 'ASRL9::INSTR' for single power supply (online PC)
# 'ASRL10::INSTR' for dual power supply, 'ASRL12::INSTR' for single power supply (offline PC)
try:
# Open the connection with the APS100 dual power supply
powerbox_dualsupply = rm.open_resource('ASRL10::INSTR',
baud_rate=9600,
data_bits=8,
parity= pyvisa.constants.Parity.none,
stop_bits= pyvisa.constants.StopBits.one,
timeout=10000)# 5000 ms timeout
write_no_echo(powerbox_dualsupply, 'REMOTE') # turn on the remote mode
# # select axis for the dual supply, either z-axis(CHAN 1 ^= Supply A) or x-axis(CHAN 2 ^= Supply B)
write_no_echo(powerbox_dualsupply, 'CHAN 2')
# # #for dual until here
# Open the connection with the APS100 single power supply
powerbox_singlesupply = rm.open_resource('ASRL12::INSTR',
baud_rate=9600,
data_bits=8,
parity= pyvisa.constants.Parity.none,
stop_bits= pyvisa.constants.StopBits.one,
timeout=10000)# 5000 ms timeout
write_no_echo(powerbox_singlesupply, 'REMOTE') # turn on the remote mode
#for single until here
# TODO: uncomment AMC connection code later, when moving the probe in cryostat is needed.
# Setup connection to AMC
# amc = AMC.Device(IP)
# amc.connect()
# # Internally, axes are numbered 0 to 2
# amc.control.setControlOutput(0, True)
# amc.control.setControlOutput(1, True)
# auto = Automation(True, List[String]())
# experiment = auto.LightFieldApplication.Experiment
# acquireCompleted = AutoResetEvent(False)
# experiment.Load("2025_03_28_Priyanka_CrSBr_DR_Sweep")
# experiment.ExperimentCompleted += experiment_completed # we are hooking a listener.
# experiment.SetValue(SpectrometerSettings.GratingSelected, '[750nm,1200][0][0]')
# InitializerFilenameParams()
#set scan range and resolution in nanometers
range_x = 20000
range_y = 20000
resolution = 1000
# set B-field scan range and resolution (all in T)
set_llim_bval = -0.3
set_ulim_bval = 0.3
set_res_bval = 0.003
#Here you can specify the filename of the map e.g. put experiment type, exposure time, used filters, etc....
# 'PL_SP_700_LP_700_HeNe_52muW_exp_2s_Start_'
# experiment_settings = 'PL_X_1859.2_Y_3918.3_HeNe_10.4muW_H_a-axis_LP_SP_650_exp_180s_600g_cwl_930_det_b-axis_Pol_90_l2_45'
experiment_settings = 'DR_white_6th spot_Power_G600_exp_25s_l1_40_l2_262_det_b_mag_b'
#The program adds the range of the scan as well as the resolution and the date and time of the measurement
# f"{set_llim_bval}T_to_{set_ulim_bval}T_{set_res_bval}T_{datetime.datetime.now().strftime('%Y_%m_%d_%H%M')}"
experiment_name = f"{set_llim_bval}T_to_{set_ulim_bval}T_stepsize_{set_res_bval}T"
# this moves the probe in xy-direction and measures spectrum there
# move_scan_xy(range_x, range_y, resolution, experiment_settings, experiment_name)
# ramp_b_val(powerbox_singlesupply, 0, 'y-axis')
# ramp_b_val(powerbox_dualsupply, 0, 'z-axis')
# for single/ dual replace and vice versa all the way down
# sweep_b_val(powerbox_singlesupply, set_llim_bval, set_ulim_bval, set_res_bval, 'y-axis',
# experiment_settings, experiment_name, zerowhenfin_bool=True, reversescan_bool=False, loopscan_bool=True)
b_field_rotation(powerbox_dualsupply, powerbox_singlesupply, Babs=0.1, startangle=0, endangle=3,
angle_stepsize=1, Settings=experiment_settings, zerowhenfin_bool=True
)
write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode
write_no_echo(powerbox_singlesupply, 'LOCAL') # turn off the remote mode
time.sleep(0.5)
# powerbox_dualsupply.close()
powerbox_singlesupply.close()
except Exception as e:
print(e)
# Internally, axes are numbered 0 to 2
write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode
write_no_echo(powerbox_singlesupply, 'LOCAL') # turn off the remote mode
time.sleep(0.5)
powerbox_dualsupply.close()
powerbox_singlesupply.close()

531
b_rotation_test_v2.py Normal file
View File

@ -0,0 +1,531 @@
'''
16.04.2025: Initial Test Results for B-field rotation
The logic chain in the function works well, now we need the other function that
scans along an arbitrary axis in plane.
'''
############################################
# Packages from Ryan
import re
import math
import threading
import pyvisa
# from pyvisa import ResourceManager, constants
# B Field Limits (in T)
BX_MAX = 1.7
BY_MAX = 1.7
BZ_MAX = 4.0
############################################
# import AMC
import csv
import time
import clr
import sys
import os
import spe2py as spe
import spe_loader as sl
import pandas as pd
import time
# from System.IO import *
# from System import String
import numpy as np
import matplotlib.pyplot as plt
import datetime
from typing import Union
def sep_num_from_units(powerbox_output :str)->list:
'''
Receives a string as input and separates the numberic value and unit and returns it as a list.
Parameters
----------
powerbox_output : str
string output from the attocube powerbox, e.g. 1.35325kG
Returns
-------
list
list of float value and string (b value and it's units). If string is purely alphabets, then return a single element list
'''
match = re.match(r'\s*([+-]?\d*\.?\d+)([A-Za-z]+)', powerbox_output)
if match:
numeric_part = float(match.group(1)) # Convert the numeric part to a float
alphabetic_part = match.group(2) # Get the alphabetic part
return [numeric_part, alphabetic_part]
else:
return [powerbox_output,]
def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
"""helper function for the Attocube APS100 that queries a function to the device, removing the echo.
Args:
instr (pyvisa.resources.Resource):
command (str): commands, can be stringed in series with ; between commands
sleeptime (float, optional): delay time between commands. Defaults to 0.01.
Returns:
str: _description_
""" ''''''
try:
print(f"Sending command: {command}")
instr.write(command)
time.sleep(sleeptime)
echo_response = instr.read() # Read and discard the echo
# print(f"Echo response: {echo_response}")
actual_response = instr.read() # Read the actual response
print(f"Actual response: {actual_response}")
return actual_response
except pyvisa.VisaIOError as e:
print(f"Error communicating with instrument: {e}")
return None
def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str:
"""helper function for the Attocube APS100 that writes a function to the device, removing the echo.
Args:
instr (pyvisa.resources.Resource):
command (str): commands, can be stringed in series with ; between commands
sleeptime (float, optional): delay time between commands. Defaults to 0.01.
Returns:
str: _description_
""" ''''''
try:
print(f"Sending command: {command}")
instr.write(command)
time.sleep(sleeptime) # Give the device some time to process
try:
while True:
echo_response = instr.read() # Read and discard the echo
# print(f"Echo response: {echo_response}")
except pyvisa.VisaIOError as e:
# Expected timeout after all echoed responses are read
if e.error_code != pyvisa.constants.VI_ERROR_TMO:
raise
except pyvisa.VisaIOError as e:
print(f"Error communicating with instrument: {e}")
def generate_angle_coord_list(radius, start_angle, end_angle, step_size, clockwise=True):
# TODO: DOCS
"""Creates a list of discrete cartesian coordinates (x,y), given the radius, start- and end angles, the angle step size, and the direction of rotation.
Function then returns a list of two lists: list of angles and list of cartesian coordinates (x,y coordinates in a tuple).
Args:
radius (_type_): _description_
start_angle (_type_): _description_
end_angle (_type_): _description_
step_size (_type_): _description_
clockwise (bool, optional): _description_. Defaults to True.
Returns:
_type_: _description_
""" """"""
# Initialize lists to hold angles and (x, y) pairs
angles = []
coordinates = []
# Normalize angles to the range [0, 360)
start_angle = start_angle % 360
end_angle = end_angle % 360
if not clockwise:
# Clockwise rotation
current_angle = start_angle
while True:
# Append the current angle to the angles list
angles.append(current_angle % 360)
# Convert the current angle to radians
current_angle_rad = math.radians(current_angle % 360)
# Convert polar to Cartesian coordinates
x = radius * math.cos(current_angle_rad)
y = radius * math.sin(current_angle_rad)
# Append the (x, y) pair to the list
coordinates.append((x, y))
# Check if we've reached the end_angle (handling wrap-around) (current_angle - step_size) % 360 == end_angle or
if current_angle % 360 == end_angle:
break
# Decrement the current angle by the step size
current_angle -= step_size
if current_angle < 0:
current_angle += 360
else:
# Counterclockwise rotation
current_angle = start_angle
while True:
# Append the current angle to the angles list
angles.append(current_angle % 360)
# Convert the current angle to radians
current_angle_rad = math.radians(current_angle % 360)
# Convert polar to Cartesian coordinates
x = radius * math.cos(current_angle_rad)
y = radius * math.sin(current_angle_rad)
# Append the (x, y) pair to the list
coordinates.append((x, y))
# Check if we've reached the end_angle (handling wrap-around) (current_angle + step_size) % 360 == end_angle or
if current_angle % 360 == end_angle:
break
# Increment the current angle by the step size
current_angle += step_size
if current_angle >= 360:
current_angle -= 360
return [angles, coordinates]
def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Resource,
Babs:float, startangle:float, endangle:float, angle_stepsize:float,
Settings:str, clockwise=True, base_file_name='', zerowhenfin_bool=False)->None:
# TODO: update docs
"""Rotation of the b-field in discrete steps, spectrum is measured at each discrete step in the rotation. Scan angle is
defined as the angle between the x-axis and the current B-field vector, i.e., in the anticlockwise direction.
Args:
instr1 (pyvisa.resources.Resource): _description_
instr2 (pyvisa.resources.Resource): _description_
Babs (float): absolute B-field value in T
startangle (float): start angle in degrees
endangle (float): end angle in degrees
angle_stepsize (float): angle step size in degrees
clockwise (bool): determines the direction of rotation of the B-field. Defaults to True.
zerowhenfin_bool (bool, optional): after finishing the rotation, both B-field components should be set to 0 T. Defaults to False.
"""
# TODO: add logging to the script
# defines the folder, in which the data from the spectrometer is temporarily stored in
temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/B_Field_Dump"
# temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test"
if base_file_name =='':
base_file_name = datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')
start_time = time.time() # start of the scan function
startangle = startangle % 360
endangle = endangle % 360 # ensures that the angles are within [0,360)
idnstr1 = query_no_echo(instr1, '*IDN?')
idnstr2 = query_no_echo(instr2, '*IDN?')
intensity_data = [] # To store data from each scan
cwd = os.getcwd() # save original directory
# find which one is the dual power supply, then, ramp B_x to Babs value
if '2301034' in idnstr1: # serial no. the dual power supply
pass
elif '2101034' in idnstr2:
# swap instruments, instr 1 to be the dual power supply (^= x-axis)
instr1, instr2 = instr2, instr1
# save initial low and high sweep limits of each device, and set them back after the rotation
instr1_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
instr2_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim
if instr1_bsettings[0][0] == 'T':
instr1_bsettings[1][0] = instr1_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
instr1_bsettings[2][0] = instr1_bsettings[2][0]*0.1
if instr2_bsettings[0][0] == 'T':
instr2_bsettings[1][0] = instr2_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T
instr2_bsettings[2][0] = instr2_bsettings[2][0]*0.1
# initialise the sweep angle list as well as the sweep limits and directions for each instrument
instr1_lim, instr2_lim = 'LLIM', 'ULIM'
instr1_sweep, instr2_sweep = 'DOWN', 'UP'
# create lists of angles and discrete Cartesian coordinates
angles, cartesian_coords = generate_angle_coord_list(Babs, startangle, endangle, angle_stepsize, clockwise=clockwise)
if clockwise: # NOTE: old conditional was: startangle > endangle see if this works....
# reverse sweep limits and directions for the clockwise rotation
instr1_lim, instr2_lim = instr2_lim, instr1_lim
instr1_sweep, instr2_sweep = instr2_sweep, instr1_sweep
# TODO: i dont think we need to change the rates just yet, think about this later
'''
# list of rates (with units) for diff ranges of each device, only up to Range 1 for single power supply as that is already
# the max recommended current.
init_range_lst1 = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'RATE? 0;RATE? 1;RATE? 2').split(';'))
init_range_lst2 = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'RATE? 0;RATE? 1').split(';'))
min_range_lst = [min(el1[0], el2[0]) for el1,el2 in zip(init_range_lst1, init_range_lst2)] # min rates for each given range
# set both devices to the min rates
write_no_echo(instr1, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
write_no_echo(instr2, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}')
'''
# TODO: see if this is the desired process: to always start from the x-axis ASK LUKAS
if Babs <= BX_MAX:
# write_no_echo(instr1, f'CHAN 2;ULIM {Babs*10};SWEEP UP') # sets to B_x, the B_x upper limit and sweeps the magnet field to the upper limit
print(f'SWITCHED TO BX, SWEEPING B-X TO {Babs} T NOW')
else:
raise ValueError(f'{Babs=}T value exceeds the max limit of the Bx field {BX_MAX}T!')
# wait for Babs to be reached by the Bx field
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
while abs(actual_bval - Babs) > 0.0001:
time.sleep(5) # little break
actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1
print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T')
actual_bval = Babs # NOTE: ONLY FOR TESTING; REMOVE THIS LINE IN ACTUAL USE
# TODO: copy and mod code to see if block logic works, test in lab
# NOTE: implement PID control, possibly best option to manage the b field DO THIS LATER ON, WE DO DISCRETE B VALUES RN
# Helper function that listens to a device
def listen_to_device(device_id, target_value, shared_values, lock, all_targets_met_event):
while not all_targets_met_event.is_set(): # Loop until the event is set
# value = 0 # Simulate receiving a float from the device INSERT QUERY NO ECHO HERE TO ASK FOR DEVICE IMAG
if '2301034' in device_id:
value = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T
if value <= target_value[device_id]:
# write_no_echo(instr1, f"CHAN 2;ULIM {target_value[device_id]*10};SWEEP UP")
print(f'sweeping Bx up to {target_value[device_id]}T')
else:
# write_no_echo(instr1, "CHAN 2;LLIM {target_value[device_id]*10};SWEEP DOWN")
print(f'sweeping Bx down to {target_value[device_id]}T')
value = target_value['2301034'] # NOTE: ONLY FOR TESTING; REMOVE IN REAL USE
time.sleep(6)
elif '2101014' in device_id:
value = sep_num_from_units(query_no_echo(instr2, 'IMAG?'))[0]*0.1 # convert kG to T
if value <= target_value[device_id]:
# write_no_echo(instr2, f"ULIM {target_value[device_id]*10};SWEEP UP")
print(f'sweeping By up to {target_value[device_id]}T')
else:
# write_no_echo(instr2, "LLIM {target_value[device_id]*10};SWEEP DOWN")
print(f'sweeping By down to {target_value[device_id]}T')
value = target_value['2101014'] # NOTE: ONLY FOR TESTING; REMOVE IN REAL USE
time.sleep(3)
else:
continue # Skip if device ID is not recognized
print(f"Device {device_id} reports value: {value} T")
# time.sleep(2)
with lock:
shared_values[device_id] = value
# Check if both devices have met their targets
if all(shared_values.get(device) is not None and abs(value - target_value[device]) <= 0.0001
for device,value in shared_values.items()):
print(f"Both devices reached their target values: {shared_values}")
all_targets_met_event.set() # Signal that both targets are met
# time.sleep(1) # Simulate periodic data checking
# Main function to manage threads and iterate over target values
def monitor_devices(device_target_values, angles_lst, intensity_data=intensity_data):
for iteration, target in enumerate(device_target_values):
print(f"\nStarting iteration {iteration+1} for target values: {target}")
# Shared dictionary to store values from devices
shared_values = {device: None for device in target.keys()}
# Event to signal when both target values are reached
all_targets_met_event = threading.Event()
# Lock to synchronize access to shared_values
lock = threading.Lock()
# Create and start threads for each device
threads = []
for device_id in target.keys():
thread = threading.Thread(target=listen_to_device, args=(device_id, target, shared_values, lock, all_targets_met_event))
threads.append(thread)
thread.start()
print(f"======================\nThread started for device {device_id}\n======================")
# Wait until both devices meet their target values
all_targets_met_event.wait()
print(f"Both target values for iteration {iteration+1} met. Performing action...")
# Clean up threads
for thread in threads:
thread.join()
print(f"Threads for iteration {iteration+1} closed.\n")
print(f'COLLECTING SPECTRUM FOR ANGLE {angles_lst[iteration]}°\n')
# Perform some action after both targets are met
# we acquire with the LF
# acquire_name_spe = f'{base_file_name}_{angles_lst[iteration]}°' # NOTE: save each intensity file with the given angle
# AcquireAndLock(acquire_name_spe) #this creates a .spe file with the scan name.
# read the .spe file and get the data as loaded_files
# cwd = os.getcwd() # save original directory
# os.chdir(temp_folder_path) #change directory
# loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable
# os.chdir(cwd) # go back to original directory
# Delete the created .spe file from acquiring after getting necessary info
# spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe')
# os.remove(spe_file_path)
points_left = len(angles) - iteration - 1
print('Points left in the scan: ', points_left)
#append the intensity data as it is (so after every #of_wl_points, the spectrum of the next point begins)
# intensity_data.append(loaded_files.data[0][0][0])
#prints total time the mapping lasted
end_time = time.time()
elapsed_time = (end_time - start_time) / 60
print('Scan time: ', elapsed_time, 'minutes')
# reset both devices to original sweep limits
write_no_echo(instr1, f'LLIM {instr1_bsettings[1][0]*10};ULIM {instr1_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
write_no_echo(instr2, f'LLIM {instr2_bsettings[1][0]*10};ULIM {instr2_bsettings[2][0]*10}') # reset the initial limits of the device after the scan
# TODO: uncomment later if resetting original rates implemented
'''
# reset both devices' initial rates for each range
write_no_echo(instr1, f'RANGE 0 {init_range_lst1[0][0]};RANGE 1 {init_range_lst1[1][0]};RANGE 2 {init_range_lst1[2][0]}') # reset the initial limits of the device after the scan
write_no_echo(instr2, f'RANGE 0 {init_range_lst2[0][0]};RANGE 1 {init_range_lst2[1][0]}') # reset the initial limits of the device after the scan
'''
if zerowhenfin_bool:
# write_no_echo(instr1, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above
# write_no_echo(instr2, 'SWEEP ZERO')
print('======================\nSWEEPING BOTH DEVICES TO ZERO NOW\n======================')
#save intensity & WL data as .txt
os.chdir('C:/Users/localadmin/Desktop/Users/Ryan')
# creates new folder for MAP data
# new_folder_name = "Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}"
# os.mkdir(new_folder_name)
# Here the things will be saved in a new folder under user Lukas !
# IMPORTANT last / has to be there, otherwise data cannot be saved and will be lost!!!!!!!!!!!!!!!!
# os.chdir('C:/Users/localadmin/Desktop/Users/Ryan/'+ new_folder_name)
# intensity_data = np.array(intensity_data)
# np.savetxt(Settings + f'{angles[0]}°_to_{angles[-1]}°' + experiment_name +'.txt', intensity_data)
# TODO: remove/edit experiment_name in line above, as well in sweep_b_val func, rn takes a global variable below
# wl = np.array(loaded_files.wavelength)
# np.savetxt("Wavelength.txt", wl)
# NOTE: data struct of device_target_values is a list of dictionaries, where each dictionary contains the target values for each device
device_target_values = [{'2301034': bval[0], '2101014': bval[1]} for bval in cartesian_coords]
# call the helper function to carry out the rotation/measurement of spectrum
monitor_devices(device_target_values, angles, intensity_data)
################################################################# END OF FUNCTION DEFS ###########################################################################################
# NOTE: RYAN INTRODUCED SOME FUNCTIONS HERE TO PERFORM THE SCAN
# Initialise PYVISA ResourceManager
rm = pyvisa.ResourceManager()
# print(rm.list_resources())
# 'ASRL8::INSTR' for dual power supply, 'ASRL9::INSTR' for single power supply (online PC)
# 'ASRL10::INSTR' for dual power supply, 'ASRL12::INSTR' for single power supply (offline PC)
try:
# Open the connection with the APS100 dual power supply
powerbox_dualsupply = rm.open_resource('ASRL10::INSTR',
baud_rate=9600,
data_bits=8,
parity= pyvisa.constants.Parity.none,
stop_bits= pyvisa.constants.StopBits.one,
timeout=10000)# 5000 ms timeout
write_no_echo(powerbox_dualsupply, 'REMOTE') # turn on the remote mode
# # select axis for the dual supply, either z-axis(CHAN 1 ^= Supply A) or x-axis(CHAN 2 ^= Supply B)
write_no_echo(powerbox_dualsupply, 'CHAN 2')
# # #for dual until here
# Open the connection with the APS100 single power supply
powerbox_singlesupply = rm.open_resource('ASRL12::INSTR',
baud_rate=9600,
data_bits=8,
parity= pyvisa.constants.Parity.none,
stop_bits= pyvisa.constants.StopBits.one,
timeout=10000)# 5000 ms timeout
write_no_echo(powerbox_singlesupply, 'REMOTE') # turn on the remote mode
#for single until here
# TODO: uncomment AMC connection code later, when moving the probe in cryostat is needed.
# Setup connection to AMC
# amc = AMC.Device(IP)
# amc.connect()
# # Internally, axes are numbered 0 to 2
# amc.control.setControlOutput(0, True)
# amc.control.setControlOutput(1, True)
# auto = Automation(True, List[String]())
# experiment = auto.LightFieldApplication.Experiment
# acquireCompleted = AutoResetEvent(False)
# experiment.Load("2025_03_28_Priyanka_CrSBr_DR_Sweep")
# experiment.ExperimentCompleted += experiment_completed # we are hooking a listener.
# experiment.SetValue(SpectrometerSettings.GratingSelected, '[750nm,1200][0][0]')
# InitializerFilenameParams()
#set scan range and resolution in nanometers
range_x = 20000
range_y = 20000
resolution = 1000
# set B-field scan range and resolution (all in T)
set_llim_bval = -0.3
set_ulim_bval = 0.3
set_res_bval = 0.003
#Here you can specify the filename of the map e.g. put experiment type, exposure time, used filters, etc....
# 'PL_SP_700_LP_700_HeNe_52muW_exp_2s_Start_'
# experiment_settings = 'PL_X_1859.2_Y_3918.3_HeNe_10.4muW_H_a-axis_LP_SP_650_exp_180s_600g_cwl_930_det_b-axis_Pol_90_l2_45'
experiment_settings = 'DR_white_6th spot_Power_G600_exp_25s_l1_40_l2_262_det_b_mag_b'
#The program adds the range of the scan as well as the resolution and the date and time of the measurement
# f"{set_llim_bval}T_to_{set_ulim_bval}T_{set_res_bval}T_{datetime.datetime.now().strftime('%Y_%m_%d_%H%M')}"
experiment_name = f"{set_llim_bval}T_to_{set_ulim_bval}T_stepsize_{set_res_bval}T"
# this moves the probe in xy-direction and measures spectrum there
# move_scan_xy(range_x, range_y, resolution, experiment_settings, experiment_name)
# ramp_b_val(powerbox_singlesupply, 0, 'y-axis')
# ramp_b_val(powerbox_dualsupply, 0, 'z-axis')
# for single/ dual replace and vice versa all the way down
# sweep_b_val(powerbox_singlesupply, set_llim_bval, set_ulim_bval, set_res_bval, 'y-axis',
# experiment_settings, experiment_name, zerowhenfin_bool=True, reversescan_bool=False, loopscan_bool=True)
b_field_rotation(powerbox_dualsupply, powerbox_singlesupply, Babs=0.1, startangle=0, endangle=3,
angle_stepsize=1, Settings=experiment_settings, zerowhenfin_bool=True
)
write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode
write_no_echo(powerbox_singlesupply, 'LOCAL') # turn off the remote mode
time.sleep(0.5)
# powerbox_dualsupply.close()
powerbox_singlesupply.close()
except Exception as e:
print(e)
# Internally, axes are numbered 0 to 2
write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode
write_no_echo(powerbox_singlesupply, 'LOCAL') # turn off the remote mode
time.sleep(0.5)
powerbox_dualsupply.close()
powerbox_singlesupply.close()

View File

@ -0,0 +1 @@

Binary file not shown.

View File

@ -0,0 +1,2 @@
def test_func():
print('hello world! from test func')