Introduction#
Zero trust operates on one principle: “never trust, always verify.” Traditional perimeter security assumed everything inside the network was safe. Zero trust assumes the network is hostile regardless of location — inside or outside the corporate perimeter. This shift matters as workloads move to cloud, employees work remotely, and lateral movement after breaches becomes the primary attack vector.
Core Principles#
- Verify explicitly: authenticate and authorize every request based on all available data points (identity, location, device, service context).
- Use least privilege access: grant minimum required permissions, time-limited where possible.
- Assume breach: design for containment. Minimize blast radius. Log everything.
Identity-Based Access (Not Network-Based)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| # OLD: "is this request coming from the internal network?"
def is_internal_request(request: Request) -> bool:
client_ip = request.client.host
return ipaddress.ip_address(client_ip).is_private
# ZERO TRUST: verify identity regardless of network location
async def verify_service_identity(request: Request) -> ServiceIdentity:
# Verify mTLS client certificate
cert = request.state.client_cert # extracted by load balancer
if not cert:
raise AuthError("No client certificate presented")
# Validate cert chain against internal CA
verify_certificate_chain(cert, internal_ca_bundle)
# Extract service identity from cert's SPIFFE ID
spiffe_id = extract_spiffe_id(cert)
# e.g., "spiffe://cluster.local/ns/production/sa/payment-service"
return ServiceIdentity(
service=spiffe_id.service_account,
namespace=spiffe_id.namespace,
trust_domain=spiffe_id.trust_domain,
)
|
SPIFFE/SPIRE: Workload Identity#
SPIFFE (Secure Production Identity Framework For Everyone) provides cryptographic identities to workloads.
1
2
3
4
5
6
7
8
9
| # SPIRE server: issues SVIDs (SPIFFE Verifiable Identity Documents) to workloads
# SPIRE agent: runs on each node, attests workloads and delivers SVIDs
# Kubernetes SPIRE agent annotation
apiVersion: v1
kind: Pod
metadata:
annotations:
spiffe.io/spiffeid: "spiffe://example.com/ns/production/sa/payment-service"
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| # Workload API: retrieve SVID (X.509 certificate)
import pyspiffe
workload_api = pyspiffe.WorkloadApiClient("unix:///run/spire/sockets/agent.sock")
svid = workload_api.fetch_x509_svid()
# Use SVID for mTLS
import ssl
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.load_verify_locations(cadata=svid.bundle.x509_authorities_pem)
context.load_cert_chain(
certfile=svid.leaf.cert_pem,
keyfile=svid.leaf.private_key_pem
)
|
Service-to-Service Authorization#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| # Policy engine: what service A can do to service B
from dataclasses import dataclass
@dataclass
class ServicePolicy:
source_service: str
target_service: str
allowed_methods: set[str]
allowed_paths: set[str]
POLICIES = [
ServicePolicy(
source_service="api-gateway",
target_service="payment-service",
allowed_methods={"POST"},
allowed_paths={"/v1/charges", "/v1/refunds"},
),
ServicePolicy(
source_service="payment-service",
target_service="audit-service",
allowed_methods={"POST"},
allowed_paths={"/v1/events"},
),
]
def is_authorized(
source: str,
target: str,
method: str,
path: str
) -> bool:
for policy in POLICIES:
if (policy.source_service == source
and policy.target_service == target
and method in policy.allowed_methods
and path in policy.allowed_paths):
return True
return False
|
Open Policy Agent (OPA) for Policy Decisions#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| # Rego policy: service-to-service authorization
package service_auth
default allow = false
allow {
# Source service is authenticated
input.source_service != ""
# Policy grants access
policy := data.policies[_]
policy.source == input.source_service
policy.target == input.target_service
policy.method == input.method
startswith(input.path, policy.path_prefix)
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # FastAPI: OPA authorization middleware
import httpx
from fastapi import Request, HTTPException
OPA_URL = "http://opa:8181/v1/data/service_auth/allow"
async def authorize_request(request: Request, source_service: str) -> None:
input_data = {
"input": {
"source_service": source_service,
"target_service": "payment-service",
"method": request.method,
"path": request.url.path,
}
}
async with httpx.AsyncClient() as client:
resp = await client.post(OPA_URL, json=input_data, timeout=0.1)
if not resp.json().get("result", False):
raise HTTPException(status_code=403, detail="Access denied by policy")
|
Network Micro-Segmentation#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| # Kubernetes Network Policy: zero-trust networking
# Default: deny all
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny
namespace: production
spec:
podSelector: {}
policyTypes: [Ingress, Egress]
---
# Explicit allow: only payment-service can reach database
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-payment-to-db
spec:
podSelector:
matchLabels:
app: postgres
ingress:
- from:
- podSelector:
matchLabels:
app: payment-service
ports:
- port: 5432
|
Comprehensive Audit Logging#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| import structlog
from datetime import datetime, timezone
log = structlog.get_logger()
async def audit_log(
request: Request,
source_identity: str,
action: str,
resource: str,
outcome: str,
reason: str = "",
) -> None:
log.info(
"access_decision",
timestamp=datetime.now(timezone.utc).isoformat(),
source=source_identity,
target_service="payment-service",
action=action,
resource=resource,
outcome=outcome,
reason=reason,
request_id=request.headers.get("X-Request-ID"),
source_ip=request.client.host,
)
|
Every access decision — allowed or denied — should be logged with enough context to reconstruct what happened during an incident investigation.
Conclusion#
Zero trust requires identity for every call (mTLS or JWT), explicit authorization policies (OPA, Kubernetes NetworkPolicy), micro-segmented network access, and comprehensive audit logs. The shift from network-perimeter thinking to identity-based thinking changes how you design both internal APIs and network policies. Start with micro-segmentation (Network Policies) and mTLS (a service mesh), then layer in fine-grained authorization policies.