Blue Collar Bioinformatics

Note: new posts have moved to http://bcb.io/ Please look there for the latest updates and comments

Posts Tagged ‘distributed-computing

Parallel approaches in next-generation sequencing analysis pipelines

with 2 comments

My last post described a distributed exome analysis pipeline implemented on the CloudBioLinux and CloudMan frameworks. This was a practical introduction to running the pipeline on Amazon resources. Here I’ll describe how the pipeline runs in parallel, specifically diagramming the workflow to identify points of parallelization during lane and sample processing.

Incredible innovation in throughput makes parallel processing critical for next-generation sequencing analysis. When a single Hi-Seq run can produce 192 samples (2 flowcells x 8 lanes per flowcell x 12 barcodes per lane), the analysis steps quickly become limited by the number of processing cores available.

The heterogeneity of architectures utilized by researchers is a major challenge in building re-usable systems. A pipeline needs to support powerful multi-core servers, clusters and virtual cloud-based machines. The approach we took is to scale at the level of individual samples, lanes and pipelines, exploiting the embarassingly parallel nature of the computation. An AMQP messaging queue allows for communication between processes, independent of the system architecture. This flexible approach allows the pipeline to serve as a general framework that can be easily adjusted or expanded to incorporate new algorithms and analysis methods.

Process overview — points for parallel implementations

The first level of parallelization occurs during processing of each fastq lane. We split the file into individualized barcoded components, followed by alignment and BAM processing. The result is a sorted BAM file for each barcoded sub-sample, given a set of input fastq files:

Initial lane processing

The pipeline merges samples present in barcodes on multiple lanes, producing a single representative BAM file. The next step parallelizes the processing of each alignment file with read quality assessment, preparation for visualization and variant calling:

Sample processing overview

The variant calling steps utilize The Genome Analysis Toolkit (GATK) from the Broad Institute. It prepares alignments by recalibrating initial quality scores given the aligned sequences and consistently realigning reads around indels. The Unified Genotyper identifies variants from this prepared alignment file, then uses these variants along with known true sites for assigning quality scores and filtering to a final set of calls:

GATK variant calling details

Subsequent steps include assessment of variant effects using snpEff and haplotype phasing of variants in diploid organism analyses.

Messaging approach to parallel execution

The process diagrams illustrate points of parallel execution for each fastq file and sample analysis. Practically, a top level analysis server manages each of the sub-processes. A command line script, a LIMS system or a specialized Galaxy interface start this top level process. RabbitMQ messaging facilitates communication between the analysis controller and processing nodes:

Messaging approach

In my previous post, CloudMan manages this entire process. The web interface controls a pre-configured SGE cluster and a custom script starts the job on this cluster. However, the general nature of the pipeline architecture allows this to work equally well on multiple core machines or a heterogeneous set of connected machines.

The CloudMan work demonstrates that clusters, especially on-demand virtual images like those available from Amazon, are be a powerful way to scale analyses. Equally important, it provides an open platform to share these pipelines and encourage re-use. The code for the pipeline is available from the bcbio-nextgen GitHub repository

Written by Brad Chapman

September 10, 2011 at 3:12 pm

Distributed exome analysis pipeline with CloudBioLinux and CloudMan

with 19 comments

A major challenge in building analysis pipelines for next-generation sequencing data is combining a large number of processing steps in a flexible, scalable manner. Current best-practice software needs to be installed and configured alongside the custom code to chain individual programs together. Scaling to handle increasing throughput requires running that custom code on a wide variety of parallel architectures, from single multicore machines to heterogeneous clusters.

Establishing community resources that meet the challenges of building these pipelines ensures that bioinformatics programmers can share the burden of building large scale systems. Two open-source efforts which aim at providing this type of architecture are:

  • CloudBioLinux — A community effort to create shared images filled with bioinformatics software and libraries, using an automated build environment.

  • CloudMan — Uses CloudBioLinux as a platform to build a full SGE cluster environment. Written by Enis Afgan and the Galaxy Team, CloudMan is used to provide a ready-to-run, dynamically scalable version of Galaxy on Amazon AWS.

Here we combine CloudBioLinux software with a CloudMan SGE cluster to build a fully automated pipeline for processing high throughput exome sequencing data:

  • The underlying analysis software is from CloudBioLinux.
  • CloudMan provides an SGE cluster managed via a web front end.
  • RabbitMQ is used for communication between cluster nodes.
  • An automated pipeline, written in Python, organizes parallel processing across the cluster.

Below are instructions for starting a cluster on Amazon EC2 resources to run an exome sequencing pipeline that processes FASTQ sequencing reads, producing fully annotated variant calls.

Start cluster with CloudBioLinux and CloudMan

Start in the Amazon web console, a convenient front end for managing EC2 servers. The first step is to follow the CloudMan setup instructions to create an Amazon account and set up appropriate security groups and user data. The wiki page contains detailed screencasts. Below is a short screencast showing how to boot your CloudBioLinux specific CloudMan server:

Once this is booted, proceed to the CloudMan web interface on the server and startup an instance from this shared identifier:

cm-b53c6f1223f966914df347687f6fc818/shared/2012-07-23--19-23

This screencast shows all of the details, including starting an additional node on the SGE cluster:


Configure AMQP messaging

Edit: The AMQP messaging steps have now been full automated so the configuration steps in this section are no longer required. Skip down to the ‘Run Analysis’ section to start processing the data immediately.

With your server booted and ready to run, the next step is to configure RabbitMQ messaging to communicate between nodes on your cluster. In the AWS console, find the external and internal hostname of the head machine. Start by opening an ssh connection to the machine with the external hostname:

$ ssh -i your-keypair ubuntu@ec2-50-19-177-134.compute-1.amazonaws.com

Edit the /export/data/galaxy/universe_wsgi.ini configuration file to add the internal hostname. After editing, the AMQP section will look like:

[galaxy_amqp]
host = ip-10-125-10-182.ec2.internal
port = 5672
userid = biouser
password = tester

Finally, add the user and virtual host to the running RabbitMQ server on the master node with 3 commands:

$ sudo rabbitmqctl add_user biouser tester
creating user "biouser" ...
...done.
$ sudo rabbitmqctl add_vhost bionextgen
creating vhost "bionextgen" ...
...done.
$ sudo rabbitmqctl set_permissions -p bionextgen biouser ".*" ".*" ".*"
setting permissions for user "biouser" in vhost "bionextgen" ...
...done.

Run analysis

With messaging in place, we are ready to run the analysis. /export/data contains a ready to run example exome analysis, with FASTQ input files in /export/data/exome_example/fastq and configuration information in /export/data/exome_example/config. Start the fully automated pipeline with a single command:

 $ cd /export/data/work
 $ distributed_nextgen_pipeline.py /export/data/galaxy/post_process.yaml
                                   /export/data/exome_example/fastq
                                   /export/data/exome_example/config/run_info.yaml

distributed_nextgen_pipeline.py starts processing servers on each of the cluster nodes, using SGE for scheduling. Then a top level analysis server runs, splitting the FASTQ data across the nodes at each step of the process:

  • Alignment with BWA
  • Preparation of merged alignment files with Picard
  • Recalibration and realignment with GATK
  • Variant calling with GATK
  • Assessment of predicted variant effects with snpEff
  • Preparation of summary PDFs for each sample with read details from FastQC alongside alignment, hybrid selection and variant calling statistics from Picard

Monitor the running process

The example data is from a human chromosome 22 hybrid selection experiment. While running, you can keep track of the progress in several ways. SGEs qstat command will tell you where the analysis servers are running on the cluster:

$ qstat
ob-ID  prior   name   user  state submit/start at   queue
----------------------------------------------------------------------------------
1 0.55500 nextgen_an ubuntu  r  08/14/2011 18:16:32 all.q@ip-10-125-10-182.ec2.int
2 0.55500 nextgen_an ubuntu  r  08/14/2011 18:16:32 all.q@ip-10-86-254-105.ec2.int
3 0.55500 automated_ ubuntu  r  08/14/2011 18:16:47 all.q@ip-10-125-10-182.ec2.int

Listing files in the working directory will show our progress:

$ cd /export/data/work
$ ls -lh
drwxr-xr-x 2 ubuntu ubuntu 4.0K 2011-08-13 21:09 alignments
-rw-r--r-- 1 ubuntu ubuntu 2.0K 2011-08-13 21:17 automated_initial_analysis.py.o11
drwxr-xr-x 2 ubuntu ubuntu   33 2011-08-13 20:43 log
-rw-r--r-- 1 ubuntu ubuntu  15K 2011-08-13 21:17 nextgen_analysis_server.py.o10
-rw-r--r-- 1 ubuntu ubuntu  15K 2011-08-13 21:17 nextgen_analysis_server.py.o9
drwxr-xr-x 8 ubuntu ubuntu  102 2011-08-13 21:06 tmp

The files that end with .o* are log files from each of the analysis servers and provide detailed information about the current state of processing at each server:

$ less nextgen_analysis_server.py.o10
INFO: nextgen_pipeline: Processing sample: Test replicate 2; lane
  8; reference genome hg19; researcher ; analysis method SNP calling
INFO: nextgen_pipeline: Aligning lane 8_100326_FC6107FAAXX with bwa aligner
INFO: nextgen_pipeline: Combining and preparing wig file [u'', u'Test replicate 2']
INFO: nextgen_pipeline: Recalibrating [u'', u'Test replicate 2'] with GATK

Retrieve results

The processing pipeline results in numerous intermediate files. These take up a lot of disk space and are not necessary after processing is finished. The final step in the process is to extract the useful files for visualization and further analysis:

$ upload_to_galaxy.py /export/data/galaxy/post_process.yaml
                      /export/data/exome_example/fastq
                      /export/data/work
                      /export/data/exome_example/config/run_info.yaml

For each sample, this script copies:

  • A BAM file with aligned sequeneces and original FASTQ data
  • A realigned and recalibrated BAM file, ready for variant calling
  • Variant calls in VCF format.
  • A tab delimited file of predicted variant effects.
  • A PDF summary file containing alignment, variant calling and hybrid selection statistics.

into an output directory for the flowcell: /export/data/galaxy/storage/100326_FC6107FAAXX:

$ ls -lh /export/data/galaxy/storage/100326_FC6107FAAXX/7
-rw-r--r-- 1 ubuntu ubuntu  38M 2011-08-19 20:50 7_100326_FC6107FAAXX.bam
-rw-r--r-- 1 ubuntu ubuntu  22M 2011-08-19 20:50 7_100326_FC6107FAAXX-coverage.bigwig
-rw-r--r-- 1 ubuntu ubuntu  72M 2011-08-19 20:51 7_100326_FC6107FAAXX-gatkrecal.bam
-rw-r--r-- 1 ubuntu ubuntu 109K 2011-08-19 20:51 7_100326_FC6107FAAXX-snp-effects.tsv
-rw-r--r-- 1 ubuntu ubuntu 827K 2011-08-19 20:51 7_100326_FC6107FAAXX-snp-filter.vcf
-rw-r--r-- 1 ubuntu ubuntu 1.6M 2011-08-19 20:50 7_100326_FC6107FAAXX-summary.pd

As suggested by the name, the script can also integrate the data into a Galaxy instance if desired. This allows biologists to perform further data analysis, including visual inspection of the alignments in the UCSC browser.

Learn more

All components of the pipeline are open source and part of community projects. CloudMan, CloudBioLinux and the pipeline are customized through YAML configuration files. Combined with the CloudMan managed SGE cluster, the pipeline can be applied in parallel to any number of samples.

The overall goal is to share the automated infrastructure work that moves samples from sequencing to being ready for analysis. This allows biologists more rapid access to the processed data, focusing attention on the real work: answering scientific questions.

If you’d like to hear more about CloudBioLinux, CloudMan and the exome sequencing pipeline, I’ll be discussing it at the AWS Genomics Event in Seattle on September 22nd.

Written by Brad Chapman

August 19, 2011 at 5:33 pm

Usage plans for Amazon Web Services research grant

with 7 comments

Amazon Web Services provide an excellent distributed computing infrastructure through their Elastic Compute Cloud (EC2), Elastic Block Storage (EBS) and associated resources. Essentially, they make available on demand compute power and storage at prices that scale with usage. In the past I’ve written about using EC2 for parallel parsing of large files. Generally, I am a big proponent of distributed computing as a solution to dealing with problems ranging from job scaling to improving code availability.

One of the challenges in advocating for using EC2 at my day to day work is the presence of existing computing resources. We have servers and clusters, but how will we scale for future work? Thankfully, we are able to assess the utility of Amazon services for future scaling through their education and research grants. Our group applied and was accepted for a research grant which we plan to use to develop and distribute next generation sequencing analyses both within our group at Mass General Hospital and in the larger community.

Amazon Machine Images (AMIs) provide an opportunity for the open source bioinformatics community to increase code availability. AMIs are essentially pre-built operating systems with installed programs. By creating AMIs and making them available, a programmer can make their code readily accessible to users and avoid any of the intricacies of installation and configuration. Add this to available data in the form of public data sets and you have a ready to go analysis platform with very little overhead. There is already a large set of available AMIs from which to build.

This idea and our thoughts on moving portions of our next generation sequencing analysis to EC2 are fleshed out further in our research grant application, portions of which are included below. We’d love to collaborate with others moving their bioinformatics work to Amazon resources.

Research Background

One broad area of rapid growth in biology research is deep sequencing (or short read) technology. A single lab investigator can produce hundreds of millions of DNA sequences, equivalent in scale to the entire human genome, in a period of days. This DNA sequencing technology is widely available through both on-site facilities as well as through commercial services. Creating scalable analysis methods is a high priority for the entire bioinformatics community; see http://selab.janelia.org/people/eddys/blog/?p=123 for a presentation nicely summarizing the issues. We propose to address the computational bottlenecks resulting from this huge data volume using distributed AWS resources.

An additional aim of our work is to provide tools to biologists looking to solve their data analysis challenges. When the computational portion of a project becomes a time limiting step, we can often speed up the cycling between experiment and analysis by providing researchers with ready to run scripts or web interfaces. However, this is complicated by high usage on shared computational resources and heterogeneous platforms requiring time consuming configuration. Both problems could be ameliorated by scalable EC2 instances with custom configured machine images.

The goals of this grant application are to develop our analysis platform on Amazon’s compute cloud and assess transfer, storage and utilization costs. We currently have internal computational resources ranging from high performance clusters to large memory machines. We believe Amazon’s compute cloud to be an ideal solution as our analysis needs outgrow our current hardware.

Benefits to Amazon and the community

Developing software on AWS architecture presents a move towards a standard platform for bioinformatics research. Our group is invested in the open source community and shares both code and analysis tools. One common hindrance to sharing is the heterogeneity of platforms; code is developed on a local cluster and not readily generalizable, hence it is not shared.

By building public machine images along with reusable source code, a diverse variety of users can readily use our code and tools. As short read sequencing continues to increase in utility and popularity, a practical ready-to-go platform for analyses will encourage many users to adopt parallelization on cloud resources as a research approach. We have begun initial work with this paradigm by developing parsers for large annotation files using MapReduce on EC2.

Having the ability to utilize AWS with your support will help us further develop and disseminate analysis templates for the larger biology community, enabling science both at MGH and elsewhere.

Written by Brad Chapman

September 7, 2009 at 7:42 pm

MapReduce implementation of GFF parsing for Biopython

with 4 comments

I previously wrote up details about starting a GFF parser for Biopython. In addition to incorporating suggestions received on the Biopython mailing list, it has been redesigned for parallel computation using Disco. Disco is an implementation of the distributed MapReduce framework in Erlang and Python. The code is available from the git repository; this post describes the impetus and design behind the MapReduce revision.

The scale of biological analyses is growing quickly thanks to new sequencing technologies. Bioinformatics programmers will need to learn techniques to rapidly analyze extremely large data sets. My coding toolbox has expanded to tackle these problems in two ways. The first is exploring programming languages designed for speed and parallelism, like Haskell. Additionally, I have been learning general techniques for parallelizing programs. Both require re-thinking code design to take advantage of increasingly available multi-processor and clustered architectures.

The MapReduce framework, originally proposed by Google, exemplifies the idea of redesigning code to analyze large data sets in parallel. In short, the programmer writes two functions: map and reduce. The map function handles the raw parsing work; for instance, it parses a line of GFF text and structures the details of interest. The reduce function combines the results from the map parsing, making them available for additional processing outside of the parallel part of the job. Several implementations of MapReduce have become popularly used. Apache’s Hadoop is a mature Java implementation with an underlying distributed file system. Here we utilize Disco, an implementation in Erlang and Python from Nokia Research Center.

The MapReduce GFF parser consists of two standalone functions. The map function takes a line of GFF text and first determines if we should parse it based on a set of limits. This allows the user to only pull items of interest from the GFF file, saving memory and time:

def _gff_line_map(line, params):
    strand_map = {'+' : 1, '-' : -1, '?' : None, None: None}
    line = line.strip()
    if line[0] != "#":
        parts = line.split('\t')
        should_do = True
        if params.limit_info:
            for limit_name, limit_values in params.limit_info.items():
                cur_id = tuple([parts[i] for i in 
                    params.filter_info[limit_name]])
                if cur_id not in limit_values:
                    should_do = False
                    break

If the GFF line is to be parsed, we use it to build a dictionary with all the details. Additionally, the line is classified as a top level annotation, a standard flat feature with a location, or part of a parent/child nested feature. The results are returned as a dictionary. For the disco parallel implementation, we use JSON to convert the dictionary into a flattened string:

        if should_do:
            assert len(parts) == 9, line
            gff_parts = [(None if p == '.' else p) for p in parts]
            gff_info = dict()
            # collect all of the base qualifiers for this item
            quals = collections.defaultdict(list)
            if gff_parts[1]:
                quals["source"].append(gff_parts[1])
            if gff_parts[5]:
                quals["score"].append(gff_parts[5])
            if gff_parts[7]:
                quals["phase"].append(gff_parts[7])
            for key, val in [a.split('=') for a in gff_parts[8].split(';')]:
                quals[key].extend(val.split(','))
            gff_info['quals'] = dict(quals)
            gff_info['rec_id'] = gff_parts[0]
            # if we are describing a location, then we are a feature
            if gff_parts[3] and gff_parts[4]:
                gff_info['location'] = [int(gff_parts[3]) - 1,
                        int(gff_parts[4])]
                gff_info['type'] = gff_parts[2]
                gff_info['id'] = quals.get('ID', [''])[0]
                gff_info['strand'] = strand_map[gff_parts[6]]
                # Handle flat features
                if not gff_info['id']:
                    final_key = 'feature'
                # features that have parents need to link so we can pick up
                # the relationship
                elif gff_info['quals'].has_key('Parent'):
                    final_key = 'child'
                # top level features
                else:
                    final_key = 'parent'
            # otherwise, associate these annotations with the full record
            else:
                final_key = 'annotation'
            return [(final_key, (simplejson.dumps(gff_info) if params.jsonify
                else gff_info))]

The advantage of this distinct map function is that it can be run in parallel for any line in the file. To condense the results back into a synchronous world, the reduce function takes the results of the map function and combines them into a dictionary of results:

def _gff_line_reduce(map_results, out, params):
    final_items = dict()
    for gff_type, final_val in map_results:
        send_val = (simplejson.loads(final_val) if params.jsonify else 
                final_val)
        try:
            final_items[gff_type].append(send_val)
        except KeyError:
            final_items[gff_type] = [send_val]
    for key, vals in final_items.items():
        out.add(key, (simplejson.dumps(vals) if params.jsonify else vals))

Finally, the dictionaries of GFF information are converted into Biopython SeqFeatures and attached to SeqRecord objects; the standard object interface is identical to that used for GenBank feature files.

Re-writing the code sped it up by a roughly calculated 10% for single processor work. Splitting up parsing and object creation allowed me to apply some simple speed ups which contributed to this improvement. The hidden advantage of learning new programming frameworks is that it encourages you to think about familiar problems in different ways.

This implementation is designed to work both in parallel using Disco, and locally on a standard machine. Practically, this means that Disco is not required unless you care about parallelizing the code. Parallel coding may also not be the right approach for a particular problem. For small files, it is more efficient to run the code locally and avoid the overhead involved with making it parallel.

When you suddenly need to apply your small GFF parsing script to many gigabytes of result data the code will scale accordingly by incorporating Disco. To look at the practical numbers related to this scaling, I plan to follow up on this post with tests using Disco on Amazon’s Elastic Compute Cloud.

Written by Brad Chapman

March 22, 2009 at 10:59 am