Files
nmreval/src/nmreval/io/hdfreader.py
Dominik Demuth 5975c08fb2
All checks were successful
Build AppImage / Explore-Gitea-Actions (push) Successful in 2m31s
changing default order of hdf works with node names and displays correct label (#310)
Fix problem with incorrect selection of label and group of HDF files when using not default order

Co-authored-by: Dominik Demuth <dominik.demuth@physik.tu-darmstadt.de>
Reviewed-on: #310
2025-06-03 17:53:48 +00:00

414 lines
13 KiB
Python

from __future__ import annotations
import re
from functools import reduce
import h5py
import numpy as np
from collections import OrderedDict
from ..data.points import Points
from ..data.nmr import FID, Spectrum
__all__ = ['HdfReader']
def unicode_(text):
return str(text, encoding='utf-8')
KEY_VAL_RE = re.compile(r'(?P<key>[\w_]+)\s*=\s*(?P<val>[-+]?[0-9]*\.?[0-9]+(?:[eE][-+]?[0-9]+)?)')
class HdfNode:
__slots__ = ['name', 'reference', 'type', 'parent', 'children',
'num_signals', 'num_pts', 'num_grp',
'title_parameter', 'parameter']
def __init__(self, name: str, ref, parent: HdfNode | None):
self.name = name
self.type = 'group'
self.reference = ref
self.parent = parent
self.children = None
self.num_signals = 0
self.num_grp = 0
self.num_pts = 0
self.parameter = {}
self.title_parameter = [(), {}]
def __repr__(self):
return f'{self.name} ({self.type})'
def __getitem__(self, item):
return self.children[item]
def __setitem__(self, key, value):
try:
self.children[key] = value
except TypeError:
self.children = OrderedDict()
self.children[key] = value
def __contains__(self, key):
if self.children is None:
return False
return key in self.children
def clear(self):
self.name = ''
self.type = 'group'
self.parent = None
self.children = None
self.num_signals = 0
self.num_grp = 0
self.num_pts = 0
self.parameter = {}
self.title_parameter = [(), {}]
def __iter__(self):
yield self
if self.children is not None:
for val in self.children.values():
yield from val
def data(self, dtype: str = None):
if dtype is None:
_dtype = ['signal', 'points']
else:
_dtype = [dtype]
if self.type in _dtype:
yield self
if self.children is not None:
for val in self.children.values():
yield from val.data(dtype=dtype)
def keys(self, prefix: str = '', dtype: str = None):
if dtype is None:
_dtype = ['signal', 'points']
else:
_dtype = [dtype]
new_prefix = f'{prefix}/{self.name}' if self.name else self.name
if self.type in _dtype:
yield new_prefix
if self.children is not None:
for val in self.children.values():
yield from val.keys(prefix=new_prefix, dtype=dtype)
def parameters(self, key: str):
node = self.get(key)
return node.parameter
@property
def path(self):
if self.parent is None:
return self.name
else:
return self.parent.path + '/' + self.name
def get(self, key: str):
split_keys = key.split('/')
if split_keys[0] == '':
split_keys = split_keys[1:]
return reduce(lambda d, k: d[k], split_keys, self)
def set_parameter(self, key, value, keep=False):
if keep and key in self.parameter:
prev_val = self.parameter[key]
if isinstance(prev_val, list):
if value not in prev_val:
prev_val.append(value)
else:
if value != prev_val:
self.parameter[key] = [prev_val, value]
else:
self.parameter[key] = value
if self.parent is not None:
self.parent.set_parameter(key, value, keep=True)
def set_title_parameter(self, child_node, params):
if params:
self.title_parameter[0] = params[-1]
else:
self.title_parameter[0] = ('', None)
if child_node is not None:
child_parameter = self.title_parameter[1]
key, value = child_node.title_parameter[0]
if key in child_parameter:
prev_val = child_parameter[key]
if isinstance(prev_val, list):
if value not in prev_val:
prev_val.append(value)
else:
if value != prev_val:
child_parameter[key] = [prev_val, value]
else:
child_parameter[key] = value
if (self.parent is not None) and params:
self.parent.set_title_parameter(self, params[:-1])
class HdfReader(HdfNode):
def __init__(self, filename=None, base='data_pool'):
super().__init__('', None, None)
self.filename = filename
self.file = None
self.base = base
if self.filename is not None:
try:
self.file = h5py.File(filename, 'r')
if base in self.file.keys():
self.create_node(self.file[base], parent=self)
except OSError:
self.file = None
raise IOError(f'Invalid file {filename}')
def __call__(self, filename, base='data_pool'):
super().clear()
self.filename = filename
self.base = base
try:
self.file = h5py.File(filename, 'r')
if base in self.file.keys():
self.create_node(self.file[base], parent=self)
except OSError:
self.file = None
raise IOError('Invalid file ' + filename)
return self
def __del__(self):
try:
if self.file is not None:
self.file.close()
except ImportError:
pass
def create_node(self, node, parent=None):
for k, v in node.items():
attr = v.attrs
if 'TITLE' in attr:
location = unicode_(attr['TITLE'])
else:
location = v.name
data = HdfNode(location.split('/')[-1], v.ref, parent)
if isinstance(v, h5py.Group):
if 'damaris_type' in attr:
# Group is DAMARIS data (ADC result, Accumulation)
name = location.split('/')[-1]
data.type = 'signal'
parent[name] = data
parent.num_signals += 1
# searching for attributes is performance bottleneck but necessary
for desc in attr.keys():
# looking for description_KEY in attributes
m = re.search(r'description_(?P<var>\S+)', desc)
if m is not None:
var_name = m['var'].lower()
try:
var_value = float(attr[desc])
except ValueError:
var_value = unicode_(attr[desc])
data.set_parameter(var_name, var_value)
title_params = []
for lvl in location.split('/'):
m = KEY_VAL_RE.search(lvl)
if m is not None:
title_params.append(m.groups())
else:
title_params.append((lvl, None))
data.set_title_parameter(None, title_params)
else:
# Group is a real group
parent[location] = data
parent.num_grp += 1
self.create_node(v, parent=data)
else:
# dataset is MeasurementResult
name = location.split('/')[-1]
data.type = 'points'
parent[name] = data
parent.num_pts += 1
m = KEY_VAL_RE.search(unicode_(attr['quantity_name']))
if m:
data.parameter[m['key']] = float(m['val'])
m = KEY_VAL_RE.search(name)
if m:
data.title_parameter[0] = (m['key'], float(m['val']))
else:
data.title_parameter[0] = (None, None)
def get_points(self):
return self.get_selected('', dtype='points')
def get_signals(self):
return self.get_selected('', dtype='signal')
def get_selected(self, key: str, dtype: str = None, value: str = None,
group: str = None, flag: str = 'fid') -> list:
key_list = []
if '*' in key:
# wildcards: find all matching entries
for k in self.keys():
m = re.match(key.replace('*', '.*'), k)
if m:
key_list.append(k)
else:
key_list.append(key)
ret_val = []
for k in key_list:
val = self.get(k)
for child in val.data(dtype=dtype):
try:
if child.type == 'points':
ret_val.append(self.make_point(child))
elif child.type == 'signal':
ret_val.append(self.make_signal(child, flag=flag, value=value, group=group))
except IOError:
print('something went wrong for ' + child.name)
continue
return ret_val
def make_point(self, node):
data = self.file[node.reference]
val = None
if node.parameter:
for v in node.parameter.values():
val = v
return Points(x=data['x'], y=data['y'], yerr=data['y_err'], name=node.name, value=val)
def make_signal(self, node, flag: str = 'fid', value: str = None, group: str = None):
if value is None:
data_name = node.name
value = self._get_parameter_values(node, node.parameter)
else:
try:
data_name = f"{value}={node.parameter[value]}"
value = node.parameter[value]
except KeyError:
print(node.title_parameter)
try:
temp = node
while value != temp.title_parameter[0][0]:
if temp.parent is None:
break
temp = temp.parent
value = temp.title_parameter[0][1]
data_name = temp.name
except KeyError:
print(f'{value} is not a valid key for {node.name}')
data_name = node.name
value = None
if group is None:
if value is not None and node.parent is not None:
group = self._get_parameter_values(node.parent, node.parameter)
else:
try:
group = node.parameter[group]
except KeyError:
temp = node
while group != temp.title_parameter[0][0]:
if temp.parent is None:
break
temp = temp.parent
group = temp.title_parameter[0][1]
data = self.file[node.reference]
try:
y = data['accu_data']
except KeyError:
y = data['adc_data']
if y.shape[1] == 4:
y = y[:, 0] + 1j*y[:, 2]
else:
y = y[:, 0] + 1j*y[:, 1]
index = data['indices']
dw = float(index['dwelltime'])
if flag == 'fid':
x = np.arange(len(y)) * dw
ret = FID(x, y, name=data_name, value=value, group=group, filename=self.file.filename)
elif flag == 'spectrum':
x = np.linspace(-1/dw, 1/dw, num=len(y))
ret = Spectrum(x, y, name=data_name, value=value, group=group, filename=self.file.filename)
else:
raise ValueError(f'{flag} unknown, use `fid` or `spectrum`.')
return ret
@staticmethod
def _get_parameter_values(node: HdfNode, param_dic: dict) -> float:
(var_key, node_param_value), _ = node.title_parameter
if var_key.startswith('Accumulation_'):
var_key = var_key[13:]
value = None
if node_param_value:
# there is hope that there is a numeric value
value = float(node_param_value)
if var_key.lower() not in node.parameter:
# we cannot find a key that fits to the one in the title, e.g. renamed in title,
# so we look in the parent node what was varied and search for a key
parent = node.parent
_, parent_child_param = parent.title_parameter
parameter_len = len(parent_child_param[var_key])
var_key = None
multi = False
for k, v in parent.parameter.items():
try:
if len(v) == parameter_len:
if var_key is None:
var_key = k
else:
# multiple values are not useful
multi = True
break
except TypeError:
continue
if multi:
var_key = None
if var_key is not None:
try:
value = param_dic[var_key.lower()]
except KeyError:
pass
return value
def get_scripts(self) -> tuple[str, str]:
return unicode_(self.file['scripts/experiment_script'][()]), \
unicode_(self.file['scripts/result_script'][()])