Blue Collar Bioinformatics

Note: new posts have moved to http://bcb.io/ Please look there for the latest updates and comments

Parallel upload to Amazon S3 with python, boto and multiprocessing

with 41 comments

One challenge with moving analysis pipelines to cloud resources like Amazon EC2 is figuring out the logistics of transferring files. Biological data is big; with the rapid adoption of new machines like the HiSeq and decreasing sequencing costs, the data transfer question isn’t going away soon. The use of Amazon in bioinformatics was brought up during a recent discussion on the BioStar question answer site. Deepak’s answer highlighted the role of parallelizing uploads and downloads to ease this transfer burden. Here I describe a method to improve upload speed by splitting over multiple processing cores.

Amazon Simple Storage System (S3) provides relatively inexpensive cloud storage with their reduced redundancy storage option. S3, and all of Amazon’s cloud services, are accessible directly from Python using boto. By using boto’s multipart upload support, coupled with Python’s built in multiprocessing module, I’ll demonstrate maximizing transfer speeds to make uploading data less painful. The script is available from GitHub and requires the latest boto from GitHub (2.0b5 or better).

Parallel upload with multiprocessing

The overall process uses boto to connect to an S3 upload bucket, initialize a multipart transfer, split the file into multiple pieces, and then upload these pieces in parallel over multiple cores. Each processing core is passed a set of credentials to identify the transfer: the multipart upload identifier (mp.id), the S3 file key name (mp.key_name) and the S3 bucket name (mp.bucket_name).

import boto

conn = boto.connect_s3()
bucket = conn.lookup(bucket_name)
mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
with multimap(cores) as pmap:
    for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part)
                                  for (i, part) in
                                  enumerate(split_file(tarball, mb_size, cores)))):
        pass
mp.complete_upload()

The split_file function uses the unix split command to divide the file into sections, each of which will be uploaded separately.

def split_file(in_file, mb_size, split_num=5):
    prefix = os.path.join(os.path.dirname(in_file),
                          "%sS3PART" % (os.path.basename(s3_key_name)))
    split_size = int(min(mb_size / (split_num * 2.0), 250))
    if not os.path.exists("%saa" % prefix):
        cl = ["split", "-b%sm" % split_size, in_file, prefix]
        subprocess.check_call(cl)
    return sorted(glob.glob("%s*" % prefix))

The multiprocessing aspect is managed using a contextmanager. The initial multiprocessing pool is setup, using a specified number of cores, and configured to allow keyboard interrupts. We then return a lazy map function (imap) which can be used just like Python’s standard map. This transparently divides the function calls for each file part over all available cores. Finally, the pool is cleaned up when the map is finished running.

@contextlib.contextmanager
def multimap(cores=None):
    if cores is None:
        cores = max(multiprocessing.cpu_count() - 1, 1)
    def wrapper(func):
        def wrap(self, timeout=None):
            return func(self, timeout=timeout if timeout is not None else 1e100)
        return wrap
    IMapIterator.next = wrapper(IMapIterator.next)
    pool = multiprocessing.Pool(cores)
    yield pool.imap
    pool.terminate()

The actual work of transferring each portion of the file is done using two functions. The helper function, mp_from_ids, uses the id information about the bucket, file key and multipart upload id to reconstitute a multipart upload object:

def mp_from_ids(mp_id, mp_keyname, mp_bucketname):
    conn = boto.connect_s3()
    bucket = conn.lookup(mp_bucketname)
    mp = boto.s3.multipart.MultiPartUpload(bucket)
    mp.key_name = mp_keyname
    mp.id = mp_id
    return mp

This object, together with the number of the file part and the file itself, are used to transfer that section of the file. The file part is removed after successful upload.

@map_wrap
def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part):
    mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname)
    print " Transferring", i, part
    with open(part) as t_handle:
        mp.upload_part_from_file(t_handle, i+1)
    os.remove(part)

When all sections, distributed over all processors, are finished, the multipart upload is signaled complete and Amazon finishes the process. Your file is now available on S3.

Parallel download

Download speeds can be maximized by utilizing several existing parallelized accelerators:

Combine these with the uploader to build up a cloud analysis workflow: move your data to S3, run a complex analysis pipeline on EC2, push the results back to S3, and then download them to local machines. Please share other tips and tricks you use to deal with Amazon file transfer in the comments.

Written by Brad Chapman

April 10, 2011 at 1:27 pm

Posted in analysis

Tagged with , , ,

41 Responses

Subscribe to comments with RSS.

  1. Awesome Brad :-!

    But now I wonder:

    1) In practice, what’s the “upload speedup” on using this approach ? 1.2X ? 2X ?

    2) Are you aware of any similar techniques for plain old rsync or other “pre-cloud” transfer protocols ?

    Thanks !
    Roman

    Roman Valls

    April 10, 2011 at 2:10 pm

  2. A similar approach using Ruby, multipart upload and separate threads to run in parallel:

    http://blog.vicecity.co.uk/post/4425574978/multipart-uploads-fog-threads-win

    Brad Chapman

    April 11, 2011 at 3:26 pm

  3. Hi Brad,

    I was implementing parallel multipart uploads with boto this weekend as well and wrote FileChunkIO (https://bitbucket.org/fabian/filechunkio/overview). Using that, you don’t need to split the files upfront and delete them afterwards. Instead, you just create FileChunkIO instances using the original file, tell them the offset and the amount of bytes and pass them to boto’s upload_part_from_file method.

    Best,
    Fabian

    Fabian Topfstedt

    April 17, 2011 at 8:59 am

  4. […] and thought about how low the memory footprint and disk usage could be. Both Mitch Garnaat and Brad Chapman used the unix split command to create chunks first, doubling the disk usage. Others were creating […]

  5. Thanks Brad ! I was having a look at this one:

    http://www.psc.edu/networking/projects/hpn-ssh/

    Which basically offloads cypher calculations on multiple threads… not sure if it actually performs file splitting and transfer, but the speed differences seem quite impressive.

    The drawback is that is a sshd patch, too intrusive/inconvenient for production (non-root) systems :-/

    Roman Valls

    April 21, 2011 at 8:08 am

  6. Here is the error message I got, debug=2 Any clue?


    header: Date: Mon, 25 Apr 2011 03:04:33 GMT
    header: Content-Type: application/xml
    header: Transfer-Encoding: chunked
    header: Server: AmazonS3
    Traceback (most recent call last):
    File “./s3multi.py”, line 135, in
    main(*args, **kwargs)
    File “./s3multi.py”, line 40, in main
    _multipart_upload(bucket, s3_key_name, transfer_file, mb_size, use_rr)
    File “./s3multi.py”, line 98, in _multipart_upload
    mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
    AttributeError: Bucket instance has no attribute ‘initiate_multipart_upload’

    player

    April 24, 2011 at 10:06 pm

  7. Sorry, the Ubuntu version is boto 1.9. Too old.

    player

    April 24, 2011 at 10:29 pm

  8. Hi Brad — this script is awesome. Thanks! Regarding the parallel download software you mentioned. Can you further elaborate on how you use that with S3? I’m not sure how to pass on my credentials to download files from S3 using them.

    Anonymous

    June 30, 2011 at 8:35 am

    • Glad this helps. For downloading private datasets, you can get S3 to give you a temporary URL with your access key included. For instance with boto, do:


      >>> import boto
      >>> con = boto.connect_s3()
      >>> bucket = con.get_bucket("chapmanb")
      >>> item = bucket.get("example.fastq")
      >>> item.generate_url(20)
      https://chapmanb.s3.amazonaws.com/example.fastq?Signature=kxnEJkHurDbKz8JT0QwiCvk7UX4%3D&Expires=1309455559&AWSAccessKeyId=XXX

      Brad Chapman

      June 30, 2011 at 12:45 pm

  9. […] Parallel upload to Amazon S3 with python, boto and multiprocessing – One challenge with moving analysis pipelines to cloud resources like Amazon EC2 is figuring out the logistics of transferring files. Biological data is big; with the rapid adoption of new machines like the HiSeq and decreasing sequencing costs, the data transfer question isn’t going away soon. The use of Amazon in bioinformatics was brought up during a recent discussion on the BioStar question answer site. Deepak’s answer highlighted the role of parallelizing uploads and downloads to ease this transfer burden. Here I describe a method to improve upload speed by splitting over multiple processing cores. … […]

  10. Howdy. This may be an ignorant question but … how long does it take to create an EBS volume from the S3 bucket? I’m getting the impression that it is extremely quick. Is that correct? ‘Cause there’s no way to compute directly from the bucket — data have to be accessed from an EBS, right?

    Cheers and thx for the great posts!

    Yannick Pouliot

    September 11, 2011 at 8:18 pm

    • Yannick;
      I use S3 as a long-term repository of files and copy over specific files needed for an analysis when an EBS store is created. There is some setup time, but it’s pretty fast since all the data is already at Amazon. You can also treat S3 as a filesystem with Fuse using s3fs:

      http://code.google.com/p/s3fs/wiki/FuseOverAmazon

      Brad Chapman

      September 12, 2011 at 8:10 am

  11. Hello Brad, thank you very much for this. We are running into a little problem in production with this, because the upload is actually too fast, consuming all of our servers bandwidth. Is there a way to throttle the upload speed?

    Thomas Söhngen

    November 13, 2012 at 7:42 am

  12. One thing occurs to me – it looks like you’re unnecessarily modifying multiprocessing’s IMapIterator.next on every use of the multimap context manager. Is this a transformation that can just occur once, or is there something more going on which requires IMapIterator.next to be ‘rewrapped’ before the creation of each pool?

    Matt Wodrich

    November 26, 2012 at 9:02 am

    • Matt;
      This hack allows keyboard interrupts during multiprocessing pools. It’s been a while since I’ve revisited this but the fix is tame, setting a timeout:


      # Fix keyboard interrupts when using multiprocessing.pool.imap().
      # Usage:
      # import fix_multiprocessing.py
      from multiprocessing.pool import IMapIterator
      def wrapper(func):
      def wrap(self, timeout=None):
      # Note: the timeout of 1 googol seconds introduces a rather subtle
      # bug for Python scripts intended to run many times the age of the universe.
      return func(self, timeout=timeout if timeout is not None else 1e100)
      return wrap
      IMapIterator.next = wrapper(IMapIterator.next)

      http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

      I include it in the context handler purely to keep the high level usage clean.

      Brad Chapman

      November 26, 2012 at 3:08 pm

  13. Hello Brad,

    This script has been super helpful. I just hit a small snag and was wondering if you could point me in the right direction.

    I am getting this error on the upload of files under 150MB.

    boto.exception.S3ResponseError: S3ResponseError: 400 Bad Request
    EntityTooSmallYour proposed upload is smaller than the minimum allowed size3e0fd0d0b964805b0cc4a0cc1bd3824652428803145728

    Basically the split is creating a file under 5MB for files 150MB or smaller. What would I need to change to make sure the split doesn’t create a file under 5MB?

    Thanks in advance

    Dale

    December 4, 2012 at 3:15 pm

    • Dale;
      Glad to hear this is useful and thanks for letting me know about the issue. I updated the script to require splits of at least 5Mb which seems to avoid the problem:

      https://github.com/chapmanb/cloudbiolinux/blob/master/utils/s3_multipart_upload.py

      Let me know if you run into any more issues at all.

      Brad Chapman

      December 5, 2012 at 11:08 am

      • Thanks for the quick reply Brad. Great work. I was investigating further yesterday and realized that it should have worked even with the less than 5MB file. Based on the way multipart upload works the last part can be any size. Perhaps the last part wasn’t properly marked as such, was planning detailed research today. But your eliminates the problem either way!

        Thanks!

        Dale

        December 5, 2012 at 11:21 am

        • Also… Just a side note, we added two lines to the main section of the script to delete the file after upload.

          print “Deleting Transfer File”
          os.remove(transfer_file)

          We use the script to upload files for backup/archiving and the local copy isn’t needed. This has increased our upload performance to S3 by more than double. Was able to upload 2TB overnight from one server.

          Thanks!!!

          Dale

          December 5, 2012 at 2:27 pm

          • Dale;
            Thanks — really glad this works well for you. I think the complaint from AWS was if the individual file parts were less than 5Mb. I’d never run into this since I didn’t have enough parts on a small enough file but was able to replicate with a test file so hopefully it’ll work fine going forward.

            I’ll leave out the automatic removal part from the main script as I don’t want to accidentally delete anyone’s local files. Thanks again.

            Brad Chapman

            December 6, 2012 at 10:47 am

  14. I thought I would give this a try and get this error..

    Traceback (most recent call last):
    File “s3multipart.py”, line 137, in
    main(*args, **kwargs)
    File “s3multipart.py”, line 42, in main
    _multipart_upload(bucket, s3_key_name, transfer_file, mb_size, use_rr)
    File “s3multipart.py”, line 104, in _multipart_upload
    enumerate(split_file(tarball, mb_size, cores)))):
    File “s3multipart.py”, line 119, in wrap
    return func(self, timeout=timeout if timeout is not None else 1e100)
    File “C:\Python27\Lib\multiprocessing\pool.py”, line 626, in next
    raise value
    boto.exception.S3ResponseError: S3ResponseError: 400 Bad Request

    RequestTimeoutYour socket connection to the server
    was not read from or written to within the timeout period. Idle connections will
    be closed.B339A1790809C9C6qaSs7Ds1TQwB
    VKf4Vysw8R43IVOQ7x6ZUDFRpHrKXL32AY+h8FJvzkGFLL3unA+I

    Adam

    December 29, 2012 at 5:26 pm

    • Adam;
      Apologies, I missed this comment. WordPress had a rash of spam e-mail over the holidays and this got buried in cleaning up those.

      The error looks like you’re having trouble connecting to AWS. The script used boto under the covers and it would be worth debugging to see if you can connect to S3 at all from your machine:

      http://docs.pythonboto.org/en/latest/boto_config_tut.html

      If so, is this a persistent error or one-off? I can’t reproduce on my end so hopefully this helps

      Brad Chapman

      January 3, 2013 at 11:13 am

      • Thanks Brad for looking into it. Sorry to hear about the spam. I tried out a handful of competitive multipart scripts and this is the only one I had any trouble with connecting but I agree the message sure looks like something is having trouble connecting. This script looked the best out of all of them so I figured I would post my feedback just incase you had any ideas but not a big deal. Will assume its probably just my firewall blocking the unique method this script is using.

        Adam

        January 3, 2013 at 1:29 pm

        • Dear Adam, Were you able to resolve this problem? The script works fine on dev machine but giving same error on staging server. Thanks

          Prabhat

          September 3, 2013 at 11:56 am

          • No, not sure why this one fails and all the others work. I just ended up using another script.

            Adam Lane

            September 3, 2013 at 2:50 pm

  15. Hello Again Brad,

    I found a new issue which I am hoping you can help with. It seems that if the bucket name is under 9 characters long the script returns an error.

    Traceback (most recent call last):
    File “/usr/local/exelate/s3upload/s3_multipart_upload.py”, line 141, in
    main(*args, **kwargs)
    File “/usr/local/exelate/s3upload/s3_multipart_upload.py”, line 42, in main
    cores)
    File “/usr/local/exelate/s3upload/s3_multipart_upload.py”, line 101, in _multipart_upload
    mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr)
    AttributeError: ‘NoneType’ object has no attribute ‘initiate_multipart_upload’

    I have replicated this error with any bucket names under 9 characters. Any assistance would be appreciated.

    Thanks

    Dale

    January 28, 2013 at 8:43 pm

    • Further testing helped me realize that the issue is when there are capital letters in the name of the bucket. example – helpME or justforIT.

      Thanks

      Dale

      January 28, 2013 at 8:47 pm

      • Dale;
        Thanks for the report. The first issue you identified was that the script assumes the upload bucket exists. I pushed a fix that creates the bucket if not found.

        The uppercase issue appears to be a check that boto places on buckets:

        BotoClientError: Bucket names cannot contain upper-case characters when using either the sub-domain or virtual hosting calling format.

        It sounds like the recommended approach is lowercase bucket names:

        http://support.rightscale.com/15-References/Dashboard_Help_Text/s3

        Thanks again for the report and hope the fix works for you.

        Brad Chapman

        January 30, 2013 at 5:55 am

  16. Brad

    Is there a way to add to this script the function that it will only update changed files? Then it will only parallely-upload and multiprocess newer files and delete those not longer existing at source.

    Perhaps by verifying file size or time stamps similar to the rsync -update option?

    Flix Johns

    May 13, 2013 at 11:35 am

  17. hi brad-

    thanks for the blog post. i posted 2 issues on github relating to this script. i hope my pull request helps. I am unable to, however, get the multipart upload to complete successfully. Any advice would be greatly appreciated.

    thanks
    greg

    barchard

    May 17, 2013 at 1:48 pm

  18. Surely upload would be IO-bound rather than CPU-bound — does parallelising the upload really help?

    Anonymous

    July 26, 2013 at 6:26 am


Leave a comment