Source code for mmf.async.pp

r"""Tools for working with `Parallel Python`_.

.. note:: You must install `Parallel Python`_ separately for this to
   work::

   wget http://www.parallelpython.com/downloads/pp/pp-1.6.0.tar.bz2
   tar -jxvf pp-1.6.0.tar.bz2
   cd pp-1.6.0
   python setup.py install

.. _Parallel Python: http://www.parallelpython.com/

Presently, you must be able to ssh to the hosts without a password
(setup a key using ssh-keygen).  The list of hosts to use can be
specified manually, or though the `pp_hosts` variable in the
`~/.mmfrc.py` initialization file.  If defined, this will be used to
populate the :attr:`_HOSTS` list.  It should contain a list of strings
with the host names, or a list of tuples `(hostname, n_cpu)` where
`n_cpu` is an integer specifying how many cores are on the host (`None`
for autodetection) or tuples `(hostname, n_cpu, working_dir)` where
`working_dir` is the directory to change to on the remote machine
before launching the `ppserver`.

.. todo:: Only launch remote servers when needed.
.. todo:: Maybe use the pexpect module to facilitate passwords and
   logging in to the remote servers.  Also, provide some diagnostics
   if the spawn fails.  Also, could add a convenience method that
   would try to login to all hosts to make sure that it is possible
   first before running.
.. todo:: Monitor remote connections and respawn ppservers if any die.
   
"""
from __future__ import division, absolute_import

__all__ = ['check_hosts', 'kill_hosts', 'Task', 'decorate_logger', 'ServerMMF',
           'ManageRemoteServer']
import os
import sys
import re
import warnings
import time
import logging
import subprocess
import socket
import Queue

import random

import mmf.archive

_HOSTS = []                     # Set in ~/.mmfrc.py

pp = None
# If this is run as a script, then sys.path[0] will be the current
# directory and this because the names clash.  We temporarily remove
# it.
_path = sys.path[0]
sys.path[0] = None
try:
    import pp
    # Don't timeout on long running jobs!
    pp.pptransport.TRANSPORT_SOCKET_TIMEOUT = None
except ImportError, msg:
    warnings.warn(
        "\n".join(
            ["Could not import Parallel Python module `pp`: %s" % (str(msg),),
             "(Code will be executed in serial...)"]))
sys.path[0] = _path
del _path

try:
    import threading
except ImportError:
    import dummy_threading as threading

try:
    import pexpect
except ImportError:
    pexpect = None

######################################################################
# Test Example
def zeta(p, N0, N1):
    r"""Sums the terms `N0 <= n < N1` of the zeta function."""
    tot = 0.0
    for n in xrange(N0, N1):
        tot += (1./(n**2 + p))
    return tot
    
def run_example(hosts=_HOSTS,
                slice_size=1000000, jobs=20):
    r"""Example of how to use the :class:`Server` class to compute the
    zeta function.

    Examples
    --------
    >>> import logging
    >>> logger = logging.getLogger()
    >>> logger.setLevel(logging.FATAL)
    >>> from numpy import pi
    >>> res = run_example(slice_size=100000)
    Submitting task 1-100001
    Submitting task 100001-200001
    Submitting task 200001-300001
    Submitting task 300001-400001
    Submitting task 400001-500001
    Submitting task 500001-600001
    Submitting task 600001-700001
    Submitting task 700001-800001
    Submitting task 800001-900001
    Submitting task 900001-1000001
    Submitting task 1000001-1100001
    Submitting task 1100001-1200001
    Submitting task 1200001-1300001
    Submitting task 1300001-1400001
    Submitting task 1400001-1500001
    Submitting task 1500001-1600001
    Submitting task 1600001-1700001
    Submitting task 1700001-1800001
    Submitting task 1800001-1900001
    Submitting task 1900001-2000001
    Processing task zeta(N0=1,p=0,N1=100001)
    Debug listener started ...
    Processing task zeta(N0=100001,p=0,N1=200001)
    Debug listener started ...
    Processing task zeta(N0=200001,p=0,N1=300001)
    Processing task zeta(N0=300001,p=0,N1=400001)
    Processing task zeta(N0=400001,p=0,N1=500001)
    Processing task zeta(N0=500001,p=0,N1=600001)
    Processing task zeta(N0=600001,p=0,N1=700001)
    Processing task zeta(N0=700001,p=0,N1=800001)
    Processing task zeta(N0=800001,p=0,N1=900001)
    Processing task zeta(N0=900001,p=0,N1=1000001)
    Processing task zeta(N0=1000001,p=0,N1=1100001)
    Processing task zeta(N0=1100001,p=0,N1=1200001)
    Processing task zeta(N0=1200001,p=0,N1=1300001)
    Processing task zeta(N0=1300001,p=0,N1=1400001)
    Processing task zeta(N0=1400001,p=0,N1=1500001)
    Processing task zeta(N0=1500001,p=0,N1=1600001)
    Processing task zeta(N0=1600001,p=0,N1=1700001)
    Processing task zeta(N0=1700001,p=0,N1=1800001)
    Processing task zeta(N0=1800001,p=0,N1=1900001)
    Processing task zeta(N0=1900001,p=0,N1=2000001)
    >>> res - pi**2/6
    -4.99999...e-07
    """
    if hosts is not None:
        hosts = tuple(hosts)
    server = ServerMMF(hosts=hosts, remote_timeout=10, async=True)
    for n in xrange(jobs):
        N0 = 1 + slice_size*n
        N1 = 1 + slice_size*(n+1)
        print("Submitting task %i-%i" % (N0, N1))
        server.submit(zeta, p=0, N0=N0, N1=N1)

    tot = 0.0
    for task in server.finished_tasks():
        print("Processing task %s" % str(task))
        tot += task.result

    # Kill the server and all workers: we won't submit more jobs.
    server.destroy()
    return tot

[docs]class Task(object):
[docs] def __init__(self, job, server, f, *args, **kwargs): self.job = job self.server = server self.f = f self.args = args self.kwargs = kwargs
@property
[docs] def finished(self): return self.job.finished
@property
[docs] def result(self): res = self.job() if self.server.archive_out: # Unpack archived output. _d = {} if res is not None: try: exec res in _d except Exception, e: e.args = e.args + (res,) raise res = _d['result'] return res
def __str__(self): sargs = list(map(str, self.args)) skwargs = ["%s=%s" % (k, str(self.kwargs[k])) for k in self.kwargs] return "%s(%s)" % (self.f.func_name, ",".join(sargs + skwargs))
def _call(vargs, archive_in, archive_out): r"""This wraps the original call so that we can process the arguments. This seems to have to be a function rather than a method. Parameters ---------- vargs : (f, args, kwargs) `f` is the function to call, `args` is the argument list, `kwargs` is the keyword argument dictionary. archive_in, archive_out : bool If these are `True` then use :class:`mmf.archive.Archive` to package the arguments and/or result. """ if archive_in: _d = {} exec vargs in _d f = _d['f'] args = _d['args'] kwargs = _d['kwargs'] else: f, args, kwargs = vargs result = f(*args, **kwargs) if archive_out: a = mmf.archive.Archive() a.insert(result=result) return str(a) else: return result
[docs]class decorate_logger(object): r"""Gives a `write` and `flush` method to the logger so that it can be used by pexpect."""
[docs] def __init__(self, logger): self.logger = logger #self.handler = logging.StreamHandler() #self.handler.setFormatter(logging.Formatter( # "%(asctime)s - %(name)s - %(levelname)s - %(message)s")) #self.logger.addHandler(self.handler) # give the logger the methods required by pexpect logger.write = self.write logger.flush = self.flush
[docs] def write(self, *args, **kwargs): r"""Write method for logging handler.""" content = args[0] #pexpect only use one arg AFAIK if content in [' ', '', '\n', '\r', '\r\n']: return # don't log empty lines for eol in ['\r\n', '\r', '\n']: # remove ending EOL, the logger will add it anyway content = re.sub('\%s$' % eol, '', content) # call the logger info method with the reworked content return self.logger.info(content)
[docs] def flush(self): r"""Flush method: does nothing""" pass
[docs]class ServerMMF(object): r"""Server object."""
[docs] def __init__(self, hosts=(), remote_timeout=30, async=True, use_pp=True, nice=10, port=None, ssh_tunnel=True, archive_in=False, archive_out=False, working_dir=".", use_local=True): if port is None: port = 60000 # Make hosts list unique _hosts = [] for host in hosts: if host not in _hosts: _hosts.append(host) hosts = tuple(hosts) # Extract nprocs and working dir: _hosts = [] _nprocs = [] _working_dirs = [] for host in hosts: _working_dir = working_dir if isinstance(host, tuple): _host = host[0] _nproc = host[1] if 2 < len(host): _working_dir = host[2] else: _host = host _nproc = 1 _hosts.append(_host) _nprocs.append(_nproc) _working_dirs.append(_working_dir) self.hosts = tuple(zip(_hosts, _nprocs, _working_dirs)) # Setup loggers for each host self.logging_index = 0 if len(self.hosts) < 10: self._fmt = "%i" elif len(self.hosts) < 100: self._fmt = "%2i" elif len(self.hosts) < 1000: self._fmt = "%3i" elif len(self.hosts) < 10000: self._fmt = "%4i" self.logger = self.get_logger('localhost') self.loggers = {} for host in self.hosts: self.loggers[host[0]] = self.get_logger(host[0]) self.async = async self.archive_in = archive_in self.archive_out = archive_out self.use_pp = (pp is not None) and use_pp if self.use_pp: self.threads = [] self.local_ports = set() self.nice = nice self.remote_timeout = remote_timeout self.secret = str(random.random()) self.ssh_procs = [] self.port = port self.finished = threading.Event() self.ssh_tunnel = ssh_tunnel ppservers = self.start_remote_servers(self.hosts) ncpus = 0 if use_local: ncpus = 'autodetect' self.pp_server = pp.Server(ppservers=ppservers, secret=self.secret, ncpus=ncpus) self.tasks = []
[docs] def get_logger(self, host): fmt = "mmf.async.pp.%s" % (self._fmt % (self.logging_index,)) logger = logging.getLogger(fmt) logger.level = logging.DEBUG decorate_logger(logger) if 0 == self.logging_index: self.logger = logger self.logger.info( "Log messages use the following key to host mapping") self.logger.info("'%s:' -> '%s'" % (fmt, host)) self.logging_index += 1 return logger
[docs] def submit(self, f, *args, **kwargs): if self.use_pp: if self.archive_in: vargs = (f, args, kwargs) a = mmf.archive.Archive() a.insert(f=f, args=args, kwargs=kwargs) vargs = str(a) else: vargs = (f, args, kwargs) _args = (vargs, self.archive_in, self.archive_out) modules = () if self.archive_in or self.archive_out: # This will make 'mmf.archive' available in the global # ppworker scope. Note that we need both to get the # desired effect: see ppworker.import_module. modules = ('mmf.archive','mmf') job = self.pp_server.submit(_call, _args, modules=modules) else: job = self._MyJob(f, *args, **kwargs) self.tasks.append(Task(job, self, f, *args, **kwargs))
[docs] def start_remote_servers(self, hosts): r"""Tries to start a remote server on each specified host using ssh. """ ppservers = [] for host in hosts: if host == socket.gethostname(): # Don't use ourself! continue thread = ManageRemoteServer(host=host, server=self, logger=self.loggers[host[0]]) if thread.local_host is None: # Could not start external process # Should test that host started okay. Should I: ppserver # does check this before connecting... Might need a # timeout here so maybe we should just let ppserver do it # all. A: But what if one is turned off? M: Good point. pass else: ppservers.append(thread.local_host) thread.start() self.threads.append(thread) return tuple(ppservers)
[docs] def finished_tasks(self): r"""Iterate over all tasks, blocking until they are finished.""" if not self.async: # Synchronous processing of tasks in the order they were # submitted for task in self.tasks: yield task else: # Asynchronous processing of tasks in the order they finish. first_task = True while self.tasks: if first_task: first_task = False else: time.sleep(2) # Pause a bit to allow jobs to finish while True: finished = [task for task in self.tasks if task.finished] if not finished: break for task in finished: self.tasks.remove(task) yield task
[docs] def destroy(self): if self.use_pp: self.pp_server.destroy() self.finished.set()
class _MyJob(object): r"""Dummy job class that just executes the job locally without any parallel processing for debugging purposes.""" def __init__(self, f, *args, **kwargs): self.result = f(*args, **kwargs) self.finished = True def __call__(self): return self.result
[docs]class ManageRemoteServer(threading.Thread): r"""This class will spawn a remote server, and then start a `ppserver` process on it. The output will be directed to the logger. There are two options for starting the remote server: 1) ssh to the remote host and launch the ppserver.py and bind it to the specified port of that server. The is the standard way of setting up the server, but allows anyone to connect and run code. This can be mitigated somewhat by starting the server with a "secret" (sent through the SSH connection, so it is not compromised), but won't work if the remote host is firewalled. Here is how this would go:: here$ ssh host1 ppserver.py -s <secret> -t 30 -p 60000 & here$ ssh host2 ppserver.py -s <secret> -t 30 -p 60000 & ... One would connect from here using the `<secret>` and connecting to `host1:60000` and `host2:60000` etc. (but so could anyone else who found out `<secret>`.) 2) Use an ssh tunnel to map a local port here to the local port on the server and then bind it to a local port. In this way, only local processes on the remote host can connect (and we can act as a local process by using the tunnel). Again, processes on the remote host can use the server, so we use the secret, but presumable these are fairly well trusted. In this case we will have to use a sequence of ports here: one for each remote host. Here is how this would go:: here$ ssh -tL 60000:localhost:60000 host1 \ ppserver.py -i localhost -s <secret> -t 30 -p 60000 & here$ ssh -tL 60001:localhost:60000 host2 \ ppserver.py -i localhost -s <secret> -t 30 -p 60000 & ... Passing `localhost` as the interface (`-i`) to the server binds it to port `60000` on that server and allows it to only accept local connections to that port. (I.e. if we try to connect to `host1:60000` as before it will act as if there is nothing listening.) Now we can connect to `localhost:60000` and `localhost:60001`. These connections will be tunnelled over the ssh connection and the remote computers will interpret these as local connections. Only process running on the remote machine, or those who have tunnelled in can use this."""
[docs] def __init__(self, host, server, logger): self.logger = logger threading.Thread.__init__(self) self.host, self.nproc, self.working_dir = host self.server = server self.ssh_options = [] _option = ["-o", "ExitOnForwardFailure=yes"] DEVNULL = os.open(os.devnull, os.O_RDWR) if 0 == subprocess.call(["ssh"] + _option + ["-V"], stdout=DEVNULL, stderr=DEVNULL): self.ssh_options.extend(_option) os.close(DEVNULL) self.proc, self.local_host = self.ssh_connect()
[docs] def run(self): r"""Run the remote server.""" self.logger.info("Starting server on %s:%s" % (self.host, self.working_dir)) self.server.finished.wait() if pexpect is None: self.proc.kill() else: self.proc.close(force=True) self.logger.info("Server on %s terminated" % (self.host, ))
[docs] def ssh_connect(self, max_tries=100): r"""Return `(proc, local_host)` after initiating an ssh connection to the specified host. Parameters ---------- max_tries : int Maximum number of local ports to try. Returns ------- proc : spawn pexpect spawn class representing process (or `None` if not started. local_host : string Address the ppserver should use to connect to remote workers. This is either a remote address, or a local address that has be tunnelled to the remote host. """ connect_msg = "py_mmf_connected" server_command = [ "cd %s;" % (self.working_dir,), "echo %s;" % (connect_msg,), "nice -n %i" % (self.server.nice,), "ppserver.py", "-t %i" % (self.server.remote_timeout,), "-s %s" % (self.server.secret,)] if self.nproc is None: # Autodetect server_command.append("-a") else: server_command.append("-w %i" % (self.nproc,)) ssh_args = [] ssh_args.append(self.host) if self.server.ssh_tunnel: server_command.append("-i localhost -p %(port)i") ssh_args.extend(self.ssh_options) ssh_args.extend(["-L", "%(local_port)i:localhost:%(port)i"]) host = "localhost" else: host = self.host # This forces pseudo-terminal allocation so that the kill # signal will kill the ppserver process. ssh_args.append("-t") proc = None local_host = None local_port = self.server.port while local_port in self.server.local_ports: local_port += 1 # We start an interactive shell so that the .bashrc files will # be processed to define the path. args = ssh_args + ["\"bash -i -c '%s'\"" % (" ".join(server_command),)] if pexpect is None: # Just launch and hope it works args.insert(0, "ssh") args = [_s % dict(port=self.server.port, local_port=local_port) for _s in args] self.logger.debug("Trying command '%s'" % (" ".join(args),)) proc = subprocess.Popen(args, stdout=subprocess.PIPE) local_host = "%s:%i" % (host, local_port) else: local_host = None while local_port < self.server.port + max_tries: if local_port in self.server.local_ports: local_port += 1 continue cmd = ("ssh " + " ".join(args)) % dict( port=self.server.port, local_port=local_port) self.logger.debug("Trying command '%s'" % (cmd,)) proc = pexpect.spawn(cmd, logfile=self.logger, timeout=5) index = proc.expect([ connect_msg, "bind: Address already in use", pexpect.EOF, pexpect.TIMEOUT]) if 0 == index: # Success! self.command = cmd local_host = "%s:%i" % (host, local_port) break elif 1 == index: self.logger.warning( "Could not bind to local port %i... Trying again." % (local_port,)) # Try with a new port local_port += 1 continue elif 2 == index: # EOF self.logger.error( "Server process closed prematurely on host %s" % (self.host,)) break elif 3 == index: # TIMEOUT # Consider prompting for user interaction here. self.logger.error("Server timeout on host %s" % (self.host,)) break if local_host is None: self.logger.error("Could not start server on host %s" % (self.host,)) proc.close(force=True) proc = None else: self.server.local_ports.add(local_port) return proc, local_host
[docs]def kill_hosts(hosts=None): if hosts is None: hosts = _HOSTS hostnames = [] for host in hosts: if isinstance(host, tuple): host = host[0] hostnames.append(host) queue = Queue.Queue() for host in hostnames: proc = subprocess.Popen(["ssh", host, "killall ppserver.py"], stdout=sys.stdout, stdin=sys.stdin)
[docs]def check_hosts(hosts=None, ps=False): r"""Log in to each host interactively to make sure that you can.""" if hosts is None: hosts = _HOSTS hostnames = [] for host in hosts: if isinstance(host, tuple): host = host[0] hostnames.append(host) for host in hostnames: proc = subprocess.Popen(["ping", "-q", "-o", "-t 1", host], stdout=sys.stdout, stdin=sys.stdin) proc.wait() remote_command = 'echo "$(hostname)" Okay.; who' if ps: remote_command = remote_command + '; ps -u mforbes' for host in hostnames: if True or pexpect is None: try: sys.stdout.write("Trying %s... " % (host,)) sys.stdout.flush() proc = subprocess.Popen( ["ssh", host, remote_command], stdout=sys.stdout, stdin=sys.stdin) proc.wait() except KeyboardInterrupt, err: sys.stdout.write("Interrupted. FAIL!\n") sys.stdout.flush() pass
if __name__ == '__main__': import mmf.async.pp as pp # To get hosts. logger = logging.getLogger() logging.basicConfig(level=logging.INFO) #logger.addHandler(logging.StreamHandler()) from numpy import pi hosts = pp._HOSTS print("Running on hosts %s" % (str(hosts), )) res = pp.run_example(hosts=hosts, slice_size=1000000) print("Zeta(%g) = %g +- %g" % ( 0, res, abs(res - pi**2/6,)))