PySpark for HPC: some success but a work in progress

Update 25.3.2021: I haven't ended up using PySpark as much as I might have hoped or expected, but I think this post has some interesting ideas in it all the same!

Introduction

Recently, I published the paper Revisiting the envelope approximation: gravitational waves from bubble collisions. This work compares two methods for computing the gravitational wave power spectrum from colliding bubbles from a phase transition in the early universe. It turned out to be an ideal example of a physics simulation in which the MapReduce algorithm can be applied.

Physics background and motivation

Normally, when I am doing numerical simulations of physics the system can be treated with domain decomposition methods - 3D and 4D lattice Monte Carlo simulations, and real-time simulations of physics in the early universe usually involve scalar fields \(\Phi(\mathbf{x})\), gauge fields, represented by parallel transporters (mini-Wilson lines) \(U_i(\mathbf{x})\) and also fermions \(\Psi(\mathbf{x})\). Normally, these fields exist everywhere in space and time; when doing a simulation they exist only inside our box (3D or 4D euclidean) and only on discrete points (or parallel transporters between those points). If a classical field theory, the box represents a state at a given time \(t\) and is evolved to the next timestep \(t+\delta t\) by some symplectic method. For quantum field theories, the box represents a configuration sampled from the Euclidean field theory with weight \(e^{-S[\Phi]}\). Parallelising these simulations means dividing the box into smaller boxes and sharing those out between different nodes, exchanging information every timestep (or Monte Carlo update) about the boundaries of each smaller box.

For simulating a first order phase transition, one can use a real-time lattice simulation. Several bubbles expand in a big box - but the number of points the box can contain is limited: either your computer will run out of memory, or you will run out of nodes to share the box across, or communicating the state of each smaller box to its neighbours will overwhelm the time you spend actually doing stuff. That gives you limited dynamic range


Slice through a simulation of a first order phase transition. Slice through a simulation of a first-order phase transition, such as that which might have happened when the Higgs became massive.


The envelope approximation is different

The envelope approximation is a way of computing the gravitational wave power spectrum from first-order phase transitions. To summarise briefly: in the Standard Model of particle physics, the Higgs particle got its mass in a gentle process, termed a crossover. However, we know the Standard Model is incomplete, and many extensions of it allow for situations where this happened in a much more violent manner. It is conceivable that bubbles of massive Higgs phase nucleated and grew, until the Higgs was massive in the whole universe. As the bubbles collided, they would produce a characteristic signature in gravitational waves, which we might be able to detect in the next couple of decades.

There's lots of jargon in there (and here is a recent summary on the state of the art, for the interested reader), but all you need to know is that, for a given simulation configuration the envelope approximation is a set of integrals that can be done in parallel (one per bubble), with the total gravitational wave power computed by summing the individual contributions from each integral.

Here is a perfect MapReduce problem: for each frequency \(\omega\) of the power spectrum we want to calculate map each thread to a different bubble \(n\), do the integrals for that bubble in splendid isolation, then reduce the total to compute the gravitational wave power \(\Omega_\text{gw}(\omega)\). The good news is a lot of the hard work is not strongly \(\omega\) dependent, so we can speed things up by giving each thread the same bubble \(n\) each time. This is what my code does, and it works pretty well. At the top level, it uses either MPI4py or PySpark. In fact, because this is such a natural MapReduce-style problem, the pyspark approach looks more natural! Below the top most layer of code, the actual calculations trivially parallelise and so there is no difference.

But what about fault tolerance?

Aha, this is the tricky one. Both [Py]Spark and MapReduce more generally, are claimed to have greater fault tolerance. Assuming the input data are available, each MapReduce step can be restarted easily enough - and indeed, in Spark this is automatic. However, the approach I outlined above does not benefit from this because the 'cached' data produced at one \(\omega\) is not stored as a Resilient Distributed Dataset, just a set of standard Python dictionaries. Restarting one of the threads will lose this cached data and slow things down.

And will you use MapReduce more in the future?

Absolutely. The diffusion equation example in Jonathan Dursi's post HPC is dying, and MPI is killing it was a revelation for me (code example is at the bottom). It's unlikely that performance of my simulation codes (in carefully written C, with MPI communication) would ever be reproduced using Spark with an interpreted language, but for a student wanting to do a small project and write their own code from scratch, I'd strongly recommend using it. Personally, I'll be sizing up every future project where I need to write a new code or heavily modify an existing one and asking myself "should I write this using a MapReduce-type approach?".

Afterword

I'm not finishing experimenting with my envelope approximation code. I plan to make it public on BitBucket soon (if you want to work with it right away, give me an email), and I also hope to modify the code so that the 'cache' data I mentioned to functions as an RDD. If and when these things happen, I'll update this blogpost.