Introduction#
Server-Side Request Forgery (SSRF) occurs when an application fetches a URL supplied by a user and does not restrict which hosts are allowed. An attacker can use this to reach internal services, cloud metadata endpoints, or other restricted infrastructure that would otherwise be inaccessible.
How SSRF Works#
1
2
3
4
| Attacker → "fetch this image: http://169.254.169.254/latest/meta-data/"
Application → makes HTTP request to AWS metadata endpoint
Application → receives and returns IAM credentials
Attacker → uses credentials to access AWS resources
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| # VULNERABLE: fetches any URL the user provides
import httpx
from fastapi import FastAPI
app = FastAPI()
@app.get("/fetch")
async def fetch_url(url: str):
async with httpx.AsyncClient() as client:
response = await client.get(url) # SSRF vulnerability
return {"content": response.text[:1000]}
# Attacks:
# /fetch?url=http://169.254.169.254/latest/meta-data/ (AWS metadata)
# /fetch?url=http://localhost:6379/ (Redis admin)
# /fetch?url=http://10.0.0.1/admin (internal service)
# /fetch?url=file:///etc/passwd (local file)
|
Prevention: URL Validation#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
| import ipaddress
import socket
import re
from urllib.parse import urlparse
import httpx
ALLOWED_SCHEMES = {"https", "http"}
BLOCKED_DOMAINS = {"localhost", "metadata.google.internal"}
# Cloud metadata IP ranges
BLOCKED_NETWORKS = [
ipaddress.ip_network("169.254.0.0/16"), # Link-local (AWS metadata: 169.254.169.254)
ipaddress.ip_network("10.0.0.0/8"), # RFC 1918 private
ipaddress.ip_network("172.16.0.0/12"), # RFC 1918 private
ipaddress.ip_network("192.168.0.0/16"), # RFC 1918 private
ipaddress.ip_network("127.0.0.0/8"), # Loopback
ipaddress.ip_network("::1/128"), # IPv6 loopback
ipaddress.ip_network("fc00::/7"), # IPv6 private
]
class SSRFError(ValueError):
pass
def validate_url(url: str) -> str:
parsed = urlparse(url)
if parsed.scheme not in ALLOWED_SCHEMES:
raise SSRFError(f"Scheme '{parsed.scheme}' not allowed")
hostname = parsed.hostname
if not hostname:
raise SSRFError("No hostname in URL")
if hostname.lower() in BLOCKED_DOMAINS:
raise SSRFError(f"Hostname '{hostname}' is blocked")
# Resolve hostname to IP and check against blocked ranges
try:
infos = socket.getaddrinfo(hostname, None)
except socket.gaierror:
raise SSRFError(f"Cannot resolve hostname: {hostname}")
for info in infos:
ip_str = info[4][0]
try:
ip = ipaddress.ip_address(ip_str)
except ValueError:
continue
for network in BLOCKED_NETWORKS:
if ip in network:
raise SSRFError(
f"IP address {ip} is in blocked range {network}"
)
return url
async def safe_fetch(url: str) -> str:
validated_url = validate_url(url)
# Also set a short timeout — slow internal servers can hang the app
async with httpx.AsyncClient(
timeout=10.0,
follow_redirects=False, # don't follow redirects that could bypass allowlist
) as client:
response = await client.get(validated_url)
return response.text
|
Allowlist Approach (Preferred)#
A blocklist can be bypassed (IPv6 encoding, DNS rebinding, redirects). An allowlist of known-safe domains is more robust.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import re
# Only allow fetching from known, trusted domains
ALLOWED_HOST_PATTERN = re.compile(
r'^(www\.)?'
r'(example\.com|partner-api\.example\.com|cdn\.example\.com)'
r'$',
re.IGNORECASE
)
def validate_url_allowlist(url: str) -> str:
parsed = urlparse(url)
if parsed.scheme not in {"https"}: # HTTPS only for external URLs
raise SSRFError("Only HTTPS is allowed")
hostname = parsed.hostname or ""
if not ALLOWED_HOST_PATTERN.match(hostname):
raise SSRFError(f"Host '{hostname}' is not in the allowlist")
return url
|
DNS Rebinding Attack#
A blocklist checking the IP at request time can be bypassed via DNS rebinding: the first lookup returns a valid public IP (passes the check), the second lookup returns an internal IP (used for the actual request).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| # Protection: resolve DNS once, pin the IP, connect directly
import ssl
async def safe_fetch_pinned(url: str) -> str:
validate_url(url) # initial check
parsed = urlparse(url)
hostname = parsed.hostname
port = parsed.port or (443 if parsed.scheme == "https" else 80)
# Resolve once
infos = socket.getaddrinfo(hostname, port, type=socket.SOCK_STREAM)
ip = infos[0][4][0]
# Check resolved IP again (defense against TOCTOU)
ip_obj = ipaddress.ip_address(ip)
for network in BLOCKED_NETWORKS:
if ip_obj in network:
raise SSRFError(f"Resolved IP {ip} is private")
# Connect to the resolved IP, not the hostname (prevents rebinding)
transport = httpx.AsyncHTTPTransport()
async with httpx.AsyncClient(transport=transport) as client:
response = await client.get(
url,
headers={"Host": hostname}, # send correct Host header
# Use the resolved IP in the URL
)
return response.text
|
In AWS, require IMDSv2 to mitigate metadata endpoint exposure:
1
2
3
4
5
6
7
8
| # Require token-based metadata access (IMDSv2)
aws ec2 modify-instance-metadata-options \
--instance-id i-xxxx \
--http-tokens required \
--http-endpoint enabled
# IMDSv2 requires a PUT request first to get a token
# SSRF via simple GET to 169.254.169.254 no longer works
|
Conclusion#
SSRF turns your application into a proxy to internal networks. Use allowlists over blocklists where possible. Always resolve DNS and validate the resulting IP against private/link-local ranges. Disable automatic redirect following. For cloud environments, enable IMDSv2 to protect instance metadata. In Kubernetes, use Network Policies to block pods from reaching the metadata endpoint.