Merging MPI-parallelised Rivet Runs on HPC clusters

When analysing events produced on multiple cores there are two common appraches one could take to merge the Rivet output into a single output file at the end of the run:

File-based merging

The “brute force” approach is to initialise an AnalysisHandler per rank and have each rank write out a YODA file. At the end of the runs the various YODA files can be merged using the rivet-merge script (see additional file-based merging documentation here).

Merging in memory

Disk space is expensive, however, and it might be more attractive to merge the output from each individual rank in memory first, such that only a single file needs writing out at the very end. This can be achieved by using the AnalysisHandler::writeData(stream, format) method to extract the analysis objects from the AnalysisHandler into a byte stream for any given rank, which can then be sent across nodes and gathered on a single rank. Once collected, it’s straightforward to merge the different streams using AnalysisHandlers together with the AnalysisHandler::readData(stream, format, preload) method. Here it is necessary to read in the complete stream by setting the preload flag to false which will load every single analysis object from the stream into the AnalysisHandler. The AnalysisHandler ojects can the be merged sequentially, followed by a reentrant run on the merged AnalysisHandler in order to finalize() and write out the analysis objects to an output file in the end.

In Python, this could look something like this:

from mpi4py import MPI
import rivet, io

def processRank(rank):
  ah = rivet.AnalysisHandler("AH%i" % rank)
  # ... analyse some events ...
  ah.finalize()
  out = io.StringIO()
  ah.writeData(out)
  return out

mpi_comm = MPI.COMM_WORLD
mpi_rank = mpi_comm.Get_rank()
mpi_size = mpi_comm.Get_size()

res = processRank(mpi_rank)
res = res.getvalue().encode("utf-8")
res = mpi_comm.gather(res)

if mpi_rank == 0:
  ahmerge = rivet.AnalysisHandler("AHMERGE")
  ahmerge.readData(res[0], preload = False)
  for stream in res[1:]:
    ahtemp = rivet.AnalysisHandler("AHTEMP")
    ahtemp.readData(stream, preload = False)
    ahmerge.merge(ahtemp)
  ahmerge.finalize()
  ahmerge.writeData("mpi_merged_output.yoda.gz")