Chaos Engineering: Building Resilient Systems
Master the discipline of chaos engineering to build confidence in your system's capability to withstand turbulent conditions. Learn how to design, implement, and run chaos experiments in production safely.
📚 Essential Resources
📖 Must-Read Books & Papers
- Chaos Engineering - Casey Rosenthal & Nora Jones
- Learning Chaos Engineering - Russ Miles
- Release It! - Michael Nygard
- Principles of Chaos Engineering - Manifesto
- Chaos Engineering: System Resiliency in Practice - Mikolaj Pawlikowski
🎥 Video Resources
- Chaos Engineering at Netflix - Pioneers of chaos
- ChaosConf Videos - Gremlin conference
- Breaking Things on Purpose - Gremlin series
- Principles of Chaos Engineering - Casey Rosenthal
- GameDays at Amazon - Jesse Robbins
🎓 Courses & Training
- Chaos Engineering Fundamentals - Gremlin certification
- Resilience Engineering - Coursera
- Litmus Chaos Training - CNCF project
- AWS Fault Injection - AWS training
- Chaos Toolkit Tutorial - Open source
📰 Blogs & Articles
- Netflix Tech Blog - Chaos Monkey origins
- Gremlin Blog - Chaos engineering insights
- Verica Blog - Resilience engineering
- AWS Architecture Blog - Resilience patterns
- Uber Engineering - Chaos at scale
🔧 Essential Tools & Platforms
- Chaos Monkey - Netflix's tool
- Litmus - CNCF chaos engineering
- Gremlin - Enterprise platform
- Chaos Toolkit - Open source framework
- AWS FIS - Fault Injection Simulator
💬 Communities & Forums
- Chaos Engineering Slack - Community
- r/chaosengineering - Reddit
- Chaos Community Day - Events
- CNCF Chaos Engineering - CNCF SIG
- LinkedIn Group - Professionals
🏆 Practice Resources
- Chaos Engineering Experiments - Awesome list
- GameDay Runbooks - AWS guide
- Failure Fridays - Practice guide
- Chaos Scenarios - Gremlin tutorials
- Kubernetes Chaos - K8s chaos tools
Chaos Engineering Fundamentals
Principles of Chaos
Definition: Chaos Engineering is the discipline of experimenting on a system to build confidence in the system's capability to withstand turbulent conditions in production.
# Core principles implementation
class ChaosEngineering:
"""
1. Build a Hypothesis Around Steady State
2. Vary Real-world Events
3. Run Experiments in Production
4. Automate Experiments to Run Continuously
5. Minimize Blast Radius
"""
def __init__(self):
self.steady_state_metrics = {
'error_rate': {'threshold': 0.01, 'current': 0.008},
'latency_p99': {'threshold': 500, 'current': 450},
'throughput': {'threshold': 1000, 'current': 1200}
}
self.blast_radius_controls = BlastRadiusController()
self.experiment_runner = ExperimentRunner()
def define_steady_state(self):
"""Define what 'normal' looks like"""
return {
'business_metrics': {
'orders_per_minute': Range(100, 150),
'conversion_rate': Range(0.02, 0.03),
'revenue_per_hour': Range(10000, 15000)
},
'system_metrics': {
'cpu_usage': Range(0.3, 0.7),
'memory_usage': Range(0.4, 0.8),
'error_rate': Range(0, 0.01),
'latency_p50': Range(10, 50),
'latency_p99': Range(100, 500)
},
'dependencies': {
'database_connections': Range(10, 100),
'cache_hit_rate': Range(0.8, 0.95),
'queue_depth': Range(0, 1000)
}
}
def run_experiment(self, hypothesis):
"""Execute chaos experiment with safety controls"""
experiment = {
'id': str(uuid.uuid4()),
'hypothesis': hypothesis,
'start_time': datetime.utcnow(),
'steady_state_before': self.measure_steady_state(),
'safety_checks': []
}
try:
# Pre-flight checks
if not self.pre_flight_checks():
raise ExperimentAborted("Pre-flight checks failed")
# Start with minimal blast radius
with self.blast_radius_controls.limit(percentage=1):
# Inject failure
self.inject_failure(hypothesis['failure_type'])
# Monitor impact
impact = self.monitor_impact(duration=300) # 5 minutes
# Check if we should continue
if self.should_abort(impact):
raise ExperimentAborted("Impact exceeded thresholds")
# Gradually increase blast radius
for radius in [5, 10, 25, 50]:
with self.blast_radius_controls.limit(percentage=radius):
impact = self.monitor_impact(duration=300)
if self.should_abort(impact):
break
experiment['steady_state_after'] = self.measure_steady_state()
experiment['result'] = 'SUCCESS'
except Exception as e:
experiment['result'] = 'FAILED'
experiment['error'] = str(e)
self.rollback_experiment()
finally:
experiment['end_time'] = datetime.utcnow()
self.record_experiment(experiment)
return experiment
Chaos Maturity Model
# Chaos maturity levels
class ChaosMaturityModel:
LEVELS = {
1: "In Development", # Testing in dev/staging
2: "Ad Hoc Production", # Manual experiments in prod
3: "Automated Regular", # Scheduled chaos experiments
4: "Continuous Minimal", # Continuous small experiments
5: "Continuous Scaled" # Full chaos engineering culture
}
def assess_maturity(self, organization):
"""Assess chaos engineering maturity"""
score = 0
# Level 1: Basic chaos in non-prod
if organization.has_chaos_tests_in_ci():
score += 1
# Level 2: Production experiments
if organization.runs_gamedays():
score += 1
# Level 3: Automated chaos
if organization.has_automated_chaos_pipeline():
score += 1
# Level 4: Continuous chaos
if organization.runs_continuous_chaos():
score += 1
# Level 5: Advanced chaos culture
if organization.has_chaos_on_call_rotation():
score += 1
return {
'level': score,
'description': self.LEVELS[score],
'next_steps': self.get_next_steps(score)
}
Chaos Experiment Design
Experiment Patterns
# Common chaos experiments
class ChaosExperiments:
def __init__(self):
self.kubernetes_client = kubernetes.client.CoreV1Api()
self.aws_client = boto3.client('ec2')
self.network_chaos = NetworkChaos()
async def infrastructure_failures(self):
"""Infrastructure-level chaos experiments"""
experiments = []
# 1. Random instance termination
experiments.append({
'name': 'ec2-instance-termination',
'description': 'Randomly terminate EC2 instances',
'implementation': self.terminate_random_instance,
'blast_radius': {'percentage': 10, 'max_instances': 2},
'safety': ['health_check', 'availability_zone_check']
})
# 2. Availability zone failure
experiments.append({
'name': 'az-failure-simulation',
'description': 'Simulate AZ failure',
'implementation': self.simulate_az_failure,
'blast_radius': {'zones': 1},
'safety': ['multi_az_verification', 'capacity_check']
})
# 3. Disk space exhaustion
experiments.append({
'name': 'disk-space-exhaustion',
'description': 'Fill disk to test space handling',
'implementation': self.exhaust_disk_space,
'blast_radius': {'target_usage': 90},
'safety': ['cleanup_job', 'space_monitor']
})
return experiments
async def application_failures(self):
"""Application-level chaos experiments"""
return [
{
'name': 'memory-leak-simulation',
'description': 'Simulate memory leak',
'implementation': self.inject_memory_leak,
'parameters': {
'leak_rate_mb_per_second': 10,
'max_memory_usage_percent': 80
}
},
{
'name': 'cpu-spike',
'description': 'Sudden CPU usage spike',
'implementation': self.inject_cpu_spike,
'parameters': {
'cpu_percent': 90,
'duration_seconds': 300,
'processes': 4
}
},
{
'name': 'thread-pool-exhaustion',
'description': 'Exhaust application thread pool',
'implementation': self.exhaust_thread_pool,
'parameters': {
'blocked_threads': 100,
'block_duration_seconds': 60
}
}
]
async def network_failures(self):
"""Network chaos experiments"""
return [
{
'name': 'network-latency',
'description': 'Inject network latency',
'implementation': lambda: self.network_chaos.add_latency(
delay_ms=100,
jitter_ms=50,
correlation=0.25
)
},
{
'name': 'packet-loss',
'description': 'Simulate packet loss',
'implementation': lambda: self.network_chaos.add_packet_loss(
loss_percent=5,
correlation=0.25
)
},
{
'name': 'network-partition',
'description': 'Partition network between services',
'implementation': lambda: self.network_chaos.create_partition(
source_service='api',
target_service='database',
bidirectional=True
)
}
]
async def dependency_failures(self):
"""External dependency chaos"""
return [
{
'name': 'database-connection-pool-exhaustion',
'description': 'Exhaust DB connections',
'implementation': self.exhaust_db_connections
},
{
'name': 'cache-flush',
'description': 'Flush cache unexpectedly',
'implementation': self.flush_cache
},
{
'name': 'third-party-api-failure',
'description': 'Simulate third-party outage',
'implementation': self.block_third_party_api
}
]
Safety Controls
# Safety mechanisms for chaos experiments
class ChaosSafetyControls:
def __init__(self):
self.emergency_stop = EmergencyStop()
self.monitors = SafetyMonitors()
self.rollback = RollbackController()
def implement_safety_controls(self, experiment):
"""Comprehensive safety controls"""
controls = {
'pre_conditions': self.check_pre_conditions(),
'abort_conditions': self.define_abort_conditions(),
'monitoring': self.setup_monitoring(),
'rollback_plan': self.create_rollback_plan(experiment)
}
return controls
def check_pre_conditions(self):
"""Pre-flight checks before chaos"""
checks = []
# System health check
checks.append({
'name': 'system_health',
'check': lambda: self.monitors.error_rate() < 0.01,
'required': True
})
# No ongoing incidents
checks.append({
'name': 'no_active_incidents',
'check': lambda: not self.incident_manager.has_active_incidents(),
'required': True
})
# Business hours check (optional)
checks.append({
'name': 'business_hours',
'check': lambda: self.is_within_experiment_window(),
'required': False,
'override_with': 'approval'
})
# Capacity check
checks.append({
'name': 'sufficient_capacity',
'check': lambda: self.capacity_manager.available_capacity() > 0.3,
'required': True
})
return checks
def define_abort_conditions(self):
"""Conditions that trigger experiment abort"""
return [
{
'metric': 'error_rate',
'threshold': 0.05, # 5% error rate
'duration': 60, # sustained for 1 minute
'action': 'abort_immediate'
},
{
'metric': 'latency_p99',
'threshold': 2000, # 2 seconds
'duration': 120, # sustained for 2 minutes
'action': 'abort_graceful'
},
{
'metric': 'revenue_drop',
'threshold': 0.1, # 10% drop
'duration': 300, # sustained for 5 minutes
'action': 'abort_immediate'
},
{
'metric': 'customer_complaints',
'threshold': 10, # 10 complaints
'duration': 600, # within 10 minutes
'action': 'abort_and_communicate'
}
]
def create_rollback_plan(self, experiment):
"""Automated rollback procedures"""
return {
'network_failures': self.rollback_network_changes,
'instance_failures': self.restore_instances,
'application_failures': self.restart_applications,
'data_corruption': self.restore_from_backup,
'configuration_changes': self.revert_configurations
}
Chaos Tools Implementation
Chaos Monkey Implementation
# Custom Chaos Monkey implementation
import random
import asyncio
from kubernetes import client, config
class ChaosMonkey:
def __init__(self, namespace='default', dry_run=False):
config.load_incluster_config() # In-cluster config
self.v1 = client.CoreV1Api()
self.namespace = namespace
self.dry_run = dry_run
self.excluded_labels = {
'chaos-monkey': 'disabled',
'environment': 'production',
'critical': 'true'
}
async def start_chaos(self, interval_minutes=10):
"""Main chaos loop"""
while True:
try:
# Select random chaos action
action = random.choice([
self.terminate_random_pod,
self.inject_network_latency,
self.consume_cpu,
self.fill_disk_space
])
# Execute with safety checks
await self.execute_chaos_action(action)
# Wait for next chaos
await asyncio.sleep(interval_minutes * 60)
except Exception as e:
print(f"Chaos failed: {e}")
await self.alert_on_failure(e)
async def terminate_random_pod(self):
"""Randomly terminate a pod"""
# Get all pods
pods = self.v1.list_namespaced_pod(self.namespace)
# Filter eligible pods
eligible_pods = []
for pod in pods.items:
if self.is_pod_eligible(pod):
eligible_pods.append(pod)
if not eligible_pods:
print("No eligible pods for chaos")
return
# Select victim
victim = random.choice(eligible_pods)
print(f"Terminating pod: {victim.metadata.name}")
if not self.dry_run:
self.v1.delete_namespaced_pod(
name=victim.metadata.name,
namespace=self.namespace,
grace_period_seconds=0
)
# Record chaos event
await self.record_chaos_event({
'action': 'pod_termination',
'target': victim.metadata.name,
'timestamp': datetime.utcnow()
})
def is_pod_eligible(self, pod):
"""Check if pod can be targeted"""
# Check excluded labels
for label, value in self.excluded_labels.items():
if pod.metadata.labels.get(label) == value:
return False
# Don't target single replicas
if self.get_replica_count(pod) <= 1:
return False
# Don't target unhealthy pods
if pod.status.phase != 'Running':
return False
return True
async def inject_network_latency(self):
"""Inject network latency using tc"""
eligible_pods = self.get_eligible_pods()
if not eligible_pods:
return
victim = random.choice(eligible_pods)
# Inject latency using kubectl exec
latency_ms = random.randint(50, 500)
jitter_ms = random.randint(10, 50)
command = [
'tc', 'qdisc', 'add', 'dev', 'eth0', 'root',
'netem', 'delay', f'{latency_ms}ms', f'{jitter_ms}ms'
]
if not self.dry_run:
self.v1.connect_get_namespaced_pod_exec(
victim.metadata.name,
self.namespace,
command=command,
stderr=True,
stdin=False,
stdout=True,
tty=False
)
# Schedule cleanup
asyncio.create_task(
self.cleanup_network_chaos(victim, delay=300)
)
Litmus Chaos Integration
# Litmus ChaosEngine configuration
apiVersion: litmuschaos.io/v1alpha1
kind: ChaosEngine
metadata:
name: nginx-chaos
namespace: default
spec:
appinfo:
appns: 'default'
applabel: 'app=nginx'
appkind: 'deployment'
engineState: 'active'
chaosServiceAccount: litmus-admin
experiments:
- name: pod-cpu-hog
spec:
components:
env:
- name: CPU_CORES
value: '2'
- name: TOTAL_CHAOS_DURATION
value: '60'
- name: CPU_LOAD
value: '80'
- name: PODS_AFFECTED_PERC
value: '50'
probe:
- name: check-nginx-availability
type: httpProbe
httpProbe/inputs:
url: http://nginx-service
insecureSkipVerify: false
method:
get:
criteria: ==
responseCode: '200'
mode: Continuous
runProperties:
probeTimeout: 2
retry: 1
interval: 1
probePollingInterval: 1
Gremlin Integration
# Gremlin SDK integration
import gremlin_python
from gremlin_python.config import GremlinAPIConfig
from gremlin_python.attack import GremlinAttack
class GremlinChaosOrchestrator:
def __init__(self, api_key: str):
config = GremlinAPIConfig(api_key=api_key)
self.client = gremlin_python.GremlinAPI(config)
def create_cpu_attack(self, target_percent=50):
"""Create CPU resource attack"""
attack = GremlinAttack(
command={
'type': 'cpu',
'args': [
'--percent', str(target_percent),
'--cores', '2'
]
},
target={
'type': 'Random',
'containers': {
'labels': {
'app': 'web-service'
}
},
'percent': 25 # Target 25% of matching containers
}
)
# Create attack with safety limits
result = self.client.create_attack(
attack=attack,
dry_run=False,
max_duration=300, # 5 minutes max
auto_rollback=True
)
return result
def create_network_attack(self):
"""Create network chaos attack"""
scenarios = [
{
'name': 'Latency Storm',
'attack': {
'type': 'latency',
'args': ['--delay', '200', '--jitter', '50']
}
},
{
'name': 'Packet Loss',
'attack': {
'type': 'packet_loss',
'args': ['--percent', '10', '--corrupt', '5']
}
},
{
'name': 'DNS Failure',
'attack': {
'type': 'dns',
'args': ['--protocol', 'all']
}
}
]
# Run scenario
selected = random.choice(scenarios)
return self.client.create_scenario(selected)
Resources:
Production Chaos Engineering
Game Days
# Game Day orchestration
class GameDayOrchestrator:
def __init__(self):
self.scenarios = []
self.participants = []
self.observers = []
self.metrics_collector = MetricsCollector()
def plan_game_day(self):
"""Plan comprehensive game day"""
return {
'date': 'Next Friday 10 AM PST',
'duration': '4 hours',
'scenarios': [
{
'time': 'T+0',
'scenario': 'Database Primary Failure',
'expected_behavior': 'Automatic failover within 30s',
'success_criteria': [
'Zero data loss',
'Recovery time < 30 seconds',
'All services remain available'
]
},
{
'time': 'T+1h',
'scenario': 'Region Failure Simulation',
'expected_behavior': 'Traffic shifts to healthy region',
'success_criteria': [
'DNS updates within 5 minutes',
'No customer impact',
'Monitoring alerts fire correctly'
]
},
{
'time': 'T+2h',
'scenario': 'Cascading Failure',
'expected_behavior': 'Circuit breakers prevent cascade',
'success_criteria': [
'Degraded but available service',
'No complete outage',
'Clear customer communication'
]
}
],
'roles': {
'incident_commander': 'Senior SRE',
'scribe': 'Junior SRE',
'communications': 'Product Manager',
'observers': ['Engineering Team', 'Support Team']
}
}
async def execute_scenario(self, scenario):
"""Execute game day scenario"""
print(f"Starting scenario: {scenario['scenario']}")
# Record initial state
initial_metrics = await self.metrics_collector.snapshot()
# Inject failure
failure_injection = await self.inject_failure(scenario['failure_type'])
# Monitor system response
timeline = []
start_time = time.time()
while not self.success_criteria_met(scenario):
current_state = await self.get_system_state()
timeline.append({
'timestamp': time.time() - start_time,
'state': current_state,
'metrics': await self.metrics_collector.snapshot()
})
# Check for timeout
if time.time() - start_time > scenario.get('timeout', 3600):
break
await asyncio.sleep(5) # Check every 5 seconds
# Generate report
return self.generate_scenario_report(scenario, timeline, initial_metrics)
def generate_game_day_report(self, results):
"""Comprehensive game day report"""
return {
'executive_summary': self.generate_executive_summary(results),
'timeline': self.create_incident_timeline(results),
'metrics': {
'availability': self.calculate_availability(results),
'mttr': self.calculate_mttr(results),
'customer_impact': self.assess_customer_impact(results)
},
'findings': self.analyze_findings(results),
'action_items': self.generate_action_items(results),
'improvements': {
'runbooks': self.identify_runbook_gaps(results),
'monitoring': self.identify_monitoring_gaps(results),
'automation': self.identify_automation_opportunities(results)
}
}
Continuous Chaos
# Continuous chaos platform
class ContinuousChaosPlat
```python
# Continuous chaos platform
class ContinuousChaosPlat
class ContinuousChaosPlat```python
# Continuous chaos platform
class ContinuousChaosPlat
class ContinuousChaosPlat
form:
def __init__(self):
self.scheduler = ChaosScheduler()
self.experiment_store = ExperimentStore()
self.safety_controller = SafetyController()
async def run_continuous_chaos(self):
"""Run chaos experiments continuously"""
while True:
try:
# Select next experiment
experiment = await self.select_next_experiment()
# Check safety conditions
if not await self.safety_controller.is_safe_to_proceed():
await asyncio.sleep(300) # Wait 5 minutes
continue
# Run experiment with minimal blast radius
result = await self.run_minimal_experiment(experiment)
# Learn and adapt
await self.update_chaos_model(result)
# Schedule next experiment
await self.schedule_next_experiment()
except Exception as e:
await self.handle_chaos_failure(e)
async def select_next_experiment(self):
"""Intelligently select next chaos experiment"""
# Get experiment history
history = await self.experiment_store.get_recent_experiments()
# Identify untested failure modes
untested = self.identify_untested_scenarios(history)
# Prioritize based on risk and value
prioritized = self.prioritize_experiments(untested)
# Select with some randomness
return self.select_with_exploration(prioritized)
def create_chaos_schedule(self):
"""Create chaos engineering schedule"""
return {
'continuous': {
'frequency': 'every 30 minutes',
'blast_radius': '1-5%',
'experiments': [
'random_pod_failure',
'network_latency_injection',
'cpu_stress'
]
},
'daily': {
'time': '10:00 AM PST',
'blast_radius': '10%',
'experiments': [
'availability_zone_failure',
'database_failover',
'cache_flush'
]
},
'weekly': {
'day': 'Thursday',
'time': '2:00 PM PST',
'blast_radius': '25%',
'experiments': [
'region_failure',
'dependency_outage',
'data_corruption_recovery'
]
},
'monthly': {
'day': 'Last Friday',
'type': 'Game Day',
'scenarios': [
'complete_datacenter_loss',
'cascading_failure',
'security_incident_response'
]
}
}
Observability for Chaos
Chaos Metrics
# Chaos observability implementation
class ChaosObservability:
def __init__(self):
self.metrics_client = PrometheusClient()
self.tracing_client = JaegerClient()
self.logging_client = ElasticsearchClient()
def setup_chaos_metrics(self):
"""Define chaos-specific metrics"""
metrics = {
# Experiment metrics
'chaos_experiments_total': Counter(
'chaos_experiments_total',
'Total number of chaos experiments',
['experiment_type', 'result']
),
'chaos_experiment_duration': Histogram(
'chaos_experiment_duration_seconds',
'Duration of chaos experiments',
['experiment_type']
),
'chaos_blast_radius': Gauge(
'chaos_blast_radius_percentage',
'Current blast radius of chaos experiment',
['experiment_type']
),
# Impact metrics
'chaos_impact_error_rate': Gauge(
'chaos_impact_error_rate',
'Error rate during chaos experiment'
),
'chaos_impact_latency': Histogram(
'chaos_impact_latency_seconds',
'Latency impact during chaos'
),
'chaos_impact_availability': Gauge(
'chaos_impact_availability',
'Service availability during chaos'
),
# Safety metrics
'chaos_safety_aborts': Counter(
'chaos_safety_aborts_total',
'Number of experiments aborted by safety controls',
['abort_reason']
),
'chaos_rollbacks': Counter(
'chaos_rollbacks_total',
'Number of chaos rollbacks performed',
['rollback_type']
)
}
return metrics
def create_chaos_dashboard(self):
"""Grafana dashboard for chaos engineering"""
return {
'dashboard': {
'title': 'Chaos Engineering Dashboard',
'panels': [
{
'title': 'Experiment Status',
'type': 'stat',
'targets': [{
'expr': 'sum(rate(chaos_experiments_total[5m])) by (result)'
}]
},
{
'title': 'System Impact',
'type': 'graph',
'targets': [
{
'expr': 'chaos_impact_error_rate',
'legendFormat': 'Error Rate'
},
{
'expr': 'histogram_quantile(0.99, chaos_impact_latency_seconds)',
'legendFormat': 'P99 Latency'
}
]
},
{
'title': 'Safety Controls',
'type': 'graph',
'targets': [{
'expr': 'increase(chaos_safety_aborts_total[1h])'
}]
},
{
'title': 'Experiment Timeline',
'type': 'table',
'targets': [{
'expr': 'chaos_experiment_events'
}]
}
]
}
}
async def trace_chaos_impact(self, experiment_id: str):
"""Distributed tracing for chaos experiments"""
with self.tracing_client.start_span('chaos_experiment') as span:
span.set_tag('experiment.id', experiment_id)
span.set_tag('experiment.type', 'network_partition')
# Trace failure injection
with self.tracing_client.start_span('inject_failure'):
await self.inject_network_partition()
# Trace system response
with self.tracing_client.start_span('monitor_impact'):
impact = await self.monitor_system_impact()
span.set_tag('impact.error_rate', impact['error_rate'])
span.set_tag('impact.latency_increase', impact['latency_increase'])
# Trace recovery
with self.tracing_client.start_span('system_recovery'):
recovery_time = await self.measure_recovery_time()
span.set_tag('recovery.time_seconds', recovery_time)
return span.trace_id
Chaos Engineering Patterns
Circuit Breaker Testing
# Test circuit breaker resilience
class CircuitBreakerChaos:
def __init__(self):
self.circuit_breaker = CircuitBreaker()
self.chaos_injector = ChaosInjector()
async def test_circuit_breaker_behavior(self):
"""Verify circuit breaker responds correctly to failures"""
test_scenarios = [
{
'name': 'Gradual Degradation',
'failure_pattern': self.gradual_failure_increase,
'expected_state_transitions': ['CLOSED', 'OPEN', 'HALF_OPEN', 'CLOSED']
},
{
'name': 'Sudden Failure',
'failure_pattern': self.sudden_complete_failure,
'expected_state_transitions': ['CLOSED', 'OPEN']
},
{
'name': 'Intermittent Failures',
'failure_pattern': self.intermittent_failures,
'expected_state_transitions': ['CLOSED', 'CLOSED', 'CLOSED']
}
]
results = []
for scenario in test_scenarios:
result = await self.run_scenario(scenario)
results.append(result)
return self.analyze_circuit_breaker_resilience(results)
async def gradual_failure_increase(self):
"""Gradually increase failure rate"""
for failure_rate in [0.1, 0.3, 0.5, 0.7, 0.9]:
self.chaos_injector.set_failure_rate(failure_rate)
await asyncio.sleep(60) # Hold for 1 minute
# Record circuit breaker state
state = self.circuit_breaker.get_state()
metrics = self.circuit_breaker.get_metrics()
yield {
'failure_rate': failure_rate,
'circuit_state': state,
'metrics': metrics
}
Bulkhead Testing
# Test bulkhead isolation
class BulkheadChaos:
def __init__(self):
self.thread_pools = ThreadPoolManager()
self.resource_monitor = ResourceMonitor()
async def test_bulkhead_isolation(self):
"""Verify bulkheads prevent resource exhaustion"""
# Exhaust one bulkhead
await self.exhaust_bulkhead('payment-service')
# Verify other services remain functional
health_checks = await self.check_all_services_health()
# Measure impact radius
impact = {
'affected_services': [],
'unaffected_services': [],
'resource_isolation': {}
}
for service, health in health_checks.items():
if health['status'] == 'healthy':
impact['unaffected_services'].append(service)
else:
impact['affected_services'].append(service)
# Verify resource isolation
for resource in ['cpu', 'memory', 'threads', 'connections']:
isolation = await self.verify_resource_isolation(resource)
impact['resource_isolation'][resource] = isolation
return impact
async def exhaust_bulkhead(self, service_name: str):
"""Exhaust a specific bulkhead's resources"""
bulkhead = self.get_bulkhead(service_name)
# Flood with requests
tasks = []
for i in range(bulkhead.max_concurrent_calls * 2):
task = self.send_slow_request(service_name)
tasks.append(task)
# Wait for bulkhead to fill
await asyncio.gather(*tasks, return_exceptions=True)
Advanced Chaos Scenarios
Data Store Chaos
# Database and cache chaos experiments
class DataStoreChaos:
def __init__(self):
self.db_chaos = DatabaseChaos()
self.cache_chaos = CacheChaos()
async def database_chaos_scenarios(self):
"""Database-specific chaos experiments"""
scenarios = [
{
'name': 'Replication Lag',
'implementation': self.introduce_replication_lag,
'parameters': {
'lag_seconds': 30,
'affected_replicas': 2
}
},
{
'name': 'Connection Pool Exhaustion',
'implementation': self.exhaust_connection_pool,
'parameters': {
'connections_to_hold': 100,
'hold_duration': 300
}
},
{
'name': 'Slow Queries',
'implementation': self.inject_slow_queries,
'parameters': {
'query_delay_ms': 5000,
'affected_percentage': 10
}
},
{
'name': 'Split Brain',
'implementation': self.simulate_split_brain,
'parameters': {
'partition_duration': 120
}
}
]
return await self.run_scenarios(scenarios)
async def cache_chaos_scenarios(self):
"""Cache-specific chaos experiments"""
scenarios = [
{
'name': 'Cache Invalidation Storm',
'implementation': self.trigger_invalidation_storm,
'parameters': {
'invalidations_per_second': 1000,
'duration': 60
}
},
{
'name': 'Cache Node Failure',
'implementation': self.fail_cache_nodes,
'parameters': {
'nodes_to_fail': 2,
'failure_pattern': 'sequential'
}
},
{
'name': 'Cache Stampede',
'implementation': self.trigger_cache_stampede,
'parameters': {
'concurrent_misses': 1000
}
}
]
return await self.run_scenarios(scenarios)
Multi-Region Chaos
# Multi-region failure scenarios
class MultiRegionChaos:
def __init__(self):
self.regions = ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1']
self.traffic_manager = GlobalTrafficManager()
async def region_failure_scenarios(self):
"""Test multi-region resilience"""
scenarios = [
{
'name': 'Single Region Failure',
'implementation': self.fail_single_region,
'validation': self.validate_traffic_failover
},
{
'name': 'Multi-Region Failure',
'implementation': self.fail_multiple_regions,
'validation': self.validate_degraded_service
},
{
'name': 'Region Network Partition',
'implementation': self.partition_regions,
'validation': self.validate_split_brain_prevention
},
{
'name': 'Cross-Region Latency',
'implementation': self.inject_cross_region_latency,
'validation': self.validate_latency_handling
}
]
results = []
for scenario in scenarios:
print(f"Running scenario: {scenario['name']}")
# Record initial state
initial_state = await self.capture_global_state()
# Execute scenario
await scenario['implementation']()
# Monitor failover
failover_metrics = await self.monitor_failover()
# Validate behavior
validation_result = await scenario['validation']()
# Clean up
await self.restore_all_regions()
results.append({
'scenario': scenario['name'],
'failover_time': failover_metrics['time_to_failover'],
'data_loss': failover_metrics['data_loss'],
'availability': failover_metrics['availability_percentage'],
'validation': validation_result
})
return results
Interview Questions
Design Questions
- Design a chaos engineering platform for a microservices architecture
- Build a safe chaos experiment framework for production
- Create a game day planning and execution system
- Design automated chaos experiments for Kubernetes
Implementation Questions
- Implement a circuit breaker with chaos testing
- Build a blast radius controller for chaos experiments
- Create a rollback mechanism for failed experiments
- Implement continuous minimal chaos
Best Practices
- How do you ensure chaos experiments are safe?
- What metrics indicate chaos engineering maturity?
- How to get buy-in for chaos engineering?
- When should you NOT run chaos experiments?
Essential Resources
Books
- 📚 Chaos Engineering - O'Reilly
- 📚 Learning Chaos Engineering
- 📚 Chaos Engineering: System Resiliency in Practice
Tools
- 🔧 Chaos Monkey - Netflix
- 🔧 Litmus - CNCF Chaos
- 🔧 Gremlin - Enterprise chaos
- 🔧 Chaos Toolkit - Open source
Documentation
Communities
Remember: Chaos engineering is about building confidence through controlled experiments. Start small, measure everything, and gradually increase complexity as your systems and teams mature.