from __future__ import annotations import hashlib import os import subprocess import urllib.request from datetime import datetime from os import getenv, stat from os.path import exists from pathlib import Path from urllib.error import HTTPError from PyQt5 import QtWidgets, QtCore from nmreval.lib.logger import logger class UpdateDialog(QtWidgets.QDialog): startDownload = QtCore.pyqtSignal(tuple) restartSignal = QtCore.pyqtSignal() def __init__(self, filename: str = None, parent=None): super().__init__(parent=parent) self._init_ui() if filename is None: filename = getenv('APPIMAGE') self._appfile = filename self.updater = Updater() self.thread = QtCore.QThread(self) self.thread.start() self.helper = Downloader() self.startDownload.connect(self.helper.run_download) self.helper.progressChanged.connect(self.status.setText) self.helper.finished.connect(self.finish_update) self.helper.started.connect(self.status.show) self.helper.moveToThread(self.thread) self.look_for_updates(self._appfile) def _init_ui(self): self.setWindowTitle('Updates') layout = QtWidgets.QVBoxLayout() self.label = QtWidgets.QLabel() layout.addWidget(self.label) layout.addSpacing(10) self.status = QtWidgets.QLabel() self.status.hide() layout.addWidget(self.status) layout.addSpacing(10) self.dialog_button = QtWidgets.QDialogButtonBox() self.dialog_button.accepted.connect(self.update_appimage) self.dialog_button.rejected.connect(self.close) layout.addWidget(self.dialog_button) self.setLayout(layout) def look_for_updates(self, filename=None): logger.info(f'Looking for updates, compare to file {filename}') # Download zsync file of latest Appimage, look for SHA-1 hash and compare with hash of AppImage is_updateble, m_time_file, m_time_zsync = self.updater.get_update_information(filename) label_text = '' if is_updateble is None: label_text += '

Could not determine if this version is newer, please update manually (if necessary).

' dialog_bttns = QtWidgets.QDialogButtonBox.Close elif is_updateble: label_text += '

Different version available. Press Ok to download this version, Cancel to ignore.

' dialog_bttns = QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel else: label_text += '

Version may be already up-to-date.

' dialog_bttns = QtWidgets.QDialogButtonBox.Close if m_time_zsync is None: label_text += '

Creation date of remote version is unknown.

' else: label_text += f'

Date of most recent AppImage: {m_time_zsync.strftime("%d %B %Y %H:%M")}

' if m_time_file is None: label_text += 'No AppImage file found, press Ok to download latest version.' dialog_bttns = QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Close else: label_text += f'

Date of used AppImage: {m_time_file.strftime("%d %B %Y %H:%M")}

' self.label.setText(label_text) self.dialog_button.setStandardButtons(dialog_bttns) @QtCore.pyqtSlot() def update_appimage(self): if self._appfile is None: args = (self.updater.zsync_url,) else: # this breaks the download for some reason args = (self.updater.zsync_url, self._appfile) self.dialog_button.setEnabled(False) self.startDownload.emit(args) self.status.show() @QtCore.pyqtSlot(int, str) def finish_update(self, retcode: int, file_loc: str): restart = QRestartWindow(state=retcode, file_loc=file_loc, parent=self) res = restart.exec() self.close() if res == QtWidgets.QMessageBox.Ok: self.restartSignal.emit() def closeEvent(self, evt): self.thread.quit() self.thread.wait() super().closeEvent(evt) class QRestartWindow(QtWidgets.QMessageBox): def __init__(self, state: int, file_loc: str, parent=None): super().__init__(parent=parent) self._appfile = file_loc if state: self.setText('Download failed') self.setDetailedText(f'Status code of failure is {state}') self.setStandardButtons(QtWidgets.QMessageBox.Close) self.setIcon(QtWidgets.QMessageBox.Warning) else: self.setText('Download completed!') self.setInformativeText("Press Restart to use new AppImage") self.setDetailedText(f'Location of AppImage: {file_loc}') self.setIcon(QtWidgets.QMessageBox.Information) self.setStandardButtons(QtWidgets.QMessageBox.Ok | QtWidgets.QMessageBox.Close) restart_button = self.button(QtWidgets.QMessageBox.Ok) restart_button.setText('Restart') self.buttonClicked.connect(self.maybe_close) def maybe_close(self): if self.clickedButton() == self.button(QtWidgets.QMessageBox.Ok): app = QtWidgets.QApplication.instance() app.quit() subprocess.Popen(self._appfile) class Downloader(QtCore.QObject): started = QtCore.pyqtSignal() finished = QtCore.pyqtSignal(int, str) progressChanged = QtCore.pyqtSignal(str) @QtCore.pyqtSlot(tuple) def run_download(self, args: tuple[str]): status = 0 appimage_location = args[0][:-6] logger.info(f'Download {appimage_location}') if len(args) == 2: new_file = Path(args[1]) else: new_file = Path.home() / 'Downloads' / 'NMReval-latest-x86_64.AppImage' if new_file.exists(): os.rename(new_file, new_file.with_suffix('.AppImage.old')) try: with urllib.request.urlopen(appimage_location) as response: with new_file.open('wb') as f: f.write(response.read()) new_file.chmod(0o755) except HTTPError as e: logger.exception(f'Download failed with {e}') status = 3 except Exception as e: logger.exception(f'Download failed with {e.args}') status = 1 if status != 0: logger.warning('Download failed, restore previous AppImage') try: os.rename(new_file.with_suffix('.AppImage.old'), new_file) except FileNotFoundError: pass # zsync does not support https self.finished.emit(status, str(new_file)) class Updater: host = 'gitea.pkm.physik.tu-darmstadt.de/api/packages/IPKM/generic/NMReval/latest/' version = 'NMReval-latest-x86_64' @property def zsync_url(self): return f'https://{Updater.host}/{Updater.version}.AppImage.zsync' @staticmethod def get_zsync(): url_zsync = f'https://{Updater.host}/{Updater.version}.AppImage.zsync' m_time_zsync = None checksum_zsync = None zsync_file = None filename = None try: with urllib.request.urlopen(url_zsync) as response: zsync_file = response.read() except HTTPError as e: logger.error(f'Request for zsync returned code {e}') except Exception as e: logger.exception(f'Download of zsync failed with exception {e.args}') if zsync_file is not None: for line in zsync_file.split(b'\n'): try: kw, val = line.split(b': ') time_string = str(val, encoding='utf-8') time_format = '%a, %d %b %Y %H:%M:%S %z' if kw == b'MTime': try: m_time_zsync = datetime.strptime(time_string, time_format).astimezone(None) except ValueError: logger.warning(f'zsync time "{time_string}" does not match "{time_format}"') elif kw == b'SHA-1': checksum_zsync = str(val, encoding='utf-8') elif kw == b'Filename': filename = str(val, encoding='utf-8') except ValueError: # stop when empty line is reached break return m_time_zsync, checksum_zsync, filename @staticmethod def get_appimage_info(filename: str): if filename is None: return None, None if not exists(filename): return None, None stat_mtime = stat(filename).st_mtime m_time_file = datetime.fromtimestamp(stat_mtime).replace(microsecond=0) with open(filename, 'rb') as f: checksum_file = hashlib.sha1(f.read()).hexdigest() if checksum_file is None: logger.warning('No checksum for AppImage calculated') return m_time_file, checksum_file @staticmethod def get_update_information(filename: str) -> tuple[(bool | None), datetime, datetime]: m_time_zsync, checksum_zsync, appname = Updater.get_zsync() m_time_file, checksum_file = Updater.get_appimage_info(filename) logger.debug(f'zsync information {m_time_zsync}, {checksum_zsync}, {appname}') logger.debug(f'file information {m_time_file}, {checksum_file}') if not ((checksum_file is not None) and (checksum_zsync is not None)): return None, m_time_file, m_time_zsync else: return checksum_file != checksum_zsync, m_time_file, m_time_zsync if __name__ == '__main__': import sys from gui_qt import App app = App(['Team Rocket FTW!']) updater = UpdateDialog() updater.show() sys.exit(app.exec())