Source code for jflib.command_watcher

"""
Module to watch the execution of shell scripts. Both streams (`stdout` and
`stderr`) are captured.

.. code:: python

    watch = Watch()
    watch.log.critical(msg)
    watch.log.error(msg)
    watch.log.warning(msg)
    watch.log.info(msg)
    watch.log.debug(msg)
    watch.run(['rsync', '-av', '/home', '/backup'])
"""

from __future__ import annotations

import abc
import logging
import os
import pwd
import queue
import shlex
import shutil
import socket
import subprocess
import sys
import textwrap
import threading
import time
import typing
import uuid
from logging.handlers import BufferingHandler
from typing import (IO, Any, Dict, List, Literal, Optional, Sequence, Tuple,
                    TypedDict, Union, cast)

from typing_extensions import Unpack

from . import capturing, icinga, termcolor
from .config_reader import ClassInterface, ConfigReader, Spec
from .send_email import send_email

HOSTNAME = socket.gethostname()
USERNAME = pwd.getpwuid(os.getuid()).pw_name


Status = Literal[0, 1, 2, 3]


[docs]class MinimalMessageParams(TypedDict, total=False): custom_message: str """Custom message""" prefix: str """ Prefix of the report message.""" body: str """ A longer report text.""" preformance_data: Dict[str, Any] """ A dictionary like `{'perf_1': 1, 'perf_2': 'test'}`"""
[docs]class MessageParams(MinimalMessageParams, total=False): status: Status """ 0 (OK), 1 (WARNING), 2 (CRITICAL), 3 (UNKOWN): see Nagios / Icinga monitoring status / state.""" service_name: str """The name of the service.""" log_records: str """Log records separated by new lines""" processes: List['Process']
[docs]class BaseClass: def _obj_to_str(self, attributes: List[str] = []) -> str: if not attributes: attributes = dir(self) output: List[str] = [] for attribute in attributes: if not attribute.startswith('_') and \ not callable(getattr(self, attribute)): value = getattr(self, attribute) if value: value = textwrap.shorten(str(value), width=64) value = value.replace('\n', ' ') output.append('{}: \'{}\''.format(attribute, value)) return '[{}] {}'.format(self.__class__.__name__, ', '.join(output))
[docs]class CommandWatcherError(Exception): """Exception raised by this module.""" def __init__(self, msg: str, **data: Unpack[MessageParams]): reporter.report( status=2, custom_message='{}: {}'.format(self.__class__.__name__, msg), **data, # type: ignore )
[docs]class Timer: """Measure the execution time of a command run.""" def __init__(self): self.stop = 0 """"The time when the timer stops. (UNIX timestamp)""" self.start = time.time() """"The start time. (UNIX timestamp)""" self.interval = 0 """The time interval between start and stop."""
[docs] def result(self): """ Measure the time intervale :return: A formatted string displaying the result. :rtype: str""" self.stop = time.time() self.interval = self.stop - self.start return '{:.3f}s'.format(self.interval)
# Logging ##################################################################### # CRITICAL 50 # ERROR 40 # -> STDERR 35 # WARNING 30 # INFO 20 # DEBUG 10 # --> STDOUT 5 # NOTSET 0 STDERR = 35 logging.addLevelName(STDERR, 'STDERR') STDOUT = 5 logging.addLevelName(STDOUT, 'STDOUT') LOGFMT = '%(asctime)s_%(msecs)03d %(levelname)s %(message)s' DATEFMT = '%Y%m%d_%H%M%S'
[docs]class LoggingHandler(BufferingHandler): """Store of all logging records in the memory. Print all records on emit. """ _master_logger: Optional[logging.Logger] def __init__(self, master_logger: Optional[logging.Logger] = None): BufferingHandler.__init__(self, capacity=1000000) self._master_logger = master_logger @staticmethod def _print(record: logging.LogRecord): """ :param logging.LogRecord record: A record object. """ level = record.levelname # CRITICAL 50 # ERROR 40 # -> STDERR 35 # WARNING 30 # INFO 20 # DEBUG 10 # --> STDOUT 5 # NOTSET 0 attr = None if level == 'CRITICAL': color = 'red' attr = 'bold' elif level == 'ERROR': color = 'red' elif level == 'STDERR': color = 'red' attr = 'dark' elif level == 'WARNING': color = 'yellow' elif level == 'INFO': color = 'green' elif level == 'DEBUG': color = 'white' elif level == 'STDOUT': color = 'white' attr = 'dark' elif level == 'NOTSET': color = 'grey' else: color = 'grey' if attr: reverse = ['reverse', attr] normal = [attr] else: reverse = ['reverse'] normal = [] if record.levelno >= STDERR: stream = sys.stderr else: stream = sys.stdout created = '{}_{:03d}'.format( time.strftime(DATEFMT, time.localtime(record.created)), int(record.msecs), ) print('{} {} {}'.format( created, termcolor.colored(' {:<8} '.format(level), color, attrs=reverse), termcolor.colored(record.msg, color, attrs=normal), ), file=stream)
[docs] def emit(self, record: logging.LogRecord): """ :param record: A record object. """ self.buffer.append(record) if not self._master_logger: self._print(record) else: self._master_logger.log(record.levelno, record.msg) if self.shouldFlush(record): self.flush()
@property def stdout(self): messages: List[str] = [] for record in self.buffer: if record.levelname == 'STDOUT': messages.append(record.msg) return '\n'.join(messages) @property def stderr(self): messages: List[str] = [] for record in self.buffer: if record.levelname == 'STDERR': messages.append(record.msg) return '\n'.join(messages) @property def all_records(self): """All log messages joined by line breaks.""" messages: List[str] = [] for record in self.buffer: messages.append(self.format(record)) return '\n'.join(messages)
[docs]class ExtendedLogger(logging.Logger):
[docs] def stdout(self, line: object, *args: Any, **kws: Any) -> None: ...
[docs] def stderr(self, line: object, *args: Any, **kws: Any) -> None: ...
def _log_stdout(self: ExtendedLogger, message: object, *args: Any, **kws: Any): # Yes, logger takes its '*args' as 'args'. self._log(STDOUT, message, args, **kws) extendedLogger: ExtendedLogger = cast(ExtendedLogger, logging.Logger) extendedLogger.stdout = _log_stdout # type: ignore def _log_stderr(self: ExtendedLogger, message: object, *args: Any, **kws: Any): # Yes, logger takes its '*args' as 'args'. self._log(STDERR, message, args, **kws) logging.Logger.stderr = _log_stderr # type: ignore
[docs]def setup_logging(master_logger: Optional[logging.Logger] = None) -> \ typing.Tuple[ExtendedLogger, LoggingHandler]: """Setup a fresh logger for each watch action. :param master_logger: Forward all log messages to a master logger.""" logger = logging.getLogger(name=str(uuid.uuid1())) formatter = logging.Formatter(fmt=LOGFMT, datefmt=DATEFMT) handler = LoggingHandler(master_logger=master_logger) handler.setFormatter(formatter) # Show all log messages: use 1 instead of 0: because: # From the documentation: # When a logger is created, the level is set to NOTSET (which causes all # messages to be processed when the logger is the root logger, or # delegation to the parent when the logger is a non-root logger). Note that # the root logger is created with level WARNING. logger.setLevel(1) logger.addHandler(handler) return (cast(ExtendedLogger, logger), handler)
# Reporting ###################################################################
[docs]class Message(BaseClass): """ This message class bundles all available message data into an object. The different reporters can choose which data they use. """ _data: MessageParams def __init__(self, **data: Unpack[MessageParams]): self._data = data def __str__(self): return self._obj_to_str() @property def status(self) -> int: """0 (OK), 1 (WARNING), 2 (CRITICAL), 3 (UNKOWN): see Nagios / Icinga monitoring status / state.""" return self._data.get('status', 0) @property def status_text(self) -> str: """The status as a text word like `OK`.""" return icinga.States[self.status] @property def service_name(self) -> str: return self._data.get('service_name', 'service_not_set') @property def performance_data(self) -> str: """ :return: A concatenated string :rtype: str """ performance_data = self._data.get('performance_data') if performance_data and isinstance(performance_data, dict): pairs: List[str] = [] key: str value: Any for key, value in performance_data.items(): pairs.append('{!s}={!s}'.format(key, value)) return ' '.join(pairs) return '' @property def custom_message(self) -> str: return self._data.get('custom_message', '') @property def prefix(self) -> str: return self._data.get('prefix', '[cwatcher]:') @property def message(self) -> str: output: List[str] = [] if self.prefix: output.append(self.prefix) output.append(self.service_name.upper()) output.append(self.status_text) if self.custom_message: output.append('- {}'.format(self.custom_message)) return ' '.join(output) @property def message_monitoring(self) -> str: """message + performance_data""" output: List[str] = [] output.append(self.message) if self.performance_data: output.append('|') output.append(self.performance_data) return ' '.join(output) @property def body(self) -> str: """Text body for the e-mail message.""" output: List[str] = [] output.append('Host: {}'.format(HOSTNAME)) output.append('User: {}'.format(USERNAME)) output.append('Service name: {}'.format(self.service_name)) if self.performance_data: output.append('Performance data: {}'.format(self.performance_data)) body: str = self._data.get('body', '') if body: output.append('') output.append(body) log_records = self._data.get('log_records', '') if log_records: output.append('') output.append('Log records:') output.append('') output.append(log_records) return '\n'.join(output) @property def processes(self) -> Optional[str]: output: List[str] = [] processes = self._data.get('processes') if processes: for process in processes: output.append(' '.join(process.args_normalized)) if output: return'({})'.format('; '.join(output)) @property def user(self) -> str: return '[user:{}]'.format(USERNAME)
[docs]class BaseChannel(BaseClass, metaclass=abc.ABCMeta): """Base class for all reporters"""
[docs] @abc.abstractmethod def report(self, message: Message) -> None: raise NotImplementedError('A reporter class must have a `report` ' 'method.')
[docs]class EmailChannel(BaseChannel): """Send reports by e-mail.""" def __init__(self, smtp_server: str, smtp_login: str, smtp_password: str, to_addr: str, from_addr: str = '', to_addr_critical: str = ''): self.smtp_server = smtp_server self.smtp_login = smtp_login self.smtp_password = smtp_password self.to_addr = to_addr self.from_addr = from_addr if not from_addr: self.from_addr = '{0} <{1}@{0}>'.format(HOSTNAME, USERNAME) self.to_addr_critical = to_addr_critical def __str__(self): return self._obj_to_str(['smtp_server', 'smtp_login', 'to_addr', 'from_addr', ])
[docs] def report(self, message: Message): """Send an e-mail message. :param message: A message object. """ if message.status == 2 and self.to_addr_critical: to_addr = self.to_addr_critical else: to_addr = self.to_addr send_email( from_addr=self.from_addr, to_addr=to_addr, subject=message.message, body=message.body, smtp_login=self.smtp_login, smtp_password=self.smtp_password, smtp_server=self.smtp_server )
[docs]class IcingaChannel(BaseChannel): url: str user: str password: str service_name: str def __init__(self, url: str, user: str, password: str, service_name: str): self.url = url self.user = user self.password = password self.service_name = service_name def __str__(self): # No password! return self._obj_to_str(['url', 'user', 'service_name'])
[docs] def report(self, message: Message): icinga.send_passive_check( url=self.url, user=self.user, password=self.password, status=message.status, host_name=HOSTNAME, service_name=message.service_name, text_output=message.message, performance_data=message.performance_data )
[docs]class BeepChannel(BaseChannel): """Send beep sounds.""" def __init__(self): self.cmd = shutil.which('beep') def __str__(self): # No password! return self._obj_to_str(['cmd'])
[docs] def beep(self, frequency: float = 4186.01, length: float = 50): """ Generate a beep sound using the “beep” command. * A success tone: frequency=4186.01, length=40 * A failure tone: frequency=65.4064, length=100 :param frequency: Frequency in Hz. :param length: Length in milliseconds. """ # TODO: Use self.cmd -> Fix tests subprocess.run(['beep', '-f', str(float(frequency)), '-l', str(float(length))])
[docs] def report(self, message: Message): """Send a beep sounds. :param message: A message object. The only attribute that takes an effect is the status attribute (0-3). """ if message.status == 0: # OK self.beep(frequency=4186.01, length=50) # C8 (highest note) elif message.status == 1: # WARNING self.beep(frequency=261.626, length=100) # C4 (middle C) elif message.status == 2: # CRITICAL self.beep(frequency=65.4064, length=150) # C2 (low C) elif message.status == 3: # UNKOWN self.beep(frequency=32.7032, length=200) # C1
[docs]class Reporter: """Collect all channels.""" channels: List[BaseChannel] def __init__(self): self.channels = []
[docs] def add_channel(self, channel: BaseChannel): self.channels.append(channel)
def report(self, **data: Unpack[MessageParams]): message = Message(**data) for channel in self.channels: channel.report(message) return message
reporter = Reporter() # Configuration ############################################################### CONF_DEFAULTS = { 'email': { 'subject_prefix': 'command_watcher', }, 'nsca': { 'port': 5667, }, } CONFIG_READER_SPEC: Spec = { 'email': { 'from_addr': { 'description': 'The email address of the sender.', }, 'to_addr': { 'description': 'The email address of the recipient.', 'not_empty': True, }, 'to_addr_critical': { 'description': 'The email address of the recipient to send ' 'critical messages to.', 'default': None, }, 'smtp_login': { 'description': 'The SMTP login name.', 'not_empty': True, }, 'smtp_password': { 'description': 'The SMTP password.', 'not_empty': True, }, 'smtp_server': { 'description': 'The URL of the SMTP server, for example: ' '`smtp.example.com:587`.', 'not_empty': True, }, }, 'nsca': { 'remote_host': { 'description': 'The IP address of the NSCA remote host.', 'not_empty': True, }, 'password': { 'description': 'The NSCA password.', 'not_empty': True, }, 'encryption_method': { 'description': 'The NSCA encryption method. The supported ' 'encryption methods are: 0 1 2 3 4 8 11 14 15 16', 'not_empty': True, }, 'port': { 'description': 'The NSCA port.', 'default': 5667, }, }, 'icinga': { 'url': { 'description': 'The HTTP URL. /v1/actions/process-check-result ' 'is appended.', 'not_empty': True, }, 'user': { 'description': 'The user for the HTTP authentification.', 'not_empty': True, }, 'password': { 'description': 'The password for the HTTP authentification.', 'not_empty': True, }, }, 'beep': { 'activated': { 'description': 'Activate the beep channel to report auditive ' 'messages.', 'default': False, } } } # Main code ################################################################### Args = Union[str, List[str], Tuple[str]]
[docs]class ProcessArgs(TypedDict, total=False): shell: bool """If true, the command will be executed through the shell. """ cwd: str """Sets the current directory before the child is executed.""" env: Dict[str, Any] """Defines the environment variables for the new process."""
[docs]class Process: """Run a process. You can use all keyword arguments from :py:class:`subprocess.Popen` except `bufsize`, `stderr`, `stdout`. :param args: List, tuple or string. A sequence of process arguments, like `subprocess.Popen(args)`. """ args: Args """Process arguments in various types.""" _queue: 'queue.Queue[Optional[Tuple[bytes, capturing.Stream]]]' log: ExtendedLogger """A ready to go and configured logger.""" log_handler: LoggingHandler subprocess: subprocess.Popen[Any] def __init__(self, args: Args, master_logger: Optional[ExtendedLogger] = None, **kwargs: Unpack[ProcessArgs]): # self.args: typing.Union[str, list, tuple] = args self.args = args self._queue = queue.Queue() log, log_handler = setup_logging(master_logger=master_logger) self.log = log self.log_handler = log_handler self.log.info('Run command: {}'.format(' '.join(self.args_normalized))) timer = Timer() self.subprocess = subprocess.Popen( self.args_normalized, stdout=subprocess.PIPE, stderr=subprocess.PIPE, # RuntimeWarning: line buffering (buffering=1) isn't # supported in binary mode, the default buffer size will be used # bufsize=1, **kwargs ) self._start_thread(self.subprocess.stdout, 'stdout') self._start_thread(self.subprocess.stderr, 'stderr') for _ in range(2): for line, stream in iter(self._queue.get, None): if line: line = line.decode('utf-8').strip() if line: if stream == 'stderr': self.log.stderr(line) if stream == 'stdout': self.log.stdout(line) self.subprocess.wait() self.log.info('Execution time: {}'.format(timer.result())) @property def args_normalized(self) -> Sequence[str]: """Normalized `args`, always a list""" if isinstance(self.args, str): return shlex.split(self.args) else: return self.args @property def stdout(self) -> str: """Alias / shortcut for `self.log_handler.stdout`.""" return self.log_handler.stdout @property def line_count_stdout(self) -> int: """The count of lines of the current `stderr`.""" return len(self.stdout.splitlines()) @property def stderr(self) -> str: """Alias / shortcut for `self.log_handler.stderr`.""" return self.log_handler.stderr @property def line_count_stderr(self) -> int: """The count of lines of the current `stderr`.""" return len(self.stderr.splitlines()) def _stdout_stderr_reader(self, pipe: IO[bytes], stream: capturing.Stream): """ :param object pipe: `process.stdout` or `process.stdout` """ try: with pipe: for line in iter(pipe.readline, b''): self._queue.put((line, stream)) finally: self._queue.put(None) def _start_thread(self, pipe: Optional[IO[str]], stream: capturing.Stream): """ :param object pipe: `process.stdout` or `process.stdout` """ threading.Thread( target=self._stdout_stderr_reader, args=[pipe, stream] ).start()
[docs]class Watch: """Watch the execution of a command. Capture all output of a command. provide and setup a logging facility. :param config_file: The file path of the configuration file in the INI format. :param service_name: A name of the watched service. :param raise_exceptions: Raise exceptions if `watch.run()` exists with a non-zero exit code. :param config_reader: A custom configuration reader. Specify this parameter to not use the build in configuration reader. """ _hostname: str """The hostname of machine the watcher running on.""" _service_name: str """A name of the watched service.""" log: ExtendedLogger """A ready to go and configured logger.""" _log_handler: LoggingHandler processes: List[Process] """A list of completed processes :py:class:`Process`. Everytime you use the method `run()` the process object is appened in the list.""" _conf: Optional[ClassInterface] _raise_exceptions: bool """Raise exceptions""" _timer: Timer def __init__(self, config_file: Optional[str] = None, service_name: str = 'command_watcher', raise_exceptions: bool = True, config_reader: Optional[ConfigReader] = None, report_channels: Optional[List[BaseChannel]] = None): self._hostname = HOSTNAME self._service_name = service_name log, log_handler = setup_logging() self.log = log self.log.info('Hostname: {}'.format(self._hostname)) self._log_handler = log_handler self._conf = None if not config_reader and config_file: config_reader = ConfigReader( spec=CONFIG_READER_SPEC, ini=config_file, dictionary=CONF_DEFAULTS, ) if not config_reader: raise Exception('No config_reader object') self._conf = config_reader.get_class_interface() if report_channels is None: try: config_reader.check_section('email') email_reporter = EmailChannel( smtp_server=self._conf.email.smtp_server, smtp_login=self._conf.email.smtp_login, smtp_password=self._conf.email.smtp_password, to_addr=self._conf.email.to_addr, from_addr=self._conf.email.from_addr, to_addr_critical=self._conf.email.to_addr_critical, ) reporter.add_channel(email_reporter) self.log.debug(email_reporter) except (ValueError, KeyError): pass try: config_reader.check_section('icinga') icinga_reporter = IcingaChannel( url=self._conf.icinga.url, user=self._conf.icinga.user, password=self._conf.icinga.password, service_name=self._service_name, ) reporter.add_channel(icinga_reporter) self.log.debug(icinga_reporter) except (ValueError, KeyError): pass if shutil.which('beep') and self._conf.beep.activated: beep_reporter = BeepChannel() reporter.add_channel(beep_reporter) self.log.debug(beep_reporter) else: reporter.channels = [] self.processes = [] self._raise_exceptions = raise_exceptions self._timer = Timer() @property def stdout(self): """Alias / shortcut for `self._log_handler.stdout`.""" return self._log_handler.stdout @property def stderr(self): """Alias / shortcut for `self._log_handler.stderr`.""" return self._log_handler.stderr def run(self, args: Args, log: bool = True, ignore_exceptions: List[int] = [], **kwargs: Unpack[ProcessArgs]) -> Process: """ Run a process. :param args: List, tuple or string. A sequence of process arguments, like `subprocess.Popen(args)`. :param log: Log the `stderr` and the `stdout` of the process. If false the `stdout` and the `stderr` are logged only to the local process logger, not to get global master logger. :param ignore_exceptions: A list of none-zero exit codes, which is ignored by this method. """ if log: master_logger = self.log else: master_logger = None process = Process(args, master_logger=master_logger, **kwargs) self.processes.append(process) rc = process.subprocess.returncode if self._raise_exceptions and rc != 0 and rc not in ignore_exceptions: raise CommandWatcherError( 'The command \'{}\' exists with an non-zero return code ({}).' .format(' '.join(process.args_normalized), rc), service_name=self._service_name, log_records=self._log_handler.all_records, ) return process def report(self, status: Status, **data: Unpack[MinimalMessageParams]) -> Message: """Report a message using the preconfigured channels. """ message = reporter.report( status=status, service_name=self._service_name, log_records=self._log_handler.all_records, processes=self.processes, **data, ) self.log.debug(message) return message def final_report(self, **data: Unpack[MessageParams]) -> Message: """The same as the `report` method. Adds `execution_time` to the `performance_data`. """ timer_result = self._timer.result() self.log.info( 'Overall execution time: {}'.format(timer_result) ) status = data.get('status', 0) data_dict: Dict[str, Any] = dict(data) if 'performance_data' not in data_dict: data_dict['performance_data'] = {} data_dict['performance_data']['execution_time'] = timer_result if 'status' in data_dict: del data_dict['status'] return self.report(status=status, **data_dict)