Source code for bang.util

# Copyright 2012 - John Calixto
#
# This file is part of bang.
#
# bang is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# bang is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with bang.  If not, see <http://www.gnu.org/licenses/>.
import argparse
import atexit
import bang
import collections
import copy
import json
import logging
import multiprocessing
import time
import re
import subprocess
import sys
from datetime import datetime
from logging.handlers import BufferingHandler

import boto
from boto.s3.key import Key
from logutils.queue import QueueHandler, QueueListener
from . import attributes as A


CONSOLE_LOGGING_FORMAT = '%(asctime)s %(levelname)8s %(processName)s - %(message)s'  # noqa

# use the multiprocessing logger so when we start parallelizing the
# deploys, we have a seamless transition.
_mlog = multiprocessing.get_logger()
log = _mlog.manager.getLogger('.'.join((_mlog.name, 'bang')))


# this is stolen from python2.7 to have support in 2.6 - noqa
[docs]class NullHandler(logging.Handler): """ This handler does nothing. It's intended to be used to avoid the "No handlers could be found for logger XXX" one-off warning. This is important for library code, which may contain code to log events. If a user of the library does not configure logging, the one-off warning might be produced; to avoid this, the library developer simply needs to instantiate a NullHandler and add it to the top-level logger of the library module or package. """
[docs] def handle(self, record): pass
[docs] def emit(self, record): pass
[docs] def createLock(self): self.lock = None # give this logger at least one handler to avoid pesky warnings for bang # commands that don't actually care about logging
_mlog.addHandler(NullHandler()) log.addHandler(NullHandler()) del _mlog
[docs]class StrictAttrBag(object): """ Generic attribute container that makes constructor arguments available as object attributes. Checks :meth:`__init__` argument names against lists of *required* and *optional* attributes. """
[docs] def __init__(self, **kwargs): for reqk in self.REQUIRED_ATTRS: if reqk not in kwargs: raise TypeError("Missing required argument, %s" % reqk) setattr(self, reqk, kwargs.pop(reqk)) for k, v in kwargs.items(): if k not in self.OPTIONAL_ATTRS: raise TypeError("Unknown argument, %s" % k) setattr(self, k, v)
[docs]class SharedMap(object): """ A multiprocess-safe :class:`Mapping` object that can be used to return values from child processes. """
[docs] def __init__(self, manager): self.lists = manager.dict() self.dicts = manager.dict() self.lock = multiprocessing.Lock()
[docs] def append(self, list_name, value): """Appends :attr:`value` to the list named :attr:`list_name`.""" with self.lock: l = self.lists.get(list_name) if l: l.append(value) else: l = [value] self.lists[list_name] = l
[docs] def merge(self, dict_name, values): """ Performs deep-merge of :attr:`values` onto the :class:`Mapping` object named :attr:`dict_name`. If :attr:`dict_name` does not yet exist, then a deep copy of :attr:`values` is assigned as the initial mapping object for the given name. :param str dict_name: The name of the dict onto which the values should be merged. """ with self.lock: d = self.dicts.get(dict_name) if d: deep_merge_dicts(d, values) else: d = copy.deepcopy(values) self.dicts[dict_name] = d
[docs]class SharedNamespace(object): """ A multiprocess-safe namespace that can be used to coordinate naming similar resources uniquely. E.g. when searching for existing nodes in a cassandra cluster, you can use this SharedNamespace to make sure other processes aren't looking at the same node. """
[docs] def __init__(self, manager): self.names = manager.list() self.lock = multiprocessing.Lock()
[docs] def add_if_unique(self, name): """ Returns ``True`` on success. Returns ``False`` if the name already exists in the namespace. """ with self.lock: if name not in self.names: self.names.append(name) return True return False
[docs]class JSONFormatter(logging.Formatter):
[docs] def __init__(self, config): logging.Formatter.__init__(self) self.stack = config.get('stack', 'anonymous')
[docs] def format(self, record): timestamp = datetime.fromtimestamp(record.created).isoformat() out = { 'level': record.levelname, 'message': record.message, 'timestamp': timestamp, 'stack': self.stack, 'pid': record.process, 'process_name': record.processName, } return '%s\n' % json.dumps(out)
[docs]class ColoredConsoleFormatter(logging.Formatter):
[docs] def format(self, record): pre = logging.Formatter.format(self, record) return '\033[38;5;%dm%s\033[0;37m' % ( (record.process * 4) % 210 + 20, pre, )
[docs]class S3Handler(BufferingHandler): """ Buffers all logging events, then uploads them all at once "atexit" to a single file in S3. """
[docs] def __init__(self, bucket, prefix=''): BufferingHandler.__init__(self, 0) self.bucket = bucket self.prefix = prefix
[docs] def shouldFlush(self, record): return False
[docs] def flush(self): payload = '' while len(self.buffer) > 0: record = self.buffer.pop(0) if record.levelno >= self.level: payload += self.format(record) if payload: conn = boto.connect_s3() bucket = conn.get_bucket(self.bucket) key = Key(bucket) key.key = '/'.join((self.prefix, 'bang-%f' % time.time())) key.content_type = 'application/json' key.set_contents_from_string(payload)
[docs]def sanitize_config_loglevel(level): ''' Kinda sorta backport of loglevel sanitization for Python 2.6. ''' if sys.version_info[:2] != (2, 6) or isinstance(level, (int, long)): return level lvl = None if isinstance(level, basestring): lvl = logging._levelNames.get(level) if not lvl: raise ValueError('Invalid log level, %s' % level) return lvl
[docs]def initialize_logging(config): multiprocessing.current_process().name = 'Stack' cfg = config.get(A.LOGGING, {}) # log to s3 if there's a destination specified in the config bucket = cfg.get(A.logging.S3_BUCKET) if bucket: json_formatter = JSONFormatter(config) s3_handler = S3Handler(bucket, cfg.get(A.logging.S3_PREFIX, '')) s3_handler.setFormatter(json_formatter) s3_handler.setLevel(logging.INFO) # The parent process is the only one that actually buffers the log # records in memory and writes them out to s3. The child processes # send all of their log records to the parent's queue. # # Using the QueueHandler and QueueListener classes from logutils-0.3.2 # here since they're the implementations in future versions of stdlib # logging anyway (logutils is the "backports from Py3k logging" # library). queue = multiprocessing.Queue() ql = QueueListener(queue, s3_handler) def cleanup(): ql.stop() s3_handler.flush() atexit.register(cleanup) ql.start() qh = QueueHandler(queue) log.addHandler(qh) # set local_file to an empty string or some other false value to deactivate local_file = cfg.get(A.logging.LOCAL_FILE, 'bang.log') if local_file: local_handler = logging.FileHandler(local_file) local_handler.setFormatter( logging.Formatter(CONSOLE_LOGGING_FORMAT) ) level = sanitize_config_loglevel( cfg.get(A.logging.LOCAL_FILE_LEVEL, logging.DEBUG) ) local_handler.setLevel(level) log.addHandler(local_handler) # also log to stderr if sys.stderr.isatty(): formatter = ColoredConsoleFormatter(CONSOLE_LOGGING_FORMAT) else: formatter = logging.Formatter(CONSOLE_LOGGING_FORMAT) handler = logging.StreamHandler() # default stream is stderr handler.setFormatter(formatter) console_level = sanitize_config_loglevel( cfg.get(A.logging.CONSOLE_LEVEL, 'INFO') ) handler.setLevel(console_level) log.setLevel(logging.DEBUG) log.addHandler(handler) log.debug('Logging initialized.')
[docs]def poll_with_timeout(timeout_s, break_func, wake_every_s=60): """ Calls :attr:`break_func` every :attr:`wake_every_s` seconds for a total duration of :attr:`timeout_s` seconds, or until :attr:`break_func` returns something other than ``None``. If :attr:`break_func` returns anything other than ``None``, that value is returned immediately. Otherwise, continues polling until the timeout is reached, then returns ``None``. """ time_slept = 0 if wake_every_s > 60: msg = '... sleeping for %0.2f minutes' % (wake_every_s / 60.0) else: msg = '... sleeping for %d seconds' % wake_every_s res = break_func() while res is None and time_slept < timeout_s: log.debug(msg) time.sleep(wake_every_s) time_slept += wake_every_s res = break_func() return res
[docs]def get_argparser(arg_config): parser = argparse.ArgumentParser( prog=arg_config.get('prog'), description=arg_config['description'], formatter_class=argparse.RawTextHelpFormatter, ) for ac in arg_config['arguments']: args = ac[:-1] kwargs = ac[-1] parser.add_argument(*args, **kwargs) return parser
SECRET_PATTERN = re.compile(r'(\s*(\S*(password|pwd|key|secret|god)\S*)\s*:\s*)\S+') # noqa SECRET_WHITELIST = ('ssh_key', 'key_pair')
[docs]def redact_secrets(line): """ Returns a sanitized string for any ``line`` that looks like it contains a secret (i.e. matches SECRET_PATTERN). """ def redact(match): if match.group(2) in SECRET_WHITELIST: return match.group(0) return match.group(1) + 'TOO_TOO_SEXY' return SECRET_PATTERN.sub(redact, line)
[docs]def bump_version_tail(oldver): """ Takes any dot-separated version string and increments the rightmost field (which it expects to be an integer). """ head, tail = oldver.rsplit('.', 1) return '%s.%d' % (head, (int(tail) + 1))
[docs]def count_to_deploy(stack, descriptor, config_count): """ takes the max of config_count and number of instances running with this stack/descriptor combo """ live_count = count_by_tag(stack, descriptor) if config_count > live_count: live_count = config_count return live_count
[docs]def count_by_tag(stack, descriptor): """ Returns the count of currently running or pending instances that match the given stack and deployer combo """ ec2_conn = boto.ec2.connection.EC2Connection() resses = ec2_conn.get_all_instances( filters={ 'tag:stack': stack, 'tag:descriptor': descriptor }) instance_list_raw = list() [[instance_list_raw.append(x) for x in res.instances] for res in resses] instance_list = [x for x in instance_list_raw if state_filter(x)] instances = len(instance_list) return instances
[docs]def state_filter(instance): """ Helper function for count_by_tag """ if instance.state == 'running' or instance.state == 'pending': return True else: return False
[docs]def deep_merge_dicts(base, incoming): """ Performs an *in-place* deep-merge of key-values from :attr:`incoming` into :attr:`base`. No attempt is made to preserve the original state of the objects passed in as arguments. :param dict base: The target container for the merged values. This will be modified *in-place*. :type base: Any :class:`dict`-like object :param dict incoming: The container from which incoming values will be copied. Nested dicts in this will be modified. :type incoming: Any :class:`dict`-like object :rtype: None """ for ki, vi in incoming.iteritems(): if (ki in base and isinstance(vi, collections.MutableMapping) and isinstance(base[ki], collections.MutableMapping) ): deep_merge_dicts(base[ki], vi) else: base[ki] = vi
[docs]def fork_exec(cmd_list, input_data=None): """ Like the subprocess.check_*() helper functions, but tailored to bang. ``cmd_list`` is the command to run, and its arguments as a list of strings. ``input_data`` is the optional data to pass to the command's stdin. On success, returns the output (i.e. stdout) of the remote command. On failure, raises BangError with the command's stderr. """ # log.debug('fork_exec: cmd_list = %s, input_data = ^%s^' % # (cmd_list, input_data)) p = subprocess.Popen( cmd_list, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) out = p.communicate(input_data) if p.returncode != 0: raise bang.BangError('ret: %d, stdout: ^%s^, stderr: ^%s^' % (p.returncode, out[0], out[1])) return out[0]

Navigation