This start method iterates through the processes dictionary and starts all threads
Specifics
The following methods are specific to just core:
populate_dependencies
Since the dependency tree for submodules is extremely circular, it is impossible to instantiate submodules with references to their dependencies
populate_dependencies iterates through the config dictionary and reads the depeneds_on key for each submodule. It then calls the submodule.set_modules method which populates the self.modules dictionary with references to the submodules dependencies
request
For any reason, a submodule may request a reference to another submodule. This method grants that request
Handles the ModuleNotFoundError by directly accessing a submodulesself.modules dictionary and repopulating it with good dependency references.
get_config
Returns core.config
Code
core/core.py
import logging
import os
import time
from functools import partial
from threading import Timer
from yaml import safe_load
from helpers.mode import Mode
from helpers.power import Power
from helpers.threadhandler import ThreadHandler
from core.processes import power_watchdog, is_first_boot
from submodules.submodule import Submodule
from submodules.antenna_deployer import AntennaDeployer
from submodules.command_ingest import CommandIngest
from submodules.eps import EPS
from submodules.radios.aprs import APRS
from submodules.radios.iridium import Iridium
from submodules.telemetry import Telemetry
class Core:
def __init__(self):
if os.path.exists('config/config_custom.yml'):
with open('config/config_custom.yml') as f:
self.config = safe_load(f)
else:
with open('config/config_default.yml') as f:
self.config = safe_load(f)
self.logger = logging.getLogger("core")
self.state = Mode.LOW_POWER
self.submodules = {
"antenna_deployer": AntennaDeployer(core=self, config=self.config),
"aprs": APRS(core=self, config=self.config),
"command_ingest": CommandIngest(core=self, config=self.config),
"eps": EPS(core=self, config=self.config),
"iridium": Iridium(core=self, config=self.config),
"telemetry": Telemetry(core=self, config=self.config),
}
self.populate_dependencies()
self.processes = {
"power_monitor": ThreadHandler(
target=partial(power_watchdog, core=self, eps=self.submodules['eps']),
name="power_monitor",
parent_logger=self.logger
),
"telemetry_dump": Timer(
interval=self.config['core']['dump_interval'],
function=partial(self.submodules["telemetry"].dump)
),
"telemetry_heartbeat": Timer(
interval=self.config['core']['heartbeat_invertal'],
target=partial(self.submodules["telemetry"].heartbeat)
)
}
def populate_dependencies(self) -> None:
"""
Iterates through configuration data dictionary and sets each submodule's self.modules dictionary
with a dictionary that contains references to all the other submodules listed in the first
submodule's depends_on key
"""
for submodule in self.submodules:
if hasattr(self.submodules[submodule], 'set_modules'):
self.submodules[submodule].set_modules({
dependency: self.submodules[dependency]
for dependency in self.config[submodule]['depends_on']
})
def get_config(self) -> dict:
"""Returns the configuration data from config_*.yml as a list"""
return self.config
def get_state(self) -> Mode:
return self.state
def reset_module(self, submodule: Submodule, dependency: str) -> None:
submodule.modules[dependency] = self.submodules[dependency]
def enter_normal_mode(self, reason: str = '') -> None:
"""
Enter normal power mode.
:param reason: Reason for entering normal mode.
"""
self.logger.warning(
f"Entering normal mode{' Reason: ' if reason else ''}{reason if reason else ''}")
self.state = Mode.NORMAL
for submodule in self.submodules:
if hasattr(self.submodules[submodule], 'enter_normal_mode'):
self.submodules[submodule].enter_normal_mode()
def enter_low_power_mode(self, reason: str = '') -> None:
"""
Enter low power mode.
:param reason: Reason for entering low power mode.
"""
self.logger.warning(
f"Entering low power mode{' Reason: ' if reason else ''}{reason if reason else ''}")
self.state = Mode.LOW_POWER
for submodule in self.submodules:
if hasattr(self.submodules[submodule], 'enter_low_power_mode'):
self.submodules[submodule].enter_low_power_mode()
def request(self, module_name: str):
"""
Returns a reference to a specified module if specified module is present
@param module_name: name of module requested
"""
return self.submodules[module_name] if module_name in self.submodules.keys() else False
def __str__(self):
return "Core: core"
def start(self) -> None:
"""
Runs the startup process for core
"""
for submodule in self.config['core']['modules']['A']:
if hasattr(self.submodules[submodule], 'start'):
self.submodules[submodule].start()
if is_first_boot():
time.sleep(self.config['core']['sleep_interval'])
for submodule in self.config['core']['modules']['B']:
if hasattr(self.submodules[submodule], 'start'):
self.submodules[submodule].start()
while self.submodules['eps'].get_battery_bus_volts() < Power.STARTUP.value:
time.sleep(1)
self.mode = Mode.NORMAL
for submodule in self.config['core']['modules']['C']:
if hasattr(self.submodules[submodule], 'start'):
self.submodules[submodule].start()
for process in self.processes:
self.processes[process].start()
while True:
time.sleep(1) # Keep main thread alive so that child threads do not terminate