"""
Module to watch the execution of shell scripts. Both streams (`stdout` and
`stderr`) are captured.
.. code:: python
watch = Watch()
watch.log.critical(msg)
watch.log.error(msg)
watch.log.warning(msg)
watch.log.info(msg)
watch.log.debug(msg)
watch.run(['rsync', '-av', '/home', '/backup'])
"""
import abc
import logging
import os
import pwd
import queue
import shlex
import socket
import subprocess
import shutil
import sys
import textwrap
import threading
import typing
import time
import uuid
from logging.handlers import BufferingHandler
from . import termcolor, send_nsca
from .config_reader import ConfigReader
from .send_email import send_email
HOSTNAME = socket.gethostname()
USERNAME = pwd.getpwuid(os.getuid()).pw_name
[docs]class BaseClass:
def _obj_to_str(self, attributes=[]):
if not attributes:
attributes = dir(self)
output = []
for attribute in attributes:
if not attribute.startswith('_') and \
not callable(getattr(self, attribute)):
value = getattr(self, attribute)
if value:
value = textwrap.shorten(str(value), width=64)
value = value.replace('\n', ' ')
output.append('{}: \'{}\''.format(attribute, value))
return '[{}] {}'.format(self.__class__.__name__, ', '.join(output))
[docs]class CommandWatcherError(Exception):
"""Exception raised by this module."""
def __init__(self, msg, **data):
reporter.report(
status=2,
custom_message='{}: {}'.format(self.__class__.__name__, msg),
**data,
)
[docs]class Timer:
"""Measure the execution time of a command run."""
def __init__(self):
self.stop = 0
""""The time when the timer stops. (UNIX timestamp)"""
self.start = time.time()
""""The start time. (UNIX timestamp)"""
self.interval = 0
"""The time interval between start and stop."""
[docs] def result(self):
"""
Measure the time intervale
:return: A formatted string displaying the result.
:rtype: str"""
self.stop = time.time()
self.interval = self.stop - self.start
return '{:.3f}s'.format(self.interval)
# Logging #####################################################################
# CRITICAL 50
# ERROR 40
# -> STDERR 35
# WARNING 30
# INFO 20
# DEBUG 10
# --> STDOUT 5
# NOTSET 0
STDERR = 35
logging.addLevelName(STDERR, 'STDERR')
STDOUT = 5
logging.addLevelName(STDOUT, 'STDOUT')
LOGFMT = '%(asctime)s_%(msecs)03d %(levelname)s %(message)s'
DATEFMT = '%Y%m%d_%H%M%S'
[docs]class LoggingHandler(BufferingHandler):
"""Store of all logging records in the memory. Print all records on emit.
"""
def __init__(self, master_logger=None):
BufferingHandler.__init__(self, capacity=1000000)
self._master_logger = master_logger
@staticmethod
def _print(record):
"""
:param logging.LogRecord record: A record object.
"""
level = record.levelname
# CRITICAL 50
# ERROR 40
# -> STDERR 35
# WARNING 30
# INFO 20
# DEBUG 10
# --> STDOUT 5
# NOTSET 0
attr = None
if level == 'CRITICAL':
color = 'red'
attr = 'bold'
elif level == 'ERROR':
color = 'red'
elif level == 'STDERR':
color = 'red'
attr = 'dark'
elif level == 'WARNING':
color = 'yellow'
elif level == 'INFO':
color = 'green'
elif level == 'DEBUG':
color = 'white'
elif level == 'STDOUT':
color = 'white'
attr = 'dark'
elif level == 'NOTSET':
color = 'grey'
else:
color = 'grey'
if attr:
reverse = ['reverse', attr]
normal = [attr]
else:
reverse = ['reverse']
normal = []
if record.levelno >= STDERR:
stream = sys.stderr
else:
stream = sys.stdout
created = '{}_{:03d}'.format(
time.strftime(DATEFMT, time.localtime(record.created)),
int(record.msecs),
)
print('{} {} {}'.format(
created,
termcolor.colored(' {:<8} '.format(level), color, attrs=reverse),
termcolor.colored(record.msg, color, attrs=normal),
), file=stream)
[docs] def emit(self, record: logging.LogRecord):
"""
:param record: A record object.
"""
self.buffer.append(record)
if not self._master_logger:
self._print(record)
else:
self._master_logger.log(record.levelno, record.msg)
if self.shouldFlush(record):
self.flush()
@property
def stdout(self):
messages = []
for record in self.buffer:
if record.levelname == 'STDOUT':
messages.append(record.msg)
return '\n'.join(messages)
@property
def stderr(self):
messages = []
for record in self.buffer:
if record.levelname == 'STDERR':
messages.append(record.msg)
return '\n'.join(messages)
@property
def all_records(self):
"""All log messages joined by line breaks."""
messages = []
for record in self.buffer:
messages.append(self.format(record))
return '\n'.join(messages)
def _log_stdout(self, message, *args, **kws):
# Yes, logger takes its '*args' as 'args'.
self._log(STDOUT, message, args, **kws)
logging.Logger.stdout = _log_stdout
def _log_stderr(self, message, *args, **kws):
# Yes, logger takes its '*args' as 'args'.
self._log(STDERR, message, args, **kws)
logging.Logger.stderr = _log_stderr
[docs]def setup_logging(master_logger: logging.Logger = None) -> \
typing.Tuple[logging.Logger, LoggingHandler]:
"""Setup a fresh logger for each watch action.
:param master_logger: Forward all log messages to a master logger."""
logger = logging.getLogger(name=str(uuid.uuid1()))
formatter = logging.Formatter(fmt=LOGFMT, datefmt=DATEFMT)
handler = LoggingHandler(master_logger=master_logger)
handler.setFormatter(formatter)
# Show all log messages: use 1 instead of 0: because:
# From the documentation:
# When a logger is created, the level is set to NOTSET (which causes all
# messages to be processed when the logger is the root logger, or
# delegation to the parent when the logger is a non-root logger). Note that
# the root logger is created with level WARNING.
logger.setLevel(1)
logger.addHandler(handler)
return (logger, handler)
# Reporting ###################################################################
[docs]class Message(BaseClass):
"""
This message class bundles all available message data into an object. The
different reporters can choose which data they use.
:param int status: 0 (OK), 1 (WARNING), 2 (CRITICAL), 3 (UNKOWN): see
Nagios / Icinga monitoring status / state.
:param str service_name: The name of the service.
:param str custom_message: Custom message
:param str prefix: Prefix of the report message.
:param str body: A longer report text.
:param dict performance_data: A dictionary like
`{'perf_1': 1, 'perf_2': 'test'}`.
:param str log_records: Log records separated by new lines
"""
def __init__(self, **data):
self._data = data
def __str__(self):
return self._obj_to_str()
@property
def status(self) -> int:
"""0 (OK), 1 (WARNING), 2 (CRITICAL), 3 (UNKOWN): see
Nagios / Icinga monitoring status / state."""
return self._data.get('status', 0)
@property
def status_text(self) -> str:
"""The status as a text word like `OK`."""
return send_nsca.States[self.status]
@property
def service_name(self) -> str:
return self._data.get('service_name', 'service_not_set')
@property
def performance_data(self) -> str:
"""
:return: A concatenated string
:rtype: str
"""
performance_data = self._data.get('performance_data')
if performance_data:
pairs = []
for key, value in self._data.get('performance_data').items():
pairs.append('{!s}={!s}'.format(key, value))
return ' '.join(pairs)
return ''
@property
def custom_message(self) -> str:
return self._data.get('custom_message', '')
@property
def prefix(self) -> str:
return self._data.get('prefix', '[cwatcher]:')
@property
def message(self) -> str:
output = []
if self.prefix:
output.append(self.prefix)
output.append(self.service_name.upper())
output.append(self.status_text)
if self.custom_message:
output.append('- {}'.format(self.custom_message))
return ' '.join(output)
@property
def message_monitoring(self) -> str:
"""message + performance_data"""
output = []
output.append(self.message)
if self.performance_data:
output.append('|')
output.append(self.performance_data)
return ' '.join(output)
@property
def body(self) -> str:
"""Text body for the e-mail message."""
output = []
output.append('Host: {}'.format(HOSTNAME))
output.append('User: {}'.format(USERNAME))
output.append('Service name: {}'.format(self.service_name))
if self.performance_data:
output.append('Performance data: {}'.format(self.performance_data))
body = self._data.get('body', '')
if body:
output.append('')
output.append(body)
log_records = self._data.get('log_records', '')
if log_records:
output.append('')
output.append('Log records:')
output.append('')
output.append(log_records)
return '\n'.join(output)
@property
def processes(self) -> str:
output = []
processes = self._data.get('processes')
if processes:
for process in processes:
output.append(' '.join(process.args_normalized))
if output:
return'({})'.format('; '.join(output))
@property
def user(self) -> str:
return '[user:{}]'.format(USERNAME)
[docs]class BaseChannel(BaseClass, metaclass=abc.ABCMeta):
"""Base class for all reporters"""
[docs] @abc.abstractmethod
def report(self, status: int = 0, service_name: str = 'command_watcher',
**data):
raise NotImplementedError('A reporter class must have a `report` '
'method.')
[docs]class EmailChannel(BaseChannel):
"""Send reports by e-mail."""
def __init__(self, smtp_server: str, smtp_login: str, smtp_password: str,
to_addr: str, from_addr: str = '',
to_addr_critical: str = ''):
self.smtp_server = smtp_server
self.smtp_login = smtp_login
self.smtp_password = smtp_password
self.to_addr = to_addr
self.from_addr = from_addr
if not from_addr:
self.from_addr = '{0} <{1}@{0}>'.format(HOSTNAME, USERNAME)
self.to_addr_critical = to_addr_critical
def __str__(self):
return self._obj_to_str(['smtp_server', 'smtp_login', 'to_addr',
'from_addr', ])
[docs] def report(self, message: Message):
"""Send an e-mail message.
:param message: A message object.
"""
if message.status == 2 and self.to_addr_critical:
to_addr = self.to_addr_critical
else:
to_addr = self.to_addr
send_email(
from_addr=self.from_addr,
to_addr=to_addr,
subject=message.message,
body=message.body,
smtp_login=self.smtp_login,
smtp_password=self.smtp_password,
smtp_server=self.smtp_server
)
[docs]class NscaChannel(BaseChannel):
"""Wrapper around `send_nsca` to send NSCA messages. Set up the NSCA
client."""
def __init__(self, remote_host: str, password: str, encryption_method: int,
port: int, service_name: str):
self.remote_host = remote_host
self.password = password
self.encryption_method = encryption_method
self.port = port
self.service_name = service_name
def __str__(self):
# No password!
return self._obj_to_str(['remote_host', 'encryption_method', 'port',
'service_name'])
[docs] def report(self, message: Message):
"""Send a NSCA message to a remote NSCA server.
:param message: A message object.
"""
send_nsca.send_nsca(
status=message.status,
host_name=HOSTNAME,
service_name=message.service_name,
text_output=message.message_monitoring,
remote_host=self.remote_host,
password=str(self.password),
encryption_method=self.encryption_method,
port=self.port,
)
[docs]class BeepChannel(BaseChannel):
"""Send beep sounds."""
def __init__(self):
self.cmd = shutil.which('beep')
def __str__(self):
# No password!
return self._obj_to_str(['cmd'])
[docs] def beep(self, frequency: float = 4186.01, length: float = 50):
"""
Generate a beep sound using the “beep” command.
* A success tone: frequency=4186.01, length=40
* A failure tone: frequency=65.4064, length=100
:param frequency: Frequency in Hz.
:param length: Length in milliseconds.
"""
# TODO: Use self.cmd -> Fix tests
subprocess.run(['beep', '-f', str(float(frequency)), '-l',
str(float(length))])
[docs] def report(self, message: Message):
"""Send a beep sounds.
:param message: A message object. The only attribute that takes an
effect is the status attribute (0-3).
"""
if message.status == 0: # OK
self.beep(frequency=4186.01, length=50) # C8 (highest note)
elif message.status == 1: # WARNING
self.beep(frequency=261.626, length=100) # C4 (middle C)
elif message.status == 2: # CRITICAL
self.beep(frequency=65.4064, length=150) # C2 (low C)
elif message.status == 3: # UNKOWN
self.beep(frequency=32.7032, length=200) # C1
[docs]class Reporter:
"""Collect all channels."""
def __init__(self):
self.channels = []
[docs] def add_channel(self, channel):
self.channels.append(channel)
[docs] def report(self, **data):
message = Message(**data)
for channel in self.channels:
channel.report(message)
return message
reporter = Reporter()
# Configuration ###############################################################
CONF_DEFAULTS = {
'email': {
'subject_prefix': 'command_watcher',
},
'nsca': {
'port': 5667,
},
}
CONFIG_READER_SPEC = {
'email': {
'from_addr': {
'description': 'The email address of the sender.',
},
'to_addr': {
'description': 'The email address of the recipient.',
'not_empty': True,
},
'to_addr_critical': {
'description': 'The email address of the recipient to send '
'critical messages to.',
'default': None,
},
'smtp_login': {
'description': 'The SMTP login name.',
'not_empty': True,
},
'smtp_password': {
'description': 'The SMTP password.',
'not_empty': True,
},
'smtp_server': {
'description': 'The URL of the SMTP server, for example: '
'`smtp.example.com:587`.',
'not_empty': True,
},
},
'nsca': {
'remote_host': {
'description': 'The IP address of the NSCA remote host.',
'not_empty': True,
},
'password': {
'description': 'The NSCA password.',
'not_empty': True,
},
'encryption_method': {
'description': 'The NSCA encryption method. The supported '
'encryption methods are: 0 1 2 3 4 8 11 14 15 16',
'not_empty': True,
},
'port': {
'description': 'The NSCA port.',
'default': 5667,
},
},
'beep': {
'activated': {
'description': 'Activate the beep channel to report auditive '
'messages.',
'default': False,
}
}
}
# Main code ###################################################################
[docs]class Process:
"""Run a process.
You can use all keyword arguments from
:py:class:`subprocess.Popen` except `bufsize`, `stderr`, `stdout`.
:param args: List, tuple or string. A sequence of
process arguments, like `subprocess.Popen(args)`.
:param master_logger:
:param bool shell: If true, the command will be executed through the
shell.
:param str cwd: Sets the current directory before the child is
executed.
:param dict env: Defines the environment variables for the new process.
"""
def __init__(self, args: typing.Union[str, list, tuple],
master_logger: logging.Logger = None, **kwargs):
# self.args: typing.Union[str, list, tuple] = args
self.args = args
"""Process arguments in various types."""
self._queue = queue.Queue()
"""An instance of :py:class:`queue.Queue`."""
log, log_handler = setup_logging(master_logger=master_logger)
# self.log: logging.Logger = log
self.log = log
"""A ready to go and configured logger. An instance of
:py:class:`logging.Logger`."""
# self.log_handler: LoggingHandler = log_handler
self.log_handler = log_handler
"""An instance of :py:class:`LoggingHandler`."""
self.log.info('Run command: {}'.format(' '.join(self.args_normalized)))
timer = Timer()
self.subprocess = subprocess.Popen(
self.args_normalized,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# RuntimeWarning: line buffering (buffering=1) isn't
# supported in binary mode, the default buffer size will be used
# bufsize=1,
**kwargs
)
"""subprocess"""
self._start_thread(self.subprocess.stdout, 'stdout')
self._start_thread(self.subprocess.stderr, 'stderr')
for _ in range(2):
for line, stream in iter(self._queue.get, None):
if line:
line = line.decode('utf-8').strip()
if line:
if stream == 'stderr':
self.log.stderr(line)
if stream == 'stdout':
self.log.stdout(line)
self.subprocess.wait()
self.log.info('Execution time: {}'.format(timer.result()))
@property
def args_normalized(self) -> list:
"""Normalized `args`, always a list"""
if isinstance(self.args, str):
return shlex.split(self.args)
else:
return self.args
@property
def stdout(self) -> str:
"""Alias / shortcut for `self.log_handler.stdout`."""
return self.log_handler.stdout
@property
def line_count_stdout(self) -> int:
"""The count of lines of the current `stderr`."""
return len(self.stdout.splitlines())
@property
def stderr(self) -> str:
"""Alias / shortcut for `self.log_handler.stderr`."""
return self.log_handler.stderr
@property
def line_count_stderr(self) -> int:
"""The count of lines of the current `stderr`."""
return len(self.stderr.splitlines())
def _stdout_stderr_reader(self, pipe, stream):
"""
:param object pipe: `process.stdout` or `process.stdout`
:param str stream: `stdout` or `stderr`
"""
try:
with pipe:
for line in iter(pipe.readline, b''):
self._queue.put((line, stream))
finally:
self._queue.put(None)
def _start_thread(self, pipe, stream):
"""
:param object pipe: `process.stdout` or `process.stdout`
:param str stream: `stdout` or `stderr`
"""
threading.Thread(
target=self._stdout_stderr_reader,
args=[pipe, stream]
).start()
[docs]class Watch:
"""Watch the execution of a command. Capture all output of a command.
provide and setup a logging facility.
:param config_file: The file path of the configuration file in the INI
format.
:param service_name: A name of the watched service.
:param raise_exceptions: Raise exceptions if `watch.run()` exists with a
non-zero exit code.
:param config_reader: A custom configuration reader. Specify this
parameter to not use the build in configuration reader.
"""
def __init__(self, config_file: str = None,
service_name: str = 'command_watcher',
raise_exceptions: bool = True,
config_reader: ConfigReader = None,
report_channels: typing.List[BaseChannel] = None):
self._hostname = HOSTNAME
"""The hostname of machine the watcher running on."""
self._service_name = service_name
"""A name of the watched service."""
log, log_handler = setup_logging()
self.log = log
"""A ready to go and configured logger. An instance of
:py:class:`logging.Logger`."""
self.log.info('Hostname: {}'.format(self._hostname))
self._log_handler = log_handler
"""An instance of :py:class:`LoggingHandler`."""
self._conf = None
"""An instance of :py:class:`jflib.config_reader.ClassInterface`."""
if not config_reader and config_file:
config_reader = ConfigReader(
spec=CONFIG_READER_SPEC,
ini=config_file,
dictionary=CONF_DEFAULTS,
)
self._conf = config_reader.get_class_interface()
if report_channels is None:
try:
config_reader.check_section('email')
email_reporter = EmailChannel(
smtp_server=self._conf.email.smtp_server,
smtp_login=self._conf.email.smtp_login,
smtp_password=self._conf.email.smtp_password,
to_addr=self._conf.email.to_addr,
from_addr=self._conf.email.from_addr,
to_addr_critical=self._conf.email.to_addr_critical,
)
reporter.add_channel(email_reporter)
self.log.debug(email_reporter)
except (ValueError, KeyError):
pass
try:
config_reader.check_section('nsca')
nsca_reporter = NscaChannel(
remote_host=self._conf.nsca.remote_host,
password=self._conf.nsca.password,
encryption_method=self._conf.nsca.encryption_method,
port=self._conf.nsca.port,
service_name=self._service_name,
)
reporter.add_channel(nsca_reporter)
self.log.debug(nsca_reporter)
except (ValueError, KeyError):
pass
if shutil.which('beep') and self._conf.beep.activated:
beep_reporter = BeepChannel()
reporter.add_channel(beep_reporter)
self.log.debug(beep_reporter)
else:
reporter.channels = []
self.processes = []
"""A list of completed processes
:py:class:`Process`. Everytime you use the method
`run()` the process object is appened in the list."""
self._raise_exceptions = raise_exceptions
"""Raise exceptions"""
self._timer = Timer()
@property
def stdout(self):
"""Alias / shortcut for `self._log_handler.stdout`."""
return self._log_handler.stdout
@property
def stderr(self):
"""Alias / shortcut for `self._log_handler.stderr`."""
return self._log_handler.stderr
[docs] def run(self, args: typing.Union[str, list, tuple],
log: bool = True, ignore_exceptions: typing.List[int] = [],
**kwargs) -> Process:
"""
Run a process.
:param args: List, tuple or string. A sequence of
process arguments, like `subprocess.Popen(args)`.
:param log: Log the `stderr` and the `stdout` of the
process. If false the `stdout` and the `stderr` are logged only
to the local process logger, not to get global master logger.
:param ignore_exceptions: A list of none-zero exit codes, which is
ignored by this method.
"""
if log:
master_logger = self.log
else:
master_logger = None
process = Process(args, master_logger=master_logger, **kwargs)
self.processes.append(process)
rc = process.subprocess.returncode
if self._raise_exceptions and rc != 0 and rc not in ignore_exceptions:
raise CommandWatcherError(
'The command \'{}\' exists with an non-zero return code.'
.format(' '.join(process.args_normalized)),
service_name=self._service_name,
log_records=self._log_handler.all_records,
)
return process
[docs] def report(self, status: int, **data) -> Message:
"""Report a message using the preconfigured channels.
:param int status: 0 (OK), 1 (WARNING), 2 (CRITICAL), 3 (UNKOWN): see
Nagios / Icinga monitoring status / state.
:param str custom_message: Custom message
:param str prefix: Prefix of the report message.
:param str body: A longer report text.
:param dict performance_data: A dictionary like
`{'perf_1': 1, 'perf_2': 'test'}`.
"""
message = reporter.report(
status=status,
service_name=self._service_name,
log_records=self._log_handler.all_records,
processes=self.processes,
**data,
)
self.log.debug(message)
return message
[docs] def final_report(self, **data) -> Message:
"""The same as the `report` method. Adds `execution_time` to the
`performance_data`.
:param int status: 0 (OK), 1 (WARNING), 2 (CRITICAL), 3 (UNKOWN): see
Nagios / Icinga monitoring status / state.
:param str custom_message: Custom message
:param str prefix: Prefix of the report message.
:param str body: A longer report text.
:param dict performance_data: A dictionary like
`{'perf_1': 1, 'perf_2': 'test'}`.
"""
timer_result = self._timer.result()
self.log.info(
'Overall execution time: {}'.format(timer_result)
)
status = data.get('status', 0)
data_dict = dict(data)
if 'performance_data' not in data_dict:
data_dict['performance_data'] = {}
data_dict['performance_data']['execution_time'] = timer_result
if 'status' in data_dict:
del data_dict['status']
return self.report(status=status, **data_dict)