From e746a664bdf4f19933490bef3937fa460775d45b Mon Sep 17 00:00:00 2001 From: ryantan Date: Wed, 16 Apr 2025 12:56:44 +0200 Subject: [PATCH] begun in writing test script for b_rotation function --- Mag_Field_Sweep_2025_04_15.py | 32 ++- b_rotation_test.py | 413 ++++++++++++++++++++++++++++++++++ 2 files changed, 434 insertions(+), 11 deletions(-) create mode 100644 b_rotation_test.py diff --git a/Mag_Field_Sweep_2025_04_15.py b/Mag_Field_Sweep_2025_04_15.py index bf40316..2ddfe69 100644 --- a/Mag_Field_Sweep_2025_04_15.py +++ b/Mag_Field_Sweep_2025_04_15.py @@ -506,8 +506,6 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float, # b = np.arange(max_bval, min_bval - res, res) # bval_lst = np.concatenate((a,b)) - # TODO: unused, see if can remove - # init_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1 # queries the initial B value of the coil, rescale from kG to T init_lim, subsequent_lim = 'LLIM', 'ULIM' init_sweep, subsequent_sweep = 'DOWN', 'UP' @@ -610,7 +608,7 @@ def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float, spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe') os.remove(spe_file_path) - points_left = total_points - i - 1 # TODO: SEE IF THIS IS CORRECT + points_left = total_points - i - 1 print('Points left in the scan: ', points_left) #append the intensity data as it is (so after every #of_wl_points, the spectrum of the next point begins) @@ -687,7 +685,7 @@ def generate_coord_list_fixed_angle(angle, b_val, b_val_step_size, reverse=False return coordinates ''' -def generate_coord_list_fixed_angle(angle, b_min, b_max, b_val_step_size, reverse=False): +def generate_coord_list_fixed_angle(angle, b_min, b_max, b_val_step_size, reverse=False)->list[tuple]: """ Generates a list of (x, y) Cartesian coordinates along a line defined by a fixed angle, scanning from b_min to b_max or from b_max to b_min depending on the reverse flag. @@ -790,7 +788,8 @@ def generate_angle_coord_list(radius, start_angle, end_angle, step_size, clockwi def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Resource, - Babs:float, startangle:float, endangle:float, angle_stepsize:float, Settings:str, clockwise=True, base_file_name='', zerowhenfin_bool=False)->None: + Babs:float, startangle:float, endangle:float, angle_stepsize:float, + Settings:str, clockwise=True, base_file_name='', zerowhenfin_bool=False)->None: # TODO: update docs """Rotation of the b-field in discrete steps, spectrum is measured at each discrete step in the rotation. Scan angle is defined as the angle between the x-axis and the current B-field vector, i.e., in the anticlockwise direction. @@ -806,10 +805,7 @@ def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.R zerowhenfin_bool (bool, optional): after finishing the rotation, both B-field components should be set to 0 T. Defaults to False. """ - # TODO: possibly rename instr1 and instr2 to the dual and single power supplies respectively?? # TODO: add logging to the script - # TODO: add check if Babs is within the limits of the power supply, and if not, raise an error - # defines the folder, in which the data from the spectrometer is temporarily stored in temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/B_Field_Dump" @@ -858,6 +854,9 @@ def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.R instr1_lim, instr2_lim = instr2_lim, instr1_lim instr1_sweep, instr2_sweep = instr2_sweep, instr1_sweep + + # TODO: i dont think we need to change the rates just yet, think about this later + ''' # list of rates (with units) for diff ranges of each device, only up to Range 1 for single power supply as that is already # the max recommended current. init_range_lst1 = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'RATE? 0;RATE? 1;RATE? 2').split(';')) @@ -868,6 +867,8 @@ def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.R # set both devices to the min rates write_no_echo(instr1, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}') write_no_echo(instr2, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}') + ''' + # TODO: see if this is the desired process: to always start from the x-axis ASK LUKAS if Babs <= BX_MAX: @@ -883,7 +884,10 @@ def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.R time.sleep(5) # little break actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T') + + + # TODO: copy and mod code to see if block logic works, test in lab # NOTE: implement PID control, possibly best option to manage the b field DO THIS LATER ON, WE DO DISCRETE B VALUES RN # Helper function that listens to a device def listen_to_device(device_id, target_value, shared_values, lock, all_targets_met_event): @@ -958,8 +962,8 @@ def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.R spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe') os.remove(spe_file_path) - # points_left = total_points - i - 1 # TODO: SEE IF THIS IS CORRECT - # print('Points left in the scan: ', points_left) + points_left = len(angles) - iteration - 1 + print('Points left in the scan: ', points_left) #append the intensity data as it is (so after every #of_wl_points, the spectrum of the next point begins) intensity_data.append(loaded_files.data[0][0][0]) @@ -972,10 +976,16 @@ def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.R # reset both devices to original sweep limits write_no_echo(instr1, f'LLIM {instr1_bsettings[1][0]*10};ULIM {instr1_bsettings[2][0]*10}') # reset the initial limits of the device after the scan write_no_echo(instr2, f'LLIM {instr2_bsettings[1][0]*10};ULIM {instr2_bsettings[2][0]*10}') # reset the initial limits of the device after the scan + + + # TODO: uncomment later if resetting original rates implemented + ''' # reset both devices' initial rates for each range write_no_echo(instr1, f'RANGE 0 {init_range_lst1[0][0]};RANGE 1 {init_range_lst1[1][0]};RANGE 2 {init_range_lst1[2][0]}') # reset the initial limits of the device after the scan write_no_echo(instr2, f'RANGE 0 {init_range_lst2[0][0]};RANGE 1 {init_range_lst2[1][0]}') # reset the initial limits of the device after the scan - + ''' + + if zerowhenfin_bool: write_no_echo(instr1, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above write_no_echo(instr2, 'SWEEP ZERO') diff --git a/b_rotation_test.py b/b_rotation_test.py new file mode 100644 index 0000000..2c28750 --- /dev/null +++ b/b_rotation_test.py @@ -0,0 +1,413 @@ +############################################ +# Packages from Ryan +import re +import math +import threading +import pyvisa +# from pyvisa import ResourceManager, constants + +# B Field Limits (in T) +BX_MAX = 1.7 +BY_MAX = 1.7 +BZ_MAX = 4.0 +############################################ + +# import AMC +import csv +import time +import clr +import sys +import os +import spe2py as spe +import spe_loader as sl +import pandas as pd +import time +# from System.IO import * +# from System import String +import numpy as np +import matplotlib.pyplot as plt +import datetime +from typing import Union + +def sep_num_from_units(powerbox_output :str)->list: + ''' + Receives a string as input and separates the numberic value and unit and returns it as a list. + + Parameters + ---------- + powerbox_output : str + string output from the attocube powerbox, e.g. 1.35325kG + + Returns + ------- + list + list of float value and string (b value and it's units). If string is purely alphabets, then return a single element list + + ''' + match = re.match(r'\s*([+-]?\d*\.?\d+)([A-Za-z]+)', powerbox_output) + if match: + numeric_part = float(match.group(1)) # Convert the numeric part to a float + alphabetic_part = match.group(2) # Get the alphabetic part + return [numeric_part, alphabetic_part] + else: + return [powerbox_output,] + + +def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str: + """helper function for the Attocube APS100 that queries a function to the device, removing the echo. + + Args: + instr (pyvisa.resources.Resource): + command (str): commands, can be stringed in series with ; between commands + sleeptime (float, optional): delay time between commands. Defaults to 0.01. + + Returns: + str: _description_ + """ '''''' + try: + print(f"Sending command: {command}") + instr.write(command) + time.sleep(sleeptime) + echo_response = instr.read() # Read and discard the echo + # print(f"Echo response: {echo_response}") + actual_response = instr.read() # Read the actual response + print(f"Actual response: {actual_response}") + return actual_response + except pyvisa.VisaIOError as e: + print(f"Error communicating with instrument: {e}") + return None + + +def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0)->str: + """helper function for the Attocube APS100 that writes a function to the device, removing the echo. + + Args: + instr (pyvisa.resources.Resource): + command (str): commands, can be stringed in series with ; between commands + sleeptime (float, optional): delay time between commands. Defaults to 0.01. + + Returns: + str: _description_ + """ '''''' + try: + print(f"Sending command: {command}") + instr.write(command) + time.sleep(sleeptime) # Give the device some time to process + try: + while True: + echo_response = instr.read() # Read and discard the echo + # print(f"Echo response: {echo_response}") + except pyvisa.VisaIOError as e: + # Expected timeout after all echoed responses are read + if e.error_code != pyvisa.constants.VI_ERROR_TMO: + raise + except pyvisa.VisaIOError as e: + print(f"Error communicating with instrument: {e}") + + +def generate_angle_coord_list(radius, start_angle, end_angle, step_size, clockwise=True): + # TODO: DOCS + """Creates a list of discrete cartesian coordinates (x,y), given the radius, start- and end angles, the angle step size, and the direction of rotation. + Function then returns a list of two lists: list of angles and list of cartesian coordinates (x,y coordinates in a tuple). + + Args: + radius (_type_): _description_ + start_angle (_type_): _description_ + end_angle (_type_): _description_ + step_size (_type_): _description_ + clockwise (bool, optional): _description_. Defaults to True. + + Returns: + _type_: _description_ + """ """""" + # Initialize lists to hold angles and (x, y) pairs + angles = [] + coordinates = [] + + # Normalize angles to the range [0, 360) + start_angle = start_angle % 360 + end_angle = end_angle % 360 + + if not clockwise: + # Clockwise rotation + current_angle = start_angle + while True: + # Append the current angle to the angles list + angles.append(current_angle % 360) + + # Convert the current angle to radians + current_angle_rad = math.radians(current_angle % 360) + + # Convert polar to Cartesian coordinates + x = radius * math.cos(current_angle_rad) + y = radius * math.sin(current_angle_rad) + + # Append the (x, y) pair to the list + coordinates.append((x, y)) + + # Check if we've reached the end_angle (handling wrap-around) (current_angle - step_size) % 360 == end_angle or + if current_angle % 360 == end_angle: + break + + # Decrement the current angle by the step size + current_angle -= step_size + if current_angle < 0: + current_angle += 360 + else: + # Counterclockwise rotation + current_angle = start_angle + while True: + # Append the current angle to the angles list + angles.append(current_angle % 360) + + # Convert the current angle to radians + current_angle_rad = math.radians(current_angle % 360) + + # Convert polar to Cartesian coordinates + x = radius * math.cos(current_angle_rad) + y = radius * math.sin(current_angle_rad) + + # Append the (x, y) pair to the list + coordinates.append((x, y)) + + # Check if we've reached the end_angle (handling wrap-around) (current_angle + step_size) % 360 == end_angle or + if current_angle % 360 == end_angle: + break + + # Increment the current angle by the step size + current_angle += step_size + if current_angle >= 360: + current_angle -= 360 + + return [angles, coordinates] + +def b_field_rotation(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Resource, + Babs:float, startangle:float, endangle:float, angle_stepsize:float, + Settings:str, clockwise=True, base_file_name='', zerowhenfin_bool=False)->None: + # TODO: update docs + """Rotation of the b-field in discrete steps, spectrum is measured at each discrete step in the rotation. Scan angle is + defined as the angle between the x-axis and the current B-field vector, i.e., in the anticlockwise direction. + + Args: + instr1 (pyvisa.resources.Resource): _description_ + instr2 (pyvisa.resources.Resource): _description_ + Babs (float): absolute B-field value in T + startangle (float): start angle in degrees + endangle (float): end angle in degrees + angle_stepsize (float): angle step size in degrees + clockwise (bool): determines the direction of rotation of the B-field. Defaults to True. + zerowhenfin_bool (bool, optional): after finishing the rotation, both B-field components should be set to 0 T. Defaults to False. + """ + + # TODO: add logging to the script + + # defines the folder, in which the data from the spectrometer is temporarily stored in + temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/B_Field_Dump" + # temp_folder_path = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test" + + if base_file_name =='': + base_file_name = datetime.datetime.now().strftime('%Y_%m_%d_%H.%M') + + start_time = time.time() # start of the scan function + + startangle = startangle % 360 + endangle = endangle % 360 # ensures that the angles are within [0,360) + + idnstr1 = query_no_echo(instr1, '*IDN?') + idnstr2 = query_no_echo(instr2, '*IDN?') + + intensity_data = [] # To store data from each scan + cwd = os.getcwd() # save original directory + + # find which one is the dual power supply, then, ramp B_x to Babs value + if '2301034' in idnstr1: # serial no. the dual power supply + pass + elif '2101034' in idnstr2: + # swap instruments, instr 1 to be the dual power supply (^= x-axis) + instr1, instr2 = instr2, instr1 + + # save initial low and high sweep limits of each device, and set them back after the rotation + instr1_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim + instr2_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim + if instr1_bsettings[0][0] == 'T': + instr1_bsettings[1][0] = instr1_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T + instr1_bsettings[2][0] = instr1_bsettings[2][0]*0.1 + if instr2_bsettings[0][0] == 'T': + instr2_bsettings[1][0] = instr2_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T + instr2_bsettings[2][0] = instr2_bsettings[2][0]*0.1 + + # initialise the sweep angle list as well as the sweep limits and directions for each instrument + instr1_lim, instr2_lim = 'LLIM', 'ULIM' + instr1_sweep, instr2_sweep = 'DOWN', 'UP' + + # create lists of angles and discrete Cartesian coordinates + angles, cartesian_coords = generate_angle_coord_list(Babs, startangle, endangle, angle_stepsize, clockwise=clockwise) + + if clockwise: # NOTE: old conditional was: startangle > endangle see if this works.... + # reverse sweep limits and directions for the clockwise rotation + instr1_lim, instr2_lim = instr2_lim, instr1_lim + instr1_sweep, instr2_sweep = instr2_sweep, instr1_sweep + + + # TODO: i dont think we need to change the rates just yet, think about this later + ''' + # list of rates (with units) for diff ranges of each device, only up to Range 1 for single power supply as that is already + # the max recommended current. + init_range_lst1 = list(sep_num_from_units(el) for el in query_no_echo(instr1, 'RATE? 0;RATE? 1;RATE? 2').split(';')) + init_range_lst2 = list(sep_num_from_units(el) for el in query_no_echo(instr2, 'RATE? 0;RATE? 1').split(';')) + + min_range_lst = [min(el1[0], el2[0]) for el1,el2 in zip(init_range_lst1, init_range_lst2)] # min rates for each given range + + # set both devices to the min rates + write_no_echo(instr1, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}') + write_no_echo(instr2, f'RATE 0 {min_range_lst[0]};RATE 1 {min_range_lst[1]}') + ''' + + + # TODO: see if this is the desired process: to always start from the x-axis ASK LUKAS + if Babs <= BX_MAX: + # write_no_echo(instr1, f'CHAN 2;ULIM {Babs*10};SWEEP UP') # sets to B_x, the B_x upper limit and sweeps the magnet field to the upper limit + print(f'SWEEPING B-X TO {Babs} T NOW') + else: + raise ValueError(f'{Babs=}T value exceeds the max limit of the Bx field {BX_MAX}T!') + + # wait for Babs to be reached by the Bx field + actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T + print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T') + while abs(actual_bval - Babs) > 0.0001: + time.sleep(5) # little break + actual_bval = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 + print(f'Actual magnet strength (Bx): {actual_bval} T,', f'Target magnet strength: {Babs} T') + + + + # TODO: copy and mod code to see if block logic works, test in lab + # NOTE: implement PID control, possibly best option to manage the b field DO THIS LATER ON, WE DO DISCRETE B VALUES RN + # Helper function that listens to a device + def listen_to_device(device_id, target_value, shared_values, lock, all_targets_met_event): + while not all_targets_met_event.is_set(): # Loop until the event is set + # value = 0 # Simulate receiving a float from the device INSERT QUERY NO ECHO HERE TO ASK FOR DEVICE IMAG + if '2301034' in device_id: + value = sep_num_from_units(query_no_echo(instr1, 'IMAG?'))[0]*0.1 # convert kG to T + if value <= target_value[device_id]: + # write_no_echo(instr1, f"CHAN 2;ULIM {target_value[device_id]*10};SWEEP UP") + print(f'sweeping Bx up to {target_value[device_id]*10}') + else: + # write_no_echo(instr1, "CHAN 2;LLIM {target_value[device_id]*10};SWEEP DOWN") + print(f'sweeping Bx down to {target_value[device_id]*10}') + + elif '2101014' in device_id: + value = sep_num_from_units(query_no_echo(instr2, 'IMAG?'))[0]*0.1 # convert kG to T + if value <= target_value[device_id]: + # write_no_echo(instr2, f"ULIM {target_value[device_id]*10};SWEEP UP") + print(f'sweeping By up to {target_value[device_id]*10}') + else: + # write_no_echo(instr2, "LLIM {target_value[device_id]*10};SWEEP DOWN") + print(f'sweeping By down to {target_value[device_id]*10}') + else: + continue # Skip if device ID is not recognized + print(f"Device {device_id} reports value: {value} T") + + with lock: + shared_values[device_id] = value + # Check if both devices have met their targets + if all(shared_values.get(device) is not None and abs(value - target_value[device]) <= 0.0001 + for device,value in shared_values.items()): + print(f"Both devices reached their target values: {shared_values}") + all_targets_met_event.set() # Signal that both targets are met + + # time.sleep(1) # Simulate periodic data checking + + # Main function to manage threads and iterate over target values + def monitor_devices(device_target_values, angles_lst, intensity_data=intensity_data): + for iteration, target in enumerate(device_target_values): + print(f"\nStarting iteration {iteration+1} for target values: {target}") + # Shared dictionary to store values from devices + shared_values = {device: None for device in target.keys()} + # Event to signal when both target values are reached + all_targets_met_event = threading.Event() + + # Lock to synchronize access to shared_values + lock = threading.Lock() + + # Create and start threads for each device + threads = [] + for device_id in target.keys(): + thread = threading.Thread(target=listen_to_device, args=(device_id, target, shared_values, lock, all_targets_met_event)) + threads.append(thread) + thread.start() + + # Wait until both devices meet their target values + all_targets_met_event.wait() + print(f"Both target values for iteration {iteration+1} met. Performing action...") + # Clean up threads + for thread in threads: + thread.join() + print(f"Threads for iteration {iteration+1} closed.\n") + + # Perform some action after both targets are met + # we acquire with the LF + acquire_name_spe = f'{base_file_name}_{angles_lst[iteration]}°' # NOTE: save each intensity file with the given angle + AcquireAndLock(acquire_name_spe) #this creates a .spe file with the scan name. + + # read the .spe file and get the data as loaded_files + cwd = os.getcwd() # save original directory + os.chdir(temp_folder_path) #change directory + loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable + os.chdir(cwd) # go back to original directory + + # Delete the created .spe file from acquiring after getting necessary info + spe_file_path = os.path.join(temp_folder_path, acquire_name_spe + '.spe') + os.remove(spe_file_path) + + points_left = len(angles) - iteration - 1 + print('Points left in the scan: ', points_left) + + #append the intensity data as it is (so after every #of_wl_points, the spectrum of the next point begins) + intensity_data.append(loaded_files.data[0][0][0]) + + #prints total time the mapping lasted + end_time = time.time() + elapsed_time = (end_time - start_time) / 60 + print('Scan time: ', elapsed_time, 'minutes') + + # reset both devices to original sweep limits + write_no_echo(instr1, f'LLIM {instr1_bsettings[1][0]*10};ULIM {instr1_bsettings[2][0]*10}') # reset the initial limits of the device after the scan + write_no_echo(instr2, f'LLIM {instr2_bsettings[1][0]*10};ULIM {instr2_bsettings[2][0]*10}') # reset the initial limits of the device after the scan + + + # TODO: uncomment later if resetting original rates implemented + ''' + # reset both devices' initial rates for each range + write_no_echo(instr1, f'RANGE 0 {init_range_lst1[0][0]};RANGE 1 {init_range_lst1[1][0]};RANGE 2 {init_range_lst1[2][0]}') # reset the initial limits of the device after the scan + write_no_echo(instr2, f'RANGE 0 {init_range_lst2[0][0]};RANGE 1 {init_range_lst2[1][0]}') # reset the initial limits of the device after the scan + ''' + + + if zerowhenfin_bool: + # write_no_echo(instr1, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above + # write_no_echo(instr2, 'SWEEP ZERO') + print('======================\nSWEEPING BOTH DEVICES TO ZERO NOW\n======================') + + #save intensity & WL data as .txt + os.chdir('C:/Users/localadmin/Desktop/Users/Ryan') + # creates new folder for MAP data + new_folder_name = "Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}" + os.mkdir(new_folder_name) + # Here the things will be saved in a new folder under user Lukas ! + # IMPORTANT last / has to be there, otherwise data cannot be saved and will be lost!!!!!!!!!!!!!!!! + os.chdir('C:/Users/localadmin/Desktop/Users/Ryan/'+ new_folder_name) + + intensity_data = np.array(intensity_data) + np.savetxt(Settings + f'{angles[0]}°_to_{angles[-1]}°' + experiment_name +'.txt', intensity_data) + # TODO: remove/edit experiment_name in line above, as well in sweep_b_val func, rn takes a global variable below + + wl = np.array(loaded_files.wavelength) + np.savetxt("Wavelength.txt", wl) + + # NOTE: data struct of device_target_values is a list of dictionaries, where each dictionary contains the target values for each device + device_target_values = [{'2301034': bval[0], '2101014': bval[1]} for bval in cartesian_coords] + + # call the helper function to carry out the rotation/measurement of spectrum + monitor_devices(device_target_values, angles, intensity_data) + \ No newline at end of file