Source code for fand.util

"""Utility library for fand"""

import logging
import os
import signal
import socket
import sys
import time
from typing import (
    Any, Callable, Dict, List, NoReturn, Optional, TYPE_CHECKING, Tuple,
)

from fand import __version__

if TYPE_CHECKING:
    import argparse

# Constants
# Logger to use
logger = logging.getLogger(__name__)

# Global variables
# Set by terminate(), get by terminating()
__TERMINATE__ = False
# Store the terminate error
__TERMINATE_ERROR__: Optional[str] = None
# List of functions to call when the program is terminating
__WHEN_TERMINATE__: List[Tuple[Callable, Tuple, Dict]] = []


[docs]def terminate(error: Optional[str] = None) -> None: """Function terminating the program Sets the terminate flag (see :func:`terminating`), and does some cleanup (see :func:`when_terminate`) :param error: Error message to print """ global __TERMINATE__, __TERMINATE_ERROR__ if error is not None: logger.critical(error) if terminating(): return logger.info("Terminating...") __TERMINATE__ = True __TERMINATE_ERROR__ = error for function, args, kwargs in __WHEN_TERMINATE__: function(*args, **kwargs)
[docs]def sys_exit() -> NoReturn: """Exit the program with the error from :func:`terminate` if any""" if not terminating(): terminate() if __TERMINATE_ERROR__ is None: sys.exit(0) else: sys.exit(__TERMINATE_ERROR__)
[docs]def terminating() -> bool: """Returns True if the program is terminating, else False""" return __TERMINATE__
[docs]def when_terminate(function: Callable, *args: Any, **kwargs: Any) -> None: """Add function to call when terminating :param function: Function to call :param args: Arguments to call the function with :param kwargs: Keyworded arguments to call the function with """ __WHEN_TERMINATE__.append((function, args, kwargs))
[docs]def sleep(secs: float) -> None: """Sleep some time, stops if terminating :param secs: Number of seconds to sleep """ logger.debug("Waiting for %s seconds", secs) while secs > 0 and not terminating(): time.sleep(1 if secs > 1 else secs) secs -= 1
[docs]def default_signal_handler(sig: int, _: Any) -> None: """Default signal handler""" if sig == signal.SIGINT: logger.warning("Received SIGINT, terminating") terminate() elif sig == signal.SIGTERM: logger.warning("Received SIGTERM, terminating") terminate() else: logger.error("Unknown signal %s received, ignoring", sig)
[docs]def parse_args(parser: 'argparse.ArgumentParser') -> 'argparse.Namespace': """Add common arguments, parse arguments, set root logger verbosity :param parser: Argument parser to use """ parser.add_argument('--version', '-V', action='version', version=f'%(prog)s {__version__}') parser.add_argument('--address', '-a', default=socket.gethostname(), help="Server address, defaults to hostname") parser.add_argument('--port', '-p', default=9999, type=int, help="Server port, defaults to 9999") parser.add_argument('--verbose', '-v', action='count', default=0, help="Set verbosity level") parser.add_argument('--quiet', '-q', action='store_true', help="Set minimal output") parser.add_argument('--logfile', '-l', default=None, help="Send output logs to logfile") parser.add_argument('--pidfile', '-P', default=None, help="Set PID file for daemon") args = parser.parse_args() if args.quiet: level = logging.CRITICAL elif args.verbose == 0: level = logging.ERROR elif args.verbose == 1: level = logging.WARNING elif args.verbose == 2: level = logging.INFO else: level = logging.DEBUG root_logger = logging.getLogger() root_logger.setLevel(level) if args.logfile is not None: handler = logging.FileHandler(args.logfile, 'a', encoding='utf-8') formatter = logging.Formatter('%(levelname)s:%(name)s: %(message)s') handler.setFormatter(formatter) root_logger.addHandler(handler) if args.pidfile is not None: with open(args.pidfile, 'w') as pidfile: pidfile.write(str(os.getpid())) return args