Source code for fand.fanctl

"""fand CLI"""

import argparse
import datetime
import logging
import re
import signal
import socket
import sys
from typing import (Any, Callable, Dict, NoReturn)

import fand.communication as com
import fand.util as util
from fand.exceptions import (
    CommunicationError, FanctlActionBadValue, ShelfPwmExpireBadValue,
)

# Constants
# Module docstring
__DOCSTRING__ = __doc__
# Logger to use
logger = logging.getLogger(__name__)
# Docstring for actions
__ACTION_DOC__ = """
Known actions are:
  raw                   Send raw request to the server
                        Syntax: raw REQUEST REQUEST_ARGS
  ping                  Ping the server
                        Syntax: ping
  get-pwm               Get the PWM value of a shelf
                        Syntax: get-pwm SHELFNAME
  set-pwm-override      Override the PWM value of a shelf
                        Syntax: set-pwm-override SHELFNAME VALUE
                        VALUE: percentage (speed)
  set-pwm-expire-in     Set expiration date of PWM override
                        Syntax: set-pwm-expire-in SHELFNAME DURATION
                        DURATION: how much time before expiration
                        Supported DURATION examples: '21d4h1m5s', '4h10s',
                        '7:22:59:00' (7d, 22h, 59min, 00s),
                        '4:22.5' (4min, 22.5s), '10' (10 seconds)
                        See DATETIME_DURATION_FORMATS for more informations.
  set-pwm-expire-on     Set expiration date of PWM override
                        Syntax: set-pwm-expire-on SHELFNAME DATE
                        DATE: date on which the override expire
                        Supported DATE examples: '2020-29-12T01:01:59',
                        '2020-29-12T01:01:59+01:00', '08/16/1988 21:30:00',
                        'Tue Aug 16 21:30:00 1988' (last two examples are
                        locale defined).
                        See DATETIME_DATE_FORMATS for more informations.
  get-rpm               Get the RPM value of a shelf
                        Syntax: get-rpm SHELFNAME
"""
#: List of accepted string formats for :meth:`datetime.datetime.strptime`
DATETIME_DATE_FORMATS = [
    "%c",
    "%x %X",
    "%xT%X",
    "%Y-%m-%dT%H:%M:%S",
    "%Y-%m-%dT%H:%M:%S%z",
    "%Y-%m-%dT%H:%M:%S.%f%z",
    "%Y-%m-%dT%H:%M:%S%Z",
    "%Y-%m-%dT%H:%M:%S.%f%Z",
]
#: List of accepted regex formats for :class:`datetime.timedelta`
DATETIME_DURATION_FORMATS = [
    r'^((?P<day>\d+?)d)?((?P<h>\d+?)h)?((?P<min>\d+?)m)?((?P<sec>\d+?)s)?$',
    r'^(?P<sec>\d+?)$',
    r'^(?P<min>\d+?):(?P<sec>\d+?)$',
    r'^(?P<h>\d+?):(?P<min>\d+?):(?P<sec>\d+?)$',
    r'^(?P<day>\d+?):(?P<h>\d+?):(?P<min>\d+?):(?P<sec>\d+?)$',
    r'^(?P<day>\d+?):(?P<h>\d+?):(?P<min>\d+?):(?P<sec>\d+?).(?P<ms>\d+?)$',
]


def _action_send_raw(server: socket.socket, *args: Any) -> None:
    """Send raw request, print the returned request"""
    logger.debug("Sending raw request to %s", server)
    com.send(server, *args)
    req, args = com.recv(server)
    print(f"{req}{args}")


def _action_ping(server: socket.socket) -> None:
    """Send ping"""
    logger.debug("Sending ping to %s", server)
    com.send(server, com.Request.PING)
    req, _ = com.recv(server)
    if req != com.Request.ACK:
        logger.error("Unexpected request from server: expected %s, got %s",
                     com.Request.ACK, req)
    else:
        print("ok")


def _action_get_shelf_pwm(server: socket.socket, shelf_id: str) -> None:
    """Get shelf PWM speed"""
    logger.debug("Sending get pwm to %s for %s", server, shelf_id)
    com.send(server, com.Request.GET_PWM, shelf_id)
    req, args = com.recv(server)
    if req != com.Request.SET_PWM:
        logger.error("Unexpected request from server: expected %s, got %s",
                     com.Request.SET_PWM, req)
    elif args[0] != shelf_id:
        logger.error("Wrong shelf returned by server: expected %s, got %s",
                     shelf_id, args[0])
    else:
        print(args[1])


def _action_get_shelf_rpm(server: socket.socket, shelf_id: str) -> None:
    """Get shelf RPM speed"""
    logger.debug("Sending get rpm to %s for %s", server, shelf_id)
    com.send(server, com.Request.GET_RPM, shelf_id)
    req, args = com.recv(server)
    if req != com.Request.SET_RPM:
        logger.error("Unexpected request from server: expected %s, got %s",
                     com.Request.SET_RPM, req)
    elif args[0] != shelf_id:
        logger.error("Wrong shelf returned by server: expected %s, got %s",
                     shelf_id, args[0])
    else:
        print(args[1])


def _action_set_pwm_override(server: socket.socket, shelf_id: str,
                             value_str: str) -> None:
    """Override shelf PWM value to value_str"""
    logger.debug("Overriding PWM of %s to %s", shelf_id, value_str)
    if str(value_str).lower() == 'none':
        value = None
    else:
        value = int(value_str)
    logger.debug("Sending set PWM override to %s for %s to %s",
                 value, shelf_id, server)
    com.send(server, com.Request.SET_PWM_OVERRIDE, shelf_id, value)
    req, _ = com.recv(server)
    if req != com.Request.ACK:
        logger.error("Unexpected request from server: expected %s, got %s",
                     com.Request.ACK, req)
    else:
        print("ok")


def _action_set_pwm_expire_on(server: socket.socket, shelf_id: str,
                              date_str: str) -> None:
    """Set shelf PWM override expiration to date_str"""
    logger.debug("Setting PWM override on %s for %s", date_str, shelf_id)
    if sys.version_info < (3, 7) and re.search(r'[+-]\d\d:\d\d$', date_str):
        # Python < 3.7 does not supports timezones 'HH:MM' (only 'HHMM')
        match = re.findall(r'[+-]\d\d:\d\d$', date_str)[0]
        date_str = date_str.replace(match, match.replace(':', ''))
    for test_str in DATETIME_DATE_FORMATS:
        try:
            date = datetime.datetime.strptime(date_str.strip(), test_str)
            break
        except ValueError:
            pass
    else:
        raise ShelfPwmExpireBadValue(
            f"Could not convert {date_str} to a datetime object"
        )
    logger.debug("Sending set PWM expire on %s for %s to %s",
                 date, shelf_id, server)
    com.send(server, com.Request.SET_PWM_EXPIRE, shelf_id, date)
    req, _ = com.recv(server)
    if req != com.Request.ACK:
        logger.error("Unexpected request from server: expected %s, got %s",
                     com.Request.ACK, req)
    else:
        print("ok")


def _action_set_pwm_expire_in(server: socket.socket, shelf_id: str,
                              duration_str: str) -> None:
    """Set shelf PWM override expiration in duration_str"""
    logger.debug("Setting PWM override in %s for %s", duration_str, shelf_id)
    for test_str in DATETIME_DURATION_FORMATS:
        match = re.match(test_str, duration_str.strip())
        if match:
            break
    else:
        raise ShelfPwmExpireBadValue(
            f"Cannot convert {duration_str} to a timedelta object"
        )
    match_dict = match.groupdict('0')
    duration = datetime.timedelta(
        days=int(match_dict.get('day', 0)),
        hours=int(match_dict.get('h', 0)),
        minutes=int(match_dict.get('min', 0)),
        seconds=int(match_dict.get('sec', 0)),
        milliseconds=int(match_dict.get('ms', 0)),
    )
    logger.debug("Converted %s to %s = %s", duration_str, match_dict, duration)
    date = datetime.datetime.now(datetime.timezone.utc) + duration
    logger.debug("Sending set PWM expire in %s (%s) for %s to %s",
                 duration, date, shelf_id, server)
    com.send(server, com.Request.SET_PWM_EXPIRE, shelf_id, date)
    req, _ = com.recv(server)
    if req != com.Request.ACK:
        logger.error("Unexpected request from server: expected %s, got %s",
                     com.Request.ACK, req)
    else:
        print("ok")


#: Dictionnary associating action strings to their corresponding functions
ACTION_DICT: Dict[str, Callable] = {
    'raw': _action_send_raw,
    'ping': _action_ping,
    'get-pwm': _action_get_shelf_pwm,
    'get-rpm': _action_get_shelf_rpm,
    'set-pwm-override': _action_set_pwm_override,
    'set-pwm-expire-on': _action_set_pwm_expire_on,
    'set-pwm-expire-in': _action_set_pwm_expire_in,
}


[docs]def main() -> NoReturn: """Entry point of the module""" parser = argparse.ArgumentParser( description=__DOCSTRING__, epilog=__ACTION_DOC__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument('action', help="Select an action") parser.add_argument('args', nargs='*', help="Action arguments") args = util.parse_args(parser) logger.info("Started from main entry point with parameters %s", args) try: send(args.action, *args.args, address=args.address, port=args.port) finally: util.sys_exit()
[docs]def send( action: str, *args: Any, address: str = socket.gethostname(), port: int = 9999, ) -> None: """Main function of this module :param action: Action to call :param args: Arguments to send to the action :param address: Server address :param port: Server port :raises FanctlActionBadValue: Invalid action name or arguments """ logger.debug("Running action %s", action) signal.signal(signal.SIGINT, util.default_signal_handler) signal.signal(signal.SIGTERM, util.default_signal_handler) try: server = com.connect(address, port) except CommunicationError: logger.exception("Failed to connect to %s:%s", address, port) util.terminate("Cannot connect to server") raise handler = ACTION_DICT.get(action) if not handler: logger.error("Invalid action %s", action) else: try: handler(server, *args) except TypeError as error: logger.exception("Invalid call to acion %s", action) util.terminate("Invalid action") raise FanctlActionBadValue("Invalid action") from error except ValueError as error: logger.exception("Received value error in action %s", action) util.terminate("Invalid arguments") raise FanctlActionBadValue("Invalid arguments") from error except ConnectionResetError: logger.error("Connection reset by server during %s", action) util.terminate("Connection reset by server") raise
if __name__ == '__main__': main()