from __future__ import annotations import sys from functools import lru_cache from os import getenv, stat from os.path import exists import hashlib import subprocess from datetime import datetime from contextlib import contextmanager from pathlib import Path import requests from numpy import linspace from scipy.interpolate import interp1d from nmreval.lib.logger import logger from ..Qt import QtGui, QtWidgets, QtCore @contextmanager def busy_cursor(): try: cursor = QtGui.QCursor(QtCore.Qt.ForbiddenCursor) QtWidgets.QApplication.setOverrideCursor(cursor) yield finally: QtWidgets.QApplication.restoreOverrideCursor() class RdBuCMap: # taken from Excel sheet from colorbrewer.org _rdbu = [ (103, 0, 31), (178, 24, 43), (214, 96, 77), (244, 165, 130), (253, 219, 199), (247, 247, 247), (209, 229, 240), (146, 197, 222), (67, 147, 195), (33, 102, 172), (5, 48, 97) ] def __init__(self, vmin=-1., vmax=1.): self.min = vmin self.max = vmax self.spline = [interp1d(linspace(self.max, self.min, num=11), [rgb[i] for rgb in RdBuCMap._rdbu]) for i in range(3)] def color(self, val: float): if val > self.max: col = QtGui.QColor.fromRgb(*RdBuCMap._rdbu[0]) elif val < self.min: col = QtGui.QColor.fromRgb(*RdBuCMap._rdbu[-1]) else: col = QtGui.QColor.fromRgb(*(float(self.spline[i](val)) for i in range(3))) return col class UpdateDialog(QtWidgets.QDialog): startDownload = QtCore.pyqtSignal(list) def __init__(self, filename: str = None, parent=None): super().__init__(parent=parent) self._init_ui() if filename is None: filename = getenv('APPIMAGE') self._appfile = filename self.success = False self.updater = Updater() self.thread = QtCore.QThread(self) self.thread.start() self.helper = Downloader() self.startDownload.connect(self.helper.run_download) self.helper.progressChanged.connect(self.status.setText) self.helper.finished.connect(self.finish_update) self.helper.started.connect(self.status.show) self.helper.moveToThread(self.thread) self.look_for_updates(self._appfile) def _init_ui(self): self.setWindowTitle('Updates') layout = QtWidgets.QVBoxLayout() self.label = QtWidgets.QLabel() layout.addWidget(self.label) layout.addSpacing(10) self.status = QtWidgets.QLabel() self.status.hide() layout.addWidget(self.status) layout.addSpacing(10) self.dialog_button = QtWidgets.QDialogButtonBox() self.dialog_button.accepted.connect(self.update_appimage) self.dialog_button.rejected.connect(self.close) layout.addWidget(self.dialog_button) self.setLayout(layout) def look_for_updates(self, filename=None): logger.info(f'Looking for updates, compare to file {filename}') # Download zsync file of latest Appimage, look for SHA-1 hash and compare with hash of AppImage is_updateble, m_time_file, m_time_zsync = self.updater.get_update_information(filename) if m_time_zsync is None: label_text = '
Retrieval of version information failed.
' \ 'Please try later (or complain to people that it does not work).
' dialog_bttns = QtWidgets.QDialogButtonBox.Close else: label_text = f'Date of most recent AppImage: {m_time_zsync.strftime("%d %B %Y %H:%M")}
' if m_time_file is None: label_text += 'No AppImage file found, press Ok to download latest version.' dialog_bttns = QtWidgets.QDialogButtonBox.Ok|QtWidgets.QDialogButtonBox.Close else: label_text += f'Date of used AppImage: {m_time_file.strftime("%d %B %Y %H:%M")}
' if is_updateble is None: self.status.setText('Could not determine if this version is newer, please update manually (if necessary).') dialog_bttns = QtWidgets.QDialogButtonBox.Close elif is_updateble: self.status.setText(f'Newer version available. Press Ok to download new version, Cancel to ignore.')
dialog_bttns = QtWidgets.QDialogButtonBox.Ok|QtWidgets.QDialogButtonBox.Cancel
else:
self.status.setText(f'Version may be already up-to-date.')
dialog_bttns = QtWidgets.QDialogButtonBox.Close
self.label.setText(label_text)
self.dialog_button.setStandardButtons(dialog_bttns)
@QtCore.pyqtSlot()
def update_appimage(self):
if self._appfile is None:
args = [self.updater.zsync_url]
else:
# this breaks the download for some reason
args = ['-i', self._appfile, self.updater.zsync_url]
args = [self.updater.zsync_url]
self.dialog_button.setEnabled(False)
self.startDownload.emit(args)
self.status.show()
@QtCore.pyqtSlot(int)
def finish_update(self, retcode: int):
# print('finished with', retcode)
self.success = retcode == 0
if retcode == 0:
self.status.setText('Download complete.')
else:
self.status.setText(f'Download failed :( with return code {retcode}.')
self.dialog_button.setStandardButtons(QtWidgets.QDialogButtonBox.Close)
self.dialog_button.setEnabled(True)
def closeEvent(self, evt):
self.thread.quit()
self.thread.wait()
if self.success:
appname = self.updater.get_zsync()[2]
if self._appfile is not None:
appimage_path = appname
old_version = Path(self._appfile).rename(self._appfile+'.old')
appimage_path = Path(appimage_path).replace(self._appfile)
else:
appimage_path = Path().cwd() / appname
# rename to version-agnostic name
appimage_path = appimage_path.rename('NMReval.AppImage')
appimage_path.chmod(appimage_path.stat().st_mode | 73) # 73 = 0o111 = a+x
_ = QtWidgets.QMessageBox.information(self, 'Complete',
f'New AppImage available at
{appimage_path}')
super().closeEvent(evt)
class Downloader(QtCore.QObject):
started = QtCore.pyqtSignal()
finished = QtCore.pyqtSignal(int)
progressChanged = QtCore.pyqtSignal(str)
@QtCore.pyqtSlot(list)
def run_download(self, args: list[str]):
logger.info(f'Download with args {args}')
process = subprocess.Popen(['zsync'] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, universal_newlines=True)
while True:
nextline = process.stdout.readline().strip()
if nextline:
self.progressChanged.emit(nextline)
# line = process.stderr.readline().strip()
if process.poll() is not None:
break
self.finished.emit(process.returncode)
class Updater:
host = 'gitea.pkm.physik.tu-darmstadt.de/api/packages/IPKM/generic/NMReval/latest/'
version = 'NMReval-latest-x86_64'
@property
def zsync_url(self):
return f'https://{Updater.host}/{Updater.version}.AppImage.zsync'
@staticmethod
@lru_cache(3)
def get_zsync():
url_zsync = self.zsync_url
m_time_zsync = None
checksum_zsync = None
zsync_file = None
filename = None
try:
response = requests.get(url_zsync)
if response.status_code == 200:
zsync_file = response.content
else:
logger.error(f'Request for zsync returned code {response.status_code}')
except Exception as :
logger.exception(f'Download of zsync failed with exception {e.args}')
if zsync_file is not None:
for line in zsync_file.split(b'\n'):
try:
kw, val = line.split(b': ')
if kw == b'MTime':
m_time_zsync = datetime.strptime(str(val, encoding='utf-8'), '%a, %d %b %Y %H:%M:%S %z').astimezone(None)
elif kw == b'SHA-1':
checksum_zsync = str(val, encoding='utf-8')
elif kw == b'Filename':
filename = str(val, encoding='utf-8')
except ValueError:
# stop when empty line is reached
break
return m_time_zsync, checksum_zsync, filename
@staticmethod
def get_appimage_info(filename: str):
if filename is None:
return None, None
if not exists(filename):
return None, None
stat_mtime = stat(filename).st_mtime
m_time_file = datetime.fromtimestamp(stat_mtime).replace(microsecond=0)
with open(filename, 'rb') as f:
checksum_file = hashlib.sha1(f.read()).hexdigest()
return m_time_file, checksum_file
@staticmethod
def get_update_information(filename: str) -> tuple[(bool | None), datetime, datetime]:
m_time_zsync, checksum_zsync, appname = Updater.get_zsync()
m_time_file, checksum_file = Updater.get_appimage_info(filename)
logger.info(f'zsync information {m_time_zsync}, {checksum_zsync}, {appname}')
logger.info(f'file information {m_time_file}, {checksum_zsync}')
if not ((checksum_file is not None) and (checksum_zsync is not None)):
return None, m_time_file, m_time_zsync
else:
return checksum_file != checksum_zsync, m_time_file, m_time_zsync
def open_bug_report():
form_entries = {
'description': 'Please state the nature of the medical emergency.',
'title': 'Everything is awesome?',
'assign[0]': 'dominik',
'subscribers[0]': 'dominik',
'tag': 'nmreval',
'priority': 'normal',
'status': 'open',
}
full_url = 'https://chaos3.fkp.physik.tu-darmstadt.de/maniphest/task/edit/?'
for k, v in form_entries.items():
full_url += f'{k}={v}&'
full_url.replace(' ', '+')
import webbrowser
webbrowser.open(full_url)