Blue Collar Bioinformatics

Parallel upload to Amazon S3 with python, boto and multiprocessing

with 16 comments

One challenge with moving analysis pipelines to cloud resources like Amazon EC2 is figuring out the logistics of transferring files. Biological data is big; with the rapid adoption of new machines like the HiSeq and decreasing sequencing costs, the data transfer question isn’t going away soon. The use of Amazon in bioinformatics was brought up during a recent discussion on the BioStar question answer site. Deepak’s answer highlighted the role of parallelizing uploads and downloads to ease this transfer burden. Here I describe a method to improve upload speed by splitting over multiple processing cores.

Amazon Simple Storage System (S3) provides relatively inexpensive cloud storage with their reduced redundancy storage option. S3, and all of Amazon’s cloud services, are accessible directly from Python using boto. By using boto’s multipart upload support, coupled with Python’s built in multiprocessing module, I’ll demonstrate maximizing transfer speeds to make uploading data less painful. The script is available from GitHub and requires the latest boto from GitHub (2.0b5 or better).

Parallel upload with multiprocessing

The overall process uses boto to connect to an S3 upload bucket, initialize a multipart transfer, split the file into multiple pieces, and then upload these pieces in parallel over multiple cores. Each processing core is passed a set of credentials to identify the transfer: the multipart upload identifier (mp.id), the S3 file key name (mp.key_name) and the S3 bucket name (mp.bucket_name).

import boto

conn = boto.connect_s3()
bucket = conn.lookup(bucket_name)
mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
with multimap(cores) as pmap:
    for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part)
                                  for (i, part) in
                                  enumerate(split_file(tarball, mb_size, cores)))):
        pass
mp.complete_upload()

The split_file function uses the unix split command to divide the file into sections, each of which will be uploaded separately.

def split_file(in_file, mb_size, split_num=5):
    prefix = os.path.join(os.path.dirname(in_file),
                          "%sS3PART" % (os.path.basename(s3_key_name)))
    split_size = int(min(mb_size / (split_num * 2.0), 250))
    if not os.path.exists("%saa" % prefix):
        cl = ["split", "-b%sm" % split_size, in_file, prefix]
        subprocess.check_call(cl)
    return sorted(glob.glob("%s*" % prefix))

The multiprocessing aspect is managed using a contextmanager. The initial multiprocessing pool is setup, using a specified number of cores, and configured to allow keyboard interrupts. We then return a lazy map function (imap) which can be used just like Python’s standard map. This transparently divides the function calls for each file part over all available cores. Finally, the pool is cleaned up when the map is finished running.

@contextlib.contextmanager
def multimap(cores=None):
    if cores is None:
        cores = max(multiprocessing.cpu_count() - 1, 1)
    def wrapper(func):
        def wrap(self, timeout=None):
            return func(self, timeout=timeout if timeout is not None else 1e100)
        return wrap
    IMapIterator.next = wrapper(IMapIterator.next)
    pool = multiprocessing.Pool(cores)
    yield pool.imap
    pool.terminate()

The actual work of transferring each portion of the file is done using two functions. The helper function, mp_from_ids, uses the id information about the bucket, file key and multipart upload id to reconstitute a multipart upload object:

def mp_from_ids(mp_id, mp_keyname, mp_bucketname):
    conn = boto.connect_s3()
    bucket = conn.lookup(mp_bucketname)
    mp = boto.s3.multipart.MultiPartUpload(bucket)
    mp.key_name = mp_keyname
    mp.id = mp_id
    return mp

This object, together with the number of the file part and the file itself, are used to transfer that section of the file. The file part is removed after successful upload.

@map_wrap
def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part):
    mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname)
    print " Transferring", i, part
    with open(part) as t_handle:
        mp.upload_part_from_file(t_handle, i+1)
    os.remove(part)

When all sections, distributed over all processors, are finished, the multipart upload is signaled complete and Amazon finishes the process. Your file is now available on S3.

Parallel download

Download speeds can be maximized by utilizing several existing parallelized accelerators:

Combine these with the uploader to build up a cloud analysis workflow: move your data to S3, run a complex analysis pipeline on EC2, push the results back to S3, and then download them to local machines. Please share other tips and tricks you use to deal with Amazon file transfer in the comments.

Written by Brad Chapman

April 10, 2011 at 1:27 pm

16 Responses

Subscribe to comments with RSS.

  1. Awesome Brad :-!

    But now I wonder:

    1) In practice, what’s the “upload speedup” on using this approach ? 1.2X ? 2X ?

    2) Are you aware of any similar techniques for plain old rsync or other “pre-cloud” transfer protocols ?

    Thanks !
    Roman

    Roman Valls

    April 10, 2011 at 2:10 pm

  2. A similar approach using Ruby, multipart upload and separate threads to run in parallel:

    http://blog.vicecity.co.uk/post/4425574978/multipart-uploads-fog-threads-win

    Brad Chapman

    April 11, 2011 at 3:26 pm

  3. Hi Brad,

    I was implementing parallel multipart uploads with boto this weekend as well and wrote FileChunkIO (https://bitbucket.org/fabian/filechunkio/overview). Using that, you don’t need to split the files upfront and delete them afterwards. Instead, you just create FileChunkIO instances using the original file, tell them the offset and the amount of bytes and pass them to boto’s upload_part_from_file method.

    Best,
    Fabian

    Fabian Topfstedt

    April 17, 2011 at 8:59 am

  4. [...] and thought about how low the memory footprint and disk usage could be. Both Mitch Garnaat and Brad Chapman used the unix split command to create chunks first, doubling the disk usage. Others were creating [...]

  5. Thanks Brad ! I was having a look at this one:

    http://www.psc.edu/networking/projects/hpn-ssh/

    Which basically offloads cypher calculations on multiple threads… not sure if it actually performs file splitting and transfer, but the speed differences seem quite impressive.

    The drawback is that is a sshd patch, too intrusive/inconvenient for production (non-root) systems :-/

    Roman Valls

    April 21, 2011 at 8:08 am

  6. Here is the error message I got, debug=2 Any clue?


    header: Date: Mon, 25 Apr 2011 03:04:33 GMT
    header: Content-Type: application/xml
    header: Transfer-Encoding: chunked
    header: Server: AmazonS3
    Traceback (most recent call last):
    File “./s3multi.py”, line 135, in
    main(*args, **kwargs)
    File “./s3multi.py”, line 40, in main
    _multipart_upload(bucket, s3_key_name, transfer_file, mb_size, use_rr)
    File “./s3multi.py”, line 98, in _multipart_upload
    mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
    AttributeError: Bucket instance has no attribute ‘initiate_multipart_upload’

    player

    April 24, 2011 at 10:06 pm

  7. Sorry, the Ubuntu version is boto 1.9. Too old.

    player

    April 24, 2011 at 10:29 pm

  8. Hi Brad — this script is awesome. Thanks! Regarding the parallel download software you mentioned. Can you further elaborate on how you use that with S3? I’m not sure how to pass on my credentials to download files from S3 using them.

    Anonymous

    June 30, 2011 at 8:35 am

  9. [...] Parallel upload to Amazon S3 with python, boto and multiprocessing – One challenge with moving analysis pipelines to cloud resources like Amazon EC2 is figuring out the logistics of transferring files. Biological data is big; with the rapid adoption of new machines like the HiSeq and decreasing sequencing costs, the data transfer question isn’t going away soon. The use of Amazon in bioinformatics was brought up during a recent discussion on the BioStar question answer site. Deepak’s answer highlighted the role of parallelizing uploads and downloads to ease this transfer burden. Here I describe a method to improve upload speed by splitting over multiple processing cores. … [...]

  10. Howdy. This may be an ignorant question but … how long does it take to create an EBS volume from the S3 bucket? I’m getting the impression that it is extremely quick. Is that correct? ‘Cause there’s no way to compute directly from the bucket — data have to be accessed from an EBS, right?

    Cheers and thx for the great posts!

    Yannick Pouliot

    September 11, 2011 at 8:18 pm

    • Yannick;
      I use S3 as a long-term repository of files and copy over specific files needed for an analysis when an EBS store is created. There is some setup time, but it’s pretty fast since all the data is already at Amazon. You can also treat S3 as a filesystem with Fuse using s3fs:

      http://code.google.com/p/s3fs/wiki/FuseOverAmazon

      Brad Chapman

      September 12, 2011 at 8:10 am


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

Join 28 other followers