This start method iterates through the processes dictionary and starts all threads
Specifics
The following methods are specific to just core:
populate_dependencies
Since the dependency tree for submodules is extremely circular, it is impossible to instantiate submodules with references to their dependencies
populate_dependencies iterates through the config dictionary and reads the depeneds_on key for each submodule. It then calls the submodule.set_modules method which populates the self.modules dictionary with references to the submodules dependencies
request
For any reason, a submodule may request a reference to another submodule. This method grants that request
Handles the ModuleNotFoundError by directly accessing a submodulesself.modules dictionary and repopulating it with good dependency references.
get_config
Returns core.config
Code
core/core.py
import loggingimport osimport timefrom functools import partialfrom threading import Timerfrom yaml import safe_loadfrom helpers.mode import Modefrom helpers.power import Powerfrom helpers.threadhandler import ThreadHandlerfrom core.processes import power_watchdog, is_first_bootfrom submodules.submodule import Submodulefrom submodules.antenna_deployer import AntennaDeployerfrom submodules.command_ingest import CommandIngestfrom submodules.eps import EPSfrom submodules.radios.aprs import APRSfrom submodules.radios.iridium import Iridiumfrom submodules.telemetry import TelemetryclassCore:def__init__(self):if os.path.exists('config/config_custom.yml'):withopen('config/config_custom.yml')as f: self.config =safe_load(f)else:withopen('config/config_default.yml')as f: self.config =safe_load(f) self.logger = logging.getLogger("core") self.state = Mode.LOW_POWER self.submodules ={"antenna_deployer":AntennaDeployer(core=self, config=self.config),"aprs":APRS(core=self, config=self.config),"command_ingest":CommandIngest(core=self, config=self.config),"eps":EPS(core=self, config=self.config),"iridium":Iridium(core=self, config=self.config),"telemetry":Telemetry(core=self, config=self.config),} self.populate_dependencies() self.processes ={"power_monitor":ThreadHandler( target=partial(power_watchdog, core=self, eps=self.submodules['eps']), name="power_monitor", parent_logger=self.logger ),"telemetry_dump":Timer( interval=self.config['core']['dump_interval'], function=partial(self.submodules["telemetry"].dump) ),"telemetry_heartbeat":Timer( interval=self.config['core']['heartbeat_invertal'], target=partial(self.submodules["telemetry"].heartbeat) )}defpopulate_dependencies(self) ->None:""" Iterates through configuration data dictionary and sets each submodule's self.modules dictionary with a dictionary that contains references to all the other submodules listed in the first submodule's depends_on key """for submodule in self.submodules:ifhasattr(self.submodules[submodule], 'set_modules'): self.submodules[submodule].set_modules({ dependency: self.submodules[dependency]for dependency in self.config[submodule]['depends_on'] })defget_config(self) ->dict:"""Returns the configuration data from config_*.yml as a list"""return self.configdefget_state(self) -> Mode:return self.statedefreset_module(self,submodule: Submodule,dependency:str) ->None: submodule.modules[dependency]= self.submodules[dependency]defenter_normal_mode(self,reason:str='') ->None:""" Enter normal power mode. :param reason: Reason for entering normal mode. """ self.logger.warning(f"Entering normal mode{' Reason: 'if reason else''}{reason if reason else''}") self.state = Mode.NORMALfor submodule in self.submodules:ifhasattr(self.submodules[submodule], 'enter_normal_mode'): self.submodules[submodule].enter_normal_mode()defenter_low_power_mode(self,reason:str='') ->None:""" Enter low power mode. :param reason: Reason for entering low power mode. """ self.logger.warning(f"Entering low power mode{' Reason: 'if reason else''}{reason if reason else''}") self.state = Mode.LOW_POWERfor submodule in self.submodules:ifhasattr(self.submodules[submodule], 'enter_low_power_mode'): self.submodules[submodule].enter_low_power_mode()defrequest(self,module_name:str):""" Returns a reference to a specified module if specified module is present @param module_name: name of module requested """return self.submodules[module_name]if module_name in self.submodules.keys()elseFalsedef__str__(self):return"Core: core"defstart(self) ->None:""" Runs the startup process for core """for submodule in self.config['core']['modules']['A']:ifhasattr(self.submodules[submodule], 'start'): self.submodules[submodule].start()ifis_first_boot(): time.sleep(self.config['core']['sleep_interval'])for submodule in self.config['core']['modules']['B']:ifhasattr(self.submodules[submodule], 'start'): self.submodules[submodule].start()while self.submodules['eps'].get_battery_bus_volts()< Power.STARTUP.value: time.sleep(1) self.mode = Mode.NORMALfor submodule in self.config['core']['modules']['C']:ifhasattr(self.submodules[submodule], 'start'): self.submodules[submodule].start()for process in self.processes: self.processes[process].start()whileTrue: time.sleep(1)# Keep main thread alive so that child threads do not terminate