pFS requires a special type of Thread to streamline the application. This type of Thread is called ThreadHandler. Any all all Threads within pFS should be ThreadHandler and not the generic Python Thread object.
ThreadHandler also serves a second purpose: Whenever a submodule process encounters and error, it can raise that error and it will propagate through ThreadHandler. For example, if a submodule cannot access one of its dependencies, it can raise an error, ThreadHandler will read the error and tell core to reassign the dependency to the submodule.
ThreadHandler
ThreadHandler is a wrapper for the Python Thread class which basically auto restarts the Thread if the Thread crashes for any reason.
The ThreadHandler class is under helpers/ can can be imported like this:
from helpers.threadhandler import ThreadHandler
The ThreadHandler class is below. You can also view this file on GitHub.
helpers/threadhandler.py
import functoolsimport loggingimport threadingimport timefrom core import Corefrom helpers.exceptions importModuleNotFoundErrorclassThreadHandler:def__init__(self,core: Core,target:callable,name:str=None,parent_logger=logging,interval:int=3,suppress_out:bool=False,auto_restart:bool=True,daemon:bool=False):""" Initialize a ThreadHandler. :param target: The child function to run, should be either a functools.partial or lambda. :param name: The name of the thread; default is name of function or pointer location. :param parent_logger: A logging object (ex. GPS); default 'root'. :param interval: Amount of time between checking the status of the child function; default 3s. :param suppress_out: Suppresses the logging of messages; default False. :param auto_restart: Whether or not to automatically restart the thread. :param daemon: Set True to stop thread when main thread terminates """ self.target = targetif name isNone:iftype(target)== functools.partial: self.name = target.func.__name__# TODO figure out why function is not definedeliftype(target)istype(lambdax: x): self.name = target.__name__else: self.name ="thread_"+str(id(self))else: self.name = name self.parent_logger = parent_logger self.interval = interval self.suppress_out = suppress_out self.auto_restart = auto_restart self.is_active =True self.is_alive =False self.daemon = daemon self.core = coredefstart(self):""" Start the ThreadHandler. This function actually starts a threading.Thread, with the run() method as the target. """ threading.Thread(target=self.run, name=self.name, daemon=self.daemon).start()defrun(self):whileTrue:if self.is_active:ifnot self.suppress_out: self.parent_logger.info("'%s' thread started"% self.name)try: self.target()exceptModuleNotFoundErroras m: self.core.reset_module(m.submodule, m.missing)exceptBaseExceptionas e:ifnot self.suppress_out: self.parent_logger.exception(str(e) +", restarting '%s'"% self.name)ifnot self.auto_restart: self.is_active =Falseelse:ifnot self.suppress_out: self.parent_logger.info("Bad thread, restarting '%s'"% self.name)ifnot self.auto_restart: self.is_active =False time.sleep(self.interval)defresume(self):""" Resume the ThreadHandler. """ifnot self.suppress_out: self.parent_logger.info("'%s' thread resumed"% self.name)ifnot self.auto_restart: self.is_active =Truedefpause(self):""" Pause the ThreadHandler. """ifnot self.suppress_out: self.parent_logger.info("'%s' thread paused"% self.name)ifnot self.auto_restart: self.is_active =False