core-extra/daemon/core/gui/task.py

45 lines
1.5 KiB
Python

import logging
import threading
from typing import Any, Callable
from core.gui.errors import show_grpc_response_exceptions
class BackgroundTask:
def __init__(self, master: Any, task: Callable, callback: Callable = None, args=()):
self.master = master
self.args = args
self.task = task
self.callback = callback
self.thread = None
def start(self):
logging.info("starting task")
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
def run(self):
result = self.task(*self.args)
logging.info("task completed")
# if start session fails, a response with Result: False and a list of exceptions is returned
if hasattr(result, "result") and not result.result:
if hasattr(result, "exceptions") and len(result.exceptions) > 0:
self.master.after(
0,
show_grpc_response_exceptions,
*(
result.__class__.__name__,
result.exceptions,
self.master,
self.master,
)
)
if self.callback:
if result is None:
args = ()
elif isinstance(result, (list, tuple)):
args = result
else:
args = (result,)
logging.info("calling callback: %s", args)
self.master.after(0, self.callback, *args)