Serverless & Edge Computing Architecture
Master serverless platforms and edge computing to build globally distributed, event-driven applications. Learn how to architect systems that scale to zero, run at the edge, and optimize for cost and performance.
📚 Essential Resources
📖 Must-Read Books & Guides
- Serverless Architectures on AWS - Peter Sbarski
- Building Serverless Applications - Sam Kroonenburg
- Learning Serverless - Jason Katzer
- Edge Computing - O'Reilly report
- Serverless Design Patterns - Brian Zambrano
🎥 Video Resources
- AWS re:Invent Serverless - AWS talks
- Serverless Framework - Official channel
- Edge Computing Explained - IBM Technology
- Cloudflare TV - Workers & edge computing
- Vercel YouTube - Edge functions
🎓 Courses & Training
- AWS Lambda Deep Dive - A Cloud Guru
- Serverless Stack - Full-stack serverless
- Edge Computing Course - Coursera
- Cloudflare Workers Training - Official docs
- Azure Functions University - Free course
📰 Blogs & Articles
- Serverless Blog - Serverless Framework
- AWS Compute Blog - Lambda updates
- Cloudflare Blog - Edge computing insights
- Vercel Blog - Edge & Next.js
- Jeremy Daly's Blog - Serverless expert
🔧 Essential Tools & Platforms
- Serverless Framework - Deployment framework
- AWS SAM - AWS toolkit
- Architect - Serverless framework
- SST - Full-stack serverless
- Workers CLI (Wrangler) - Cloudflare
💬 Communities & Forums
- Serverless Stack Discord - SST community
- r/serverless - Reddit community
- ServerlessConf - Conference community
- FaaS Community - OpenFaaS
- Edge Computing Forum - Industry forum
🏆 Platform Resources
- AWS Lambda Docs - Official documentation
- Google Cloud Functions - GCP serverless
- Azure Functions Docs - Azure serverless
- Cloudflare Workers Docs - Edge computing
- Deno Deploy - Edge runtime
Serverless Fundamentals
Understanding Serverless Architecture
Serverless Computing Spectrum:
Traditional → VMs → Containers → Functions → Edge Functions
More Control More Abstraction
Higher Ops Lower Ops
Fixed Costs Usage-Based
Functions as a Service (FaaS)
# AWS Lambda function example
import json
import boto3
from typing import Dict, Any
import asyncio
from dataclasses import dataclass
@dataclass
class LambdaContext:
function_name: str
function_version: str
invoked_function_arn: str
memory_limit_in_mb: int
aws_request_id: str
log_group_name: str
log_stream_name: str
def get_remaining_time_in_millis(self) -> int:
"""Get remaining execution time"""
pass
def lambda_handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
"""
AWS Lambda handler with best practices
"""
# Initialize clients outside handler for connection reuse
global dynamodb_client
if 'dynamodb_client' not in globals():
dynamodb_client = boto3.client('dynamodb')
try:
# Parse event
request_body = json.loads(event.get('body', '{}'))
user_id = request_body.get('user_id')
# Business logic
result = process_user_request(user_id)
# Return API Gateway response format
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'message': 'Success',
'data': result,
'request_id': context.aws_request_id
})
}
except Exception as e:
# Error handling with structured logging
print(json.dumps({
'level': 'ERROR',
'message': str(e),
'request_id': context.aws_request_id,
'event': event
}))
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
# Async Lambda with Python 3.9+
async def async_lambda_handler(event: Dict[str, Any], context: LambdaContext):
"""
Async Lambda for concurrent operations
"""
# Concurrent API calls
tasks = [
fetch_user_data(event['user_id']),
fetch_preferences(event['user_id']),
fetch_recommendations(event['user_id'])
]
user_data, preferences, recommendations = await asyncio.gather(*tasks)
return {
'user': user_data,
'preferences': preferences,
'recommendations': recommendations
}
Serverless Patterns
1. Event-Driven Processing
# SQS trigger for batch processing
def sqs_batch_processor(event: Dict[str, Any], context: LambdaContext):
"""
Process SQS messages in batches
"""
successful_messages = []
failed_messages = []
for record in event['Records']:
try:
# Parse SQS message
message_body = json.loads(record['body'])
# Process message
process_message(message_body)
successful_messages.append(record['messageId'])
except Exception as e:
failed_messages.append({
'itemIdentifier': record['messageId']
})
print(f"Failed to process message: {e}")
# Return partial batch failure
return {
'batchItemFailures': failed_messages
}
# DynamoDB Streams processor
def dynamodb_stream_processor(event: Dict[str, Any], context: LambdaContext):
"""
Process DynamoDB change events
"""
for record in event['Records']:
event_name = record['eventName']
if event_name == 'INSERT':
# Handle new item
new_item = record['dynamodb']['NewImage']
index_in_elasticsearch(new_item)
elif event_name == 'MODIFY':
# Handle updates
old_item = record['dynamodb']['OldImage']
new_item = record['dynamodb']['NewImage']
sync_changes(old_item, new_item)
elif event_name == 'REMOVE':
# Handle deletions
old_item = record['dynamodb']['OldImage']
remove_from_elasticsearch(old_item)
2. API Gateway Integration
# serverless.yml configuration
service: api-service
provider:
name: aws
runtime: python3.9
memorySize: 1024
timeout: 30
environment:
STAGE: ${opt:stage, 'dev'}
TABLE_NAME: ${self:service}-${opt:stage, 'dev'}
# Function-level permissions
iam:
role:
statements:
- Effect: Allow
Action:
- dynamodb:Query
- dynamodb:GetItem
- dynamodb:PutItem
Resource:
- arn:aws:dynamodb:${aws:region}:*:table/${self:provider.environment.TABLE_NAME}
functions:
api:
handler: handler.api
events:
- http:
path: /{proxy+}
method: ANY
cors: true
authorizer:
type: COGNITO_USER_POOLS
authorizerId:
Ref: ApiGatewayAuthorizer
websocket-connect:
handler: websocket.connect
events:
- websocket:
route: $connect
authorizer:
name: auth
identitySource:
- 'route.request.querystring.token'
scheduled-task:
handler: tasks.cleanup
events:
- schedule:
rate: rate(1 hour)
enabled: true
input:
action: cleanup_expired
resources:
Resources:
# API Gateway custom domain
CustomDomain:
Type: AWS::ApiGateway::DomainName
Properties:
DomainName: api.example.com
CertificateArn: ${self:custom.certificateArn}
Container-Based Serverless
AWS Fargate
# ECS Task Definition for Fargate
task_definition = {
"family": "api-service",
"networkMode": "awsvpc",
"requiresCompatibilities": ["FARGATE"],
"cpu": "512",
"memory": "1024",
"containerDefinitions": [{
"name": "api",
"image": "123456789.dkr.ecr.us-east-1.amazonaws.com/api:latest",
"portMappings": [{
"containerPort": 8080,
"protocol": "tcp"
}],
"environment": [
{"name": "ENV", "value": "production"},
{"name": "PORT", "value": "8080"}
],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "/ecs/api-service",
"awslogs-region": "us-east-1",
"awslogs-stream-prefix": "ecs"
}
},
"healthCheck": {
"command": ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"],
"interval": 30,
"timeout": 5,
"retries": 3
}
}]
}
# Auto-scaling configuration
scaling_policy = {
"ServiceNamespace": "ecs",
"ResourceId": "service/cluster-name/service-name",
"ScalableDimension": "ecs:service:DesiredCount",
"PolicyType": "TargetTrackingScaling",
"TargetTrackingScalingPolicyConfiguration": {
"TargetValue": 70.0,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "ECSServiceAverageCPUUtilization"
},
"ScaleOutCooldown": 60,
"ScaleInCooldown": 180
}
}
Google Cloud Run
# Cloud Run service with Eventarc
import os
from flask import Flask, request
from google.cloud import pubsub_v1
import json
app = Flask(__name__)
@app.route('/', methods=['POST'])
def handle_pubsub():
"""Handle Pub/Sub push messages"""
envelope = request.get_json()
if not envelope:
return 'Bad Request', 400
# Decode Pub/Sub message
pubsub_message = envelope['message']
data = json.loads(
base64.b64decode(pubsub_message['data']).decode('utf-8')
)
# Process message
process_event(data)
return '', 204
@app.route('/jobs/<job_id>', methods=['POST'])
def handle_job(job_id):
"""Long-running job handler"""
# Cloud Run can handle up to 60 minute requests
result = process_long_running_job(job_id)
return json.dumps(result), 200
if __name__ == '__main__':
# Cloud Run sets PORT environment variable
port = int(os.environ.get('PORT', 8080))
app.run(host='0.0.0.0', port=port)
Resources:
Edge Computing
Edge Platforms Overview
// Cloudflare Workers example
addEventListener('fetch', event => {
event.respondWith(handleRequest(event.request))
})
async function handleRequest(request) {
// KV storage for edge data
const cache = caches.default
const cacheKey = new Request(request.url, request)
// Check cache
let response = await cache.match(cacheKey)
if (!response) {
// A/B testing at the edge
const variant = selectVariant(request)
// Modify request headers
const modifiedRequest = new Request(request)
modifiedRequest.headers.set('X-Variant', variant)
// Fetch from origin
response = await fetch(modifiedRequest)
// Cache for 5 minutes
response = new Response(response.body, response)
response.headers.set('Cache-Control', 'max-age=300')
event.waitUntil(cache.put(cacheKey, response.clone()))
}
return response
}
// Durable Objects for stateful edge computing
export class RateLimiter {
constructor(state, env) {
this.state = state
this.env = env
}
async fetch(request) {
const ip = request.headers.get('CF-Connecting-IP')
const key = `ratelimit:${ip}`
// Get current count
let count = (await this.state.storage.get(key)) || 0
count++
// Update count with TTL
await this.state.storage.put(key, count, {
expirationTtl: 60 // 1 minute window
})
if (count > 100) {
return new Response('Rate limit exceeded', { status: 429 })
}
return new Response('OK')
}
}
Edge Functions Architecture
// Vercel Edge Functions
import { NextRequest, NextResponse } from 'next/server'
import { geolocation, ipAddress } from '@vercel/edge'
export const config = {
runtime: 'edge',
regions: ['iad1', 'sfo1'], // Deploy to specific regions
}
export default async function handler(req: NextRequest) {
// Geolocation at the edge
const geo = geolocation(req)
const ip = ipAddress(req)
// Feature flags based on location
const features = getFeatureFlags(geo.country)
// Edge-side rendering
const html = await renderPage({
locale: geo.country,
features,
userIP: ip
})
return new NextResponse(html, {
headers: {
'content-type': 'text/html',
'cache-control': 'max-age=60, stale-while-revalidate=86400',
'x-geo-country': geo.country || 'unknown'
}
})
}
// Edge middleware for authentication
export async function middleware(request: NextRequest) {
// Verify JWT at the edge
const token = request.cookies.get('auth-token')
if (!token) {
return NextResponse.redirect(new URL('/login', request.url))
}
try {
const payload = await verifyJWT(token)
// Add user context to headers
const requestHeaders = new Headers(request.headers)
requestHeaders.set('x-user-id', payload.userId)
return NextResponse.next({
request: {
headers: requestHeaders,
},
})
} catch (error) {
return NextResponse.redirect(new URL('/login', request.url))
}
}
WebAssembly at the Edge
// Rust compiled to WASM for edge computing
use wasm_bindgen::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct ImageTransform {
width: u32,
height: u32,
format: String,
quality: u8,
}
#[wasm_bindgen]
pub async fn handle_request(req: web_sys::Request) -> Result<web_sys::Response, JsValue> {
// Parse request URL
let url = web_sys::Url::new(&req.url())?;
let params = parse_query_params(&url);
// Fetch original image
let image_response = fetch_image(¶ms.src).await?;
let image_data = image_response.array_buffer().await?;
// Transform image using WASM
let transformed = transform_image(
&image_data,
ImageTransform {
width: params.width,
height: params.height,
format: params.format,
quality: params.quality,
}
)?;
// Return optimized image
Ok(web_sys::Response::new_with_opt_buffer_source_and_init(
Some(&transformed),
web_sys::ResponseInit::new()
.status(200)
.headers(&headers)
)?)
}
#[wasm_bindgen]
pub fn transform_image(data: &[u8], transform: ImageTransform) -> Result<Vec<u8>, JsValue> {
// High-performance image processing in WASM
let img = image::load_from_memory(data)
.map_err(|e| JsValue::from_str(&e.to_string()))?;
let resized = img.resize(
transform.width,
transform.height,
image::imageops::FilterType::Lanczos3
);
// Encode to requested format
let mut output = Vec::new();
match transform.format.as_str() {
"webp" => {
resized.write_to(&mut output, image::ImageOutputFormat::WebP)
.map_err(|e| JsValue::from_str(&e.to_string()))?;
}
"avif" => {
// AVIF encoding logic
}
_ => {
resized.write_to(&mut output, image::ImageOutputFormat::Jpeg(transform.quality))
.map_err(|e| JsValue::from_str(&e.to_string()))?;
}
}
Ok(output)
}
Resources:
Serverless Databases
DynamoDB for Serverless
# DynamoDB single-table design
import boto3
from boto3.dynamodb.conditions import Key, Attr
from typing import List, Dict, Optional
import uuid
from datetime import datetime
class ServerlessDataStore:
def __init__(self, table_name: str):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def create_user(self, email: str, name: str) -> Dict:
"""Create user with single-table design"""
user_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat()
# User entity
user_item = {
'PK': f'USER#{user_id}',
'SK': f'USER#{user_id}',
'GSI1PK': f'USER_EMAIL#{email}',
'GSI1SK': f'USER#{user_id}',
'Type': 'User',
'UserId': user_id,
'Email': email,
'Name': name,
'CreatedAt': timestamp,
'UpdatedAt': timestamp
}
# Email lookup entity
email_item = {
'PK': f'EMAIL#{email}',
'SK': f'EMAIL#{email}',
'Type': 'EmailLookup',
'UserId': user_id
}
# Transactional write
self.table.transact_write_items(
TransactItems=[
{'Put': {'Item': user_item, 'TableName': self.table.table_name}},
{'Put': {'Item': email_item, 'TableName': self.table.table_name,
'ConditionExpression': 'attribute_not_exists(PK)'}}
]
)
return user_item
def get_user_with_posts(self, user_id: str) -> Dict:
"""Get user and their posts in single query"""
response = self.table.query(
KeyConditionExpression=Key('PK').eq(f'USER#{user_id}') &
Key('SK').begins_with('USER#') |
Key('SK').begins_with('POST#'),
ScanIndexForward=False
)
user = None
posts = []
for item in response['Items']:
if item['Type'] == 'User':
user = item
elif item['Type'] == 'Post':
posts.append(item)
return {
'user': user,
'posts': posts
}
def list_posts_by_date(self, limit: int = 20,
last_evaluated_key: Optional[Dict] = None) -> Dict:
"""List posts using GSI for date sorting"""
query_params = {
'IndexName': 'GSI2',
'KeyConditionExpression': Key('GSI2PK').eq('POSTS') &
Key('GSI2SK').begins_with('DATE#'),
'ScanIndexForward': False,
'Limit': limit
}
if last_evaluated_key:
query_params['ExclusiveStartKey'] = last_evaluated_key
return self.table.query(**query_params)
Serverless SQL Options
# Neon Serverless Postgres
import asyncpg
import os
from contextlib import asynccontextmanager
class ServerlessPostgres:
def __init__(self):
self.database_url = os.environ['DATABASE_URL']
self.pool = None
async def init_pool(self):
"""Initialize connection pool with serverless optimizations"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=0, # Scale to zero
max_size=3, # Limited connections for serverless
max_inactive_connection_lifetime=10, # Quick cleanup
command_timeout=30,
server_settings={
'jit': 'off', # Disable JIT for cold starts
'plan_cache_mode': 'force_generic_plan'
}
)
@asynccontextmanager
async def get_connection(self):
"""Get connection with automatic retry"""
if not self.pool:
await self.init_pool()
async with self.pool.acquire() as conn:
yield conn
async def execute_with_retry(self, query: str, *args, retries: int = 3):
"""Execute query with connection retry for serverless"""
for attempt in range(retries):
try:
async with self.get_connection() as conn:
return await conn.fetch(query, *args)
except asyncpg.exceptions.ConnectionDoesNotExistError:
if attempt == retries - 1:
raise
await asyncio.sleep(0.1 * (2 ** attempt))
Serverless Orchestration
Step Functions
# AWS Step Functions definition
step_function_definition = {
"Comment": "Order processing workflow",
"StartAt": "ValidateOrder",
"States": {
"ValidateOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:validate-order",
"Next": "CheckInventory",
"Retry": [{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}],
"Catch": [{
"ErrorEquals": ["ValidationError"],
"Next": "OrderFailed"
}]
},
"CheckInventory": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "CheckWarehouse1",
"States": {
"CheckWarehouse1": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:check-inventory",
"Parameters": {
"warehouse": "us-east-1",
"items.$": "$.items"
},
"End": true
}
}
},
{
"StartAt": "CheckWarehouse2",
"States": {
"CheckWarehouse2": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:check-inventory",
"Parameters": {
"warehouse": "us-west-2",
"items.$": "$.items"
},
"End": true
}
}
}
],
"Next": "ProcessPayment"
},
"ProcessPayment": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
"Parameters": {
"FunctionName": "process-payment",
"Payload": {
"orderId.$": "$.orderId",
"amount.$": "$.totalAmount",
"taskToken.$": "$$.Task.Token"
}
},
"TimeoutSeconds": 300,
"Next": "ShipOrder"
},
"ShipOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:ship-order",
"Next": "OrderComplete"
},
"OrderComplete": {
"Type": "Succeed"
},
"OrderFailed": {
"Type": "Fail",
"Error": "OrderProcessingFailed",
"Cause": "Order could not be processed"
}
}
}
# Lambda function for long-running tasks
def process_payment_handler(event, context):
"""Handle payment with callback to Step Functions"""
task_token = event['taskToken']
try:
# Process payment
payment_result = process_payment_async(
order_id=event['orderId'],
amount=event['amount']
)
# Send success callback
step_functions_client.send_task_success(
taskToken=task_token,
output=json.dumps({
'paymentId': payment_result['id'],
'status': 'completed'
})
)
except Exception as e:
# Send failure callback
step_functions_client.send_task_failure(
taskToken=task_token,
error='PaymentFailed',
cause=str(e)
)
Performance Optimization
Cold Start Optimization
# Minimize cold starts
import json
import os
# Global initialization - happens once per container
print("Cold start initialization")
# Lazy loading for heavy dependencies
_heavy_client = None
def get_heavy_client():
global _heavy_client
if _heavy_client is None:
import heavy_dependency
_heavy_client = heavy_dependency.Client()
return _heavy_client
# Pre-warm connections
if os.environ.get('PREWARM_CONNECTIONS'):
import boto3
dynamodb = boto3.client('dynamodb')
# Make a dummy request to establish connection
try:
dynamodb.describe_table(TableName='dummy')
except:
pass
def optimized_handler(event, context):
"""Optimized Lambda handler"""
# Use provisioned concurrency for critical functions
# Keep function warm with CloudWatch Events
# Minimize package size
# Use Lambda Layers for dependencies
# Enable HTTP keep-alive
return {
'statusCode': 200,
'body': json.dumps({'status': 'success'})
}
# Runtime optimization
class LambdaOptimizer:
@staticmethod
def minimize_deployment_package():
"""Reduce package size for faster cold starts"""
optimizations = [
"Remove unnecessary files (__pycache__, tests, docs)",
"Use Lambda Layers for large dependencies",
"Compile Python packages without debug symbols",
"Strip binaries with 'strip' command",
"Use ARM architecture (Graviton2) for better price/performance"
]
return optimizations
@staticmethod
def configure_memory():
"""Optimal memory configuration"""
return {
'CPU-bound': 3008, # Maximum CPU allocation
'IO-bound': 1024, # Balanced for I/O operations
'Minimal': 512 # Cost-optimized for simple tasks
}
Edge Caching Strategies
// Intelligent edge caching
class EdgeCacheManager {
constructor() {
this.cache = caches.default
this.analytics = new EdgeAnalytics()
}
async handleRequest(request) {
const cacheKey = this.generateCacheKey(request)
// Check cache with varying TTLs
let response = await this.cache.match(cacheKey)
if (response) {
// Validate cache freshness
const age = Date.now() - new Date(response.headers.get('date')).getTime()
const maxAge = this.calculateMaxAge(request)
if (age < maxAge) {
// Serve from cache
this.analytics.recordCacheHit()
return response
}
// Stale-while-revalidate
const staleResponse = response.clone()
// Async revalidation
this.revalidateCache(request, cacheKey)
return staleResponse
}
// Cache miss - fetch from origin
response = await this.fetchFromOrigin(request)
// Intelligent caching based on response
if (this.shouldCache(request, response)) {
await this.cacheResponse(cacheKey, response.clone())
}
return response
}
calculateMaxAge(request) {
// Dynamic TTL based on content type and patterns
const url = new URL(request.url)
if (url.pathname.match(/\.(jpg|png|gif|webp)$/)) {
return 86400 * 30 // 30 days for images
} else if (url.pathname.match(/\.(js|css)$/)) {
return 86400 * 7 // 7 days for assets
} else if (url.pathname.includes('/api/')) {
return 60 // 1 minute for API responses
}
return 300 // 5 minutes default
}
async revalidateCache(request, cacheKey) {
try {
const response = await fetch(request)
if (response.ok) {
await this.cache.put(cacheKey, response)
}
} catch (error) {
console.error('Revalidation failed:', error)
}
}
}
Monitoring and Observability
Distributed Tracing
# OpenTelemetry for serverless
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.aws_lambda import AwsLambdaInstrumentor
# Initialize tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Configure OTLP exporter
otlp_exporter = OTLPSpanExporter(
endpoint="otel-collector:4317",
insecure=True
)
# Add batch processor for efficiency
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Auto-instrument Lambda
AwsLambdaInstrumentor().instrument()
def traced_handler(event, context):
"""Lambda handler with distributed tracing"""
with tracer.start_as_current_span("process_request") as span:
# Add custom attributes
span.set_attribute("request.id", event.get('requestId'))
span.set_attribute("user.id", event.get('userId'))
try:
# Trace external calls
with tracer.start_as_current_span("database_query"):
result = query_database(event['query'])
with tracer.start_as_current_span("cache_lookup"):
cached = check_cache(event['key'])
return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR))
raise
Metrics and Alerting
# CloudFormation for serverless monitoring
Resources:
LambdaErrorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub '${AWS::StackName}-lambda-errors'
MetricName: Errors
Namespace: AWS/Lambda
Statistic: Sum
Period: 60
EvaluationPeriods: 2
Threshold: 5
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: FunctionName
Value: !Ref LambdaFunction
AlarmActions:
- !Ref SNSTopic
LambdaDurationAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub '${AWS::StackName}-lambda-duration'
MetricName: Duration
Namespace: AWS/Lambda
Statistic: Average
Period: 300
EvaluationPeriods: 2
Threshold: 3000 # 3 seconds
ComparisonOperator: GreaterThanThreshold
CustomMetrics:
Type: AWS::Logs::MetricFilter
Properties:
FilterPattern: '[timestamp, level="ERROR", ...]'
LogGroupName: !Sub '/aws/lambda/${LambdaFunction}'
MetricTransformations:
- MetricName: ApplicationErrors
MetricNamespace: CustomApp
MetricValue: '1'
DefaultValue: 0
Cost Optimization
Serverless Cost Management
# Cost optimization strategies
class ServerlessCostOptimizer:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.lambda_client = boto3.client('lambda')
def analyze_function_costs(self, function_name: str, days: int = 30):
"""Analyze Lambda function costs"""
# Get invocation metrics
invocations = self.get_metric_statistics(
function_name, 'Invocations', days
)
# Get duration metrics
duration = self.get_metric_statistics(
function_name, 'Duration', days
)
# Get function configuration
config = self.lambda_client.get_function_configuration(
FunctionName=function_name
)
memory_mb = config['MemorySize']
# Calculate costs
total_invocations = sum(i['Sum'] for i in invocations)
avg_duration_ms = sum(d['Average'] for d in duration) / len(duration)
# Compute GB-seconds
gb_seconds = (memory_mb / 1024) * (avg_duration_ms / 1000) * total_invocations
# Pricing (varies by region)
request_cost = total_invocations * 0.0000002 # $0.20 per 1M requests
compute_cost = gb_seconds * 0.0000166667 # $0.0000166667 per GB-second
return {
'total_invocations': total_invocations,
'average_duration_ms': avg_duration_ms,
'gb_seconds': gb_seconds,
'estimated_cost': {
'requests': request_cost,
'compute': compute_cost,
'total': request_cost + compute_cost
},
'recommendations': self.generate_recommendations(
memory_mb, avg_duration_ms, total_invocations
)
}
def generate_recommendations(self, memory_mb, avg_duration_ms, invocations):
"""Generate cost optimization recommendations"""
recommendations = []
# Memory optimization
if avg_duration_ms < 100 and memory_mb > 512:
recommendations.append({
'type': 'memory_reduction',
'action': 'Reduce memory to 512MB',
'estimated_savings': '40%'
})
# Provisioned concurrency analysis
if invocations > 1000000: # 1M+ invocations per month
recommendations.append({
'type': 'provisioned_concurrency',
'action': 'Consider provisioned concurrency for consistent performance',
'estimated_impact': 'Eliminate cold starts'
})
# Architecture recommendations
if avg_duration_ms > 5000:
recommendations.append({
'type': 'architecture',
'action': 'Consider moving to container-based solution (Fargate)',
'reason': 'Long-running functions may be more cost-effective as containers'
})
return recommendations
Interview Preparation
System Design Questions
- Design a global serverless API with sub-100ms latency
- Build an event-driven image processing pipeline
- Create a serverless data analytics platform
- Design a multi-region edge application
Implementation Challenges
- Implement request coalescing at the edge
- Build a serverless WebSocket system
- Create a distributed rate limiter
- Design stateful serverless workflows
Optimization Problems
- Minimize cold starts in a high-traffic system
- Optimize costs for a serverless application
- Implement edge caching with personalization
- Build a serverless CI/CD pipeline
Essential Resources
Documentation
Books
Tools
- 🔧 Serverless Framework
- 🔧 AWS SAM
- 🔧 Architect
- 🔧 SST
Communities
Remember: Serverless and edge computing represent the future of distributed systems. Master these technologies to build applications that scale infinitely while minimizing operational overhead.