Save-Path-File-Changes #1

Merged
rtan merged 41 commits from Save-Path-File-Changes into main 2025-04-23 13:56:19 +00:00
4 changed files with 126 additions and 22 deletions
Showing only changes of commit 8a76fac788 - Show all commits

View File

@ -406,8 +406,39 @@ def ramp_b_val(instr:pyvisa.resources.Resource, bval:float, magnet_coil:str)->No
print("Ramping Done!")
helper_scan_func(bval)
# TODO: input logging functions here for the power supply.
def append_measurement(target_b_abs, target_angle, b_x, b_y, measurement_data):
"""Append a single measurement to the global list."""
measurement = {
"Target B_abs (T)": target_b_abs,
"Target Angle (deg)": target_angle, # insert target angle here
"Datetime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"B_x (T)": b_x,
"B_y (T)": b_y,
"Actual B_abs (T)": (b_x**2 + b_y**2)**0.5,
"Actual Angle (deg)": np.degrees(np.arctan2(b_y, b_x)) % 360,
}
measurement_data.append(measurement)
def save_measurements_to_file(relative_directory, measurement_data, make_dir=False):
"""Save accumulated measurements to a file in the specified directory."""
script_dir = os.path.dirname(os.path.abspath(__file__))
directory = os.path.join(script_dir, relative_directory)
if make_dir:
os.makedirs(directory, exist_ok=True)
filename = "scanlog_" + datetime.datetime.now().strftime("%Y-%m-%d_%H-%M") + ".txt"
file_path = os.path.join(directory, filename)
# Write header and data
with open(file_path, 'w') as f:
f.write("Target B_abs (T);Target Angle (deg);Datetime; B_x (T);B_y (T);Actual B_abs (T);Actual Angle (deg)\n")
for entry in measurement_data:
line = f"{entry['Target B_abs (T)']};{entry['Target Angle (deg)']};{entry['Datetime']};{entry['B_x (T)']};{entry['B_y (T)']};{entry['Actual B_abs (T)']};{entry['Actual Angle (deg)']}\n"
f.write(line)
# receive values in units of T, rescale in kg to talk with the power supplyy. 1T = 10kG
# NOTE: removed singlepowersupply_bool, reading serial-nr. of the device instead.
@ -1086,7 +1117,7 @@ def sweep_b_angle(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Reso
# NOTE: implement PID control, possibly best option to manage the b field DO THIS LATER ON, WE DO DISCRETE B VALUES RN
# Helper function that listens to a device
def listen_to_device(device_id, target_value, shared_values, lock, all_targets_met_event):
def listen_to_device(device_id, target_value, shared_values, lock, all_targets_met_event, measurement_data):
while not all_targets_met_event.is_set(): # Loop until the event is set
# value = 0 # Simulate receiving a float from the device INSERT QUERY NO ECHO HERE TO ASK FOR DEVICE IMAG
if '2301034' in device_id:
@ -1116,6 +1147,8 @@ def sweep_b_angle(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Reso
with lock:
shared_values[device_id] = value
append_measurement((target_value['2301034']**2 + target_value['2101014']**2)**0.5, angle, shared_values['2301034'], shared_values['2101014'], measurement_data) # append the bval to the measurement data
# Check if both devices have met their targets
if all(shared_values.get(device) is not None and abs(value - target_value[device]) <= 0.0001
for device,value in shared_values.items()):
@ -1125,7 +1158,11 @@ def sweep_b_angle(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Reso
# time.sleep(1) # Simulate periodic data checking
# Main function to manage threads and iterate over target values
def monitor_devices(device_target_values, angle, intensity_data=intensity_data):
def monitor_devices(device_target_values, angle, intensity_data=intensity_data):
# initilise measurement list for b-val tracking
measurement_data = []
for iteration, target in enumerate(device_target_values):
print(f"\nStarting iteration {iteration+1} for target values: {target}")
# Shared dictionary to store values from devices
@ -1139,7 +1176,7 @@ def sweep_b_angle(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Reso
# Create and start threads for each device
threads = []
for device_id in target.keys():
thread = threading.Thread(target=listen_to_device, args=(device_id, target, shared_values, lock, all_targets_met_event))
thread = threading.Thread(target=listen_to_device, args=(device_id, target, shared_values, lock, all_targets_met_event, measurement_data))
threads.append(thread)
thread.start()
@ -1199,6 +1236,10 @@ def sweep_b_angle(instr1:pyvisa.resources.Resource, instr2:pyvisa.resources.Reso
# creates new folder for MAP data
new_folder_name = "Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}"
os.mkdir(new_folder_name)
# NOTE: added log file to folder
save_measurements_to_file(new_folder_name, measurement_data, make_dir=False)
# Here the things will be saved in a new folder under user Lukas !
# IMPORTANT last / has to be there, otherwise data cannot be saved and will be lost!!!!!!!!!!!!!!!!
os.chdir('C:/Users/localadmin/Desktop/Users/Lukas/'+ new_folder_name)

View File

@ -1,39 +1,46 @@
import os
import time
import datetime
import numpy as np
# List to accumulate measurement data
measurement_data = []
def append_measurement(target_b_abs, b_x, b_y, measurement_data=measurement_data):
def append_measurement(target_b_abs, b_x, b_y, measurement_data):
"""Append a single measurement to the global list."""
measurement = {
"Target B_abs": target_b_abs,
"Target B_abs (T)": target_b_abs,
"Target Angle (deg)": 90, # insert target angle here
"Datetime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"B_x": b_x,
"B_y": b_y
"B_x (T)": b_x,
"B_y (T)": b_y,
"Actual B_abs (T)": (b_x**2 + b_y**2)**0.5,
"Actual Angle (deg)": np.degrees(np.arctan2(b_y, b_x)) % 360,
}
measurement_data.append(measurement)
def save_measurements_to_file(relative_directory, measurement_data=measurement_data):
def save_measurements_to_file(relative_directory, measurement_data, make_dir=False):
"""Save accumulated measurements to a file in the specified directory."""
script_dir = os.path.dirname(os.path.abspath(__file__))
directory = os.path.join(script_dir, relative_directory)
os.makedirs(directory, exist_ok=True)
if make_dir:
os.makedirs(directory, exist_ok=True)
filename = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M") + ".txt"
file_path = os.path.join(directory, filename)
# Write header and data
with open(file_path, 'w') as f:
f.write("Target B_abs, Datetime, B_x, B_y\n")
f.write("Target B_abs (T);Target Angle (deg);Datetime; B_x (T);B_y (T);Actual B_abs (T);Actual Angle (deg)\n")
for entry in measurement_data:
line = f"{entry['Target B_abs']}, {entry['Datetime']}, {entry['B_x']}, {entry['B_y']}\n"
line = f"{entry['Target B_abs (T)']};{entry['Target Angle (deg)']};{entry['Datetime']};{entry['B_x (T)']};{entry['B_y (T)']};{entry['Actual B_abs (T)']};{entry['Actual Angle (deg)']}\n"
f.write(line)
# Example usage
for i in range(5):
append_measurement(target_b_abs=0.5 + i, b_x=1.0 * i, b_y=2.0 * i)
save_measurements_to_file("Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}")
append_measurement(target_b_abs=0.5 + i, b_x=1.0 * i, b_y=2.0 * i, measurement_data=measurement_data)
time.sleep(1) # Simulate time delay between measurements
save_measurements_to_file("Test_Map_" + f"{datetime.datetime.now().strftime('%Y_%m_%d_%H.%M')}", measurement_data, make_dir=False)
# print(9**0.5)
# print(datetime.datetime.now())

62
Test3.py Normal file
View File

@ -0,0 +1,62 @@
import os
import threading
from datetime import datetime
import time
import random
# Shared list and lock
measurement_data = []
data_lock = threading.Lock()
def append_measurement(target_b_abs, b_x, b_y):
measurement = {
"Target B_abs": target_b_abs,
"Datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"B_x": b_x,
"B_y": b_y
}
with data_lock:
measurement_data.append(measurement)
def save_measurements_to_file(relative_directory):
script_dir = os.path.dirname(os.path.abspath(__file__))
directory = os.path.join(script_dir, relative_directory)
os.makedirs(directory, exist_ok=True)
filename = datetime.now().strftime("%Y-%m-%d_%H-%M") + ".txt"
file_path = os.path.join(directory, filename)
header_keys = ["Target B_abs", "Datetime", "B_x", "B_y"]
with data_lock:
with open(file_path, 'w') as f:
f.write(", ".join(header_keys) + "\n")
for entry in measurement_data:
line = ", ".join(str(entry[key]) for key in header_keys) + "\n"
f.write(line)
# Thread function
def simulate_sensor_readings(sensor_id):
for i in range(3):
# Simulate some "sensor" data
target_b_abs = round(random.uniform(0.1, 1.0), 3)
b_x = round(random.uniform(-1.0, 1.0), 3)
b_y = round(random.uniform(-1.0, 1.0), 3)
print(f"Sensor {sensor_id} appending: {target_b_abs}, {b_x}, {b_y}")
append_measurement(target_b_abs, b_x, b_y)
time.sleep(random.uniform(1,2)) # Simulate delay
# Launch threads
thread1 = threading.Thread(target=simulate_sensor_readings, args=(1,))
thread2 = threading.Thread(target=simulate_sensor_readings, args=(2,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
# Save all data at the end
save_measurements_to_file("TestDirectory")
print

View File

@ -1,6 +0,0 @@
Target B_abs, Datetime, B_x, B_y
0.5, 2025-04-23 14:03:43, 0.0, 0.0
1.5, 2025-04-23 14:03:43, 1.0, 2.0
2.5, 2025-04-23 14:03:43, 2.0, 4.0
3.5, 2025-04-23 14:03:43, 3.0, 6.0
4.5, 2025-04-23 14:03:43, 4.0, 8.0