r"""Tools for working with `Parallel Python`_.
.. note:: You must install `Parallel Python`_ separately for this to
wget http://www.parallelpython.com/downloads/pp/pp-1.6.0.tar.bz2
tar -jxvf pp-1.6.0.tar.bz2
cd pp-1.6.0
python setup.py install
.. _Parallel Python: http://www.parallelpython.com/
Presently, you must be able to ssh to the hosts without a password
(setup a key using ssh-keygen). The list of hosts to use can be
specified manually, or though the `pp_hosts` variable in the
`~/.mmfrc.py` initialization file. If defined, this will be used to
populate the :attr:`_HOSTS` list. It should contain a list of strings
with the host names, or a list of tuples `(hostname, n_cpu)` where
`n_cpu` is an integer specifying how many cores are on the host (`None`
for autodetection) or tuples `(hostname, n_cpu, working_dir)` where
`working_dir` is the directory to change to on the remote machine
before launching the `ppserver`.
.. todo:: Only launch remote servers when needed.
.. todo:: Maybe use the pexpect module to facilitate passwords and
logging in to the remote servers. Also, provide some diagnostics
if the spawn fails. Also, could add a convenience method that
would try to login to all hosts to make sure that it is possible
first before running.
.. todo:: Monitor remote connections and respawn ppservers if any die.
from __future__ import division, absolute_import
__all__ = ['check_hosts', 'kill_hosts', 'Task', 'decorate_logger', 'ServerMMF',
import os
import sys
import re
import warnings
import time
import logging
import subprocess
import socket
import Queue
import random
import mmf.archive
_HOSTS = [] # Set in ~/.mmfrc.py
pp = None
# If this is run as a script, then sys.path[0] will be the current
# directory and this because the names clash. We temporarily remove
# it.
_path = sys.path[0]
sys.path[0] = None
import pp
# Don't timeout on long running jobs!
pp.pptransport.TRANSPORT_SOCKET_TIMEOUT = None
except ImportError, msg:
["Could not import Parallel Python module `pp`: %s" % (str(msg),),
"(Code will be executed in serial...)"]))
sys.path[0] = _path
del _path
import threading
except ImportError:
import dummy_threading as threading
import pexpect
except ImportError:
pexpect = None
# Test Example
def zeta(p, N0, N1):
r"""Sums the terms `N0 <= n < N1` of the zeta function."""
tot = 0.0
for n in xrange(N0, N1):
tot += (1./(n**2 + p))
return tot
def run_example(hosts=_HOSTS,
slice_size=1000000, jobs=20):
r"""Example of how to use the :class:`Server` class to compute the
zeta function.
>>> import logging
>>> logger = logging.getLogger()
>>> logger.setLevel(logging.FATAL)
>>> from numpy import pi
>>> res = run_example(slice_size=100000)
Submitting task 1-100001
Submitting task 100001-200001
Submitting task 200001-300001
Submitting task 300001-400001
Submitting task 400001-500001
Submitting task 500001-600001
Submitting task 600001-700001
Submitting task 700001-800001
Submitting task 800001-900001
Submitting task 900001-1000001
Submitting task 1000001-1100001
Submitting task 1100001-1200001
Submitting task 1200001-1300001
Submitting task 1300001-1400001
Submitting task 1400001-1500001
Submitting task 1500001-1600001
Submitting task 1600001-1700001
Submitting task 1700001-1800001
Submitting task 1800001-1900001
Submitting task 1900001-2000001
Processing task zeta(N0=1,p=0,N1=100001)
Debug listener started ...
Processing task zeta(N0=100001,p=0,N1=200001)
Debug listener started ...
Processing task zeta(N0=200001,p=0,N1=300001)
Processing task zeta(N0=300001,p=0,N1=400001)
Processing task zeta(N0=400001,p=0,N1=500001)
Processing task zeta(N0=500001,p=0,N1=600001)
Processing task zeta(N0=600001,p=0,N1=700001)
Processing task zeta(N0=700001,p=0,N1=800001)
Processing task zeta(N0=800001,p=0,N1=900001)
Processing task zeta(N0=900001,p=0,N1=1000001)
Processing task zeta(N0=1000001,p=0,N1=1100001)
Processing task zeta(N0=1100001,p=0,N1=1200001)
Processing task zeta(N0=1200001,p=0,N1=1300001)
Processing task zeta(N0=1300001,p=0,N1=1400001)
Processing task zeta(N0=1400001,p=0,N1=1500001)
Processing task zeta(N0=1500001,p=0,N1=1600001)
Processing task zeta(N0=1600001,p=0,N1=1700001)
Processing task zeta(N0=1700001,p=0,N1=1800001)
Processing task zeta(N0=1800001,p=0,N1=1900001)
Processing task zeta(N0=1900001,p=0,N1=2000001)
>>> res - pi**2/6
if hosts is not None:
hosts = tuple(hosts)
server = ServerMMF(hosts=hosts, remote_timeout=10, async=True)
for n in xrange(jobs):
N0 = 1 + slice_size*n
N1 = 1 + slice_size*(n+1)
print("Submitting task %i-%i" % (N0, N1))
server.submit(zeta, p=0, N0=N0, N1=N1)
tot = 0.0
for task in server.finished_tasks():
print("Processing task %s" % str(task))
tot += task.result
# Kill the server and all workers: we won't submit more jobs.
return tot
[docs]class Task(object):
[docs] def __init__(self, job, server, f, *args, **kwargs):
self.job = job
self.server = server
self.f = f
self.args = args
self.kwargs = kwargs
[docs] def finished(self):
return self.job.finished
[docs] def result(self):
res = self.job()
if self.server.archive_out:
# Unpack archived output.
_d = {}
if res is not None:
exec res in _d
except Exception, e:
e.args = e.args + (res,)
res = _d['result']
return res
def __str__(self):
sargs = list(map(str, self.args))
skwargs = ["%s=%s" % (k, str(self.kwargs[k])) for k in self.kwargs]
return "%s(%s)" % (self.f.func_name, ",".join(sargs + skwargs))
def _call(vargs, archive_in, archive_out):
r"""This wraps the original call so that we can process the
arguments. This seems to have to be a function rather than a
vargs : (f, args, kwargs)
`f` is the function to call, `args` is the argument list,
`kwargs` is the keyword argument dictionary.
archive_in, archive_out : bool
If these are `True` then use :class:`mmf.archive.Archive` to
package the arguments and/or result.
if archive_in:
_d = {}
exec vargs in _d
f = _d['f']
args = _d['args']
kwargs = _d['kwargs']
f, args, kwargs = vargs
result = f(*args, **kwargs)
if archive_out:
a = mmf.archive.Archive()
return str(a)
return result
[docs]class decorate_logger(object):
r"""Gives a `write` and `flush` method to the logger so that it
can be used by pexpect."""
[docs] def __init__(self, logger):
self.logger = logger
#self.handler = logging.StreamHandler()
# "%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
# give the logger the methods required by pexpect
logger.write = self.write
logger.flush = self.flush
[docs] def write(self, *args, **kwargs):
r"""Write method for logging handler."""
content = args[0] #pexpect only use one arg AFAIK
if content in [' ', '', '\n', '\r', '\r\n']:
return # don't log empty lines
for eol in ['\r\n', '\r', '\n']:
# remove ending EOL, the logger will add it anyway
content = re.sub('\%s$' % eol, '', content)
# call the logger info method with the reworked content
return self.logger.info(content)
[docs] def flush(self):
r"""Flush method: does nothing"""
[docs]class ServerMMF(object):
r"""Server object."""
[docs] def __init__(self, hosts=(), remote_timeout=30,
async=True, use_pp=True, nice=10, port=None,
ssh_tunnel=True, archive_in=False,
working_dir=".", use_local=True):
if port is None:
port = 60000
# Make hosts list unique
_hosts = []
for host in hosts:
if host not in _hosts:
hosts = tuple(hosts)
# Extract nprocs and working dir:
_hosts = []
_nprocs = []
_working_dirs = []
for host in hosts:
_working_dir = working_dir
if isinstance(host, tuple):
_host = host[0]
_nproc = host[1]
if 2 < len(host):
_working_dir = host[2]
_host = host
_nproc = 1
self.hosts = tuple(zip(_hosts, _nprocs, _working_dirs))
# Setup loggers for each host
self.logging_index = 0
if len(self.hosts) < 10:
self._fmt = "%i"
elif len(self.hosts) < 100:
self._fmt = "%2i"
elif len(self.hosts) < 1000:
self._fmt = "%3i"
elif len(self.hosts) < 10000:
self._fmt = "%4i"
self.logger = self.get_logger('localhost')
self.loggers = {}
for host in self.hosts:
self.loggers[host[0]] = self.get_logger(host[0])
self.async = async
self.archive_in = archive_in
self.archive_out = archive_out
self.use_pp = (pp is not None) and use_pp
if self.use_pp:
self.threads = []
self.local_ports = set()
self.nice = nice
self.remote_timeout = remote_timeout
self.secret = str(random.random())
self.ssh_procs = []
self.port = port
self.finished = threading.Event()
self.ssh_tunnel = ssh_tunnel
ppservers = self.start_remote_servers(self.hosts)
ncpus = 0
if use_local:
ncpus = 'autodetect'
self.pp_server = pp.Server(ppservers=ppservers,
self.tasks = []
[docs] def get_logger(self, host):
fmt = "mmf.async.pp.%s" % (self._fmt % (self.logging_index,))
logger = logging.getLogger(fmt)
logger.level = logging.DEBUG
if 0 == self.logging_index:
self.logger = logger
"Log messages use the following key to host mapping")
self.logger.info("'%s:' -> '%s'" % (fmt, host))
self.logging_index += 1
return logger
[docs] def submit(self, f, *args, **kwargs):
if self.use_pp:
if self.archive_in:
vargs = (f, args, kwargs)
a = mmf.archive.Archive()
a.insert(f=f, args=args, kwargs=kwargs)
vargs = str(a)
vargs = (f, args, kwargs)
_args = (vargs, self.archive_in, self.archive_out)
modules = ()
if self.archive_in or self.archive_out:
# This will make 'mmf.archive' available in the global
# ppworker scope. Note that we need both to get the
# desired effect: see ppworker.import_module.
modules = ('mmf.archive','mmf')
job = self.pp_server.submit(_call, _args, modules=modules)
job = self._MyJob(f, *args, **kwargs)
self.tasks.append(Task(job, self, f, *args, **kwargs))
[docs] def start_remote_servers(self, hosts):
r"""Tries to start a remote server on each specified host
using ssh.
ppservers = []
for host in hosts:
if host == socket.gethostname():
# Don't use ourself!
thread = ManageRemoteServer(host=host,
if thread.local_host is None:
# Could not start external process
# Should test that host started okay. Should I: ppserver
# does check this before connecting... Might need a
# timeout here so maybe we should just let ppserver do it
# all. A: But what if one is turned off? M: Good point.
return tuple(ppservers)
[docs] def finished_tasks(self):
r"""Iterate over all tasks, blocking until they are
if not self.async:
# Synchronous processing of tasks in the order they were
# submitted
for task in self.tasks:
yield task
# Asynchronous processing of tasks in the order they finish.
first_task = True
while self.tasks:
if first_task:
first_task = False
time.sleep(2) # Pause a bit to allow jobs to finish
while True:
finished = [task for task in self.tasks
if task.finished]
if not finished:
for task in finished:
yield task
[docs] def destroy(self):
if self.use_pp:
class _MyJob(object):
r"""Dummy job class that just executes the job locally without any
parallel processing for debugging purposes."""
def __init__(self, f, *args, **kwargs):
self.result = f(*args, **kwargs)
self.finished = True
def __call__(self):
return self.result
[docs]class ManageRemoteServer(threading.Thread):
r"""This class will spawn a remote server, and then start a
`ppserver` process on it. The output will be directed to the
There are two options for starting the remote server:
1) ssh to the remote host and launch the ppserver.py and bind
it to the specified port of that server. The is the standard
way of setting up the server, but allows anyone to connect
and run code. This can be mitigated somewhat by starting
the server with a "secret" (sent through the SSH
connection, so it is not compromised), but won't work if
the remote host is firewalled. Here is how this would go::
here$ ssh host1 ppserver.py -s <secret> -t 30 -p 60000 &
here$ ssh host2 ppserver.py -s <secret> -t 30 -p 60000 &
One would connect from here using the `<secret>` and
connecting to `host1:60000` and `host2:60000` etc. (but so
could anyone else who found out `<secret>`.)
2) Use an ssh tunnel to map a local port here to the local
port on the server and then bind it to a local port. In
this way, only local processes on the remote host can
connect (and we can act as a local process by using the
tunnel). Again, processes on the remote host can use the
server, so we use the secret, but presumable these are
fairly well trusted.
In this case we will have to use a sequence of ports here:
one for each remote host. Here is how this would go::
here$ ssh -tL 60000:localhost:60000 host1 \
ppserver.py -i localhost -s <secret> -t 30 -p 60000 &
here$ ssh -tL 60001:localhost:60000 host2 \
ppserver.py -i localhost -s <secret> -t 30 -p 60000 &
Passing `localhost` as the interface (`-i`) to the server
binds it to port `60000` on that server and allows it to
only accept local connections to that port. (I.e. if we
try to connect to `host1:60000` as before it will act as if
there is nothing listening.) Now we can connect to
`localhost:60000` and `localhost:60001`. These connections
will be tunnelled over the ssh connection and the remote
computers will interpret these as local connections. Only
process running on the remote machine, or those who have
tunnelled in can use this."""
[docs] def __init__(self, host, server, logger):
self.logger = logger
self.host, self.nproc, self.working_dir = host
self.server = server
self.ssh_options = []
_option = ["-o", "ExitOnForwardFailure=yes"]
DEVNULL = os.open(os.devnull, os.O_RDWR)
if 0 == subprocess.call(["ssh"] + _option + ["-V"],
self.proc, self.local_host = self.ssh_connect()
[docs] def run(self):
r"""Run the remote server."""
self.logger.info("Starting server on %s:%s" % (self.host,
if pexpect is None:
self.logger.info("Server on %s terminated" % (self.host, ))
[docs] def ssh_connect(self, max_tries=100):
r"""Return `(proc, local_host)` after initiating an ssh
connection to the specified host.
max_tries : int
Maximum number of local ports to try.
proc : spawn
pexpect spawn class representing process (or `None` if
not started.
local_host : string
Address the ppserver should use to connect to remote
workers. This is either a remote address, or a local
address that has be tunnelled to the remote host.
connect_msg = "py_mmf_connected"
server_command = [
"cd %s;" % (self.working_dir,),
"echo %s;" % (connect_msg,),
"nice -n %i" % (self.server.nice,),
"-t %i" % (self.server.remote_timeout,),
"-s %s" % (self.server.secret,)]
if self.nproc is None:
# Autodetect
server_command.append("-w %i" % (self.nproc,))
ssh_args = []
if self.server.ssh_tunnel:
server_command.append("-i localhost -p %(port)i")
ssh_args.extend(["-L", "%(local_port)i:localhost:%(port)i"])
host = "localhost"
host = self.host
# This forces pseudo-terminal allocation so that the kill
# signal will kill the ppserver process.
proc = None
local_host = None
local_port = self.server.port
while local_port in self.server.local_ports: local_port += 1
# We start an interactive shell so that the .bashrc files will
# be processed to define the path.
args = ssh_args + ["\"bash -i -c '%s'\"" % (" ".join(server_command),)]
if pexpect is None:
# Just launch and hope it works
args.insert(0, "ssh")
args = [_s % dict(port=self.server.port,
for _s in args]
self.logger.debug("Trying command '%s'" % (" ".join(args),))
proc = subprocess.Popen(args, stdout=subprocess.PIPE)
local_host = "%s:%i" % (host, local_port)
local_host = None
while local_port < self.server.port + max_tries:
if local_port in self.server.local_ports:
local_port += 1
cmd = ("ssh " + " ".join(args)) % dict(
self.logger.debug("Trying command '%s'" % (cmd,))
proc = pexpect.spawn(cmd,
index = proc.expect([
"bind: Address already in use",
if 0 == index:
# Success!
self.command = cmd
local_host = "%s:%i" % (host, local_port)
elif 1 == index:
"Could not bind to local port %i... Trying again."
% (local_port,))
# Try with a new port
local_port += 1
elif 2 == index:
"Server process closed prematurely on host %s"
% (self.host,))
elif 3 == index:
# Consider prompting for user interaction here.
self.logger.error("Server timeout on host %s"
% (self.host,))
if local_host is None:
self.logger.error("Could not start server on host %s"
% (self.host,))
proc = None
return proc, local_host
[docs]def kill_hosts(hosts=None):
if hosts is None:
hosts = _HOSTS
hostnames = []
for host in hosts:
if isinstance(host, tuple):
host = host[0]
queue = Queue.Queue()
for host in hostnames:
proc = subprocess.Popen(["ssh", host, "killall ppserver.py"],
[docs]def check_hosts(hosts=None, ps=False):
r"""Log in to each host interactively to make sure that you
if hosts is None:
hosts = _HOSTS
hostnames = []
for host in hosts:
if isinstance(host, tuple):
host = host[0]
for host in hostnames:
proc = subprocess.Popen(["ping", "-q", "-o", "-t 1", host],
remote_command = 'echo "$(hostname)" Okay.; who'
if ps:
remote_command = remote_command + '; ps -u mforbes'
for host in hostnames:
if True or pexpect is None:
sys.stdout.write("Trying %s... " % (host,))
proc = subprocess.Popen(
["ssh", host, remote_command],
except KeyboardInterrupt, err:
sys.stdout.write("Interrupted. FAIL!\n")
if __name__ == '__main__':
import mmf.async.pp as pp # To get hosts.
logger = logging.getLogger()
from numpy import pi
hosts = pp._HOSTS
print("Running on hosts %s" % (str(hosts), ))
res = pp.run_example(hosts=hosts, slice_size=1000000)
print("Zeta(%g) = %g +- %g" % ( 0, res, abs(res - pi**2/6,)))