Blue Collar Bioinformatics

Note: new posts have moved to Please look there for the latest updates and comments

Archive for the ‘analysis’ Category

Extending the GATK for custom variant comparisons using Clojure

with 2 comments

The Genome Analysis Toolkit (GATK) is a full-featured library for dealing with next-generation sequencing data. The open-source Java code base, written by the Genome Sequencing and Analysis Group at the Broad Institute, exposes a Map/Reduce framework allowing developers to code custom tools taking advantage of support for: BAM Alignment files through Picard, BED and other interval file formats through Tribble, and variant data in VCF format.

Here I’ll show how to utilize the GATK API from Clojure, a functional, dynamic programming language that targets the Java Virtual Machine. We’ll:

  • Write a GATK walker that plots variant quality scores using the Map/Reduce API.
  • Create a custom annotation that adds a mean neighboring base quality metric using the GATK VariantAnnotator.
  • Use the VariantContext API to parse and access variant information in a VCF file.

The Clojure variation library is freely available and is part of a larger project to provide variant assessment capabilities for the Archon Genomics XPRIZE competition.

Map/Reduce GATK Walker

GATK’s well documented Map/Reduce API eases the development of custom programs for processing BAM and VCF files. The presentation from Eli Lilly is a great introduction to developing your own custom GATK Walkers in Java. Here we’ll follow a similar approach to code these in Clojure.

We’ll start by defining a simple Java base class that extends the base GATK walker and defines an output and input variable. The output is a string specifying the output file to write to, and the input is any type of variant file the GATK accepts. Here we’ll be dealing with VCF input files:

public abstract class BaseVariantWalker extends RodWalker {
  public String out;

  public StandardVariantContextInputArgumentCollection invrns = new StandardVariantContextInputArgumentCollection();

This base class is all the Java we need. We implement the remaining walker in Clojure and will walk through the fully annotated source in sections. To start, we import the base walker we wrote and extend this to generate a Java class, which the GATK will pick up and make available as a command line walker:

(ns bcbio.variation.vcfwalker
  (:import [bcbio.variation BaseVariantWalker])
   :name bcbio.variation.vcfwalker.VcfSimpleStatsWalker
   :extends bcbio.variation.BaseVariantWalker))

Since this is a Map/Reduce framework, we first need to implement the map function. GATK passes this function a tracker, used to retrieve the actual variant call values and a context which describes the current location. We use the invrns argument we defined in Java to reference the input VCF file supplied on the commandline. Finally, we extract the quality score from each VariantContext and return those. This map function produces a stream of quality scores from the input VCF file:

(defn -map
  [this tracker ref context]
  (if-not (nil? tracker)
    (for [vc (map from-vc
                    (.getValues tracker (.variants (.invrns this))
                                (.getLocation context)))]
      (-> vc :genotypes first :qual))))

For the reduce part, we take the stream of quality scores and plot a histogram. In the GATK this happens in 3 functions: reduceInit starts the reduction step and creates a list to add the quality scores to, reduce collects all of the quality scores into this list, and onTraversalDone plots a histogram of these scores using the Incanter statistical library:

(defn -reduceInit

(defn -reduce
  [this cur coll]
  (if-not (nil? cur)
    (vec (flatten [coll cur]))

(defn -onTraversalDone
  [this result]
  (doto (icharts/histogram result
                           :x-label "Variant quality"
                           :nbins 50)
    (icore/save (.out this) :width 500 :height 400)))

We’ve implemented a full GATK walker in Clojure, taking advantage of existing Clojure plotting libraries. To run this, compile the code into a jarfile and run like a standard GATK tool:

$ lein uberjar
$ java -jar bcbio.variation-0.0.1-SNAPSHOT-standalone.jar -T VcfSimpleStats
  -r test/data/grch37.fa --variant test/data/gatk-calls.vcf --out test.png

which produces a plot of quality score distributions:

GATK walker quality scores

Custom GATK Annotation

GATK’s Variant Annotator is a useful way to add metrics information to a file of variants. These metrics allow filtering and prioritization of variants, either by variant quality score recalibration or hard filtering. We can add new annotation metrics by inheriting from GATK Java interfaces. Here we’ll implement Mean Neighboring Base Quality (NBQ), a metric from the Atlas2 variation suite that assesses the quality scores in a region surrounding a variation.

We start walking through the full implementation by again defining a generated Java class that inherits from a GATK interface. In this case, InfoFieldAnnotation:

(ns bcbio.variation.annotate.nbq
  (:import [org.broadinstitute.sting.gatk.walkers.annotator.interfaces.InfoFieldAnnotation]
           [org.broadinstitute.sting.utils.codecs.vcf VCFInfoHeaderLine VCFHeaderLineType])
  (:require [incanter.stats :as istats])
   :name bcbio.variation.annotate.nbq.MeanNeighboringBaseQuality
   :extends org.broadinstitute.sting.gatk.walkers.annotator.interfaces.InfoFieldAnnotation))

The annotate function does the work of calculating the mean quality score. We define functions that use the GATK API to:

  • Retrieve the pileup at the current position.
  • Get the neighbor qualities from a read at a position.
  • Combine the qualities for all reads in a pileup.

With these three functions, we can use the Clojure threading macro to cleanly organize the steps of the operation as we retrieve the pileup, get the qualities and calculate the mean:

(defn -annotate
  [_ _ _ _ contexts _]
  (letfn [(get-pileup [context]
            (if (.hasExtendedEventPileup context)
              (.getExtendedEventPileup context)
              (.getBasePileup context)))
          (neighbor-qualities [[offset read]]
            (let [quals (-> read .getBaseQualities vec)]
              (map #(nth quals % nil) (range (- offset flank-bp) (+ offset flank-bp)))))
          (pileup-qualities [pileup]
            (map neighbor-qualities (map vector (.getOffsets pileup) (.getReads pileup))))]
    {"NBQ" (->> contexts
                (map get-pileup)
                (map pileup-qualities)
                (remove nil?)
                (format "%.2f"))}))

With this in place we can now run this directly using the standard GATK command line arguments. As before, we create a jar file with the new annotator, and then pass the name as a desired annotation when running the VariantAnnotator, producing a VCF file with NBQ annotations:

$ lein uberjar
$ java -jar bcbio.variation-0.0.1-SNAPSHOT-standalone.jar -T VariantAnnotator
   -A MeanNeighboringBaseQuality -R test/data/GRCh37.fa -I test/data/aligned-reads.bam
   --variant test/data/gatk-calls.vcf -o annotated-file.vcf

Access VCF variant information

In addition to extending the GATK through walkers and annotations you can also utilize the extensive API directly, taking advantage of parsers and data structures to handle common file formats. Using Clojure’s Java interoperability, the variantcontext module provides a high level API to parse and extract information from VCF files. To loop through a VCF file and print the location, reference allele and called alleles for each variant we:

  • Open a VCF source providing access to the underlying file inside a with-open statement to ensure closing of the resource.
  • Parse the VCF source, returning an iterator of VariantContext maps for each variant in the file.
  • Extract values from the map: the chromosome, start, reference allele and called alleles for the first genotype.
(use 'bcbio.variation.variantcontext)

(with-open [vcf-source (get-vcf-source "test/data/gatk-calls.vcf")]
  (doseq [vc (parse-vcf vcf-source)]
    (println (:chr vc) (:start vc) (:ref-allele vc)
             (-> vc :genotypes first :alleles)))

This produces:

MT 73 #<Allele G*> [#<Allele A> #<Allele A>]
MT 150 #<Allele T*> [#<Allele C> #<Allele C>]
MT 152 #<Allele T*> [#<Allele C> #<Allele C>]
MT 195 #<Allele C*> [#<Allele T> #<Allele T>]

I hope this tour provides some insight into the powerful tools that can be rapidly built by leveraging the GATK from Clojure. The full library contains a range of additional functionality including normalization of complex MNPs and support for phased haplotype comparisons.

Written by Brad Chapman

March 4, 2012 at 8:13 pm

Posted in analysis

Tagged with , , ,

Making next-generation sequencing analysis pipelines easier with BioCloudCentral and Galaxy integration

with 15 comments

My previous post described running an automated exome pipeline using CloudBioLinux and CloudMan, and generated incredibly useful feedback. Comments and e-mails pointed out potential points of confusion for new users deploying the process on custom data. I also had the chance to get hands on with researchers running CloudBioLinux and CloudMan during the AWS Genomics Event (talk slides are available).

The culmination of all this feedback are two new development projects from the CloudBioLinux community, aimed at making it easier to run custom analysis pipelines:

  • BioCloudCentral — A web service that launches CloudBioLinux and CloudMan clusters on Amazon Web Services hardware. This removes all of the manual steps involved in setting up security groups and launching a CloudBioLinux instance. A user only needs to sign up for an AWS account; BioCloudCentral takes care of everything else.

  • A custom Galaxy integrated front-end to next-generation sequencing pipelines. A jQuery UI wizard interface manages the intake of sequences and specification of parameters. It runs an automated backend processing pipeline with the structured input data, uploading results into Galaxy data libraries for additional analysis.

Special thanks are due to Enis Afgan for his help building these tools. He provided his boto expertise to the BioCloudCentral Amazon interaction, and generalized CloudMan to support the additional flexibility and automation on display here.

This post describes using these tools to start a CloudMan instance, create an SGE cluster and run a distributed variant calling analysis, all from the browser. The behind the scene details described earlier are available: the piepline uses a CloudBioLinux image containing a wide variety of bioinformatics software and you can use ssh or an NX graphical client to connect to the instance. This is the unique approach behind CloudBioLinux and CloudMan: they provide an open framework for building automated, easy-to-use workflows.

BioCloudCentral — starting a CloudBioLinux instance

To get started, sign up for an Amazon Web services account. This gives you access to on demand computing where you pay per hour of usage. Once signed up, you will need your Access Key ID and Secret Access Key from the Amazon security credentials page.

With these, navigate to BioCloudCentral and fill out the simple entry form. In addition to your access credentials, enter your choice of a name used to identify the cluster, and your choice of password to access the CloudMan web interface and the cluster itself via ssh or NX.

Clicking submit launches a CloudBioLinux server on Amazon. Be careful, since you are now paying per hour for your machine; remember to shut it down when finished.

Before leaving the monitoring page, you want to download a pre-formatted user-data file; this allows you to later start the same CloudMan instance directly from the Amazon web services console.

CloudMan — managing the cluster

The monitoring page on BioCloudCentral provides links directly to the CloudMan web interface. On the welcome page, start a shared CloudMan instance with this identifier:


This shared instance contains the custom Galaxy interface we will use, along with FASTQ sequence files for demonstration purposes. CloudMan will start up the filesystem, SGE, PostgreSQL and Galaxy. Once launched, you can use the CloudMan interface to add additional machines to your cluster for processing.

Galaxy pipeline interface — running the analysis

This Galaxy instance is a fork of the main codebase containing a custom pipeline interface in addition to all of the standard Galaxy tools. It provides an intuitive way to select FASTQ files for processing. Login with the demonstration account (user:; password: example) and load FASTQ files along with target and bait BED files into your active history. Then work through the pipeline wizard step by step to start an analysis:

The Galaxy interface builds a configuration file describing the parameters and inputs, and submits this to the backend analysis server. This server kicks off processing, distributing the analysis across the SGE cluster. For the test data, processing will take approximately 4 hours on a cluster with a single additional work node (Large instance type).

Galaxy — retrieving and displaying results

The analysis pipeline uploads the finalized results into Galaxy data libraries. For this demonstration, the example user has results from a previous run in the data library so you don’t need to wait for the analysis to finish. This folder contains alignment data in BAM format, coverage information in BigWig format, a VCF file of variant calls, a tab separate file with predicted variant effects, and a PDF file of summary information. After importing these into your active Galaxy history, you can perform additional analysis on the data, including visualization in the UCSC genome browser:

As a reminder, don’t forget to terminate your cluster when finished. You can do this either from the CloudMan web interface or the Amazon console.

Analysis pipeline details and extending this work

The backend analysis pipeline is a freely available set of Python modules included on the CloudBioLinux AMI. The pipeline closely follows current best practice variant detection recommendations from the Broad GATK team:

The pipeline framework design is general, allowing incorporation of alternative aligners or variant calling algorithms.

We hope that in addition to being directly useful, this framework can fit within the work environments of other developers. The flexible toolkit used is: CloudBioLinux with open source bioinformatics libraries, CloudMan with a managed SGE cluster, Galaxy with a custom pipeline interface, and finally Python to parallelize and manage the processing. We invite you to fork and extend any of the different components. Thank you again to everyone for the amazing feedback on the analysis pipeline and CloudBioLinux.

Written by Brad Chapman

November 29, 2011 at 8:50 pm

Posted in analysis

Tagged with , , ,

Distributed exome analysis pipeline with CloudBioLinux and CloudMan

with 19 comments

A major challenge in building analysis pipelines for next-generation sequencing data is combining a large number of processing steps in a flexible, scalable manner. Current best-practice software needs to be installed and configured alongside the custom code to chain individual programs together. Scaling to handle increasing throughput requires running that custom code on a wide variety of parallel architectures, from single multicore machines to heterogeneous clusters.

Establishing community resources that meet the challenges of building these pipelines ensures that bioinformatics programmers can share the burden of building large scale systems. Two open-source efforts which aim at providing this type of architecture are:

  • CloudBioLinux — A community effort to create shared images filled with bioinformatics software and libraries, using an automated build environment.

  • CloudMan — Uses CloudBioLinux as a platform to build a full SGE cluster environment. Written by Enis Afgan and the Galaxy Team, CloudMan is used to provide a ready-to-run, dynamically scalable version of Galaxy on Amazon AWS.

Here we combine CloudBioLinux software with a CloudMan SGE cluster to build a fully automated pipeline for processing high throughput exome sequencing data:

  • The underlying analysis software is from CloudBioLinux.
  • CloudMan provides an SGE cluster managed via a web front end.
  • RabbitMQ is used for communication between cluster nodes.
  • An automated pipeline, written in Python, organizes parallel processing across the cluster.

Below are instructions for starting a cluster on Amazon EC2 resources to run an exome sequencing pipeline that processes FASTQ sequencing reads, producing fully annotated variant calls.

Start cluster with CloudBioLinux and CloudMan

Start in the Amazon web console, a convenient front end for managing EC2 servers. The first step is to follow the CloudMan setup instructions to create an Amazon account and set up appropriate security groups and user data. The wiki page contains detailed screencasts. Below is a short screencast showing how to boot your CloudBioLinux specific CloudMan server:

Once this is booted, proceed to the CloudMan web interface on the server and startup an instance from this shared identifier:


This screencast shows all of the details, including starting an additional node on the SGE cluster:

Configure AMQP messaging

Edit: The AMQP messaging steps have now been full automated so the configuration steps in this section are no longer required. Skip down to the ‘Run Analysis’ section to start processing the data immediately.

With your server booted and ready to run, the next step is to configure RabbitMQ messaging to communicate between nodes on your cluster. In the AWS console, find the external and internal hostname of the head machine. Start by opening an ssh connection to the machine with the external hostname:

$ ssh -i your-keypair

Edit the /export/data/galaxy/universe_wsgi.ini configuration file to add the internal hostname. After editing, the AMQP section will look like:

host = ip-10-125-10-182.ec2.internal
port = 5672
userid = biouser
password = tester

Finally, add the user and virtual host to the running RabbitMQ server on the master node with 3 commands:

$ sudo rabbitmqctl add_user biouser tester
creating user "biouser" ...
$ sudo rabbitmqctl add_vhost bionextgen
creating vhost "bionextgen" ...
$ sudo rabbitmqctl set_permissions -p bionextgen biouser ".*" ".*" ".*"
setting permissions for user "biouser" in vhost "bionextgen" ...

Run analysis

With messaging in place, we are ready to run the analysis. /export/data contains a ready to run example exome analysis, with FASTQ input files in /export/data/exome_example/fastq and configuration information in /export/data/exome_example/config. Start the fully automated pipeline with a single command:

 $ cd /export/data/work
 $ /export/data/galaxy/post_process.yaml
                                   /export/data/exome_example/config/run_info.yaml starts processing servers on each of the cluster nodes, using SGE for scheduling. Then a top level analysis server runs, splitting the FASTQ data across the nodes at each step of the process:

  • Alignment with BWA
  • Preparation of merged alignment files with Picard
  • Recalibration and realignment with GATK
  • Variant calling with GATK
  • Assessment of predicted variant effects with snpEff
  • Preparation of summary PDFs for each sample with read details from FastQC alongside alignment, hybrid selection and variant calling statistics from Picard

Monitor the running process

The example data is from a human chromosome 22 hybrid selection experiment. While running, you can keep track of the progress in several ways. SGEs qstat command will tell you where the analysis servers are running on the cluster:

$ qstat
ob-ID  prior   name   user  state submit/start at   queue
1 0.55500 nextgen_an ubuntu  r  08/14/2011 18:16:32
2 0.55500 nextgen_an ubuntu  r  08/14/2011 18:16:32
3 0.55500 automated_ ubuntu  r  08/14/2011 18:16:47

Listing files in the working directory will show our progress:

$ cd /export/data/work
$ ls -lh
drwxr-xr-x 2 ubuntu ubuntu 4.0K 2011-08-13 21:09 alignments
-rw-r--r-- 1 ubuntu ubuntu 2.0K 2011-08-13 21:17
drwxr-xr-x 2 ubuntu ubuntu   33 2011-08-13 20:43 log
-rw-r--r-- 1 ubuntu ubuntu  15K 2011-08-13 21:17
-rw-r--r-- 1 ubuntu ubuntu  15K 2011-08-13 21:17
drwxr-xr-x 8 ubuntu ubuntu  102 2011-08-13 21:06 tmp

The files that end with .o* are log files from each of the analysis servers and provide detailed information about the current state of processing at each server:

$ less
INFO: nextgen_pipeline: Processing sample: Test replicate 2; lane
  8; reference genome hg19; researcher ; analysis method SNP calling
INFO: nextgen_pipeline: Aligning lane 8_100326_FC6107FAAXX with bwa aligner
INFO: nextgen_pipeline: Combining and preparing wig file [u'', u'Test replicate 2']
INFO: nextgen_pipeline: Recalibrating [u'', u'Test replicate 2'] with GATK

Retrieve results

The processing pipeline results in numerous intermediate files. These take up a lot of disk space and are not necessary after processing is finished. The final step in the process is to extract the useful files for visualization and further analysis:

$ /export/data/galaxy/post_process.yaml

For each sample, this script copies:

  • A BAM file with aligned sequeneces and original FASTQ data
  • A realigned and recalibrated BAM file, ready for variant calling
  • Variant calls in VCF format.
  • A tab delimited file of predicted variant effects.
  • A PDF summary file containing alignment, variant calling and hybrid selection statistics.

into an output directory for the flowcell: /export/data/galaxy/storage/100326_FC6107FAAXX:

$ ls -lh /export/data/galaxy/storage/100326_FC6107FAAXX/7
-rw-r--r-- 1 ubuntu ubuntu  38M 2011-08-19 20:50 7_100326_FC6107FAAXX.bam
-rw-r--r-- 1 ubuntu ubuntu  22M 2011-08-19 20:50 7_100326_FC6107FAAXX-coverage.bigwig
-rw-r--r-- 1 ubuntu ubuntu  72M 2011-08-19 20:51 7_100326_FC6107FAAXX-gatkrecal.bam
-rw-r--r-- 1 ubuntu ubuntu 109K 2011-08-19 20:51 7_100326_FC6107FAAXX-snp-effects.tsv
-rw-r--r-- 1 ubuntu ubuntu 827K 2011-08-19 20:51 7_100326_FC6107FAAXX-snp-filter.vcf
-rw-r--r-- 1 ubuntu ubuntu 1.6M 2011-08-19 20:50 7_100326_FC6107FAAXX-summary.pd

As suggested by the name, the script can also integrate the data into a Galaxy instance if desired. This allows biologists to perform further data analysis, including visual inspection of the alignments in the UCSC browser.

Learn more

All components of the pipeline are open source and part of community projects. CloudMan, CloudBioLinux and the pipeline are customized through YAML configuration files. Combined with the CloudMan managed SGE cluster, the pipeline can be applied in parallel to any number of samples.

The overall goal is to share the automated infrastructure work that moves samples from sequencing to being ready for analysis. This allows biologists more rapid access to the processed data, focusing attention on the real work: answering scientific questions.

If you’d like to hear more about CloudBioLinux, CloudMan and the exome sequencing pipeline, I’ll be discussing it at the AWS Genomics Event in Seattle on September 22nd.

Written by Brad Chapman

August 19, 2011 at 5:33 pm

Summarizing next-gen sequencing variation statistics with Hadoop using Cascalog

with 3 comments

Improvements in next-generation sequencing technology are leading to ever increasing amounts of sequencing data. With this additional throughput comes the demand for algorithms and approaches that can easily scale. Hadoop offers an open source framework for batch processing large files. This post describes using Cascalog, a Hadoop query language written in Clojure, to investigate quality statistics for variant calling in deeply sequenced regions.

Biological question

The goal is to improve a variation calling algorithm for next-generation sequencing data. We have a densely sequenced region, where each position has thousands of potential base calls. Each position may be a single base, or a mix of of majority and minority variants. We are filtering variants on 3 metrics of quality:

  • Quality score — The sequencing technology’s assessment of the correctness of a base.
  • K-mer score — An estimate of the uniqueness of the region surrounding the base call position, built using khmer. Unique regions are more likely to be sequencing artifacts, while common regions are more likely to be real.
  • Mapping score — The aligner’s estimate of the reliability of the read alignment.

Each read and position is in a tab delimited file that looks like:

951     G       31      0.0515130211584 198

The training data has a set of known variable positions, and details about how the current variant calling algorithm did at each position:

951     T       false_positive  0.7
953     A       true_positive   4.0

We wanted to generate summary statistics at each position of interest, and look for additional criteria that could be built into the calling algorithm.

Writing cascalog queries

Cascalog is based on the Datalog rule language, a subset of Prolog. You describe the rules of a system and the query optimizer figures out how best to satisfy them; it requires a change of mindset from the more standard approach that you need to write detailed instructions about what to do.

Cascalog provides a high level of abstraction over Hadoop and Map-Reduce, so you focus entirely on writing the query. This post from Antonio Piccolboni compares several Hadoop languages; the post provides a nice side-by-side example of the brevity you can achieve with Cascalog.

The main query defines the outputs, retrieves input data from the snpdata and location target files described above, provides a count of reads at each position and base of interest, then averages the kmer, quality and mapping score metrics described earlier:

(defn calc-snpdata-stats [snpdata targets]
  (??<- [?chr ?pos ?base ?count ?avg-score ?avg-kmer-pct
         ?avg-qual ?avg-map ?type]
        (snpdata ?chr ?pos ?base ?qual ?kmer-pct ?map-score)
        (targets ?chr ?pos ?base ?type)
        (ops/count ?count)
        (ops/avg ?kmer-pct :> ?avg-kmer-pct)
        (ops/avg ?qual :> ?avg-qual)
        (ops/avg ?map-score :> ?avg-map)
        (combine-score ?kmer-pct ?qual ?map-score :> ?score)
        (ops/avg ?score :> ?avg-score)))

A big advantage of Cascalog is that it is just Clojure, so you can write custom queries in a full-featured language. The last two lines of the query define a custom score and its average at a position. The custom score is a linear combination of the min-max normalized scores:

(defn min-max-norm [score minv maxv]
  (let [trunc-score-max (if (< score maxv) score maxv)
        trunc-score (if (> trunc-score-max minv) trunc-score-max minv)]
    (/ (- trunc-score minv) (- maxv minv))))

(defmapop combine-score [kmer-pct qual map-score]
  (+ (min-max-norm kmer-pct 1e-5 0.10)
     (min-max-norm qual 4.0 35.0)
     (min-max-norm map-score 0.0 250.0)))

The final part of the code involves parsing the files and producing the snpdata and targets inputs to the query. That code splits each line in the input file and assigns the parts to the variables of interest:

(defmapop parse-snpdata-line [line]
  (let [[space pos base qual kmer-pct map-score] (split line #"\t")]
    [space (Integer/parseInt pos) base (Integer/parseInt qual)
     (Float/parseFloat kmer-pct) (Integer/parseInt map-score)]))

(defn snpdata-from-hfs [dir]
    (<- [?chr ?pos ?base ?qual ?kmer-pct ?map-score]
        (source ?line)
        (parse-snpdata-line ?line :> ?chr ?pos ?base ?qual
                                     ?kmer-pct ?map-score))))

Running on Hadoop

The full project is available on GitHub. To run on a configured Hadoop system, you build the code, copy your input files to HDFS, then run:

% lein deps
% lein uberjar
% hadoop fs -mkdir /tmp/snp-assess/data
% hadoop fs -mkdir /tmp/snp-assess/positions
% hadoop fs -put your_variation_data.tsv /tmp/snp-assess/data
% hadoop fs -put positions_of_interest.tsv /tmp/snp-assess/positions
% hadoop jar snp-assess-0.0.1-SNAPSHOT-standalone.jar
             snp_assess.core /tmp/snp-assess/data /tmp/snp-assess/positions

The same code can also run locally without Hadoop. This is extremely useful for testing and development, or for smaller datasets that do not require the distributed power of Hadoop:

% lein deps
% lein run :snp-data /directory/of/varation/data /directory/of/positions

Both approaches generate tabular output with our positions, counts, scores and average metrics:

| 951 | T |  3 | 0.9 | 2.0e-04 | 24.7 | 55.7  | false_positive |
| 953 | A | 10 | 1.5 | 1.6e-02 | 23.1 | 175.5 | true_positive  |

Overview and additional projects

Cascalog provided an easy to use abstraction on top of Hadoop, which enabled exploration of densely mapped next-generation sequencing reads for variant detection. The code is free of scaling specific details, and instead focuses purely on the data of interest.

Another example of Cascalog in a biological setting is the answer I wrote to Pierre’s question on BioStar, dealing with overlapping genomic segments within Hadoop. The code is available from GitHub as an additional starting point for getting oriented with Hadoop and Cascalog.

Written by Brad Chapman

July 4, 2011 at 9:25 pm

Posted in analysis

Tagged with , , , ,

Bioinformatics jobs at Harvard School of Public Health

with 3 comments

I’ve recently moved positions to the bioinformatics core at Harvard School of Public Health. It’s a great place to do science, with plenty of researchers doing interesting work and actively looking for bioinformatics collaborators. The team, working alongside members of the Hide Lab, is passionate about open source work. Both qualities made it a great fit for my interests and experience.

My new group is currently hiring bioinformatics researchers. The work involves interacting collaboratively with a research group to understand their biological problem, creatively attacking the mountains of data underlying the research question, and presenting the results back in an intuitive fashion. On the programming side, it’s an opportunity to combine existing published toolkits with your own custom algorithms and approaches. On the biology side, you should be passionate and interested in thinking of novel ways to advance our understanding of the problems. Practically, all of this work will involve a wide range of technologies and approaches; I expect plenty of next-generation sequencing data and lots of learning about the best ways to scale analyses.

Our other goal is to build re-usable tools for the larger research community. We work extensively with analysis frameworks like Galaxy and open standards like ISA-Tab. We hope to extract the common parts from disparate experiments to build abstractions that help get new analyses done quicker. Tool building also involves automating and deploying analysis pipelines in a way that allows biologists to run them directly. By democratizing analyses and presenting results to researchers at a high level they can directly interact with, science is accelerated and the world becomes an awesomer place.

So if you enjoy the work I write about here, and have always secretly wanted to sit in an office right next to me, now is your big chance (no stalkers, please). If this sounds of interest, please get in touch and I’d be happy to pass along more details.

Written by Brad Chapman

April 10, 2011 at 2:25 pm

Posted in analysis

Tagged with , ,

Parallel upload to Amazon S3 with python, boto and multiprocessing

with 41 comments

One challenge with moving analysis pipelines to cloud resources like Amazon EC2 is figuring out the logistics of transferring files. Biological data is big; with the rapid adoption of new machines like the HiSeq and decreasing sequencing costs, the data transfer question isn’t going away soon. The use of Amazon in bioinformatics was brought up during a recent discussion on the BioStar question answer site. Deepak’s answer highlighted the role of parallelizing uploads and downloads to ease this transfer burden. Here I describe a method to improve upload speed by splitting over multiple processing cores.

Amazon Simple Storage System (S3) provides relatively inexpensive cloud storage with their reduced redundancy storage option. S3, and all of Amazon’s cloud services, are accessible directly from Python using boto. By using boto’s multipart upload support, coupled with Python’s built in multiprocessing module, I’ll demonstrate maximizing transfer speeds to make uploading data less painful. The script is available from GitHub and requires the latest boto from GitHub (2.0b5 or better).

Parallel upload with multiprocessing

The overall process uses boto to connect to an S3 upload bucket, initialize a multipart transfer, split the file into multiple pieces, and then upload these pieces in parallel over multiple cores. Each processing core is passed a set of credentials to identify the transfer: the multipart upload identifier (, the S3 file key name (mp.key_name) and the S3 bucket name (mp.bucket_name).

import boto

conn = boto.connect_s3()
bucket = conn.lookup(bucket_name)
mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
with multimap(cores) as pmap:
    for _ in pmap(transfer_part, ((, mp.key_name, mp.bucket_name, i, part)
                                  for (i, part) in
                                  enumerate(split_file(tarball, mb_size, cores)))):

The split_file function uses the unix split command to divide the file into sections, each of which will be uploaded separately.

def split_file(in_file, mb_size, split_num=5):
    prefix = os.path.join(os.path.dirname(in_file),
                          "%sS3PART" % (os.path.basename(s3_key_name)))
    split_size = int(min(mb_size / (split_num * 2.0), 250))
    if not os.path.exists("%saa" % prefix):
        cl = ["split", "-b%sm" % split_size, in_file, prefix]
    return sorted(glob.glob("%s*" % prefix))

The multiprocessing aspect is managed using a contextmanager. The initial multiprocessing pool is setup, using a specified number of cores, and configured to allow keyboard interrupts. We then return a lazy map function (imap) which can be used just like Python’s standard map. This transparently divides the function calls for each file part over all available cores. Finally, the pool is cleaned up when the map is finished running.

def multimap(cores=None):
    if cores is None:
        cores = max(multiprocessing.cpu_count() - 1, 1)
    def wrapper(func):
        def wrap(self, timeout=None):
            return func(self, timeout=timeout if timeout is not None else 1e100)
        return wrap = wrapper(
    pool = multiprocessing.Pool(cores)
    yield pool.imap

The actual work of transferring each portion of the file is done using two functions. The helper function, mp_from_ids, uses the id information about the bucket, file key and multipart upload id to reconstitute a multipart upload object:

def mp_from_ids(mp_id, mp_keyname, mp_bucketname):
    conn = boto.connect_s3()
    bucket = conn.lookup(mp_bucketname)
    mp = boto.s3.multipart.MultiPartUpload(bucket)
    mp.key_name = mp_keyname = mp_id
    return mp

This object, together with the number of the file part and the file itself, are used to transfer that section of the file. The file part is removed after successful upload.

def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part):
    mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname)
    print " Transferring", i, part
    with open(part) as t_handle:
        mp.upload_part_from_file(t_handle, i+1)

When all sections, distributed over all processors, are finished, the multipart upload is signaled complete and Amazon finishes the process. Your file is now available on S3.

Parallel download

Download speeds can be maximized by utilizing several existing parallelized accelerators:

Combine these with the uploader to build up a cloud analysis workflow: move your data to S3, run a complex analysis pipeline on EC2, push the results back to S3, and then download them to local machines. Please share other tips and tricks you use to deal with Amazon file transfer in the comments.

Written by Brad Chapman

April 10, 2011 at 1:27 pm

Posted in analysis

Tagged with , , ,

Gene Ontology analysis with Python and Bioconductor

with 9 comments

Last post we discussed extracting differentially expressed genes from a set of short read data. The output of this analysis is a large list of genes, and the next challenge is to try and extract interesting patterns from the results. Are genes in particular families over-represented? Are there a large number of genes with a particular function? These patterns can help direct your future experiments and provide an understandable grouping for biologists examining the data.

The Gene Ontology (GO) project provides a standardized set of terms describing the molecular function of genes. We will use the topGO package from the Bioconductor project to identify over-represented GO terms from a set of differentially expressed genes. Python will be used to prepare the data, utilizing rpy2 to call R for the statistical analysis.

The first step is to parse input files describing the differentially expressed genes and the mapping of gene names to GO terms. For the example we will use a simple CSV file from our previous analysis and an equally simple file describing the gene to GO mapping. For real life work, you will need to get the GO mapping file for your organism; for instance, here is the Arabidopsis GO mapping from TAIR.

The input file of differentially expressed genes is parsed into a dictionary where the keys are gene names and the values are associated p-values:

import csv

def parse_input_csv(in_handle):
    reader = csv.reader(in_handle) # header
    all_genes = dict()
    for (gene_name, _, _, pval) in reader:
        all_genes[gene_name] = float(pval)
    return all_genes

The GO mapping input file is parsed into two dictionaries: one mapping genes to GO terms for the GO analysis, and the second mapping GO terms to genes for displaying the results of the analysis:

import collections

def parse_go_map_file(in_handle, genes_w_pvals):
    gene_list = genes_w_pvals.keys()
    gene_to_go = collections.defaultdict(list)
    go_to_gene = collections.defaultdict(list)
    for line in in_handle:
        parts = line.split("\t")
        gene_id = parts[0]
        go_id = parts[1].strip()
        if gene_id in gene_list:
    return dict(gene_to_go), dict(go_to_gene)

Next we set up our associated R environment for GO analysis. This involves loading the topGO library and creating a function to classify genes as either differentially expressed, or not. We do this based on the p-value, classifying genes with a p-value of 0.01 or less as differentially expressed:

import rpy2.robjects as robjects

gene_pval = 1e-2
    topDiffGenes = function(allScore) {
      return (allScore < %s)
''' % gene_pval)

Now it’s time to run the analysis. The paramters are defined as a dictionary which initializes a topGOdata object:

  • ontology — the type of GO ontology to analyze; molecular function (MF) is examined here.
  • annot — how the GO terms are annotated; we specify they will be identified by a mapping of genes to GO ids, passed as the gene2GO argument.
  • geneSelectionFun — the function we defined above to determine if genes are differentially expressed.
  • allGenes — an R named vector where the names are genes and the values are p-values; we use a utility function to convert a python dictionary to the named vector.
  • gene2GO — an R named vector mapping gene names to associated GO terms.

The initialized object is run using Fisher classic analysis. Several different analysis methods and test statistics are available in the package. The resulting scores are extracted and a summary table of the results is generated:

def _dict_to_namedvector(init_dict):
    """Call R to create a named vector from an input dictionary.
    return robjects.r.c(**init_dict)

go_term_type = "MF"
topgo_method = "classic" # choice of classic, elim, weight
params = {"ontology" : go_term_type,
          "annot" : robjects.r["annFUN.gene2GO"],
          "geneSelectionFun" : robjects.r["topDiffGenes"],
          "allGenes" : _dict_to_namedvector(gene_vals),
          "gene2GO" : _dict_to_namedvector(gene_to_go)
go_data ="topGOdata", **params)
results = robjects.r.runTest(go_data, algorithm=topgo_method,
scores = robjects.r.score(results)
score_names = scores.getnames()
num_summarize = min(100, len(score_names))
results_table = robjects.r.GenTable(go_data, elimFisher=results,
        orderBy="elimFisher", topNodes=num_summarize)

The scores and results table are used to generate a list of over-represented GO terms with their associated p-values and descriptions:

# extract term names from the topGO summary dataframe
ids_to_terms = dict()
for index, go_id in enumerate(results_table[GO_ID_INDEX]):
    ids_to_terms[go_id] = results_table[TERM_INDEX][index]
go_terms = []
# convert the scores and results information info terms to return
for index, item in enumerate(scores):
    if item < go_pval:
        go_id = score_names[index]
        go_terms.append((item, go_id, ids_to_terms.get(go_id, "")))

Finally, we print out the resulting overexpressed GO terms along with associated genes:

def print_go_info(go_terms, go_term_type, go_to_gene):
    for final_pval, go_id, go_term in go_terms:
        genes = []
        for check_go in [go_id] + get_go_children(go_id, go_term_type):
            genes.extend(go_to_gene.get(check_go, []))
        genes = sorted(list(set(genes)))
        print "-> %s (%s) : %0.4f" % (go_id, go_term, final_pval)
        for g in genes:
            print g

One tricky part of the post-analysis processing is finding all of the genes associated with an identified GO id. The GO terminology is hierarchical, so a resulting GO identifier like GO:0042578 in the example results file can represent both that term and more specific terms. A nice way to illustrate this for yourself is to look at the AmiGO browser display for the term.

The result is that genes associated with an over-represented term will come from both the identified term and more specific terms. Bioconductor has an interface to the GO database, so we can extract all of the relevant terms by moving down the GO tree collecting the more specific children, given the parent term:

def get_go_children(go_term, go_term_type):
    child_map = robjects.r["GO%sCHILDREN" % (go_term_type)]
    children = []
    to_check = [go_term]
    while len(to_check) > 0:
        new_children = []
        for check_term in to_check:
            new_children.extend(list(robjects.r.get(check_term, child_map)))
        new_children = list(set([c for c in new_children if c]))
        to_check = new_children
    children = list(set(children))
    return children

The full script pulls all of this code together into a working example. You will need to update the parsers to match your input and GO mapping file, but otherwise the code can plug directly into full genome analyses of differential expression data.

Written by Brad Chapman

October 18, 2009 at 11:03 am

Differential expression analysis with Bioconductor and Python

with 10 comments

This post demonstrates performing differential expression analysis of short read sequencing data using a combination of Python and the R statistical language. Python is used a glue language to manipulate and prepare count data from short read sequencing. R and the Bioconductor package are used to perform the statistical analysis. The excellent rpy2 package connection Python and R.

Scientists often need to compare expression results from multiple experimental conditions. This can be done with microarray analysis and the wide variety of associated statistical tests, but many researchers are now utilizing short read sequencing. Experiments identify transcript prevalence through digital gene expression under multiple conditions, and we are interested in extracting statistically meaningful differences from the results. Similarly, we may be interested in differences resulting from short RNA discovery or other next generation sequencing applications.

Short read differential expression analysis differs from microarray analyses as it is based on counts instead of intensity scores. The edgeR package in Bioconductor fits count data to a statistical model considering sample to sample variation, and performs Fisher’s exact test to provide p-values associated with changes in expression between samples.

Here we will consider the case of a two experiment analysis. A simple Example CSV file has counts for 5 different genes under 2 conditions and total count information for the full experiment. We read the file into a dictionary of counts keyed by each condition and the gene name:

import csv
import collections

def read_count_file(in_file):
    """Read count information from a simple CSV file into a dictionary.
    counts = collections.defaultdict(dict)
    with open(in_file) as in_handle:
        reader = csv.reader(in_handle)
        header =
        conditions = header[1:]
        for parts in reader:
            region_name = parts[0]
            region_counts = [float(x) for x in parts[1:]]
            for ci, condition in enumerate(conditions):
                counts[condition][region_name] = region_counts[ci]
    return dict(counts)

The dictionary is organized into NumPy matrices; NumPy is a powerful numerical package for Python which integrates smoothly with our rpy2 interface to import directly into R. Here we organize our conditions and genes and then push the count data into a matrix where the columns are conditions and the rows are genes; each item is a count value. This is returned along with associated data:

  • groups — A list of experimental groups, which can be used to analyze replicates. Here, two groups with a single replicate each are defined.
  • sizes — The total number of counts for each experiment. This is extracted from the “Total” row of the CSV count file, and will be used for normalization of during the statistical analysis.

import numpy

def get_conditions_and_genes(work_counts): 
    conditions = work_counts.keys()
    all_genes = []
    for c in conditions:
    all_genes = list(set(all_genes))
    sizes = [work_counts[c]["Total"] for c in conditions]
    return conditions, all_genes, sizes
def edger_matrices(work_counts):
    conditions, all_genes, sizes = get_conditions_and_genes(work_counts)
    assert len(sizes) == 2
    groups = [1, 2]
    data = []
    final_genes = []
    for g in all_genes:
        cur_row = [int(work_counts[c][g]) for c in conditions]
        if sum(cur_row) > 0:
    return (numpy.array(data), numpy.array(groups), numpy.array(sizes),
            conditions, final_genes)

The organized data is now ready to be pushed directly into an edgeR Bioconductor analysis using the rpy2 interface. Three R functions are called: the data matrices are organized into a DGEList object, this object is passed to deDGE which does the actual digital gene expression analysis, and finally topTags is called to retrieve a vector of differential expression p-values. The vector is translated into a Python object from which we extract the p-values and re-organize them by the initial gene indexes. The ordered p-values are then returned.

import rpy2.robjects as robjects
import rpy2.robjects.numpy2ri

def run_edger(data, groups, sizes, genes):
    params = {'group' : groups, 'lib.size' : sizes}
    dgelist = robjects.r.DGEList(data, **params)
    ms = robjects.r.deDGE(dgelist, doPoisson=True)
    tags = robjects.r.topTags(ms, pair=groups, n=len(genes))
    indexes = [int(t) - 1 for t in tags.rownames()]
    pvals = list(tags.r['adj.P.Val'][0])
    assert len(indexes) == len(pvals)
    pvals_w_index = zip(indexes, pvals)
    assert len(pvals_w_index) == len(indexes)
    return [p for i,p in pvals_w_index]

The final results are written to a CSV file with our ordered genes, conditions and p-value probabilities. Each gene is associated with the initial count data and p-values, the genes are sorted by p-value with the most differentially expressed genes placed first, and the ordered information is written out:

def write_outfile(outfile, genes, conditions, work_counts, probs):
    with open(outfile, "w") as out_handle:
        writer = csv.writer(out_handle)
        writer.writerow(["Region"] +
                ["%s count" % c for c in conditions] + ["edgeR p-value"])
        out_info = []
        for i, gene in enumerate(genes):
            counts = [int(work_counts[c][gene]) for c in conditions]
            out_info.append((probs[i], [gene] + counts))
        [writer.writerow(start + [prob]) for prob, start in out_info]

Our example output file shows the results. The full script is available for you to use and customize for your own analyses. This example demonstrates the power of the rpy2 interface. Input and output data is happily manipulated in a familiar language such as Python, and can be seamlessly integrated with excellent statistical computation in Bioconductor and other specialized R packages.

Written by Brad Chapman

September 13, 2009 at 8:28 pm

Organization of literature using PubMed related articles

leave a comment »

When dealing with a long list of journal articles, what is the best method to organize them? I was confronted with this problem in designing an interface where users would pick specific papers and retrieve results tied to them. Presenting them as the raw list was unsatisfying; it is fine for users who know exactly what articles they want, but naive users would have a lot of difficulty finding relevant articles. Even for power users, a better classification system could help reveal items they may not have known about.

The approach I took was to group together papers based on similarity. The NCBI PubMed literature database has links to related articles, which it exposes programmatically though EUtils. Using the Biopython Entrez interface, the first step is to retrieve a dictionary of related IDs for each starting article, ordered by relevance:

def _get_elink_related_ids(self, pmids):
    pmid_related = {}
    for pmid in pmids:
        handle = Entrez.elink(dbform='pubmed', db='pubmed', id=pmid)
        record =
        cur_ids = []
        for link_dict in record[0]['LinkSetDb'][0]['Link']:
            cur_ids.append((int(link_dict.get('Score', 0)),
        local_ids = [x[1] for x in cur_ids if x[1] in pmids]
        if pmid in local_ids:
        pmid_related[pmid] = local_ids
    return pmid_related

Trying to group directly based on this dictionary will often result in one large group, since many of the articles may be linked together through a few common articles. For instance, a review may be related to several other papers in non-overlapping areas. To make the results as useful as possible we define a maximum and minimum group size, and a two parameters to filter the related lists:

  • overrep_thresh: The percentage of papers an item is related to out of all papers being grouped; the threshold sets a maximum number of papers that can be related. For instance, a value of .25 means that a journal will be related to 25% or less of the total papers.
  • related_max: The number of related papers to use. The best related articles go into the grouping.

These parameters define a filter for our dictionary of related articles:

def _filter_related(self, inital_dict, overrep_thresh, related_max):
    final_dict = {}
    all_vals = reduce(operator.add, inital_dict.values())
    for item_id, item_vals in inital_dict.items():
        final_vals = [val for val in item_vals if 
            float(all_vals.count(val)) / len(inital_dict) <= overrep_thresh&#93;
        final_dict&#91;item_id&#93; = final_vals&#91;:related_max&#93;
    return final_dict

The filtered list is grouped using a generalized version of the <code>examine_paralogs</code> function used in an earlier post to group together <a href="">location and duplication information</a>. Sets combine any groups with overlapping articles:

def _groups_from_related_dict(self, related_dict):
    cur_groups = []
    all_base = related_dict.keys()
    for base_id, cur_ids in related_dict.items():
        overlap = set(cur_ids) & set(all_base)
        if len(overlap) > 0:
            new_group = set(overlap | set([base_id]))
            is_unique = True
            for exist_i, exist_group in enumerate(cur_groups):
                if len(new_group & exist_group) > 0:
                    update_group = new_group | exist_group
                    cur_groups[exist_i] = update_group
                    is_unique = False
            if is_unique:
    return [list(g) for g in cur_groups]

With this list, we want to extract the groups and their articles that fit in our grouping criteria, the minimum and maximum size:

def _collect_new_groups(self, pmid_related, groups):
final_groups = []
for group_items in groups:
final_items = [i for i in group_items if pmid_related.has_key(i)]
if (len(final_items) >= self._min_group and
len(final_items) <= self._max_group): final_groups.append(final_items) for item in final_items: del pmid_related[item] final_related_dict = {} for pmid, related in pmid_related.items(): final_related = [r for r in related if pmid_related.has_key(r)] final_related_dict[pmid] = final_related return final_groups, final_related_dict [/sourcecode]

Utilizing these functions, the main algorithm steps through a series of increasingly less stringent parameters picking out groups which fall into our thresholds. Closely related journal articles are grouped first; more general papers with less association will be placed in groups in later rounds:

def get_pmid_groups(self, pmids):
pmid_related = self._get_elink_related_ids(pmids)
filter_params = self._filter_params[:]
final_groups = []
while len(pmid_related) > 0:
if len(filter_params) == 0:
raise ValueError(“Ran out of parameters before finding groups”)
cur_thresh, cur_related = filter_params.pop(0)
while 1:
filt_related = self._filter_related(pmid_related, cur_thresh,
groups = self._groups_from_related_dict(filt_related)
new_groups, pmid_related = self._collect_new_groups(
pmid_related, groups)
if len(new_groups) == 0:
if len(pmid_related) < self._max_group: final_groups.append(pmid_related.keys()) pmid_related = {} return final_groups [/sourcecode]

The full code wrapped up into a class is available from the GitHub repository.

This is one approach to automatically grouping a large list of literature to make interesting items more discoverable. With the work being done on full text indexing, the data underlying resources such as iHOP might be used to do these groupings even more effectively using similar algorithms.

Written by Brad Chapman

February 13, 2009 at 6:19 am