Core
This article describes the format Core should follow
Description
As previously stated, Core acts as a system daemon and controls pFS on an application level. Therefore, core has the following aspects:
__init__()This constructor initializes the
configYAML dictionaryThis constructor initializes the
logger,statevariablesThis constructor initializes all the
submodulesregardless of orderThis constructor populates the dependencies of all the
submodulesregardless of orderThis constructor initializes the
processesdictionary
Specifics
The following methods are specific to just core:
populate_dependenciesSince the dependency tree for
submodulesis extremely circular, it is impossible to instantiatesubmoduleswith references to their dependenciespopulate_dependenciesiterates through theconfigdictionary and reads thedepeneds_onkey for eachsubmodule. It then calls thesubmodule.set_modulesmethod which populates theself.modulesdictionary with references to thesubmodulesdependencies
requestFor any reason, a
submodulemay request a reference to anothersubmodule. This method grants that request
get_stateReturns the current Mode.
reset_moduleHandles the
ModuleNotFoundErrorby directly accessing asubmodulesself.modulesdictionary and repopulating it with good dependency references.
get_configReturns
core.config
Code
import logging
import os
import time
from functools import partial
from threading import Timer
from yaml import safe_load
from helpers.mode import Mode
from helpers.power import Power
from helpers.threadhandler import ThreadHandler
from core.processes import power_watchdog, is_first_boot
from submodules.submodule import Submodule
from submodules.antenna_deployer import AntennaDeployer
from submodules.command_ingest import CommandIngest
from submodules.eps import EPS
from submodules.radios.aprs import APRS
from submodules.radios.iridium import Iridium
from submodules.telemetry import Telemetry
class Core:
def __init__(self):
if os.path.exists('config/config_custom.yml'):
with open('config/config_custom.yml') as f:
self.config = safe_load(f)
else:
with open('config/config_default.yml') as f:
self.config = safe_load(f)
self.logger = logging.getLogger("core")
self.state = Mode.LOW_POWER
self.submodules = {
"antenna_deployer": AntennaDeployer(core=self, config=self.config),
"aprs": APRS(core=self, config=self.config),
"command_ingest": CommandIngest(core=self, config=self.config),
"eps": EPS(core=self, config=self.config),
"iridium": Iridium(core=self, config=self.config),
"telemetry": Telemetry(core=self, config=self.config),
}
self.populate_dependencies()
self.processes = {
"power_monitor": ThreadHandler(
target=partial(power_watchdog, core=self, eps=self.submodules['eps']),
name="power_monitor",
parent_logger=self.logger
),
"telemetry_dump": Timer(
interval=self.config['core']['dump_interval'],
function=partial(self.submodules["telemetry"].dump)
),
"telemetry_heartbeat": Timer(
interval=self.config['core']['heartbeat_invertal'],
target=partial(self.submodules["telemetry"].heartbeat)
)
}
def populate_dependencies(self) -> None:
"""
Iterates through configuration data dictionary and sets each submodule's self.modules dictionary
with a dictionary that contains references to all the other submodules listed in the first
submodule's depends_on key
"""
for submodule in self.submodules:
if hasattr(self.submodules[submodule], 'set_modules'):
self.submodules[submodule].set_modules({
dependency: self.submodules[dependency]
for dependency in self.config[submodule]['depends_on']
})
def get_config(self) -> dict:
"""Returns the configuration data from config_*.yml as a list"""
return self.config
def get_state(self) -> Mode:
return self.state
def reset_module(self, submodule: Submodule, dependency: str) -> None:
submodule.modules[dependency] = self.submodules[dependency]
def enter_normal_mode(self, reason: str = '') -> None:
"""
Enter normal power mode.
:param reason: Reason for entering normal mode.
"""
self.logger.warning(
f"Entering normal mode{' Reason: ' if reason else ''}{reason if reason else ''}")
self.state = Mode.NORMAL
for submodule in self.submodules:
if hasattr(self.submodules[submodule], 'enter_normal_mode'):
self.submodules[submodule].enter_normal_mode()
def enter_low_power_mode(self, reason: str = '') -> None:
"""
Enter low power mode.
:param reason: Reason for entering low power mode.
"""
self.logger.warning(
f"Entering low power mode{' Reason: ' if reason else ''}{reason if reason else ''}")
self.state = Mode.LOW_POWER
for submodule in self.submodules:
if hasattr(self.submodules[submodule], 'enter_low_power_mode'):
self.submodules[submodule].enter_low_power_mode()
def request(self, module_name: str):
"""
Returns a reference to a specified module if specified module is present
@param module_name: name of module requested
"""
return self.submodules[module_name] if module_name in self.submodules.keys() else False
def __str__(self):
return "Core: core"
def start(self) -> None:
"""
Runs the startup process for core
"""
for submodule in self.config['core']['modules']['A']:
if hasattr(self.submodules[submodule], 'start'):
self.submodules[submodule].start()
if is_first_boot():
time.sleep(self.config['core']['sleep_interval'])
for submodule in self.config['core']['modules']['B']:
if hasattr(self.submodules[submodule], 'start'):
self.submodules[submodule].start()
while self.submodules['eps'].get_battery_bus_volts() < Power.STARTUP.value:
time.sleep(1)
self.mode = Mode.NORMAL
for submodule in self.config['core']['modules']['C']:
if hasattr(self.submodules[submodule], 'start'):
self.submodules[submodule].start()
for process in self.processes:
self.processes[process].start()
while True:
time.sleep(1) # Keep main thread alive so that child threads do not terminateLast updated
Was this helpful?