Numerical derivatives in parallel

My last post was about a fairly esoteric new language feature in python: coroutines. I wasn't reading up on them because they're whiz-bang cool (okay, not only because they're whiz-bang cool) but because I have an actual problem that they might help with. For the triple system, I find I need to do very slow numerical optimization problems. That is, the objective function takes seconds to evaluate. It can't be parallelized internally, and numerical minimization is largely a sequential operation: evaluate the function, figure out which way is downhill, go there, repeat. So this code currently uses just one core, and is almost as slow as the highly parallel Markov-Chain Monte Carlo code that does hundreds of times as much work. So obviously it would be nice to find some parallelism. I have a way to do that: parallelize gradient evaluations.

The other reason for this post is to make a point about code robustness. Getting numerical derivatives right is a notoriously difficult process, and getting them both efficient and right is even harder. The package I have been using, MINUIT, has been used by the particle physicists for decades, and they have hammered out all sorts of corner cases that a naive implementation might miss. So I'm not going to use a naive implementation, not least because my problem is numerically difficult. This means the code is not simple, and I don't want to try to restructure it manually to be more concurrent; I want the computer to handle the concurrency for me.

MINUIT does its numerical optimization using a "quasi-Newton method". That is, it tries to approximate the function as a quadratic and jump straight to the minimum. The "quasi-" is because it builds the quadratic estimate by taking gradients at successive points and updating the Hessian matrix based on the measured gradients. This is a reasonably effective method, and it spends almost all its time evaluating numerical gradients.

The function I am using takes 20-30 input parameters and returns a goodness-of-fit value (actually a log-likelihood). So computing the gradient is actually a matter of computing derivatives independently in 20-30 directions. If this could be done in parallel, a substantial speedup would be possible. Importantly, the (highly non-trivial) MINUIT code is built to permit "analytic" gradient functions to be passed in, so this is a change that can be made without reimplementing the entire algorithm.

The numerical derivative algorithm is in principle simple: in dimension d, choose a step size h and compute (f(x+h)-f(x-h))/2h. The trick is choosing the right value for h - too large and the function doesn't look like a parabola, too small and round-off error wrecks the result. MINUIT uses several tricks to find the right h. First, if you also have f(x), you can estimate the second derivative, and use it to work out how much truncation error there is in the (worse) first-order estimate (f(x+h)-f(x))/h. This error is a pessimistic estimate for the truncation error in the symmetric calculation, and it can be used to balance truncation error and round-off. Of course, if your initial h was wildly wrong, your corrected value might not be okay either, so you might have to repeat this until it stabilizes. And obviously having a good initial guess matters, so you want to keep around estimates of the gradient and second derivative from previous function evaluations. Plus you have to be careful about floating-point misbehaviour. So the whole function is kind of complicated. But here it is, converted from MINUIT code to parallel, coroutine-using form:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import concurrent.futures
import asyncio
import time
import numpy as np

@asyncio.coroutine
def pmap(f, xs, loop=None):
    fs = [asyncio.async(f(*x), loop=loop) for x in xs]
    yield from asyncio.wait(fs)
    return [f.result() for f in fs]


class Scalar:
    def __init__(self, f,
                 step_initial=1e-8,
                 f_eps=None,
                 max_gradient_steps=3,
                 step_tolerance=0.3,
                 gradient_tolerance=0.05,
                 dtype=float,
                 pool=None):
        self.f = f
        self.x = None
        self.f_x = None
        self.dtype = dtype
        try:
            self.step_initial = self.dtype(step_initial)
        except TypeError:
            self.step_initial = np.asarray(step_initial)
        if f_eps is None:
            self.f_eps = np.finfo(self.dtype).eps
        else:
            self.f_eps = self.dtype(f_eps)
        self.up = 1.
        self.eps = 8*np.finfo(self.dtype).eps
        self.eps2 = 2*np.sqrt(np.finfo(self.dtype).eps)
        self._gradient = None
        self._gradient_steps = None
        self._gradient_2nd = None
        self.max_gradient_steps = max_gradient_steps
        self.step_tolerance = step_tolerance
        self.gradient_tolerance = gradient_tolerance
        if pool is None:
            self.pool = concurrent.futures.ProcessPoolExecutor()
        else:
            self.pool = pool
        self.loop = asyncio.get_event_loop()


    def __call__(self, x):
        if self.x is None or not np.all(x==self.x):
            self.x = x
            self.f_x = self.loop.run_until_complete(self._f(x))
        return self.f_x

    @asyncio.coroutine
    def _f(self, x):
        return (yield from self.loop.run_in_executor(self.pool, self.f, x))

    def _setup_initial_gradient(self):
        # FIXME: make sure dirin doesn't get too small
        dirin = 2*self.step_initial
        if isinstance(dirin, self.dtype):
            dirin = np.array([dirin]*self.n)
        self._gradient_2nd = 2*self.up/dirin**2
        self._gradient_steps = 0.1*dirin
        self._gradient = self._gradient_2nd*dirin

    def gradient(self, x):
        x = np.asarray(x)
        self(x)
        self.n = len(x)
        if self._gradient is None:
            self._setup_initial_gradient()
        if len(self._gradient_steps) != len(x):
            raise ValueError("Dimensionality of input appears to have changed "
                             "from %d to %d" % (len(self._gradient_steps),
                                                len(x)))
        if False:
            return np.array([
                self.loop.run_until_complete(self._gradient1d(d))
                for d in range(len(x))])
        else:
            return np.array(self.loop.run_until_complete(
                pmap(self._gradient1d,[(d,) for d in range(len(x))])))

    @asyncio.coroutine
    def _gradient1d(self, d):
        dfmin = 8*self.eps2*(np.abs(self.f_x)+self.up)
        vrysml = 8*self.eps**2

        xtf = self.x[d]
        epspri = self.eps2+np.abs(self._gradient[d]*self.eps2)
        stepb4 = 0
        for j in range(self.max_gradient_steps):
            optstp = np.sqrt(dfmin/(np.abs(self._gradient_2nd[d])+epspri))
            step = max(optstp, np.abs(0.1*self._gradient_steps[d]))
            step = min(step, 10*self._gradient_steps[d])
            step = max(step, vrysml)
            step = max(step, 8*np.abs(self.eps2*self.x[d]))
            if np.abs((step-stepb4)/step) < self.step_tolerance:
                break
            self._gradient_steps[d] = step
            stepb4 = step

            x1 = self.x.copy()
            x1[d] = xtf+step
            x2 = self.x.copy()
            x2[d] = xtf-step
            fs1, fs2 = yield from pmap(self._f, [(x1,), (x2,)])

            step = (x1[d]-x2[d])/2 # in case of round-off

            grdb4 = self._gradient[d]
            self._gradient[d] = 0.5*(fs1-fs2)/step
            self._gradient_2nd[d] = (fs1+fs2 - 2.*self.f_x)/step**2

            if (np.abs(grdb4-self._gradient[d])
                /(np.abs(self._gradient[d])+dfmin/step)
                < self.gradient_tolerance):
                break
        return self._gradient[d]

if __name__=='__main__':
    k = np.array([1e-6,1e-3,1,1e3,1e6])
    def f(x):
        return np.sin(np.dot(k,x))
    def df(x):
        return np.cos(np.dot(k,x))*k

    F = Scalar(f)
    for i in range(20):
        print()
        x = np.random.randn(len(k))
        print(F.gradient(x))
        print(df(x))

Unfortunately, coroutines require python 3.4, while the rest of the system uses python 2. So I did a conversion to a thread-based approach, which was gratifyingly simple:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import concurrent.futures
import time
import numpy as np

def pmap(f, xs, pool):
    ts = [pool.submit(f,*x) for x in xs]
    return [t.result() for t in ts]


class Scalar:
    def __init__(self, f,
                 step_initial=1e-8,
                 f_eps=None,
                 max_gradient_steps=3,
                 step_tolerance=0.3,
                 gradient_tolerance=0.05,
                 dtype=float,
                 pool=None):
        self.f = f
        self.x = None
        self.f_x = None
        self.dtype = dtype
        try:
            self.step_initial = self.dtype(step_initial)
        except TypeError:
            self.step_initial = np.asarray(step_initial)
        if f_eps is None:
            self.f_eps = np.finfo(self.dtype).eps
        else:
            self.f_eps = self.dtype(f_eps)
        self.up = 1.
        self.eps = 8*np.finfo(self.dtype).eps
        self.eps2 = 2*np.sqrt(np.finfo(self.dtype).eps)
        self._gradient = None
        self._gradient_steps = None
        self._gradient_2nd = None
        self.max_gradient_steps = max_gradient_steps
        self.step_tolerance = step_tolerance
        self.gradient_tolerance = gradient_tolerance
        if pool is None:
            self.pool = concurrent.futures.ProcessPoolExecutor()
        else:
            self.pool = pool
        self.thread_pool = concurrent.futures.ThreadPoolExecutor(65536)


    def __call__(self, x):
        if self.x is None or not np.all(x==self.x):
            self.x = x
            self.f_x = self.pool.submit(self.f,x).result()
        return self.f_x

    def _setup_initial_gradient(self):
        # FIXME: make sure dirin doesn't get too small
        dirin = 2*self.step_initial
        if isinstance(dirin, self.dtype):
            dirin = np.array([dirin]*self.n)
        self._gradient_2nd = 2*self.up/dirin**2
        self._gradient_steps = 0.1*dirin
        self._gradient = self._gradient_2nd*dirin

    def gradient(self, x):
        x = np.asarray(x)
        self(x)
        self.n = len(x)
        if self._gradient is None:
            self._setup_initial_gradient()
        if len(self._gradient_steps) != len(x):
            raise ValueError("Dimensionality of input appears to have changed "
                             "from %d to %d" % (len(self._gradient_steps),
                                                len(x)))
        return np.array(pmap(self._gradient1d,[(d,) for d in range(len(x))],
                             self.thread_pool))

    def _gradient1d(self, d):
        dfmin = 8*self.eps2*(np.abs(self.f_x)+self.up)
        vrysml = 8*self.eps**2

        xtf = self.x[d]
        epspri = self.eps2+np.abs(self._gradient[d]*self.eps2)
        stepb4 = 0
        for j in range(self.max_gradient_steps):
            optstp = np.sqrt(dfmin/(np.abs(self._gradient_2nd[d])+epspri))
            step = max(optstp, np.abs(0.1*self._gradient_steps[d]))
            step = min(step, 10*self._gradient_steps[d])
            step = max(step, vrysml)
            step = max(step, 8*np.abs(self.eps2*self.x[d]))
            if np.abs((step-stepb4)/step) < self.step_tolerance:
                break
            self._gradient_steps[d] = step
            stepb4 = step

            x1 = self.x.copy()
            x1[d] = xtf+step
            x2 = self.x.copy()
            x2[d] = xtf-step
            fs1, fs2 = pmap(self, [(x1,), (x2,)],
                            self.thread_pool)

            step = (x1[d]-x2[d])/2 # in case of round-off

            grdb4 = self._gradient[d]
            self._gradient[d] = 0.5*(fs1-fs2)/step
            self._gradient_2nd[d] = (fs1+fs2 - 2.*self.f_x)/step**2

            if (np.abs(grdb4-self._gradient[d])
                /(np.abs(self._gradient[d])+dfmin/step)
                < self.gradient_tolerance):
                break
        return self._gradient[d]

if __name__=='__main__':
    k = np.array([1e-6,1e-3,1,1e3,1e6])
    def f(x):
        return np.sin(np.dot(k,x))
    def df(x):
        return np.cos(np.dot(k,x))*k

    F = Scalar(f)
    for i in range(20):
        print()
        x = np.random.randn(len(k))
        print(F.gradient(x))
        print(df(x))


Applying this to the actual triple system problem is a work in progress. There's also the very promising pure-python nested-sampling package nestle, which I would like to convert to use futures to do parallel computation, again because I want to apply it to the triple system. That's a longer-term problem, because my objective function currently crashes if you give it sufficiently-bogus orbital parameters.

2 comments:

Anonymous said...

I'm curious -- is there a reason you wrote your own pmap rather than using the .map method from the PoolExecutor objects?

Unknown said...

Good point. I have my own pmap because asyncio does not provide one(?), and I wrote the coroutine version first. But of course one could simplify the threaded version by using pool.map.