Introduction#
S3 is one of the most versatile AWS services — object storage, static website hosting, data lake foundation, event trigger source, and artifact store. Beyond basic put/get, S3 has patterns that solve common production problems: presigned URLs for direct browser uploads, multipart upload for large files, lifecycle policies for cost management, and event notifications for asynchronous processing pipelines.
Presigned URLs: Direct Browser Uploads#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import boto3
from botocore.exceptions import ClientError
import uuid
s3 = boto3.client("s3", region_name="us-east-1")
def generate_upload_url(
bucket: str,
user_id: str,
filename: str,
content_type: str,
max_size_bytes: int = 10 * 1024 * 1024, # 10MB
expiry_seconds: int = 3600,
) -> dict:
"""Generate a presigned URL for direct client-to-S3 upload."""
# Build a safe key with user isolation
ext = filename.rsplit(".", 1)[-1] if "." in filename else ""
key = f"uploads/{user_id}/{uuid.uuid4()}.{ext}"
# Use presigned POST for server-side validation
response = s3.generate_presigned_post(
Bucket=bucket,
Key=key,
Fields={
"Content-Type": content_type,
},
Conditions=[
{"Content-Type": content_type},
["content-length-range", 1, max_size_bytes], # enforce size limit
],
ExpiresIn=expiry_seconds,
)
return {
"upload_url": response["url"],
"fields": response["fields"],
"key": key,
"expires_in": expiry_seconds,
}
# Usage from a FastAPI endpoint:
from fastapi import FastAPI, Depends
app = FastAPI()
@app.post("/upload-url")
async def get_upload_url(
filename: str,
content_type: str,
current_user: User = Depends(get_current_user),
):
if content_type not in {"image/jpeg", "image/png", "image/webp", "application/pdf"}:
raise HTTPException(400, "Content type not allowed")
return generate_upload_url(
bucket="my-app-uploads",
user_id=str(current_user.id),
filename=filename,
content_type=content_type,
)
Client-Side Upload with Presigned POST#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Browser-side: upload directly to S3 (no proxy through your server)
async function uploadFile(file, presignedData) {
const formData = new FormData();
// Fields from presigned POST must come first
for (const [key, value] of Object.entries(presignedData.fields)) {
formData.append(key, value);
}
// File must be last
formData.append("file", file);
const response = await fetch(presignedData.upload_url, {
method: "POST",
body: formData,
});
if (!response.ok) {
throw new Error(`Upload failed: ${response.status}`);
}
return presignedData.key;
}
Multipart Upload for Large Files#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import boto3
from boto3.s3.transfer import TransferConfig
s3 = boto3.client("s3")
s3_resource = boto3.resource("s3")
def upload_large_file(
file_path: str,
bucket: str,
key: str,
part_size_mb: int = 8,
) -> None:
"""Upload a large file using multipart upload with progress."""
config = TransferConfig(
multipart_threshold=8 * 1024 * 1024, # 8MB threshold
max_concurrency=4, # parallel part uploads
multipart_chunksize=part_size_mb * 1024 * 1024,
use_threads=True,
)
total_size = os.path.getsize(file_path)
uploaded = 0
def progress(bytes_transferred):
nonlocal uploaded
uploaded += bytes_transferred
pct = (uploaded / total_size) * 100
print(f"\rUploading: {pct:.1f}%", end="", flush=True)
s3_resource.upload_file(
file_path,
bucket,
key,
Config=config,
Callback=progress,
ExtraArgs={"StorageClass": "STANDARD"},
)
print()
# Manual multipart upload (for streaming data without a known size)
def stream_multipart_upload(bucket: str, key: str, data_generator):
"""Upload a stream of data using multipart upload."""
mpu = s3.create_multipart_upload(Bucket=bucket, Key=key)
upload_id = mpu["UploadId"]
parts = []
try:
for part_number, chunk in enumerate(data_generator, start=1):
if len(chunk) < 5 * 1024 * 1024 and part_number > 1:
# S3 requires minimum 5MB per part (except last)
raise ValueError(f"Part {part_number} too small")
response = s3.upload_part(
Body=chunk,
Bucket=bucket,
Key=key,
PartNumber=part_number,
UploadId=upload_id,
)
parts.append({"PartNumber": part_number, "ETag": response["ETag"]})
print(f"Uploaded part {part_number}")
s3.complete_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
print(f"Completed multipart upload to s3://{bucket}/{key}")
except Exception:
s3.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id)
raise
S3 Event Notifications → Lambda#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Lambda function triggered by S3 ObjectCreated events
import json
import boto3
from urllib.parse import unquote_plus
s3 = boto3.client("s3")
def handler(event, context):
for record in event["Records"]:
bucket = record["s3"]["bucket"]["name"]
key = unquote_plus(record["s3"]["object"]["key"])
size = record["s3"]["object"]["size"]
event_type = record["eventName"]
print(f"Event: {event_type} for s3://{bucket}/{key} ({size} bytes)")
if key.startswith("uploads/") and event_type.startswith("ObjectCreated"):
process_upload(bucket, key)
def process_upload(bucket: str, key: str) -> None:
# Download to /tmp (Lambda has 512MB /tmp by default)
local_path = f"/tmp/{key.split('/')[-1]}"
s3.download_file(bucket, key, local_path)
# Process: generate thumbnail, extract metadata, virus scan, etc.
metadata = extract_metadata(local_path)
# Save processed result
result_key = key.replace("uploads/", "processed/")
s3.upload_file(local_path, bucket, result_key, ExtraArgs={
"Metadata": {k: str(v) for k, v in metadata.items()}
})
# Update database
update_file_record(key, result_key, metadata)
Lifecycle Policies for Cost Management#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# Terraform: S3 lifecycle policy
resource "aws_s3_bucket_lifecycle_configuration" "uploads" {
bucket = aws_s3_bucket.uploads.id
rule {
id = "transition-to-intelligent-tiering"
status = "Enabled"
transition {
days = 30
storage_class = "INTELLIGENT_TIERING"
}
transition {
days = 90
storage_class = "GLACIER_IR" # Instant retrieval
}
transition {
days = 365
storage_class = "DEEP_ARCHIVE" # Cheapest, 12hr retrieval
}
expiration {
days = 2555 # 7 years, for compliance
}
}
rule {
id = "delete-incomplete-multipart"
status = "Enabled"
abort_incomplete_multipart_upload {
days_after_initiation = 7 # clean up abandoned uploads
}
}
rule {
id = "delete-temp-files"
status = "Enabled"
filter {
prefix = "tmp/"
}
expiration {
days = 1
}
}
}
S3 Select: Query Object Contents#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import json
def query_csv_in_s3(bucket: str, key: str, sql: str) -> list[dict]:
"""Run SQL query directly on a CSV file in S3 without downloading it."""
response = s3.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType="SQL",
Expression=sql,
InputSerialization={
"CSV": {"FileHeaderInfo": "USE"},
"CompressionType": "NONE",
},
OutputSerialization={"JSON": {"RecordDelimiter": "\n"}},
)
records = []
for event in response["Payload"]:
if "Records" in event:
data = event["Records"]["Payload"].decode("utf-8")
for line in data.strip().split("\n"):
if line:
records.append(json.loads(line))
return records
# Instead of downloading a 10GB CSV, query just the rows you need:
results = query_csv_in_s3(
bucket="data-lake",
key="events/2025/01/events.csv",
sql="SELECT * FROM S3Object WHERE status = 'error' LIMIT 100",
)
Conclusion#
S3’s presigned POST pattern removes your API server from the upload path — clients upload directly to S3 at the bandwidth they negotiate with AWS, rather than routing gigabytes through your fleet. Multipart upload is mandatory for files above 100MB and improves reliability for anything above 5MB through parallel transfers. S3 event notifications to Lambda enable async processing pipelines that scale automatically with upload volume. Lifecycle policies are free cost optimizations that should be configured on every bucket containing data with predictable access patterns.