62 lines
1.8 KiB
Python
62 lines
1.8 KiB
Python
import os
|
|
import threading
|
|
from datetime import datetime
|
|
import time
|
|
import random
|
|
|
|
# Shared list and lock
|
|
measurement_data = []
|
|
data_lock = threading.Lock()
|
|
|
|
def append_measurement(target_b_abs, b_x, b_y):
|
|
measurement = {
|
|
"Target B_abs": target_b_abs,
|
|
"Datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"B_x": b_x,
|
|
"B_y": b_y
|
|
}
|
|
with data_lock:
|
|
measurement_data.append(measurement)
|
|
|
|
def save_measurements_to_file(relative_directory):
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
directory = os.path.join(script_dir, relative_directory)
|
|
os.makedirs(directory, exist_ok=True)
|
|
|
|
filename = datetime.now().strftime("%Y-%m-%d_%H-%M") + ".txt"
|
|
file_path = os.path.join(directory, filename)
|
|
|
|
header_keys = ["Target B_abs", "Datetime", "B_x", "B_y"]
|
|
|
|
with data_lock:
|
|
with open(file_path, 'w') as f:
|
|
f.write(", ".join(header_keys) + "\n")
|
|
for entry in measurement_data:
|
|
line = ", ".join(str(entry[key]) for key in header_keys) + "\n"
|
|
f.write(line)
|
|
|
|
# Thread function
|
|
def simulate_sensor_readings(sensor_id):
|
|
for i in range(3):
|
|
# Simulate some "sensor" data
|
|
target_b_abs = round(random.uniform(0.1, 1.0), 3)
|
|
b_x = round(random.uniform(-1.0, 1.0), 3)
|
|
b_y = round(random.uniform(-1.0, 1.0), 3)
|
|
|
|
print(f"Sensor {sensor_id} appending: {target_b_abs}, {b_x}, {b_y}")
|
|
append_measurement(target_b_abs, b_x, b_y)
|
|
time.sleep(random.uniform(1,2)) # Simulate delay
|
|
|
|
# Launch threads
|
|
thread1 = threading.Thread(target=simulate_sensor_readings, args=(1,))
|
|
thread2 = threading.Thread(target=simulate_sensor_readings, args=(2,))
|
|
|
|
thread1.start()
|
|
thread2.start()
|
|
|
|
thread1.join()
|
|
thread2.join()
|
|
|
|
# Save all data at the end
|
|
save_measurements_to_file("TestDirectory")
|
|
print |