None aws_s3

Working with S3

One of the most common operations when working with Amazon S3 (Amazon Simple Storage Service) is to pull data from s3 to local as well as push data from local to s3. We can use aws command line tool to achieve this:

# e.g. from s3 to local, add --recursive if it's a directory
aws s3 cp <s3 path> <local path> --recursive

We'll also demonstrate how to use boto3 to perform these kind of operations in Python.

In [1]:
%load_ext watermark
%load_ext autoreload
%autoreload 2

import os
import json
import boto3
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from time import perf_counter
from typing import List

%watermark -a 'Ethen' -d -u -v -iv
Author: Ethen

Last updated: 2023-05-19

Python implementation: CPython
Python version       : 3.8.10
IPython version      : 8.8.0

pandas : 1.5.2
pyarrow: 10.0.1
boto3  : 1.26.49
numpy  : 1.23.2
json   : 2.0.9

In [2]:
# replace these top level configuration, especially s3 region and bucket
region_name = "us-east-1"
s3_bucket = ""
s3_json_path = "ethenliu/test.json"
s3_dir = "ethenliu/data"
local_dir = "ethenliu/data"
s3_parquet_path = os.path.join(s3_dir, "test.parquet")

s3_client = boto3.client("s3", region_name=region_name)

Suppose we have a python object in memory, one option is to use client's put_object method and save it as a json file.

In [3]:
# json dumps doesn't allow saving numpy array directly, we need to convert it to a list
prediction = {
    "ids": [1, 2],
    "embeddings": np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]).tolist()
}
prediction
Out[3]:
{'ids': [1, 2], 'embeddings': [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]}
In [4]:
response = s3_client.put_object(
    Body=json.dumps(prediction),
    Bucket=s3_bucket,
    Key=s3_json_path
)

Upload and Download Parquet Files

All of this is well and good until we work with some large python objects, which we'll encounter errors such as entity too large error.

Directly copied from S3's documentation

  • Upload an object in a single operation by using the AWS SDKs, REST API, or AWS CLI – With a single PUT operation, you can upload a single object up to 5 GB in size.
  • Upload an object in parts by using the AWS SDKs, REST API, or AWS CLI – Using the multipart upload API operation, you can upload a single large object, up to 5 TB in size.

Fortunately, we can rely on upload_file method, boto3 will automatically use multipart upload underneath the hood without us having to worry about lower level functions related to multipart upload. The following code chunk shows how to save our python object as a parquet file and upload it to s3 as well as downloading files from s3 to local and reading it as a pandas dataframe.

In [5]:
def save_as_parquet_to_s3(data, s3_bucket: str, s3_path: str, verbose: bool = False):
    """Saves the dictionary as a parquet file and push it to s3.    
    """
    file_name = os.path.split(s3_path)[-1]
    pa_table = pa.table(data)
    pq.write_table(pa_table, file_name)

    s3_client.upload_file(Filename=file_name, Bucket=s3_bucket, Key=s3_path)
    os.remove(file_name)
    if verbose:
        print(pa_table)
        print("Finish writing {} to s3://{}/{}".format(file_name, s3_bucket, s3_path))

    return
In [6]:
# convert 2d numpy array to list of 1d numpy array as pyarrow supports saving 1d numpy array
embeddings = np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
prediction = {
    "ids": [1, 2],
    "embeddings": [embedding for embedding in embeddings]
}
save_as_parquet_to_s3(prediction, s3_bucket, s3_parquet_path)
In [7]:
def download_files_from_s3(
    s3_bucket: str,
    s3_path: str,
    local_path: str,
    n_jobs: int
) -> List[str]:
    """
    Download files under a s3 bucket & path to a specified local path.
    n_jobs configures the number of threads used for downloading files in parallel.

    Returns
    -------
    download_paths : 
        list of files that were downloaded their corresponding local path.
    """
    os.makedirs(local_path, exist_ok=True)

    def download_file(object_key, local_path, s3_client, s3_bucket):
        download_path = os.path.join(local_path, os.path.split(object_key)[-1])
        s3_client.download_file(s3_bucket, object_key, download_path)
        return download_path

    # use clients instead of resource, as they are thread safe
    # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients
    s3_client = boto3.client("s3")

    # leverage paginator to avoid list object's limitation of returning 1000 objects at a time
    # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html
    paginator = s3_client.get_paginator("list_objects_v2")
    page_iterator = paginator.paginate(Bucket=s3_bucket, Prefix=s3_path)

    parallel = Parallel(n_jobs=n_jobs, backend="threading")
    page_download_paths = []
    for page in page_iterator:
        object_keys = [content["Key"] for content in page["Contents"] if "_SUCCESS" not in content["Key"]]
        download_paths = parallel(
            delayed(download_file)(object_key, local_path, s3_client, s3_bucket)
            for object_key in object_keys
        )
        page_download_paths.extend(download_paths)

    return page_download_paths
In [8]:
if not os.path.exists(local_dir):
    print("download files from s3")
    start = perf_counter()
    files = download_files_from_s3(s3_bucket, s3_dir, local_dir)
    end = perf_counter()
    print("download files from s3 elapsed: ", end - start)

df = pd.read_parquet(local_dir)
df
Out[8]:
ids embeddings
0 1 [1.0, 2.0, 3.0]
1 2 [4.0, 5.0, 6.0]

Instead of downloading our parquet files to disk first, we can also read it directly into memory by wrapping the bytes object in a pyarrow BufferReader followed with read_table.

In [9]:
def list_s3_object(s3_bucket: str, s3_dir: str):
    s3 = boto3.resource("s3")
    bucket = s3.Bucket(s3_bucket)

    objects = bucket.objects.filter(Prefix=s3_dir)
    objects = [obj for obj in objects if "_SUCCESS" not in obj.key]
    return objects
In [10]:
def read_s3_parquet_object(s3_object):
    """
    https://stackoverflow.com/questions/58061225/read-a-parquet-bytes-object-in-python
    """
    body = s3_object.get()["Body"].read()
    reader = pa.BufferReader(body)
    table = pq.read_table(reader)
    df = table.to_pandas()
    return df
In [11]:
s3_objects = list_s3_object(s3_bucket, s3_dir)
df_list = [read_s3_parquet_object(s3_object) for s3_object in s3_objects]
df = pd.concat(df_list, ignore_index=True)
df
Out[11]:
ids embeddings
0 1 [1.0, 2.0, 3.0]
1 2 [4.0, 5.0, 6.0]

Additional Tips

When conducting heavy write operations, we might encounter slow down: please reduce your request rate related errors. Although Amazon S3 has announced performance improvements, it might not be enough for our use case. When countered with these type of situations, the two most common suggestions would be to: 1. add retry. 2. add prefixes/partitions.

When encountering 503 slown down error or other errors that we suspect to be not from our client, we could leverage built-in retry with exponential backoff and jitter capability [3] [4]. Meaning after we receive a server or throttling related error, we use progressively longer wait time with some noise added to it between each retries. For example, we can explicitly configure our boto3 client with the retry max attempt as well as retry mode (algorithm that's used for conducting the retry).

from botocore.config import Config

config = Config(
    retries={
        "max_attempts": 10,
        "mode": "standard"
    }
)
s3_client = boto3.client("s3", config=config)

If then new limits and adjusting retry still prove to be insufficient. Prefixes would need to be used, which is adding any string between a bucket name and an object name, for example:

  • bucket/1/file
  • bucket/2/file

Prefixes of the object file would be: /1/, /2/. In this example, if we spread write across all 2 prefixes evenly, we can achieve double the throughput.

import random

# we can configure the partition base on use case
num_partition = 10
random_part = str(random.randint(1, num_partition))
key = os.path.join(s3_path, random_part, file_name)
s3_client.upload_file(Filename=file_name, Bucket=s3_bucket, Key=key)

The offical documentation also has further suggestions on optimizing S3 performance [2].

Reference

  • [1] Blog: How to use Boto3 to upload files to an S3 Bucket?
  • [2] AWS Documentation: Best practices design patterns: optimizing Amazon S3 performance
  • [3] Boto3 Documentation: Retries
  • [4] AWS Documentation: Error retries and exponential backoff in AWS
  • [5] Blog: How to use Boto3 to download multiple files from S3 in parallel?