AWS S3 Patterns: Presigned URLs, Multipart Upload, and Event Processing

S3 is one of the most versatile AWS services — object storage, static website hosting, data lake foundation, event trigger source, and artifact store. Beyond basic put/get, S3 has patterns that solve

Introduction#

S3 is one of the most versatile AWS services — object storage, static website hosting, data lake foundation, event trigger source, and artifact store. Beyond basic put/get, S3 has patterns that solve common production problems: presigned URLs for direct browser uploads, multipart upload for large files, lifecycle policies for cost management, and event notifications for asynchronous processing pipelines.

Presigned URLs: Direct Browser Uploads#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import boto3
from botocore.exceptions import ClientError
import uuid

s3 = boto3.client("s3", region_name="us-east-1")

def generate_upload_url(
    bucket: str,
    user_id: str,
    filename: str,
    content_type: str,
    max_size_bytes: int = 10 * 1024 * 1024,  # 10MB
    expiry_seconds: int = 3600,
) -> dict:
    """Generate a presigned URL for direct client-to-S3 upload."""
    # Build a safe key with user isolation
    ext = filename.rsplit(".", 1)[-1] if "." in filename else ""
    key = f"uploads/{user_id}/{uuid.uuid4()}.{ext}"

    # Use presigned POST for server-side validation
    response = s3.generate_presigned_post(
        Bucket=bucket,
        Key=key,
        Fields={
            "Content-Type": content_type,
        },
        Conditions=[
            {"Content-Type": content_type},
            ["content-length-range", 1, max_size_bytes],  # enforce size limit
        ],
        ExpiresIn=expiry_seconds,
    )

    return {
        "upload_url": response["url"],
        "fields": response["fields"],
        "key": key,
        "expires_in": expiry_seconds,
    }

# Usage from a FastAPI endpoint:
from fastapi import FastAPI, Depends

app = FastAPI()

@app.post("/upload-url")
async def get_upload_url(
    filename: str,
    content_type: str,
    current_user: User = Depends(get_current_user),
):
    if content_type not in {"image/jpeg", "image/png", "image/webp", "application/pdf"}:
        raise HTTPException(400, "Content type not allowed")

    return generate_upload_url(
        bucket="my-app-uploads",
        user_id=str(current_user.id),
        filename=filename,
        content_type=content_type,
    )

Client-Side Upload with Presigned POST#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Browser-side: upload directly to S3 (no proxy through your server)
async function uploadFile(file, presignedData) {
  const formData = new FormData();

  // Fields from presigned POST must come first
  for (const [key, value] of Object.entries(presignedData.fields)) {
    formData.append(key, value);
  }

  // File must be last
  formData.append("file", file);

  const response = await fetch(presignedData.upload_url, {
    method: "POST",
    body: formData,
  });

  if (!response.ok) {
    throw new Error(`Upload failed: ${response.status}`);
  }

  return presignedData.key;
}

Multipart Upload for Large Files#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import boto3
from boto3.s3.transfer import TransferConfig

s3 = boto3.client("s3")
s3_resource = boto3.resource("s3")

def upload_large_file(
    file_path: str,
    bucket: str,
    key: str,
    part_size_mb: int = 8,
) -> None:
    """Upload a large file using multipart upload with progress."""
    config = TransferConfig(
        multipart_threshold=8 * 1024 * 1024,      # 8MB threshold
        max_concurrency=4,                          # parallel part uploads
        multipart_chunksize=part_size_mb * 1024 * 1024,
        use_threads=True,
    )

    total_size = os.path.getsize(file_path)
    uploaded = 0

    def progress(bytes_transferred):
        nonlocal uploaded
        uploaded += bytes_transferred
        pct = (uploaded / total_size) * 100
        print(f"\rUploading: {pct:.1f}%", end="", flush=True)

    s3_resource.upload_file(
        file_path,
        bucket,
        key,
        Config=config,
        Callback=progress,
        ExtraArgs={"StorageClass": "STANDARD"},
    )
    print()

# Manual multipart upload (for streaming data without a known size)
def stream_multipart_upload(bucket: str, key: str, data_generator):
    """Upload a stream of data using multipart upload."""
    mpu = s3.create_multipart_upload(Bucket=bucket, Key=key)
    upload_id = mpu["UploadId"]
    parts = []

    try:
        for part_number, chunk in enumerate(data_generator, start=1):
            if len(chunk) < 5 * 1024 * 1024 and part_number > 1:
                # S3 requires minimum 5MB per part (except last)
                raise ValueError(f"Part {part_number} too small")

            response = s3.upload_part(
                Body=chunk,
                Bucket=bucket,
                Key=key,
                PartNumber=part_number,
                UploadId=upload_id,
            )
            parts.append({"PartNumber": part_number, "ETag": response["ETag"]})
            print(f"Uploaded part {part_number}")

        s3.complete_multipart_upload(
            Bucket=bucket,
            Key=key,
            UploadId=upload_id,
            MultipartUpload={"Parts": parts},
        )
        print(f"Completed multipart upload to s3://{bucket}/{key}")

    except Exception:
        s3.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id)
        raise

S3 Event Notifications → Lambda#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Lambda function triggered by S3 ObjectCreated events
import json
import boto3
from urllib.parse import unquote_plus

s3 = boto3.client("s3")

def handler(event, context):
    for record in event["Records"]:
        bucket = record["s3"]["bucket"]["name"]
        key = unquote_plus(record["s3"]["object"]["key"])
        size = record["s3"]["object"]["size"]
        event_type = record["eventName"]

        print(f"Event: {event_type} for s3://{bucket}/{key} ({size} bytes)")

        if key.startswith("uploads/") and event_type.startswith("ObjectCreated"):
            process_upload(bucket, key)

def process_upload(bucket: str, key: str) -> None:
    # Download to /tmp (Lambda has 512MB /tmp by default)
    local_path = f"/tmp/{key.split('/')[-1]}"
    s3.download_file(bucket, key, local_path)

    # Process: generate thumbnail, extract metadata, virus scan, etc.
    metadata = extract_metadata(local_path)

    # Save processed result
    result_key = key.replace("uploads/", "processed/")
    s3.upload_file(local_path, bucket, result_key, ExtraArgs={
        "Metadata": {k: str(v) for k, v in metadata.items()}
    })

    # Update database
    update_file_record(key, result_key, metadata)

Lifecycle Policies for Cost Management#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# Terraform: S3 lifecycle policy
resource "aws_s3_bucket_lifecycle_configuration" "uploads" {
  bucket = aws_s3_bucket.uploads.id

  rule {
    id     = "transition-to-intelligent-tiering"
    status = "Enabled"

    transition {
      days          = 30
      storage_class = "INTELLIGENT_TIERING"
    }

    transition {
      days          = 90
      storage_class = "GLACIER_IR"  # Instant retrieval
    }

    transition {
      days          = 365
      storage_class = "DEEP_ARCHIVE"  # Cheapest, 12hr retrieval
    }

    expiration {
      days = 2555  # 7 years, for compliance
    }
  }

  rule {
    id     = "delete-incomplete-multipart"
    status = "Enabled"

    abort_incomplete_multipart_upload {
      days_after_initiation = 7  # clean up abandoned uploads
    }
  }

  rule {
    id     = "delete-temp-files"
    status = "Enabled"

    filter {
      prefix = "tmp/"
    }

    expiration {
      days = 1
    }
  }
}

S3 Select: Query Object Contents#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import json

def query_csv_in_s3(bucket: str, key: str, sql: str) -> list[dict]:
    """Run SQL query directly on a CSV file in S3 without downloading it."""
    response = s3.select_object_content(
        Bucket=bucket,
        Key=key,
        ExpressionType="SQL",
        Expression=sql,
        InputSerialization={
            "CSV": {"FileHeaderInfo": "USE"},
            "CompressionType": "NONE",
        },
        OutputSerialization={"JSON": {"RecordDelimiter": "\n"}},
    )

    records = []
    for event in response["Payload"]:
        if "Records" in event:
            data = event["Records"]["Payload"].decode("utf-8")
            for line in data.strip().split("\n"):
                if line:
                    records.append(json.loads(line))
    return records

# Instead of downloading a 10GB CSV, query just the rows you need:
results = query_csv_in_s3(
    bucket="data-lake",
    key="events/2025/01/events.csv",
    sql="SELECT * FROM S3Object WHERE status = 'error' LIMIT 100",
)

Conclusion#

S3’s presigned POST pattern removes your API server from the upload path — clients upload directly to S3 at the bandwidth they negotiate with AWS, rather than routing gigabytes through your fleet. Multipart upload is mandatory for files above 100MB and improves reliability for anything above 5MB through parallel transfers. S3 event notifications to Lambda enable async processing pipelines that scale automatically with upload volume. Lifecycle policies are free cost optimizations that should be configured on every bucket containing data with predictable access patterns.

Contents