Source code for fand.clientrpi

"""fand client for Raspberry Pi"""

import argparse
import logging
import signal
import socket
import time
from typing import (NoReturn, Optional, Set, Union)

import gpiozero

import fand.communication as com
import fand.util as util
from fand.exceptions import (
    CommunicationError, GpioError, ShelfPwmBadValue, TerminatingError
)

# Constants
# Module docstring
__DOCSTRING__ = __doc__
# Logger to use
logger = logging.getLogger(__name__)
#: How much time to wait between updates
SLEEP_TIME: float = 60

# Global variables
# Set of active GPIO devices, closed when program exits
__GPIO_DEVICES__: Set[Union['GpioRpm', 'GpioPwm']] = set()


@util.when_terminate
def _terminate() -> None:
    for device in __GPIO_DEVICES__.copy():
        try:
            device.close()
        except GpioError:
            logger.exception("Failed to close GPIO %s", device)
        __GPIO_DEVICES__.remove(device)


[docs]def add_gpio_device(device: Union['GpioRpm', 'GpioPwm']) -> None: """Add a GPIO device to the set of managed GPIO devices :param device: GPIO device to add :raises TerminatingError: Trying to add a socket but :func:`fand.util.terminating` is True """ if util.terminating(): raise TerminatingError("Cannot add new GPIO device while terminating") __GPIO_DEVICES__.add(device)
[docs]class GpioRpm: """Class to handle RPM tachometer input from a fan :param pin: GPIO pin number to use :param managed: set to true to have the GPIO device automatically closed when :func:`fand.util.terminate` is called :raises GpioError: Received a :exc:`gpiozero.GPIOZeroError` """ __pin: int __gpio: gpiozero.Button __count: float __start_time: float rpm: float def __init__(self, pin: int, managed: bool = True) -> None: self.__pin = pin try: self.__gpio = gpiozero.Button(pin, pull_up=True) self.__gpio.when_pressed = self.__pressed except gpiozero.GPIOZeroError as error: raise GpioError from error except gpiozero.GPIOZeroWarning as warning: logger.warning("Ignoring GPIO warning %s", warning) self.__count, self.__start_time = 0, time.time() #: RPM value self.rpm = 0 if managed: add_gpio_device(self) logger.info("Created GPIO RPM device on pin %s", pin) def __str__(self) -> str: return f"RPM on GPIO{self.__pin}" def __pressed(self) -> None: """Increment the press count""" self.__count += 1
[docs] def update(self) -> None: """Update the RPM value""" self.rpm = (self.__count / 2) / (time.time() - self.__start_time) * 60 self.__count, self.__start_time = 0, time.time() logger.debug("Updating RPM value to %s", self.rpm)
[docs] def close(self) -> None: """Close the GPIO device :raises GpioError: Received a :exc:`gpiozero.GPIOZeroError` """ try: self.__gpio.close() except gpiozero.GPIOZeroError as error: raise GpioError from error except gpiozero.GPIOZeroWarning as warning: logger.warning("Ignoring GPIO warning %s", warning)
[docs]class GpioPwm: """Class to handle PWM output for a fan :param pin: GPIO pin number to use :param managed: set to true to have the GPIO device automatically closed when :func:`fand.util.terminate` is called :raises GpioError: Received a :exc:`gpiozero.GPIOZeroError` """ __pin: int __gpio: gpiozero.PWMLED def __init__(self, pin: int, managed: bool = True) -> None: self.__pin = pin try: self.__gpio = gpiozero.PWMLED(pin, frequency=25000, active_high=True, initial_value=1) except gpiozero.GPIOZeroError as error: raise GpioError from error except gpiozero.GPIOZeroWarning as warning: logger.warning("Ignoring GPIO warning %s", warning) if managed: add_gpio_device(self) logger.info("Created GPIO PWM device on pin %s", pin) def __str__(self) -> str: return f"PWM on GPIO{self.__pin}" @property def pwm(self) -> float: """PWM output value, backend is :attr:`gpiozero.PWMLED.value` :raises GpioError: Received a :exc:`gpiozero.GPIOZeroError` """ return self.__gpio.value * 100 @pwm.setter def pwm(self, value: float) -> None: if value > 100 or value < 0: raise ShelfPwmBadValue("PWM value must be between 0 and 100") try: self.__gpio.value = value / 100 except gpiozero.GPIOZeroError as error: raise GpioError from error except gpiozero.GPIOZeroWarning as warning: logger.warning("Ignoring GPIO warning %s", warning)
[docs] def close(self) -> None: """Close the GPIO device :raises GpioError: Received a :exc:`gpiozero.GPIOZeroError` """ try: self.__gpio.close() except gpiozero.GPIOZeroError as error: raise GpioError from error except gpiozero.GPIOZeroWarning as warning: logger.warning("Ignoring GPIO warning %s", warning)
[docs]def main() -> NoReturn: """Module entry point""" parser = argparse.ArgumentParser( description=__DOCSTRING__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument('name', nargs='?', default=socket.gethostname(), help='Shelf name, defaults to hostname') parser.add_argument('--pwmpin', '-W', default=18, type=int, help="Set GPIO pin for PWM output, defaults to 2") parser.add_argument('--rpmpin', '-r', default=17, type=int, help="Set GPIO pin for RPM input, defaults to 3") args = util.parse_args(parser) logger.info("Started from main entry point with parameters %s", args) # GPIO PWM try: gpio_pwm = GpioPwm(args.pwmpin) except GpioError: logger.exception("Failed to create GPIO PWM object") util.terminate("Cannot continue without GPIO PWM object") util.sys_exit() logger.debug("Created PWM GPIO device %s", gpio_pwm) # GPIO RPM try: gpio_rpm = GpioRpm(args.rpmpin) except GpioError: logger.exception("Failed to create GPIO RPM object") util.terminate("Cannot continue without GPIO RPM object") util.sys_exit() logger.debug("Created RPM GPIO device %s", gpio_rpm) try: daemon(gpio_pwm, gpio_rpm, shelf_name=args.name, address=args.address, port=args.port) finally: util.sys_exit()
[docs]def daemon( gpio_pwm: GpioPwm, gpio_rpm: GpioRpm, shelf_name: str = socket.gethostname(), address: str = socket.gethostname(), port: int = 9999, ) -> None: """Main function of this module :param gpio_pwm: GPIO device to use for PWM output :param gpio_rpm: GPIO device to use for RPM input :param shelf_name: Name of this shelf, used to communicate with the server :param address: Server address or hostname :param port: Port number to connect to """ def reconnect(server: Optional[socket.socket] = None, error: Optional[str] = None, notice: bool = True) -> socket.socket: try: if server is not None: com.reset_connection(server, error, notice=notice) return com.connect(address, port) except CommunicationError: logger.exception("Failed to connect to %s:%s", address, port) util.terminate("Cannot connect to server") raise logger.debug("Starting client daemon") signal.signal(signal.SIGINT, util.default_signal_handler) signal.signal(signal.SIGTERM, util.default_signal_handler) server = reconnect() while not util.terminating(): logger.info("Updating informations") logger.debug("Updating PWM") try: com.send(server, com.Request.GET_PWM, shelf_name) req, args = com.recv(server) server_shelf_name, pwm_value = args except ConnectionResetError: logger.info("Connection reset by %s", server) server = reconnect(server, notice=False) except CommunicationError: logger.exception("Failed to get PWM value from %s", server) server = reconnect(server) except ValueError: logger.error("Unexpected data received from %s: %s", server, args) server = reconnect(server, "Unexpected arguments") if req != com.Request.SET_PWM: logger.error("Unexpected request from %s: expected %s, got %s", server, com.Request.SET_PWM, req) server = reconnect(server, "Unexpected request") elif server_shelf_name != shelf_name: logger.error("Unexpected shelf name %s received from %s", server_shelf_name, server) server = reconnect(server, "Unexpected shelf") else: try: gpio_pwm.pwm = pwm_value except GpioError: logger.exception("Failed to set PWM value for %s", gpio_pwm) util.terminate("Cannot continue after GPIO failure") raise except ValueError: logger.exception("Unexpected PWM value from %s: %s", server, pwm_value) logger.debug("Updating RPM") gpio_rpm.update() util.sleep(1) gpio_rpm.update() try: com.send(server, com.Request.SET_RPM, shelf_name, gpio_rpm.rpm) req, args = com.recv(server) except ConnectionResetError: logger.info("Connection reset by %s", server) server = reconnect(server, notice=False) except CommunicationError: logger.exception("Failed to get RPM value from %s", server) server = reconnect(server) if req != com.Request.ACK: logger.error("Unexpected request from %s: expected %s, got %s", server, com.Request.ACK, req) server = reconnect(server, "Unexpected request") logger.info("Updated: PWM = %s, RPM = %s", pwm_value, gpio_rpm.rpm) util.sleep(SLEEP_TIME)
if __name__ == '__main__': main()