None
One of the most common operations when working with Amazon S3 (Amazon Simple Storage Service) is to pull data from s3 to local as well as push data from local to s3. We can use aws command line tool to achieve this:
# e.g. from s3 to local, add --recursive if it's a directory
aws s3 cp <s3 path> <local path> --recursive
We'll also demonstrate how to use boto3
to perform these kind of operations in Python.
%load_ext watermark
%load_ext autoreload
%autoreload 2
import os
import json
import boto3
import shutil
import fnmatch
import numpy as np
import pandas as pd
import pyarrow as pa
import awswrangler as wr
import pyarrow.parquet as pq
from joblib import Parallel, delayed
from time import perf_counter
from typing import List
%watermark -a 'Ethen' -d -u -v -iv
# replace these top level configuration, especially s3 region and bucket
region_name = "us-east-1"
s3_bucket =
s3_client = boto3.client("s3", region_name=region_name)
Suppose we have a python object in memory, one option is to use client's put_object
method and save it as a json file.
# json dumps doesn't allow saving numpy array directly, we need to convert it to a list
prediction = {
"ids": [1, 2],
"embeddings": np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]).tolist()
}
prediction
s3_json_path = "ethenliu/data/test.json"
response = s3_client.put_object(
Body=json.dumps(prediction),
Bucket=s3_bucket,
Key=s3_json_path
)
All of this is well and good until we work with some large python objects, which we'll encounter errors such as entity being too large.
Directly copied from S3's documentation
- Upload an object in a single operation by using the AWS SDKs, REST API, or AWS CLI – With a single PUT operation, you can upload a single object up to 5 GB in size.
- Upload an object in parts by using the AWS SDKs, REST API, or AWS CLI – Using the multipart upload API operation, you can upload a single large object, up to 5 TB in size.
Fortunately, we can rely on upload_file
method, boto3 will automatically use multipart upload underneath the hood without us having to worry about lower level functions related to multipart upload. The following code chunk shows how to save our python object as a parquet file and upload it to s3 as well as downloading files from s3 to local and reading it as a pandas dataframe.
def save_as_parquet_to_s3(data, s3_bucket: str, s3_path: str, verbose: bool = False):
"""Saves the dictionary as a parquet file and push it to s3.
"""
file_name = os.path.split(s3_path)[-1]
pa_table = pa.table(data)
pq.write_table(pa_table, file_name)
s3_client.upload_file(Filename=file_name, Bucket=s3_bucket, Key=s3_path)
os.remove(file_name)
if verbose:
print(pa_table)
print("Finish writing {} to s3://{}/{}".format(file_name, s3_bucket, s3_path))
return
# convert 2d numpy array to list of 1d numpy array as pyarrow supports saving 1d numpy array
embeddings = np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
prediction = {
"ids": [1, 2],
"embeddings": [embedding for embedding in embeddings]
}
s3_parquet_path = "ethenliu/data/test.parquet"
save_as_parquet_to_s3(prediction, s3_bucket, s3_parquet_path)
def download_files_from_s3(
s3_bucket: str,
s3_path: str,
local_path: str,
pattern: str = "*.parquet",
n_jobs: int = 2
) -> List[str]:
"""
Download files under a s3 bucket & path to a specified local path.
n_jobs configures the number of threads used for downloading files in parallel.
Returns
-------
download_paths :
list of files that were downloaded their corresponding local path.
"""
os.makedirs(local_path, exist_ok=True)
def download_file(object_key, local_path, s3_client, s3_bucket):
download_path = os.path.join(local_path, os.path.split(object_key)[-1])
s3_client.download_file(s3_bucket, object_key, download_path)
return download_path
# use clients instead of resource, as they are thread safe
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients
s3_client = boto3.client("s3")
# leverage paginator to avoid list object's limitation of returning 1000 objects at a time
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html
paginator = s3_client.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(Bucket=s3_bucket, Prefix=s3_path)
parallel = Parallel(n_jobs=n_jobs, backend="threading")
page_download_paths = []
for page in page_iterator:
object_keys = [content["Key"] for content in page["Contents"] if fnmatch.fnmatch(content["Key"], pattern)]
download_paths = parallel(
delayed(download_file)(object_key, local_path, s3_client, s3_bucket)
for object_key in object_keys
)
page_download_paths.extend(download_paths)
return page_download_paths
s3_dir = "ethenliu/data"
local_dir = "ethenliu/data"
if not os.path.exists(local_dir):
print("download files from s3")
start = perf_counter()
files = download_files_from_s3(s3_bucket, s3_dir, local_dir)
end = perf_counter()
print("download files from s3 elapsed: ", end - start)
df = pd.read_parquet(local_dir)
df
We can also upload an entire local directory to s3, and remove the local copy once complete.
def upload_s3(local_path: str, s3_path: str):
s3_client = boto3.client('s3')
bucket, _, key = s3_path.replace("s3://", "").partition("/")
key = key.rstrip("/")
for root, dirs, files in os.walk(local_path):
for file_name in files:
local_file_path = os.path.join(root, file_name)
relative_path = os.path.relpath(local_file_path, local_path)
s3_file_path = os.path.join(key, relative_path)
s3_client.upload_file(local_file_path, bucket, s3_file_path)
shutil.rmtree(local_path)
s3_dir = f"s3://{s3_bucket}/ethenliu/data_upload"
upload_s3(local_dir, s3_dir)
Instead of downloading our parquet files to disk first, we can also read it directly into memory by wrapping the bytes object in a pyarrow BufferReader
followed with read_table
.
def list_s3_objects(s3_bucket: str, s3_path: str, pattern: str = "*.parquet"):
s3_client = boto3.client("s3")
# leverage paginator to avoid list object's limitation of returning 1000 objects at a time
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html
paginator = s3_client.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(Bucket=s3_bucket, Prefix=s3_path)
page_download_paths = []
for page in page_iterator:
for content in page["Contents"]:
if fnmatch.fnmatch(content["Key"], pattern):
obj = s3_client.get_object(Bucket=s3_bucket, Key=content["Key"])
page_download_paths.append(obj)
return page_download_paths
def read_s3_parquet_object(s3_object):
"""
https://stackoverflow.com/questions/58061225/read-a-parquet-bytes-object-in-python
"""
body = s3_object["Body"].read()
reader = pa.BufferReader(body)
table = pq.read_table(reader)
df = table.to_pandas()
return df
s3_dir = "ethenliu/data/"
s3_objects = list_s3_objects(s3_bucket, s3_dir)
df_list = [read_s3_parquet_object(s3_object) for s3_object in s3_objects]
df = pd.concat(df_list, ignore_index=True)
df
The upload and download functionality is technically quite generic and allows for us to use it as part of any object. If uploading/reading parquet file is all we need, then we could instead leverage awswrangler.
path = f"s3://{s3_bucket}/ethenliu/data_awswrangler/test.parquet"
wr.s3.to_parquet(
df=df,
path=path,
dataset=True
)
df = wr.s3.read_parquet(path)
print(df.shape)
df.head()
When conducting heavy write operations, we might encounter slow down: please reduce your request rate related errors. Although Amazon S3 has announced performance improvements, it might not be enough for our use case. When countered with these type of situations, the two most common suggestions would be to: 1. add retry. 2. add prefixes/partitions.
When encountering 503 slown down error or other errors that we suspect to be not from our client, we could leverage built-in retry with exponential backoff and jitter capability [3] [4]. Meaning after we receive a server or throttling related error, we use progressively longer wait time with some noise added to it between each retries. For example, we can explicitly configure our boto3 client with the retry max attempt as well as retry mode (algorithm that's used for conducting the retry).
from botocore.config import Config
config = Config(
retries={
"max_attempts": 10,
"mode": "standard"
}
)
s3_client = boto3.client("s3", config=config)
If then new limits and adjusting retry still prove to be insufficient. Prefixes would need to be used, which is adding any string between a bucket name and an object name, for example:
Prefixes of the object file
would be: /1/
, /2/
. In this example, if we spread write across all 2 prefixes evenly, we can achieve double the throughput.
import random
# we can configure the partition base on use case
num_partition = 10
random_part = str(random.randint(1, num_partition))
key = os.path.join(s3_path, random_part, file_name)
s3_client.upload_file(Filename=file_name, Bucket=s3_bucket, Key=key)
The offical documentation also has further suggestions on optimizing S3 performance [2].