qdsfit/qds

613 lines
24 KiB
Python
Executable File

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from gui import ExtraDifferentialWidget
_author_ = "Markus Rosenstihl"
import hashlib, uuid
import time
import os, sys, re, signal
import matplotlib
matplotlib.use('agg')
from matplotlib import pyplot
from matplotlib.colors import hex2color
# matplotlib.rc_file("default.mplrc")
from PyQt4.QtCore import *
from PyQt4.QtGui import *
import numpy as np
import pyqtgraph as pg
from data.container import Conductivity, PowerComplex, Static, Peak, YAFF
from gui.ContainerWidgets import ParameterWidget
from ui import QDSMain
from libmath.BDSlib import FunctionRegister, FitRoutine
from data.data import Data
from bds_io import bds_file_reader
class AppWindow(QMainWindow):
def __init__( self, files=[], parent=None ):
super(AppWindow, self).__init__(parent)
self.ui = QDSMain.Ui_MainWindow()
self.ui.setupUi(self)
self._file_paths = self._sortInputFiles(files)
self._last_written_header = None
actions = {
self.ui.actionAdd_Peak: self.addPeak,
self.ui.actionAdd_Cond: self.addCond,
self.ui.actionAdd_PowerLaw: self.addPowerComplex,
self.ui.actionAdd_Eps_Infty: self.addEpsInfty,
self.ui.actionYAFF: self.addYaff,
}
self.myActionGroup = QActionGroup(self)
for a in actions.keys(): self.myActionGroup.addAction(a)
self._init_menu()
self.function_registry = FunctionRegister()
self.session_id = uuid.uuid4()
self.peakId = 0
self.parameterWidget = ParameterWidget()
self.ui.dockWidget_3.setWidget(self.parameterWidget)
self.data = Data()
self.fit_boundary_imag = pg.LinearRegionItem(brush=QColor(0, 127, 254, 15))
self.fit_boundary_real = pg.LinearRegionItem(brush=QColor(0, 127, 254, 15))
self.ui.pgPlotWidget_imag.addItem(self.data.data_curve_imag)
self.ui.pgPlotWidget_real.addItem(self.data.data_curve_real)
self.ui.pgPlotWidget_imag.addItem(self.data.fitted_curve_imag)
self.ui.pgPlotWidget_real.addItem(self.data.fitted_curve_real)
self.ui.pgPlotWidget_imag.addItem(self.fit_boundary_imag)
self.ui.pgPlotWidget_real.addItem(self.fit_boundary_real)
# fit boundary signals
self.fit_boundary_imag.sigRegionChanged.connect(self._update_fit_boundary_imag)
self.fit_boundary_imag.sigRegionChangeFinished.connect(self.updatePlot)
self.fit_boundary_real.sigRegionChanged.connect(self._update_fit_boundary_real)
self.fit_boundary_real.sigRegionChangeFinished.connect(self.updatePlot)
for pltwidgt in (self.ui.pgPlotWidget_real, self.ui.pgPlotWidget_imag):
pltwidgt.setLogMode(x=True, y=True)
pltwidgt.showGrid(x=True, y=True)
pltwidgt.disableAutoRange()
pltwidgt.setLabel("bottom", "Frequency", units="Hz")
self.ui.pgPlotWidget_imag.setLabel("left", u'Dielectric loss ε"', units="Debye")
self.ui.pgPlotWidget_real.setLabel("left", u"Dielectric loss ε' ", units="Debye")
sc_real = self.ui.pgPlotWidget_real.scene()
sc_real.sigMouseClicked.connect(self.mousePress)
sc_real.sigMouseMoved.connect(self.updateCrosshair)
sc_imag = self.ui.pgPlotWidget_imag.scene()
sc_imag.sigMouseClicked.connect(self.mousePress)
sc_imag.sigMouseMoved.connect(self.updateCrosshair)
self._fit_thread = QThread()
self._fit_method = FitRoutine()
self._fit_method.moveToThread(self._fit_thread)
self._fit_method.finished_fit.connect(self.fitData_update)
self._fit_method.data_ready.connect(self.updateIntermediatePlot)
self._fit_thread.started.connect(self._fit_method.fit)
# finally process cmd line args
if files != []:
self.openFile(unicode(self._file_paths[0]))
self._current_file_index = 0
def _init_menu( self ):
fileMenu = self.menuBar().addMenu("File")
openFile = QAction("&Open", self)
openFile.setShortcut(QKeySequence.Open)
openFile.triggered.connect(self.getFileNames)
fileMenu.addAction(openFile)
nextFile = QAction("Next", self)
nextFile.setShortcut(QKeySequence("Ctrl+k"))
nextFile.triggered.connect(self.nextFile)
fileMenu.addAction(nextFile)
previousFile = QAction("Previous", self)
previousFile.setShortcut(QKeySequence("Ctrl+j"))
previousFile.triggered.connect(self.previousFile)
fileMenu.addAction(previousFile)
saveFile = QAction("&Save Fit Result", self)
saveFile.setShortcut(QKeySequence.Save)
saveFile.triggered.connect(self.saveFitResult)
fileMenu.addAction(saveFile)
# fitting methods
fitMenu = self.menuBar().addMenu("Standard Fits")
# lm
fit_lmAction = QAction("Complex NLS", self)
fit_lmAction.setShortcut(QKeySequence("Ctrl+F"))
fitMenu.addAction(fit_lmAction)
# lbfgsb
fit_lbfgsbAction = QAction("NLS (Imag.)", self)
fitMenu.addAction(fit_lbfgsbAction)
# Simulated Annealing
fit_annealAction = QAction("&Simulated Annealing", self)
fitMenu.addAction(fit_annealAction)
self.ui.actionActionAbortFit.triggered.connect(self.abortFit)
self.signalMapper = QSignalMapper(self)
for i, fit_action in enumerate([fit_lmAction, fit_lbfgsbAction, fit_annealAction
]):
self.signalMapper.setMapping(fit_action, i)
fit_action.triggered.connect(self.signalMapper.map)
self.signalMapper.mapped.connect(self.fitData_start)
self.ui.actionShow_Derivative.triggered.connect(self.show_derivative)
def show_derivative( self ):
self.xtra_wdgt = ExtraDifferentialWidget.DifferentialWidget()
#self.xtra_wdgt.set
deriv_r = np.diff(np.log10(self.data.epsilon.real))
deriv_i = np.diff(np.log10(self.data.epsilon.imag))*0
deriv_i = -np.pi/2*np.diff(np.log10(self.data.epsilon.real))/np.diff(np.log10(self.data.frequency))
self.xtra_wdgt.plot(self.data.frequency[:-1], deriv_r, deriv_i)
# self.xtra_wdgt.plot([0,1], [0,1], [0,1])
self.xtra_wdgt.setGeometry(self.ui.pgPlotWidget_real.geometry())
self.xtra_wdgt.show()
#self.xtra_wdgt.showCenterd()
self.xtra_wdgt.raise_()
def updateCrosshair( self, evt ):
vb_real = self.ui.pgPlotWidget_real.getPlotItem().vb
vb_imag = self.ui.pgPlotWidget_imag.getPlotItem().vb
if self.ui.pgPlotWidget_imag.underMouse():
pos = vb_imag.mapSceneToView(evt)
elif self.ui.pgPlotWidget_real.underMouse():
pos = vb_real.mapSceneToView(evt)
else:
pos = QPointF(0.0, 0.0)
self.last_pos = pos
def mousePress( self, evt ):
data_pos = self.last_pos
mouse_in_imag = self.ui.pgPlotWidget_imag.underMouse()
mouse_in_real = self.ui.pgPlotWidget_real.underMouse()
msgBox = QMessageBox()
if self.ui.actionAdd_Peak.isChecked():
if mouse_in_imag:
self.addPeak(data_pos)
self.ui.actionAdd_Peak.setChecked(False)
else:
msgBox.setText("Click in imaginary part")
msgBox.exec_()
if self.ui.actionAdd_Cond.isChecked():
if mouse_in_imag:
self.addCond(data_pos)
self.ui.actionAdd_Cond.setChecked(False)
else:
msgBox.setText("Click in imaginary part")
msgBox.exec_()
if self.ui.actionYAFF.isChecked():
if mouse_in_imag:
self.addYaff(data_pos)
self.ui.actionYAFF.setChecked(False)
else:
msgBox.setText("Click in imaginary part")
msgBox.exec_()
if self.ui.actionAdd_PowerLaw.isChecked():
if mouse_in_imag:
self.addPowerComplex(data_pos)
self.ui.actionAdd_PowerLaw.setChecked(False)
else:
msgBox.setText("Click in imaginary part")
msgBox.exec_()
if self.ui.actionAdd_Eps_Infty.isChecked():
if mouse_in_real:
self.addEpsInfty(data_pos)
self.ui.actionAdd_Eps_Infty.setChecked(False)
else:
msgBox.setText("Click in real part")
msgBox.exec_()
def abortFit( self ):
for container in self.function_registry.get_registered_functions():
container.abort(True)
self._fit_thread.terminate()
def saveFitResult( self ):
"""
Saving fit parameters to fitresults.log
including temperature
"""
self._saveFitFigure()
if not os.path.exists("fitresults.log"):
f = open("fitresults.log", "w")
else:
f = open("fitresults.log", "a")
# prepare header
file_id = hashlib.md5(open(self._file_paths[self._current_file_index]).read()).hexdigest()
pre_header = "# Date: {date}\n".format(date=time.strftime("%Y-%m-%d"))
pre_header += "# Time: {time}\n# SessionID={id}\n".format(time=time.strftime("%H:%M:%S"), id=self.session_id)
pars = []
base_filename, file_ext = os.path.splitext(self.filepath)
header = "{n1:13}{n2:13}".format(n1="# 0:T", n2="1:invT")
varnum = 2 # T, invT are the first two columns
for i_fcn, fcn in enumerate(self.function_registry.get_registered_functions()):
fit_function_name = fcn.id_string
for i, name in enumerate(fcn.widget.names): # get variable names
header += "{n:13}{n_sd:13}".format(n="%i:%s"%(varnum, name), n_sd="%i:%s_sd"%(varnum+1, name))
varnum += 2
pre_header += "# %s\n"%fit_function_name
# write for each function an extra file
fit_filename = "%s_%i.fit"%(base_filename, i_fcn)
f_fcn = open(fit_filename, 'w')
# retrieve correct function type peak
#if fit_function_name == "hn":
f_fcn.write("# type=%s\n"%fcn.widget.func_type)
f_fcn.write("# SourceID=%s\n"%file_id)
#else:
# f_fcn.write("# type=%s\n"%fit_function_name)
for i, par in enumerate(fcn._beta): # params # TODO: ughh
if fcn._selector_mask is not None:
if fcn._selector_mask[i]:
pars.extend([par])
pars.extend([fcn._sd_beta[i]])
f_fcn.write('# param=%s %e %e\n'%(fcn.widget.names[i], par, fcn._sd_beta[i]))
else:
pars.extend([par])
pars.extend([fcn._sd_beta[i]])
f_fcn.write('# param=%s %e %e\n'%(fcn.widget.names[i], par, fcn._sd_beta[i]))
# finish writing fit function file
f_fcn.flush()
np.savetxt(f_fcn, fcn.resampleData(self.data.frequency))
f_fcn.close()
# append fit limits header
header += "%-13s%-13s\n"%("fit_xlow", "fit_xhigh")
# write new header if fit model changed TODO: more robust detection
if self._last_written_header != header:
f.write(pre_header)
f.write(header)
f.flush()
self._last_written_header = header
else:
pass
pars.insert(0, self.data.meta["T"])
pars.insert(1, 1e3/self.data.meta["T"])
pars.append(self.data.fit_limits[0])
pars.append(self.data.fit_limits[1])
pars = np.array([pars])
np.savetxt(f, pars, fmt='%-12.3e', delimiter=" ")
f.close()
def _saveFitFigure( self ):
fig = pyplot.figure(figsize=(3.54*1.4, 2.75*1.4))
font = { 'family': 'sans serif',
'weight': 'normal',
'size': 9 }
matplotlib.rc('font', **font)
pyplot.grid(linestyle="solid", alpha=0.3, color="0.5")
pyplot.loglog(self.data.frequency, self.data.epsilon.imag, 'bo', markersize=4, label="Data")
pyplot.loglog(self.data.frequency_fit, self.data.epsilon_fit.imag, 'r-', lw=1.2, label="Fit")
for fcn in self.function_registry.get_registered_functions():
f, eps = fcn.get_data()
label = fcn.id_label
color = hex2color(str(fcn.color.name()))
pyplot.loglog(f, eps[1], ls=":", color=color, lw=1, dashes=(1, 1), label=label)
for i in (0, 1): pyplot.axvline(x=self.data.fit_limits[i], color='b', ls="-", lw=0.5)
pyplot.legend(title="T=%.1f K"%(self.data.meta["T"]))
pyplot.xlabel('f/Hz')
pyplot.ylabel(u'ε"')
pyplot.ylim(self.data.epsilon.imag.min(), self.data.epsilon.imag.max())
#pyplot.savefig(os.path.splitext(self.filepath)[0]+".png")
pyplot.savefig(os.path.splitext(self.filepath)[0]+".pdf")
fig.clear()
del (fig)
def _saveFitFigureGrace( self ):
#agrtemplate = open('template.agr').read()
agrtemplate = """
"""
#TODO: need interface/method for adding function blocks, this is too repetitive
def addYaff( self, pos ):
_yaff = YAFF(plt_real=self.ui.pgPlotWidget_real,
plt_imag=self.ui.pgPlotWidget_imag,
limits=self.data.fit_limits)
_yaff.blockSignals(True)
_yaff.changedData.connect(self.updatePlot)
_yaff.removeObj.connect(self.delParamterObject)
gg_y = 10**pos.y()*2
gg_x = 1/(10**pos.x()*2*np.pi)
yaff_par = [gg_y, gg_x, 20.0, 1.0, 0.5, gg_x/100, 1.0, 1.0]
_yaff.setParameter(beta=yaff_par)
self.parameterWidget.add(_yaff.widget)
self.function_registry.register_function(_yaff)
self.updatePlot()
_yaff.blockSignals(False)
def addCond( self, pos ):
_conductivity = Conductivity(plt_real=self.ui.pgPlotWidget_real,
plt_imag=self.ui.pgPlotWidget_imag,
limits=self.data.fit_limits)
_conductivity.blockSignals(True)
_conductivity.changedData.connect(self.updatePlot)
_conductivity.removeObj.connect(self.delParamterObject)
cond_par = [0.0, 10**(pos.y()+pos.x())*2*np.pi, 1.0]
_conductivity.setParameter(beta=cond_par)
self.parameterWidget.add(_conductivity.widget)
self.function_registry.register_function(_conductivity)
self.updatePlot()
_conductivity.blockSignals(False)
def addPowerComplex( self, pos ):
_power_complex = PowerComplex(plt_imag=self.ui.pgPlotWidget_imag,
plt_real=self.ui.pgPlotWidget_real,
limits=self.data.fit_limits)
_power_complex.changedData.connect(self.updatePlot)
_power_complex.removeObj.connect(self.delParamterObject)
cond_par = [10**(pos.y()+pos.x())*2*np.pi, 1.0]
_power_complex.setParameter(beta=cond_par)
self.parameterWidget.add(_power_complex.widget)
self.function_registry.register_function(_power_complex)
self.updatePlot()
def addEpsInfty( self, pos ):
_eps_infty = Static(plt_imag=self.ui.pgPlotWidget_imag,
plt_real=self.ui.pgPlotWidget_real,
limits=self.data.fit_limits)
_eps_infty.changedData.connect(self.updatePlot)
_eps_infty.removeObj.connect(self.delParamterObject)
cond_par = [10**pos.y()]
_eps_infty.setParameter(beta=cond_par)
self.parameterWidget.add(_eps_infty.widget)
self.function_registry.register_function(_eps_infty)
self.updatePlot()
def delParamterObject( self, obj ):
self.function_registry.unregister_function(obj)
self.updatePlot()
def addPeak( self, pos ):
id_list = [key.id_num for key in
self.function_registry.get_registered_functions()
if key.id_string == 'hn']
self.peakId = 1
while self.peakId in id_list:
self.peakId += 1
_peak = Peak(id_num=self.peakId,
plt_real=self.ui.pgPlotWidget_real,
plt_imag=self.ui.pgPlotWidget_imag,
limits=self.data.fit_limits)
self.function_registry.register_function(_peak)
_peak.changedData.connect(self.updatePlot)
_peak.removeObj.connect(self.delParamterObject)
new_peak_beta0 = [2*10**pos.y(), 1/(2*np.pi*10**pos.x()), 1, 1]
_peak.setParameter(beta=new_peak_beta0)
self.parameterWidget.add(_peak.widget)
self.updatePlot()
def fitData_start( self, method ):
#fit_methods = [fit_odr_cmplx, fit_odr_imag, fit_lbfgsb, fit_anneal]
self.fit_boundary_real.hide()
self.fit_boundary_imag.hide()
fit_method = [
self._fit_method.fit_odr_cmplx,
self._fit_method.fit_odr_imag,
][method]
# build function list
p0, funcs, fixed_params = [], [], []
for fcn in self.function_registry.get_registered_functions():
p0.extend(fcn.getParameter())
funcs.append(fcn)
fixed_params.extend(fcn.getFixed())
fcn.clearData()
_freq, _fit = self.data.get_data()
if not self._fit_thread.isRunning():
#self._fit_method.fit_odr_cmplx(_freq, _fit, p0, fixed_params, funcs)
fit_method(_freq, _fit, p0, fixed_params, funcs)
self._fit_thread.start()
self.ui.statusbar.showMessage("Fitting ...")
else:
self.ui.statusbar.showMessage("Still fitting ...")
def fitData_update( self ):
self._fit_thread.quit()
odr_result = self._fit_method.result()
p0, funcs, fixed_params = [], [], []
for fcn in self.function_registry.get_registered_functions():
p0.extend(fcn.getParameter())
funcs.append(fcn)
fixed_params.extend(fcn.getFixed())
for container in self.function_registry.get_registered_functions():
container.abort(False)
self.data.set_fit(odr_result.beta, funcs)
self.ui.statusbar.showMessage(" ".join(odr_result.stopreason))
ndx = 0
for i, fcn in enumerate(self.function_registry.get_registered_functions()):
num_p = len(fcn.getParameter())
beta = odr_result.beta[ndx:num_p+ndx]
if odr_result.sd_beta is not None:
sd_beta = odr_result.sd_beta[ndx:num_p+ndx]
else:
sd_beta = None
fcn.setParameter(beta, sd_beta)
ndx += num_p
self.fit_boundary_real.show()
self.fit_boundary_imag.show()
def getFileNames( self ):
tmp = QFileDialog.getOpenFileNames(self, "Open file", "", '*.dat *.TXT')
if len(tmp) != 0:
self._file_paths = tmp
self._current_file_index = 0
path = unicode(self._file_paths[self._current_file_index])
self.openFile(path)
def nextFile( self ):
lim = self.fit_boundary_imag.getRegion() # store limits
if len(self._file_paths) > self._current_file_index+1: # wrap around
self._current_file_index += 1
else:
self._current_file_index = 0
path = unicode(self._file_paths[self._current_file_index])
self.openFile(path)
self.fit_boundary_imag.setRegion(lim)
def previousFile( self ):
lim = self.fit_boundary_imag.getRegion() # store limits
if self._current_file_index == 0: # wrap around
self._current_file_index = len(self._file_paths)-1
else:
self._current_file_index -= 1
path = unicode(self._file_paths[self._current_file_index])
self.openFile(path)
self.fit_boundary_imag.setRegion(lim)
def _sortInputFiles( self, files ):
return QStringList(sorted(files, key=lambda x: re.findall("\d+\.\d+K", x)))
def openFile( self, path ):
print "opening: %s"%path
self.filepath = path
Temp, _die_loss, _die_stor, _freq = bds_file_reader.FileReader.read_datafile(path)
self.setWindowTitle("%s - %.2f K"%(os.path.basename(path), Temp))
self.data.set_data(_freq, _die_stor, _die_loss)
self.data.meta["T"] = Temp
self.fit_boundary_imag.setRegion([np.log10(_freq.min()), np.log10(_freq.max())])
self.ui.pgPlotWidget_imag.disableAutoRange()
self.ui.pgPlotWidget_real.disableAutoRange()
self.ui.pgPlotWidget_imag.setXRange(np.log10(_freq.min()), np.log10(_freq.max()))
self.ui.pgPlotWidget_imag.setYRange(np.log10(_die_loss.min()), np.log10(_die_loss.max()))
self.ui.pgPlotWidget_real.setXRange(np.log10(_freq.min()), np.log10(_freq.max()))
self.ui.pgPlotWidget_real.setYRange(np.log10(_die_stor.min()), np.log10(_die_stor.max()))
#self.ui.pgPlotWidget_real.setRange(xRange=(_freq.min(), _freq.max()),
# yRange=(_die_stor.min(), _die_stor.max()) )
self.updatePlot()
def updatePlot( self ):
log10fmin, log10fmax = self.fit_boundary_imag.getRegion()
self.data.set_fit_xlimits(10**log10fmin, 10**log10fmax)
p0, funcs = [], []
for fcn in self.function_registry.get_registered_functions():
p0.extend(fcn.getParameter())
funcs.append(fcn)
# calculate parametrized curve
self.data.set_fit(p0, funcs)
# replot data and fit, TODO: replot only if measurement data changed
self.data.data_curve_real.setData(self.data.frequency, self.data.epsilon.real)
self.data.data_curve_imag.setData(self.data.frequency, self.data.epsilon.imag)
#print "updatePlot: ",self.data.frequency_fit, self.data.epsilon_fit
if len(funcs) > 0:
#print "funcs > 0:",self.data.frequency_fit, self.data.epsilon_fit
self.data.fitted_curve_real.setData(x=self.data.frequency_fit, y=self.data.epsilon_fit.real)
self.data.fitted_curve_imag.setData(x=self.data.frequency_fit, y=self.data.epsilon_fit.imag)
else:
self.data.fitted_curve_real.setData(x=np.array([np.nan]), y=np.array([np.nan]))
self.data.fitted_curve_imag.setData(x=np.array([np.nan]), y=np.array([np.nan]))
def updateIntermediatePlot( self, freq, intermediate_data ):
self.data.fitted_curve_real.setData(freq, intermediate_data[0])
self.data.fitted_curve_imag.setData(freq, intermediate_data[1])
def _update_fit_boundary_imag( self ):
"""
Update real region when with imag reagion
"""
self.fit_boundary_real.setRegion(self.fit_boundary_imag.getRegion())
self._update_fit_boundary()
def _update_fit_boundary_real( self ):
"""
Update imag region when with real reagion
"""
self.fit_boundary_imag.setRegion(self.fit_boundary_real.getRegion())
self._update_fit_boundary()
def _update_fit_boundary( self ):
"""
Update limits in container.
"""
for container in self.function_registry.get_registered_functions():
lims = [10**i for i in self.fit_boundary_real.getRegion()]
container.set_limits(lims)
def sigint_handler( *args ):
"""
Handler for the SIGINT signal (CTRL + C).
"""
sys.stderr.write('\r')
if QMessageBox.question(None, '', "Are you sure you want to quit?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes) == QMessageBox.Yes:
QApplication.quit()
if __name__ == '__main__':
signal.signal(signal.SIGINT, sigint_handler)
files = sys.argv[1:]
app = QApplication(sys.argv)
timer = QTimer()
timer.start(1000) # Check every second for Strg-c on Cmd line
timer.timeout.connect(lambda: None)
main = AppWindow(files=files)
main.showMaximized()
main.raise_()
sys.exit(app.exec_())