AWS Lambda Patterns: Cold Starts, Concurrency, and Best Practices

AWS Lambda runs code in response to events — S3 uploads, API Gateway requests, SQS messages, DynamoDB streams — without managing servers. Understanding cold starts, concurrency limits, memory/CPU trad

Introduction#

AWS Lambda runs code in response to events — S3 uploads, API Gateway requests, SQS messages, DynamoDB streams — without managing servers. Understanding cold starts, concurrency limits, memory/CPU tradeoffs, and proper error handling separates reliable Lambda-based systems from fragile ones.

Cold Starts#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import boto3
import json
import time

# Code outside the handler runs ONCE during cold start (initialization)
# Reused on subsequent warm invocations

# Move expensive initialization here — NOT inside the handler
print("Cold start: initializing...")
db_client = boto3.client("dynamodb")  # connection pool initialized once
ssm = boto3.client("ssm")

# Fetch config once at cold start
response = ssm.get_parameter(Name="/app/db-url", WithDecryption=True)
DB_URL = response["Parameter"]["Value"]

def handler(event, context):
    # This runs on every invocation
    start = time.time()

    # Handler is fast because db_client is already initialized
    result = db_client.get_item(
        TableName="orders",
        Key={"id": {"S": event["order_id"]}},
    )

    print(f"Duration: {(time.time()-start)*1000:.1f}ms")
    return result.get("Item", {})

# Cold start adds ~100ms-1s depending on:
# - Runtime (Python/Node: ~100ms, Java: ~1s+)
# - Package size (keep dependencies minimal)
# - VPC attachment (+1-2s for ENI creation; use PrivateLink instead)
# - Memory (more memory = more CPU = faster init)

Provisioned Concurrency#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# serverless.yml or CloudFormation: eliminate cold starts for critical functions

# serverless.yml
functions:
  api:
    handler: handler.main
    memorySize: 512
    timeout: 30
    provisionedConcurrency: 5  # keep 5 warm instances always-on
    environment:
      STAGE: production

# Trade-off: provisioned concurrency costs money even when idle
# Use for: user-facing APIs where cold start latency is unacceptable
# Don't use for: background processing, batch jobs, infrequent events

SQS Trigger: Batch Processing#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from dataclasses import dataclass
from typing import Any
import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

@dataclass
class FailedItem:
    item_identifier: str

def handler(event: dict, context: Any) -> dict:
    """
    SQS trigger: process messages in batch.
    Return failed item identifiers so SQS can retry them.
    """
    batch_item_failures = []

    for record in event["Records"]:
        message_id = record["messageId"]
        body = json.loads(record["body"])

        try:
            process_message(body)
            logger.info("Processed message %s", message_id)
        except Exception as e:
            logger.error("Failed to process %s: %s", message_id, e)
            # Report this item as failed — SQS will retry it
            # Other items in the batch can still succeed
            batch_item_failures.append({"itemIdentifier": message_id})

    return {"batchItemFailures": batch_item_failures}

def process_message(body: dict) -> None:
    # Your business logic here
    order_id = body["order_id"]
    logger.info("Processing order %s", order_id)

Lambda Layers for Shared Dependencies#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Create a Lambda layer for common Python dependencies
mkdir -p layer/python
pip install psycopg2-binary boto3 pydantic -t layer/python/

# Package the layer
cd layer && zip -r ../dependencies-layer.zip python/

# Upload via AWS CLI
aws lambda publish-layer-version \
    --layer-name python-dependencies \
    --zip-file fileb://dependencies-layer.zip \
    --compatible-runtimes python3.11 python3.12

# Reference in function (SAM template)
# Properties:
#   Layers:
#     - !Ref DependenciesLayer

Environment Variables and Secrets#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import os
import boto3
import json
from functools import lru_cache

# Never hardcode secrets in Lambda code or environment variables (visible in console)
# Use Secrets Manager or Parameter Store

@lru_cache(maxsize=None)
def get_secret(secret_name: str) -> dict:
    """Cached secret retrieval — fetched once per Lambda container lifetime."""
    client = boto3.client("secretsmanager")
    response = client.get_secret_value(SecretId=secret_name)
    return json.loads(response["SecretString"])

def get_db_connection():
    secrets = get_secret(os.environ["DB_SECRET_ARN"])
    # Use secrets["host"], secrets["password"], etc.
    return connect_to_db(secrets)

# Lambda execution role must have:
# secretsmanager:GetSecretValue on the specific secret ARN
# This is preferable to environment variables for sensitive data

Idempotency#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import hashlib
import boto3
from boto3.dynamodb.conditions import Attr

dynamodb = boto3.resource("dynamodb")
idempotency_table = dynamodb.Table("lambda-idempotency")

def ensure_idempotent(execution_id: str, ttl_seconds: int = 86400) -> bool:
    """
    Return True if this is a new execution (proceed).
    Return False if this execution was already processed.
    """
    import time
    expiry = int(time.time()) + ttl_seconds

    try:
        idempotency_table.put_item(
            Item={
                "execution_id": execution_id,
                "processed_at": int(time.time()),
                "ttl": expiry,
            },
            ConditionExpression=Attr("execution_id").not_exists(),
        )
        return True  # new execution
    except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
        return False  # already processed

def handler(event, context):
    # SQS message ID makes a good idempotency key
    for record in event["Records"]:
        execution_id = f"sqs-{record['messageId']}"

        if not ensure_idempotent(execution_id):
            print(f"Skipping duplicate message {execution_id}")
            continue

        process_message(json.loads(record["body"]))

Lambda Best Practices Summary#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Architecture:
- Keep functions single-purpose and small
- Use SQS with batch size > 1 for throughput; report batchItemFailures
- Avoid VPC unless you need private resources — add PrivateLink instead
- Use EventBridge for scheduled triggers, not cron hacks

Performance:
- Initialize connections and load config outside the handler
- Use Provisioned Concurrency for latency-sensitive functions
- Right-size memory: more memory = more CPU = faster execution
  (Lambda charges by GB*ms; 2x memory often costs same if 2x faster)
- Keep deployment package small (layers for dependencies)

Reliability:
- Make handlers idempotent (Lambda can invoke multiple times on failure)
- Set function timeout < SQS visibility timeout
- Use DLQ for failed events
- Report partial batch failures (batchItemFailures)

Observability:
- Structure logs as JSON (CloudWatch Logs Insights queries them)
- Use Lambda Powertools for tracing, logging, metrics
- Set up CloudWatch alarms on errors and duration P99

Conclusion#

Lambda’s strength is event-driven, stateless computation without infrastructure management. Cold starts are manageable with proper initialization patterns and provisioned concurrency for latency-sensitive paths. SQS integration with partial batch failure reporting enables robust message processing. Always design handlers to be idempotent — Lambda’s at-least-once delivery guarantee means duplicate invocations are expected. Keep packages small, initialize expensive resources outside the handler, and use Secrets Manager rather than environment variables for sensitive configuration.

Contents