Source code for ocspd.core.ocspadder

# -*- coding: utf-8 -*-
"""
Module for adding OCSP Staples to a running HAProxy instance.
"""
import threading
import socket
import errno
import os
import queue
from io import StringIO
import ocspd.core.log
from ocspd.core.excepthandler import ocsp_except_handle
import ocspd.core.exceptions
import ocspd.util.functions

try:
    _ = BrokenPipeError
except NameError:
    import socket
    BrokenPipeError = socket.error

LOG = ocspd.core.log.get_logger(__name__)

SOCKET_BUFFER_SIZE = 1024


[docs]class OCSPAdder(threading.Thread): """ This class is used to add a OCSP staples to a running HAProxy instance by sending it over a socket. It runs a thread that keeps connections to sockets open for each of the supplied haproxy sockets. Code from `collectd haproxy connection`_ under the MIT license, was used for inspiration. Tasks are taken from the :class:`ocspd.scheduling.SchedulerThread`, as soon as a task context is received, an OCSP response is read from the model within it, it is added to a HAProxy socket found in self.socks[<certificate directory>]. .. _collectd haproxy connection: https://github.com/wglass/collectd-haproxy/blob/master/collectd_haproxy/ connection.py """ #: The name of this task in the scheduler TASK_NAME = 'proxy-add' #: The haproxy socket command to add OCSP staples. Use string.format to add #: the base64 encoded OCSP staple OCSP_ADD = 'set ssl ocsp-response {}'
[docs] def __init__(self, *args, **kwargs): """ Initialise the thread with its parent :class:`threading.Thread` and its arguments. :kwarg dict socket_paths: A mapping from a directory (typically the directory containing TLS certificates) to a HAProxy socket that serves certificates from that directory. These sockets are used to communicate new OCSP staples to HAProxy, so it does not have to be restarted. :kwarg ocspd.scheduling.SchedulerThread scheduler: The scheduler object where we can get "haproxy-adder" tasks from **(required)**. """ self.stop = False LOG.debug("Starting OCSPAdder thread") self.scheduler = kwargs.pop('scheduler', None) self.socket_paths = kwargs.pop('socket_paths', None) assert self.scheduler is not None, \ "Please pass a scheduler to get and add proxy-add tasks." assert self.socket_paths is not None, \ "The OCSPAdder needs a socket_paths dict" self.socks = {} with ocsp_except_handle(): for key, socket_path in self.socket_paths.items(): self._open_socket(key, socket_path) super(OCSPAdder, self).__init__(*args, **kwargs)
[docs] def _open_socket(self, key, socket_path): """ Opens a socket located at socket_path and saves it in self.socks[key]. Subsequently it asks for a prompt to keep the socket connection open, so several commands can be sent without having to close and re-open the socket. :param key: the identifier of the socket in self.socks :param str socket_path: A valid HAProxy socket path. :raises :exc:ocspd.core.exceptions.SocketError: when the socket can not be opened. """ self.socks[key] = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: self.socks[key].connect(socket_path) except FileNotFoundError as exc: raise ocspd.core.exceptions.SocketError( "Could not initialize OCSPAdder with socket {}: {}".format( socket_path, exc)) result = self.send(key, "prompt") LOG.debug("Opened prompt with result: '%s'", result)
[docs] def __del__(self): """ Close the sockets on exit. """ for sock in self.socks.values(): sock.close()
[docs] def run(self): """ The main loop: send any commands that enter the command queue :raises ValueError: if the command queue is empty. """ LOG.info("Started an OCSP adder thread.") while not self.stop: try: context = self.scheduler.get_task(self.TASK_NAME, timeout=0.25) model = context.model LOG.debug("Sending staple for cert:'%s'", model) # Open the exception handler context to run tasks likely to fail with ocsp_except_handle(context): self.add_staple(model) self.scheduler.task_done(self.TASK_NAME) except queue.Empty: pass LOG.debug("Goodbye cruel world..")
[docs] def add_staple(self, model): """ Create and send the command that adds a base64 encoded OCSP staple to the HAProxy :param model: An object that has a binary string `ocsp_staple` in it and a filename `filename`. """ command = self.OCSP_ADD.format( ocspd.util.functions.base64(model.ocsp_staple.data)) LOG.debug("Setting OCSP staple with command '%s'", command) directory = os.path.dirname(model.filename) response = self.send(directory, command) if response != 'OCSP Response updated!': raise ocspd.core.exceptions.OCSPAdderBadResponse( "Bad HAProxy response: {}".format(response))
[docs] def send(self, socket_key, command): """ Send the command through self.socks[socket_key] (using self.socket_paths) :param str socket_key: Identifying dictionary key of the socket. This is typically the directory HAProxy serves certificates from. :param str command: String with the HAProxy command. For a list of possible commands, see the `haproxy documentation`_ :raises IOError if an error occurs and it's not errno.EAGAIN or errno.EINTR .. _haproxy documentation: http://haproxy.tech-notes.net/9-2-unix-socket-commands/ """ # Empty buffer first, it's possible that other commands have been fired # to the same socket, we don't want the response to those commands in # our response string. # FIXME: This would be nice, but is tricky because the socket seems to # close if the recv call times out. Otherwise the socket stays open but # the recv call is blocking... # If this problem occurs, the easiest way is probably to open a socket # each time we want to communicate... # while True: # try: # chunk = self.socks[socket_key].recv(SOCKET_BUFFER_SIZE) # if not chunk: # break # except IOError as err: # if err.errno not in (errno.EAGAIN, errno.EINTR): # raise # else: # break # Send command with ocsp_except_handle(): try: self.socks[socket_key].sendall((command + "\n").encode()) except BrokenPipeError: # Try to re-open the socket. If that doesn't work, that will # raise a :exc:`~ocspd.core.exceptions.SocketError` LOG.warning("Re-opening socket %s", socket_key) self.socks[socket_key].close() self._open_socket(socket_key, self.socket_paths[socket_key]) # Try again, if this results in a BrokenPipeError *again*, it # will be caught by ocsp_except_handle self.socks[socket_key].sendall((command + "\n").encode()) buff = StringIO() # Get new response. while True: try: chunk = self.socks[socket_key].recv(SOCKET_BUFFER_SIZE) if chunk: d_chunk = chunk.decode('ascii') buff.write(d_chunk) # TODO: Find out what happens if several threads are # talking to HAProxy on this socket if '> ' in d_chunk: break else: break except IOError as err: if err.errno not in (errno.EAGAIN, errno.EINTR): raise # Strip *all* \n, > and space characters from the end response = buff.getvalue().strip('\n> ') buff.close() LOG.debug("Received HAProxy response '%s'", response) return response