From c090976d99f6f747f2b194b45d68f1c844d80248 Mon Sep 17 00:00:00 2001 From: rtan Date: Fri, 12 Jul 2024 16:55:20 +0200 Subject: [PATCH] first commit --- 20240709SerdarModScript.py | 570 +++++++++++++++++++++++++++++++++++++ 1 file changed, 570 insertions(+) create mode 100644 20240709SerdarModScript.py diff --git a/20240709SerdarModScript.py b/20240709SerdarModScript.py new file mode 100644 index 0000000..d5cbe09 --- /dev/null +++ b/20240709SerdarModScript.py @@ -0,0 +1,570 @@ +# -*- coding: utf-8 -*- +""" +Created on Fri Dec 22 15:10:10 2023 +Lightfield + Positioner +@author: Serdar, adjusted by Lukas +""" +############################################ +# Packages from Ryan +import re +import pyvisa +# from pyvisa import ResourceManager, constants + +# B Field Limits (in T) +BX_MAX = 1.7 +BY_MAX = 1.7 +BZ_MAX = 4.0 +############################################ + +import AMC +import csv +import time +import clr +import sys +import os +import spe2py as spe +import spe_loader as sl +import pandas as pd +import time +from System.IO import * +from System import String +import numpy as np +import matplotlib.pyplot as plt +import datetime + + +#First choose your controller +IP_AMC300 = "192.168.1.1" +IP_AMC100 = "192.168.71.100" + +# IP = "192.168.1.1" +IP = IP_AMC100 + + +# Import os module +import os, glob, string + +# Import System.IO for saving and opening files +from System.IO import * + +from System.Threading import AutoResetEvent + +# Import C compatible List and String +from System import String +from System.Collections.Generic import List + +# Add needed dll references +sys.path.append(os.environ['LIGHTFIELD_ROOT']) +sys.path.append(os.environ['LIGHTFIELD_ROOT']+"\\AddInViews") +sys.path.append(r'C:\Program Files\Princeton Instruments\LightField\AddInViews') #I added them by hand -serdar +sys.path.append(r'C:\Program Files\Princeton Instruments\LightField') #this one also +clr.AddReference('PrincetonInstruments.LightFieldViewV5') +clr.AddReference('PrincetonInstruments.LightField.AutomationV5') +clr.AddReference('PrincetonInstruments.LightFieldAddInSupportServices') +os.environ['LIGHTFIELD_ROOT'] = r'C:\Program Files\Princeton Instruments\LightField' +# PI imports +from PrincetonInstruments.LightField.Automation import Automation +from PrincetonInstruments.LightField.AddIns import ExperimentSettings +from PrincetonInstruments.LightField.AddIns import CameraSettings +#from PrincetonInstruments.LightField.AddIns import DeviceType +from PrincetonInstruments.LightField.AddIns import SpectrometerSettings +from PrincetonInstruments.LightField.AddIns import RegionOfInterest + +######################################################################################################### code begins from here ############################################# + +def set_custom_ROI(): + + # Get device full dimensions + dimensions = experiment.FullSensorRegion() + + regions = [] + + # Add two ROI to regions + regions.append( + RegionOfInterest( + int(dimensions.X), int(dimensions.Y), + int(dimensions.Width), int(dimensions.Height//4), # Use // for integer division + int(dimensions.XBinning), int(dimensions.Height//4))) + + + + # Set both ROI + experiment.SetCustomRegions(regions) + +def experiment_completed(sender, event_args): #callback function which is hooked to event completed, this is the listener + print("... Acquisition Complete!") + acquireCompleted.Set() #set the event. This puts the autoresetevent false.(look at .NET library for furher info) + +def InitializerFilenameParams(): + experiment.SetValue(ExperimentSettings.FileNameGenerationAttachIncrement, False) + experiment.SetValue(ExperimentSettings.FileNameGenerationIncrementNumber, 1.0) + experiment.SetValue(ExperimentSettings.FileNameGenerationIncrementMinimumDigits, 2.0) + experiment.SetValue(ExperimentSettings.FileNameGenerationAttachDate, False) + experiment.SetValue(ExperimentSettings.FileNameGenerationAttachTime, False) + +def AcquireAndLock(name): + print("Acquiring...", end = "") + # name += 'Exp{0:06.2f}ms.CWL{1:07.2f}nm'.format(\ + # experiment.GetValue(CameraSettings.ShutterTimingExposureTime)\ + # ,experiment.GetValue(SpectrometerSettings.GratingCenterWavelength)) + + experiment.SetValue(ExperimentSettings.FileNameGenerationBaseFileName, name) #this creates .spe file with the name + experiment.Acquire() # this is an ashynrchronus func. + acquireCompleted.WaitOne() + +def calculate_distance(x1, y1, x2, y2): + return np.sqrt((x2 - x1)**2 + (y2 - y1)**2) + +def generate_scan_positions(center, range_val, resolution): + positive_range = np.arange(center, center + range_val + resolution, resolution) + return positive_range + +def save_as_csv(filename, position_x, position_y): + file_existance = os.path.isfile(filename) + + with open(filename, 'a', newline = '') as csvfile: + writer = csv.writer(csvfile) + + if not file_existance: + writer.writerow(['x_coordinates','y_coordinates']) + + writer.writerow([position_x, position_y]) + +def move_axis(axis, target): + """ + This function moves an axis to the specified target and stop moving after it is in the really closed + vicinity (+- 25nm) of the target (listener hooked to it). + """ + amc.move.setControlTargetPosition(axis, target) + amc.control.setControlMove(axis, True) + while not (target - 25) < amc.move.getPosition(axis) < (target + 25): + time.sleep(0.1) + time.sleep(0.15) + while not (target - 25) < amc.move.getPosition(axis) < (target + 25): + time.sleep(0.1) + amc.control.setControlMove(axis, False) + +def move_xy(target_x, target_y): # moving in x and y direction closed to desired position + amc.move.setControlTargetPosition(0, target_x) + amc.control.setControlMove(0, True) + amc.move.setControlTargetPosition(1, target_y) + amc.control.setControlMove(1, True) + while not (target_x - 25) < amc.move.getPosition(0) < (target_x + 25) and (target_y - 25) < amc.move.getPosition(1) < (target_y + 25): + time.sleep(0.1) + time.sleep(0.15) + while not (target_x - 25) < amc.move.getPosition(0) < (target_x + 25) and (target_y - 25) < amc.move.getPosition(1) < (target_y + 25): + time.sleep(0.1) + + amc.control.setControlOutput(0, False) + amc.control.setControlOutput(1, False) + + +# intensity_data = [] # To store data from each scan +# data_list = [] + +def move_scan_xy(range_x, range_y, resolution, Settings, baseFileName): + """ + This function moves the positioners to scan the sample with desired ranges and resolution in 2 dimensions. + At the end it saves a csv file + + Parameters + ---------- + range_x : integer in nm. max value is 5um + Scan range in x direction. + range_y : integer in nm. max value is 5um + Scan range in y direction. + resolution : integer in nm. + Room temprature max res is 50nm. In cyrostat (4K) it is 10nm (check the Attocube manual) + baseFileName: string. At the end the saved file will be: baseFileName_scan_data.csv and it will be saved to the current directory + + Returns + ------- + None. + + """ + start_time = time.time() + axis_x = 0 #first axis + axis_y = 1 #second axis + center_x = amc.move.getPosition(axis_x) + center_y = amc.move.getPosition(axis_y) + # #check if the intput range is reasonable + # if amc.move.getPosition(axis_x) + range_x >= 5000 or amc.move.getPosition(axis_x)- range_x <= 0 or amc.move.getPosition(axis_y) + range_y >=5000 or amc.move.getPosition(axis_y) - range_y <= 5000 : + # print("scan range is out of range!") + # return + # +- range from current positions for x and y directions + + + array_x = generate_scan_positions(center_x, range_x, resolution) + array_y = generate_scan_positions(center_y, range_y, resolution) + total_points = len(array_x)*len(array_y) + len_y = len(array_y) + intensity_data = [] # To store data from each scan + data_list = [] + cwd = os.getcwd() # save original directory + + #This gives a directory, in which the script will save the spectrum of each spot as spe + #However, it will open the spectrum, convert it to txt, add it to the intensity_data and delete the spe file + Path_save = "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test" + + #scanning loop + for i, x_positions in enumerate(array_x): + move_axis(axis_x, x_positions) + y = False + for j, y_positions in enumerate(array_y): + move_axis(axis_y, y_positions) + #each time when the positioner comes to the beggining of a new line + #this if will make the positioner wait a bit longer to really go to the target. + if y == False: + move_axis(axis_y, y_positions) + y = True + + + + #we acquire with the LF + acquire_name_spe = f'{baseFileName}_X{x_positions}_Y{y_positions}' + AcquireAndLock(acquire_name_spe) #this creates a .spe file with the scan name. + + #read the .spe file and get the data as loaded_files + cwd = os.getcwd() # save original directory + os.chdir(Path_save) #change directory + loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable + os.chdir(cwd) # go back to original directory + + # Delete the created .spe file from acquiring after getting necessary info + spe_file_path = os.path.join(Path_save, acquire_name_spe + '.spe') + os.remove(spe_file_path) + + distance = calculate_distance(x_positions, y_positions,amc.move.getPosition(axis_x), amc.move.getPosition(axis_y)) + + points_left = total_points - (i * len_y + (j+1)) + 1 + print('Points left in the scan: ', points_left) + + #append the intensity data as it is (so after every #of_wl_points, the spectrum of the next point begins) + intensity_data.append(loaded_files.data[0][0][0]) + + data_list.append({ + 'position_x': x_positions, + 'position_y': y_positions, + 'actual_x': amc.move.getPosition(axis_x), + 'actual_y': amc.move.getPosition(axis_y), + 'distance': distance, + }) + + #moves back to starting position + move_axis(axis_x, center_x) + move_axis(axis_y, center_y) + + #prints total time the mapping lasted + end_time = time.time() + elapsed_time = (end_time - start_time) / 60 + print('Scan time: ', elapsed_time, 'minutes') + + # df = pd.DataFrame(data_list) + + #save intensity & WL data as .txt + os.chdir('C:/Users/localadmin/Desktop/Users/Lukas') + # creates new folder for MAP data + new_folder_name = "Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}" + os.mkdir(new_folder_name) + # Here the things will be saved in a new folder under user Lukas ! + # IMPORTANT last / has to be there, otherwise data cannot be saved and will be lost!!!!!!!!!!!!!!!! + os.chdir('C:/Users/localadmin/Desktop/Users/Lukas/'+ new_folder_name) + + intensity_data = np.array(intensity_data) + np.savetxt(Settings + str(center_x) + '_' + str(center_y) + experiment_name +'.txt', intensity_data) + + wl = np.array(loaded_files.wavelength) + np.savetxt("Wavelength.txt", wl) + +################################################################# RYAN'S FUNCTIONS HERE ########################################################################################## + +def sep_num_from_units(powerbox_output :str)->list: + ''' + Receives a string as input and separates the numberic value and unit and returns it as a list. + + Parameters + ---------- + powerbox_output : str + string output from the attocube powerbox, e.g. 1.35325kG + + Returns + ------- + list + list of float value and string (b value and it's units). If string is purely alphabets, then return a single element list + + ''' + match = re.match(r'\s*([+-]?\d*\.?\d+)([A-Za-z]+)', powerbox_output) + if match: + numeric_part = float(match.group(1)) # Convert the numeric part to a float + alphabetic_part = match.group(2) # Get the alphabetic part + return [numeric_part, alphabetic_part] + else: + return [powerbox_output,] + +def query_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0.01)->str: + """helper function for the Attocube APS100 that queries a function to the device, removing the echo. + + Args: + instr (pyvisa.resources.Resource): + command (str): commands, can be stringed in series with ; between commands + sleeptime (float, optional): delay time between commands. Defaults to 0.01. + + Returns: + str: _description_ + """ '''''' + try: + print(f"Sending command: {command}") + instr.write(command) + time.sleep(sleeptime) + echo_response = instr.read() # Read and discard the echo + # print(f"Echo response: {echo_response}") + actual_response = instr.read() # Read the actual response + print(f"Actual response: {actual_response}") + return actual_response + except pyvisa.VisaIOError as e: + print(f"Error communicating with instrument: {e}") + return None + +def write_no_echo(instr:pyvisa.resources.Resource, command:str, sleeptime=0.01)->str: + """helper function for the Attocube APS100 that writes a function to the device, removing the echo. + + Args: + instr (pyvisa.resources.Resource): + command (str): commands, can be stringed in series with ; between commands + sleeptime (float, optional): delay time between commands. Defaults to 0.01. + + Returns: + str: _description_ + """ '''''' + try: + print(f"Sending command: {command}") + instr.write(command) + time.sleep(sleeptime) # Give the device some time to process + try: + while True: + echo_response = instr.read() # Read and discard the echo + # print(f"Echo response: {echo_response}") + except pyvisa.VisaIOError as e: + # Expected timeout after all echoed responses are read + if e.error_code != pyvisa.constants.VI_ERROR_TMO: + raise + except pyvisa.VisaIOError as e: + print(f"Error communicating with instrument: {e}") + +# TODO: implement the reverse scan and zero when finish functionality +# receive values in units of T, rescale in kg to talk with the power supplyy. 1T = 10kG +def sweep_b_val(instr:pyvisa.resources.Resource, min_bval:float, max_bval:float, + res:float, Settings:str, base_file_name='', path_save="C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test", + singlepowersupply_bool=False, reversescan_bool=False, zerowhenfin_bool=False)->None: + """ this function performs a sweep of the B field of the chosen magnet coil. It creates a list o B values from the given min and max values, with the given resolution. For each value, a measurement of the spectrum + of the probe in the cryostat is made, using the LightField spectrometer. + + Args: + instr (pyvisa.resources.Resource): chosen power supply device to connect to + min_bval (float): min B value of the scan (please input in units of Tesla) + max_bval (float): max B value of the scan (please input in units of Tesla) + res (float): resolution of the list of B values (please input in units of Tesla) + Settings (str): experiment settings, included in file name. + base_file_name (str, optional): base file name. Defaults to ''. + path_save (str, optional): file path where the file will be saved. Defaults to "C:/Users/localadmin/Desktop/Users/Lukas/2024_02_08_Map_test". + singlepowersupply_bool (bool, optional): _description_. Defaults to False. + reversescan_bool (bool, optional): _description_. Defaults to False. + zerowhenfin_bool (bool, optional): _description_. Defaults to False. + + Raises: + ValueError: when By limit is exceeded. + ValueError: when Bz limit is exceeded. + ValueError: when Bx limit is exceeded. + """ '''''' + if base_file_name =='': + base_file_name = datetime.datetime.now().strftime('%Y_%m_%d_%H.%M') + + start_time = time.time() + instr_bsettings = list(sep_num_from_units(el) for el in query_no_echo(instr, 'UNITS?;LLIM?;ULIM?').split(';')) # deliver a 3 element tuple of tuples containing the set unit, llim and ulim + + if instr_bsettings[0][0] == 'T': + instr_bsettings[1][0] = instr_bsettings[1][0]*0.1 # rescale kG to T, device accepts values only in kG or A, eventho we set it to T + instr_bsettings[2][0] = instr_bsettings[2][0]*0.1 + + if singlepowersupply_bool: # checks limits of Bx or By + if (min_bval< -BY_MAX) or (max_bval > BY_MAX): + raise ValueError('Input limits exceed that of the magnet By! Please input smaller limits.') + elif '1' in query_no_echo(instr, 'CHAN?'): # check if its the coils for Bz + if (min_bval < -BZ_MAX) or (max_bval > BZ_MAX): + raise ValueError('Input limits exceed that of the magnet (Bz)! Please input smaller limits.') + else: # checks limits of Bx + if (min_bval< -BX_MAX) or (max_bval > BX_MAX): + raise ValueError('Input limits exceed that of the magnet Bx! Please input smaller limits.') + + write_no_echo(instr, f'LLIM {min_bval*10};ULIM {max_bval*10}') # sets the given limits, must convert to kG for the device to read + bval_lst = np.arange(min_bval, max_bval + res, res) # creates list of B values to measure at, with given resolution, in T + + init_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1 # queries the initial B value of the coil, rescale from kG to T + + init_lim, subsequent_lim = 'LLIM', 'ULIM' + init_sweep, subsequent_sweep = 'DOWN', 'UP' + + #################################################### + # TODO: decide whether to start at min b val or max b val, depending on which one is nearer, IMPLEMENT THIS LATER + # nearest_bval = (abs(init_bval - min_bval), abs(init_bval - max_bval)) + # if nearest_bval[0] <= nearest_bval[1]: + # reversescan_bool = True + #################################################### + + # if reverse scan, then flip the values in the b list, and swap the initial limit and sweep conditions + if reversescan_bool: + bval_lst = bval_lst[::-1] + init_lim, subsequent_lim = subsequent_lim, init_lim + init_sweep, subsequent_sweep = subsequent_sweep, init_sweep + + total_points = len(bval_lst) + intensity_data = [] # To store data from each scan + cwd = os.getcwd() # save original directory + + #This gives a directory, in which the script will save the spectrum of each spot as spe + #However, it will open the spectrum, convert it to txt, add it to the intensity_data and delete the spe file + #scanning loop + for i, bval in enumerate(bval_lst): + + # if init_bval == bval: + # # if initial bval is equal to the element of the given iteration from the bval_lst, then commence measuring the spectrum + # pass + # else: + + # TODO: improve the conditional block later on... try to shorten the number of conditionals needed/flatten the nested conditionals + # else, travel to the lower or higher limit, depending on how far the init val is to each bound, and commence the measurement from there on + # if not reversescan_bool: + if i == 0: # for first iteration, sweep to one of the limits + write_no_echo(instr, f'{init_lim} {bval*10}') # convert back to kG + write_no_echo(instr, f'SWEEP {init_sweep}') + else: + write_no_echo(instr, f'{subsequent_lim} {bval*10}') # convert back to kG + write_no_echo(instr, f'SWEEP {subsequent_sweep}') + + actual_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1 # convert kG to T + print(f'Actual magnet strength: {actual_bval} T,', f'Target magnet strength: {bval} T') + + while abs(actual_bval - bval) > 0.0001: + time.sleep(5) # little break + actual_bval = sep_num_from_units(query_no_echo(instr, 'IMAG?'))[0]*0.1 + # update the actual bval + print(f'Actual magnet strength: {actual_bval} T,', f'Target magnet strength: {bval} T') + + time.sleep(5) + # we acquire with the LF + acquire_name_spe = f'{base_file_name}_{bval}T' + AcquireAndLock(acquire_name_spe) #this creates a .spe file with the scan name. + + # read the .spe file and get the data as loaded_files + cwd = os.getcwd() # save original directory + os.chdir(path_save) #change directory + loaded_files = sl.load_from_files([acquire_name_spe + '.spe']) # get the .spe file as a variable + os.chdir(cwd) # go back to original directory + + # Delete the created .spe file from acquiring after getting necessary info + spe_file_path = os.path.join(path_save, acquire_name_spe + '.spe') + os.remove(spe_file_path) + + points_left = total_points - i - 1 # TODO: SEE IF THIS IS CORRECT + print('Points left in the scan: ', points_left) + + #append the intensity data as it is (so after every #of_wl_points, the spectrum of the next point begins) + intensity_data.append(loaded_files.data[0][0][0]) + + #prints total time the mapping lasted + end_time = time.time() + elapsed_time = (end_time - start_time) / 60 + print('Scan time: ', elapsed_time, 'minutes') + + if zerowhenfin_bool: + write_no_echo(instr, 'SWEEP ZERO') # if switched on, discharges the magnet after performing the measurement loop above + + #save intensity & WL data as .txt + os.chdir('C:/Users/localadmin/Desktop/Users/Lukas') + # creates new folder for MAP data + new_folder_name = "Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}" + os.mkdir(new_folder_name) + # Here the things will be saved in a new folder under user Lukas ! + # IMPORTANT last / has to be there, otherwise data cannot be saved and will be lost!!!!!!!!!!!!!!!! + os.chdir('C:/Users/localadmin/Desktop/Users/Lukas/'+ new_folder_name) + + intensity_data = np.array(intensity_data) + np.savetxt(Settings + str(min_bval) + 'T_to_' + str(max_bval) + 'T' + experiment_name +'.txt', intensity_data) + + wl = np.array(loaded_files.wavelength) + np.savetxt("Wavelength.txt", wl) + + + +################################################################# END OF FUNCTION DEFS ########################################################################################### + +# NOTE: RYAN INTRODUCED SOME FUNCTIONS HERE TO PERFORM THE SCAN + +# Initialise PYVISA ResourceManager +rm = pyvisa.ResourceManager() +# print(rm.list_resources()) # 'ASRL8::INSTR' for dual power supply, 'ASRL9::INSTR' for single power supply + +# Open the connection with the APS100 dual power supply +powerbox_dualsupply = rm.open_resource('ASRL8::INSTR', + baud_rate=9600, # Example baud rate, adjust as needed + data_bits=8, + parity= pyvisa.constants.Parity.none, + stop_bits= pyvisa.constants.StopBits.one, + timeout=5000)# 5000 ms timeout + +write_no_echo(powerbox_dualsupply, 'REMOTE') # turn on the remote mode + +# select axis for the dual supply, either z-axis(CHAN 1 ^= Supply A) or x-axis(CHAN 2 ^= Supply B) +write_no_echo(powerbox_dualsupply, 'CHAN 1') + +# Setup connection to AMC +amc = AMC.Device(IP) +amc.connect() + +# Internally, axes are numbered 0 to 2 +amc.control.setControlOutput(0, True) +amc.control.setControlOutput(1, True) + + +auto = Automation(True, List[String]()) +experiment = auto.LightFieldApplication.Experiment +acquireCompleted = AutoResetEvent(False) + +experiment.Load("Lukas_experiment_2024_02_06") +experiment.ExperimentCompleted += experiment_completed # we are hooking a listener. +# experiment.SetValue(SpectrometerSettings.GratingSelected, '[750nm,1200][0][0]') +# InitializerFilenameParams() + + +#set scan range and resolution in nanometers +range_x = 20000 +range_y = 20000 +resolution = 1000 +# set B-field scan range and resolution (all in T) +set_llim_bval = -0.01 +set_ulim_bval = 0.01 +set_res_bval = 0.01 + +#Here you can specify the filename of the map e.g. put experiment type, exposure time, used filters, etc.... +experiment_settings = 'PL_SP_700_LP_700_HeNe_52muW_exp_2s_Start_' +#The program adds the range of the scan as well as the resolution and the date and time of the measurement +experiment_name = f"{set_llim_bval}T_to_{set_ulim_bval}T_{set_res_bval}T_{datetime.datetime.now().strftime('%Y_%m_%d_%H%M')}" + +# # TODO: write the bval scan here +# for idx, bval in enumerate(bval_lst): +# write_no_echo(powerbox_dualsupply, '') + +# this moves the probe in xy-direction and measures spectrum there +# move_scan_xy(range_x, range_y, resolution, experiment_settings, experiment_name) + +# perform the B-field measurement for selected axis above +# sweep_b_val(powerbox_dualsupply, set_llim_bval, set_ulim_bval, set_res_bval, experiment_settings, experiment_name) +sweep_b_val(powerbox_dualsupply, set_llim_bval, set_ulim_bval, set_res_bval, + experiment_settings, experiment_name, singlepowersupply_bool=False, zerowhenfin_bool=True, reversescan_bool=False) + +# Internally, axes are numbered 0 to 2 + +write_no_echo(powerbox_dualsupply, 'LOCAL') # turn off the remote mode +# time.sleep(0.5) +powerbox_dualsupply.close() +