Zero Trust Architecture Principles for Backend Engineers

Zero trust operates on one principle: "never trust, always verify." Traditional perimeter security assumed everything inside the network was safe. Zero trust assumes the network is hostile regardless

Introduction#

Zero trust operates on one principle: “never trust, always verify.” Traditional perimeter security assumed everything inside the network was safe. Zero trust assumes the network is hostile regardless of location — inside or outside the corporate perimeter. This shift matters as workloads move to cloud, employees work remotely, and lateral movement after breaches becomes the primary attack vector.

Core Principles#

  1. Verify explicitly: authenticate and authorize every request based on all available data points (identity, location, device, service context).
  2. Use least privilege access: grant minimum required permissions, time-limited where possible.
  3. Assume breach: design for containment. Minimize blast radius. Log everything.

Identity-Based Access (Not Network-Based)#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# OLD: "is this request coming from the internal network?"
def is_internal_request(request: Request) -> bool:
    client_ip = request.client.host
    return ipaddress.ip_address(client_ip).is_private

# ZERO TRUST: verify identity regardless of network location
async def verify_service_identity(request: Request) -> ServiceIdentity:
    # Verify mTLS client certificate
    cert = request.state.client_cert  # extracted by load balancer
    if not cert:
        raise AuthError("No client certificate presented")

    # Validate cert chain against internal CA
    verify_certificate_chain(cert, internal_ca_bundle)

    # Extract service identity from cert's SPIFFE ID
    spiffe_id = extract_spiffe_id(cert)
    # e.g., "spiffe://cluster.local/ns/production/sa/payment-service"

    return ServiceIdentity(
        service=spiffe_id.service_account,
        namespace=spiffe_id.namespace,
        trust_domain=spiffe_id.trust_domain,
    )

SPIFFE/SPIRE: Workload Identity#

SPIFFE (Secure Production Identity Framework For Everyone) provides cryptographic identities to workloads.

1
2
3
4
5
6
7
8
9
# SPIRE server: issues SVIDs (SPIFFE Verifiable Identity Documents) to workloads
# SPIRE agent: runs on each node, attests workloads and delivers SVIDs

# Kubernetes SPIRE agent annotation
apiVersion: v1
kind: Pod
metadata:
  annotations:
    spiffe.io/spiffeid: "spiffe://example.com/ns/production/sa/payment-service"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Workload API: retrieve SVID (X.509 certificate)
import pyspiffe

workload_api = pyspiffe.WorkloadApiClient("unix:///run/spire/sockets/agent.sock")
svid = workload_api.fetch_x509_svid()

# Use SVID for mTLS
import ssl
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.load_verify_locations(cadata=svid.bundle.x509_authorities_pem)
context.load_cert_chain(
    certfile=svid.leaf.cert_pem,
    keyfile=svid.leaf.private_key_pem
)

Service-to-Service Authorization#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# Policy engine: what service A can do to service B
from dataclasses import dataclass

@dataclass
class ServicePolicy:
    source_service: str
    target_service: str
    allowed_methods: set[str]
    allowed_paths: set[str]

POLICIES = [
    ServicePolicy(
        source_service="api-gateway",
        target_service="payment-service",
        allowed_methods={"POST"},
        allowed_paths={"/v1/charges", "/v1/refunds"},
    ),
    ServicePolicy(
        source_service="payment-service",
        target_service="audit-service",
        allowed_methods={"POST"},
        allowed_paths={"/v1/events"},
    ),
]

def is_authorized(
    source: str,
    target: str,
    method: str,
    path: str
) -> bool:
    for policy in POLICIES:
        if (policy.source_service == source
                and policy.target_service == target
                and method in policy.allowed_methods
                and path in policy.allowed_paths):
            return True
    return False

Open Policy Agent (OPA) for Policy Decisions#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Rego policy: service-to-service authorization
package service_auth

default allow = false

allow {
    # Source service is authenticated
    input.source_service != ""

    # Policy grants access
    policy := data.policies[_]
    policy.source == input.source_service
    policy.target == input.target_service
    policy.method == input.method
    startswith(input.path, policy.path_prefix)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# FastAPI: OPA authorization middleware
import httpx
from fastapi import Request, HTTPException

OPA_URL = "http://opa:8181/v1/data/service_auth/allow"

async def authorize_request(request: Request, source_service: str) -> None:
    input_data = {
        "input": {
            "source_service": source_service,
            "target_service": "payment-service",
            "method": request.method,
            "path": request.url.path,
        }
    }
    async with httpx.AsyncClient() as client:
        resp = await client.post(OPA_URL, json=input_data, timeout=0.1)
        if not resp.json().get("result", False):
            raise HTTPException(status_code=403, detail="Access denied by policy")

Network Micro-Segmentation#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Kubernetes Network Policy: zero-trust networking
# Default: deny all
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: default-deny
  namespace: production
spec:
  podSelector: {}
  policyTypes: [Ingress, Egress]
---
# Explicit allow: only payment-service can reach database
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-payment-to-db
spec:
  podSelector:
    matchLabels:
      app: postgres
  ingress:
  - from:
    - podSelector:
        matchLabels:
          app: payment-service
    ports:
    - port: 5432

Comprehensive Audit Logging#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import structlog
from datetime import datetime, timezone

log = structlog.get_logger()

async def audit_log(
    request: Request,
    source_identity: str,
    action: str,
    resource: str,
    outcome: str,
    reason: str = "",
) -> None:
    log.info(
        "access_decision",
        timestamp=datetime.now(timezone.utc).isoformat(),
        source=source_identity,
        target_service="payment-service",
        action=action,
        resource=resource,
        outcome=outcome,
        reason=reason,
        request_id=request.headers.get("X-Request-ID"),
        source_ip=request.client.host,
    )

Every access decision — allowed or denied — should be logged with enough context to reconstruct what happened during an incident investigation.

Conclusion#

Zero trust requires identity for every call (mTLS or JWT), explicit authorization policies (OPA, Kubernetes NetworkPolicy), micro-segmented network access, and comprehensive audit logs. The shift from network-perimeter thinking to identity-based thinking changes how you design both internal APIs and network policies. Start with micro-segmentation (Network Policies) and mTLS (a service mesh), then layer in fine-grained authorization policies.

Contents