Skip to main content

Serverless & Edge Computing Architecture

Master serverless platforms and edge computing to build globally distributed, event-driven applications. Learn how to architect systems that scale to zero, run at the edge, and optimize for cost and performance.

📚 Essential Resources

📖 Must-Read Books & Guides

🎥 Video Resources

🎓 Courses & Training

📰 Blogs & Articles

🔧 Essential Tools & Platforms

💬 Communities & Forums

🏆 Platform Resources

Serverless Fundamentals

Understanding Serverless Architecture

Serverless Computing Spectrum:

Traditional → VMs → Containers → Functions → Edge Functions
More Control More Abstraction
Higher Ops Lower Ops
Fixed Costs Usage-Based

Functions as a Service (FaaS)

# AWS Lambda function example
import json
import boto3
from typing import Dict, Any
import asyncio
from dataclasses import dataclass

@dataclass
class LambdaContext:
function_name: str
function_version: str
invoked_function_arn: str
memory_limit_in_mb: int
aws_request_id: str
log_group_name: str
log_stream_name: str

def get_remaining_time_in_millis(self) -> int:
"""Get remaining execution time"""
pass

def lambda_handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
"""
AWS Lambda handler with best practices
"""
# Initialize clients outside handler for connection reuse
global dynamodb_client
if 'dynamodb_client' not in globals():
dynamodb_client = boto3.client('dynamodb')

try:
# Parse event
request_body = json.loads(event.get('body', '{}'))
user_id = request_body.get('user_id')

# Business logic
result = process_user_request(user_id)

# Return API Gateway response format
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'message': 'Success',
'data': result,
'request_id': context.aws_request_id
})
}

except Exception as e:
# Error handling with structured logging
print(json.dumps({
'level': 'ERROR',
'message': str(e),
'request_id': context.aws_request_id,
'event': event
}))

return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}

# Async Lambda with Python 3.9+
async def async_lambda_handler(event: Dict[str, Any], context: LambdaContext):
"""
Async Lambda for concurrent operations
"""
# Concurrent API calls
tasks = [
fetch_user_data(event['user_id']),
fetch_preferences(event['user_id']),
fetch_recommendations(event['user_id'])
]

user_data, preferences, recommendations = await asyncio.gather(*tasks)

return {
'user': user_data,
'preferences': preferences,
'recommendations': recommendations
}

Serverless Patterns

1. Event-Driven Processing

# SQS trigger for batch processing
def sqs_batch_processor(event: Dict[str, Any], context: LambdaContext):
"""
Process SQS messages in batches
"""
successful_messages = []
failed_messages = []

for record in event['Records']:
try:
# Parse SQS message
message_body = json.loads(record['body'])

# Process message
process_message(message_body)

successful_messages.append(record['messageId'])
except Exception as e:
failed_messages.append({
'itemIdentifier': record['messageId']
})
print(f"Failed to process message: {e}")

# Return partial batch failure
return {
'batchItemFailures': failed_messages
}

# DynamoDB Streams processor
def dynamodb_stream_processor(event: Dict[str, Any], context: LambdaContext):
"""
Process DynamoDB change events
"""
for record in event['Records']:
event_name = record['eventName']

if event_name == 'INSERT':
# Handle new item
new_item = record['dynamodb']['NewImage']
index_in_elasticsearch(new_item)

elif event_name == 'MODIFY':
# Handle updates
old_item = record['dynamodb']['OldImage']
new_item = record['dynamodb']['NewImage']
sync_changes(old_item, new_item)

elif event_name == 'REMOVE':
# Handle deletions
old_item = record['dynamodb']['OldImage']
remove_from_elasticsearch(old_item)

2. API Gateway Integration

# serverless.yml configuration
service: api-service

provider:
name: aws
runtime: python3.9
memorySize: 1024
timeout: 30
environment:
STAGE: ${opt:stage, 'dev'}
TABLE_NAME: ${self:service}-${opt:stage, 'dev'}

# Function-level permissions
iam:
role:
statements:
- Effect: Allow
Action:
- dynamodb:Query
- dynamodb:GetItem
- dynamodb:PutItem
Resource:
- arn:aws:dynamodb:${aws:region}:*:table/${self:provider.environment.TABLE_NAME}

functions:
api:
handler: handler.api
events:
- http:
path: /{proxy+}
method: ANY
cors: true
authorizer:
type: COGNITO_USER_POOLS
authorizerId:
Ref: ApiGatewayAuthorizer

websocket-connect:
handler: websocket.connect
events:
- websocket:
route: $connect
authorizer:
name: auth
identitySource:
- 'route.request.querystring.token'

scheduled-task:
handler: tasks.cleanup
events:
- schedule:
rate: rate(1 hour)
enabled: true
input:
action: cleanup_expired

resources:
Resources:
# API Gateway custom domain
CustomDomain:
Type: AWS::ApiGateway::DomainName
Properties:
DomainName: api.example.com
CertificateArn: ${self:custom.certificateArn}

Container-Based Serverless

AWS Fargate

# ECS Task Definition for Fargate
task_definition = {
"family": "api-service",
"networkMode": "awsvpc",
"requiresCompatibilities": ["FARGATE"],
"cpu": "512",
"memory": "1024",
"containerDefinitions": [{
"name": "api",
"image": "123456789.dkr.ecr.us-east-1.amazonaws.com/api:latest",
"portMappings": [{
"containerPort": 8080,
"protocol": "tcp"
}],
"environment": [
{"name": "ENV", "value": "production"},
{"name": "PORT", "value": "8080"}
],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "/ecs/api-service",
"awslogs-region": "us-east-1",
"awslogs-stream-prefix": "ecs"
}
},
"healthCheck": {
"command": ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"],
"interval": 30,
"timeout": 5,
"retries": 3
}
}]
}

# Auto-scaling configuration
scaling_policy = {
"ServiceNamespace": "ecs",
"ResourceId": "service/cluster-name/service-name",
"ScalableDimension": "ecs:service:DesiredCount",
"PolicyType": "TargetTrackingScaling",
"TargetTrackingScalingPolicyConfiguration": {
"TargetValue": 70.0,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "ECSServiceAverageCPUUtilization"
},
"ScaleOutCooldown": 60,
"ScaleInCooldown": 180
}
}

Google Cloud Run

# Cloud Run service with Eventarc
import os
from flask import Flask, request
from google.cloud import pubsub_v1
import json

app = Flask(__name__)

@app.route('/', methods=['POST'])
def handle_pubsub():
"""Handle Pub/Sub push messages"""
envelope = request.get_json()
if not envelope:
return 'Bad Request', 400

# Decode Pub/Sub message
pubsub_message = envelope['message']
data = json.loads(
base64.b64decode(pubsub_message['data']).decode('utf-8')
)

# Process message
process_event(data)

return '', 204

@app.route('/jobs/<job_id>', methods=['POST'])
def handle_job(job_id):
"""Long-running job handler"""
# Cloud Run can handle up to 60 minute requests
result = process_long_running_job(job_id)

return json.dumps(result), 200

if __name__ == '__main__':
# Cloud Run sets PORT environment variable
port = int(os.environ.get('PORT', 8080))
app.run(host='0.0.0.0', port=port)

Resources:

Edge Computing

Edge Platforms Overview

// Cloudflare Workers example
addEventListener('fetch', event => {
event.respondWith(handleRequest(event.request))
})

async function handleRequest(request) {
// KV storage for edge data
const cache = caches.default
const cacheKey = new Request(request.url, request)

// Check cache
let response = await cache.match(cacheKey)

if (!response) {
// A/B testing at the edge
const variant = selectVariant(request)

// Modify request headers
const modifiedRequest = new Request(request)
modifiedRequest.headers.set('X-Variant', variant)

// Fetch from origin
response = await fetch(modifiedRequest)

// Cache for 5 minutes
response = new Response(response.body, response)
response.headers.set('Cache-Control', 'max-age=300')

event.waitUntil(cache.put(cacheKey, response.clone()))
}

return response
}

// Durable Objects for stateful edge computing
export class RateLimiter {
constructor(state, env) {
this.state = state
this.env = env
}

async fetch(request) {
const ip = request.headers.get('CF-Connecting-IP')
const key = `ratelimit:${ip}`

// Get current count
let count = (await this.state.storage.get(key)) || 0
count++

// Update count with TTL
await this.state.storage.put(key, count, {
expirationTtl: 60 // 1 minute window
})

if (count > 100) {
return new Response('Rate limit exceeded', { status: 429 })
}

return new Response('OK')
}
}

Edge Functions Architecture

// Vercel Edge Functions
import { NextRequest, NextResponse } from 'next/server'
import { geolocation, ipAddress } from '@vercel/edge'

export const config = {
runtime: 'edge',
regions: ['iad1', 'sfo1'], // Deploy to specific regions
}

export default async function handler(req: NextRequest) {
// Geolocation at the edge
const geo = geolocation(req)
const ip = ipAddress(req)

// Feature flags based on location
const features = getFeatureFlags(geo.country)

// Edge-side rendering
const html = await renderPage({
locale: geo.country,
features,
userIP: ip
})

return new NextResponse(html, {
headers: {
'content-type': 'text/html',
'cache-control': 'max-age=60, stale-while-revalidate=86400',
'x-geo-country': geo.country || 'unknown'
}
})
}

// Edge middleware for authentication
export async function middleware(request: NextRequest) {
// Verify JWT at the edge
const token = request.cookies.get('auth-token')

if (!token) {
return NextResponse.redirect(new URL('/login', request.url))
}

try {
const payload = await verifyJWT(token)

// Add user context to headers
const requestHeaders = new Headers(request.headers)
requestHeaders.set('x-user-id', payload.userId)

return NextResponse.next({
request: {
headers: requestHeaders,
},
})
} catch (error) {
return NextResponse.redirect(new URL('/login', request.url))
}
}

WebAssembly at the Edge

// Rust compiled to WASM for edge computing
use wasm_bindgen::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct ImageTransform {
width: u32,
height: u32,
format: String,
quality: u8,
}

#[wasm_bindgen]
pub async fn handle_request(req: web_sys::Request) -> Result<web_sys::Response, JsValue> {
// Parse request URL
let url = web_sys::Url::new(&req.url())?;
let params = parse_query_params(&url);

// Fetch original image
let image_response = fetch_image(&params.src).await?;
let image_data = image_response.array_buffer().await?;

// Transform image using WASM
let transformed = transform_image(
&image_data,
ImageTransform {
width: params.width,
height: params.height,
format: params.format,
quality: params.quality,
}
)?;

// Return optimized image
Ok(web_sys::Response::new_with_opt_buffer_source_and_init(
Some(&transformed),
web_sys::ResponseInit::new()
.status(200)
.headers(&headers)
)?)
}

#[wasm_bindgen]
pub fn transform_image(data: &[u8], transform: ImageTransform) -> Result<Vec<u8>, JsValue> {
// High-performance image processing in WASM
let img = image::load_from_memory(data)
.map_err(|e| JsValue::from_str(&e.to_string()))?;

let resized = img.resize(
transform.width,
transform.height,
image::imageops::FilterType::Lanczos3
);

// Encode to requested format
let mut output = Vec::new();
match transform.format.as_str() {
"webp" => {
resized.write_to(&mut output, image::ImageOutputFormat::WebP)
.map_err(|e| JsValue::from_str(&e.to_string()))?;
}
"avif" => {
// AVIF encoding logic
}
_ => {
resized.write_to(&mut output, image::ImageOutputFormat::Jpeg(transform.quality))
.map_err(|e| JsValue::from_str(&e.to_string()))?;
}
}

Ok(output)
}

Resources:

Serverless Databases

DynamoDB for Serverless

# DynamoDB single-table design
import boto3
from boto3.dynamodb.conditions import Key, Attr
from typing import List, Dict, Optional
import uuid
from datetime import datetime

class ServerlessDataStore:
def __init__(self, table_name: str):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)

def create_user(self, email: str, name: str) -> Dict:
"""Create user with single-table design"""
user_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat()

# User entity
user_item = {
'PK': f'USER#{user_id}',
'SK': f'USER#{user_id}',
'GSI1PK': f'USER_EMAIL#{email}',
'GSI1SK': f'USER#{user_id}',
'Type': 'User',
'UserId': user_id,
'Email': email,
'Name': name,
'CreatedAt': timestamp,
'UpdatedAt': timestamp
}

# Email lookup entity
email_item = {
'PK': f'EMAIL#{email}',
'SK': f'EMAIL#{email}',
'Type': 'EmailLookup',
'UserId': user_id
}

# Transactional write
self.table.transact_write_items(
TransactItems=[
{'Put': {'Item': user_item, 'TableName': self.table.table_name}},
{'Put': {'Item': email_item, 'TableName': self.table.table_name,
'ConditionExpression': 'attribute_not_exists(PK)'}}
]
)

return user_item

def get_user_with_posts(self, user_id: str) -> Dict:
"""Get user and their posts in single query"""
response = self.table.query(
KeyConditionExpression=Key('PK').eq(f'USER#{user_id}') &
Key('SK').begins_with('USER#') |
Key('SK').begins_with('POST#'),
ScanIndexForward=False
)

user = None
posts = []

for item in response['Items']:
if item['Type'] == 'User':
user = item
elif item['Type'] == 'Post':
posts.append(item)

return {
'user': user,
'posts': posts
}

def list_posts_by_date(self, limit: int = 20,
last_evaluated_key: Optional[Dict] = None) -> Dict:
"""List posts using GSI for date sorting"""
query_params = {
'IndexName': 'GSI2',
'KeyConditionExpression': Key('GSI2PK').eq('POSTS') &
Key('GSI2SK').begins_with('DATE#'),
'ScanIndexForward': False,
'Limit': limit
}

if last_evaluated_key:
query_params['ExclusiveStartKey'] = last_evaluated_key

return self.table.query(**query_params)

Serverless SQL Options

# Neon Serverless Postgres
import asyncpg
import os
from contextlib import asynccontextmanager

class ServerlessPostgres:
def __init__(self):
self.database_url = os.environ['DATABASE_URL']
self.pool = None

async def init_pool(self):
"""Initialize connection pool with serverless optimizations"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=0, # Scale to zero
max_size=3, # Limited connections for serverless
max_inactive_connection_lifetime=10, # Quick cleanup
command_timeout=30,
server_settings={
'jit': 'off', # Disable JIT for cold starts
'plan_cache_mode': 'force_generic_plan'
}
)

@asynccontextmanager
async def get_connection(self):
"""Get connection with automatic retry"""
if not self.pool:
await self.init_pool()

async with self.pool.acquire() as conn:
yield conn

async def execute_with_retry(self, query: str, *args, retries: int = 3):
"""Execute query with connection retry for serverless"""
for attempt in range(retries):
try:
async with self.get_connection() as conn:
return await conn.fetch(query, *args)
except asyncpg.exceptions.ConnectionDoesNotExistError:
if attempt == retries - 1:
raise
await asyncio.sleep(0.1 * (2 ** attempt))

Serverless Orchestration

Step Functions

# AWS Step Functions definition
step_function_definition = {
"Comment": "Order processing workflow",
"StartAt": "ValidateOrder",
"States": {
"ValidateOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:validate-order",
"Next": "CheckInventory",
"Retry": [{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}],
"Catch": [{
"ErrorEquals": ["ValidationError"],
"Next": "OrderFailed"
}]
},
"CheckInventory": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "CheckWarehouse1",
"States": {
"CheckWarehouse1": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:check-inventory",
"Parameters": {
"warehouse": "us-east-1",
"items.$": "$.items"
},
"End": true
}
}
},
{
"StartAt": "CheckWarehouse2",
"States": {
"CheckWarehouse2": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:check-inventory",
"Parameters": {
"warehouse": "us-west-2",
"items.$": "$.items"
},
"End": true
}
}
}
],
"Next": "ProcessPayment"
},
"ProcessPayment": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
"Parameters": {
"FunctionName": "process-payment",
"Payload": {
"orderId.$": "$.orderId",
"amount.$": "$.totalAmount",
"taskToken.$": "$$.Task.Token"
}
},
"TimeoutSeconds": 300,
"Next": "ShipOrder"
},
"ShipOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:ship-order",
"Next": "OrderComplete"
},
"OrderComplete": {
"Type": "Succeed"
},
"OrderFailed": {
"Type": "Fail",
"Error": "OrderProcessingFailed",
"Cause": "Order could not be processed"
}
}
}

# Lambda function for long-running tasks
def process_payment_handler(event, context):
"""Handle payment with callback to Step Functions"""
task_token = event['taskToken']

try:
# Process payment
payment_result = process_payment_async(
order_id=event['orderId'],
amount=event['amount']
)

# Send success callback
step_functions_client.send_task_success(
taskToken=task_token,
output=json.dumps({
'paymentId': payment_result['id'],
'status': 'completed'
})
)
except Exception as e:
# Send failure callback
step_functions_client.send_task_failure(
taskToken=task_token,
error='PaymentFailed',
cause=str(e)
)

Performance Optimization

Cold Start Optimization

# Minimize cold starts
import json
import os

# Global initialization - happens once per container
print("Cold start initialization")

# Lazy loading for heavy dependencies
_heavy_client = None

def get_heavy_client():
global _heavy_client
if _heavy_client is None:
import heavy_dependency
_heavy_client = heavy_dependency.Client()
return _heavy_client

# Pre-warm connections
if os.environ.get('PREWARM_CONNECTIONS'):
import boto3
dynamodb = boto3.client('dynamodb')
# Make a dummy request to establish connection
try:
dynamodb.describe_table(TableName='dummy')
except:
pass

def optimized_handler(event, context):
"""Optimized Lambda handler"""
# Use provisioned concurrency for critical functions
# Keep function warm with CloudWatch Events

# Minimize package size
# Use Lambda Layers for dependencies
# Enable HTTP keep-alive

return {
'statusCode': 200,
'body': json.dumps({'status': 'success'})
}

# Runtime optimization
class LambdaOptimizer:
@staticmethod
def minimize_deployment_package():
"""Reduce package size for faster cold starts"""
optimizations = [
"Remove unnecessary files (__pycache__, tests, docs)",
"Use Lambda Layers for large dependencies",
"Compile Python packages without debug symbols",
"Strip binaries with 'strip' command",
"Use ARM architecture (Graviton2) for better price/performance"
]
return optimizations

@staticmethod
def configure_memory():
"""Optimal memory configuration"""
return {
'CPU-bound': 3008, # Maximum CPU allocation
'IO-bound': 1024, # Balanced for I/O operations
'Minimal': 512 # Cost-optimized for simple tasks
}

Edge Caching Strategies

// Intelligent edge caching
class EdgeCacheManager {
constructor() {
this.cache = caches.default
this.analytics = new EdgeAnalytics()
}

async handleRequest(request) {
const cacheKey = this.generateCacheKey(request)

// Check cache with varying TTLs
let response = await this.cache.match(cacheKey)

if (response) {
// Validate cache freshness
const age = Date.now() - new Date(response.headers.get('date')).getTime()
const maxAge = this.calculateMaxAge(request)

if (age < maxAge) {
// Serve from cache
this.analytics.recordCacheHit()
return response
}

// Stale-while-revalidate
const staleResponse = response.clone()

// Async revalidation
this.revalidateCache(request, cacheKey)

return staleResponse
}

// Cache miss - fetch from origin
response = await this.fetchFromOrigin(request)

// Intelligent caching based on response
if (this.shouldCache(request, response)) {
await this.cacheResponse(cacheKey, response.clone())
}

return response
}

calculateMaxAge(request) {
// Dynamic TTL based on content type and patterns
const url = new URL(request.url)

if (url.pathname.match(/\.(jpg|png|gif|webp)$/)) {
return 86400 * 30 // 30 days for images
} else if (url.pathname.match(/\.(js|css)$/)) {
return 86400 * 7 // 7 days for assets
} else if (url.pathname.includes('/api/')) {
return 60 // 1 minute for API responses
}

return 300 // 5 minutes default
}

async revalidateCache(request, cacheKey) {
try {
const response = await fetch(request)
if (response.ok) {
await this.cache.put(cacheKey, response)
}
} catch (error) {
console.error('Revalidation failed:', error)
}
}
}

Monitoring and Observability

Distributed Tracing

# OpenTelemetry for serverless
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.aws_lambda import AwsLambdaInstrumentor

# Initialize tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Configure OTLP exporter
otlp_exporter = OTLPSpanExporter(
endpoint="otel-collector:4317",
insecure=True
)

# Add batch processor for efficiency
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# Auto-instrument Lambda
AwsLambdaInstrumentor().instrument()

def traced_handler(event, context):
"""Lambda handler with distributed tracing"""
with tracer.start_as_current_span("process_request") as span:
# Add custom attributes
span.set_attribute("request.id", event.get('requestId'))
span.set_attribute("user.id", event.get('userId'))

try:
# Trace external calls
with tracer.start_as_current_span("database_query"):
result = query_database(event['query'])

with tracer.start_as_current_span("cache_lookup"):
cached = check_cache(event['key'])

return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR))
raise

Metrics and Alerting

# CloudFormation for serverless monitoring
Resources:
LambdaErrorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub '${AWS::StackName}-lambda-errors'
MetricName: Errors
Namespace: AWS/Lambda
Statistic: Sum
Period: 60
EvaluationPeriods: 2
Threshold: 5
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: FunctionName
Value: !Ref LambdaFunction
AlarmActions:
- !Ref SNSTopic

LambdaDurationAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub '${AWS::StackName}-lambda-duration'
MetricName: Duration
Namespace: AWS/Lambda
Statistic: Average
Period: 300
EvaluationPeriods: 2
Threshold: 3000 # 3 seconds
ComparisonOperator: GreaterThanThreshold

CustomMetrics:
Type: AWS::Logs::MetricFilter
Properties:
FilterPattern: '[timestamp, level="ERROR", ...]'
LogGroupName: !Sub '/aws/lambda/${LambdaFunction}'
MetricTransformations:
- MetricName: ApplicationErrors
MetricNamespace: CustomApp
MetricValue: '1'
DefaultValue: 0

Cost Optimization

Serverless Cost Management

# Cost optimization strategies
class ServerlessCostOptimizer:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.lambda_client = boto3.client('lambda')

def analyze_function_costs(self, function_name: str, days: int = 30):
"""Analyze Lambda function costs"""
# Get invocation metrics
invocations = self.get_metric_statistics(
function_name, 'Invocations', days
)

# Get duration metrics
duration = self.get_metric_statistics(
function_name, 'Duration', days
)

# Get function configuration
config = self.lambda_client.get_function_configuration(
FunctionName=function_name
)

memory_mb = config['MemorySize']

# Calculate costs
total_invocations = sum(i['Sum'] for i in invocations)
avg_duration_ms = sum(d['Average'] for d in duration) / len(duration)

# Compute GB-seconds
gb_seconds = (memory_mb / 1024) * (avg_duration_ms / 1000) * total_invocations

# Pricing (varies by region)
request_cost = total_invocations * 0.0000002 # $0.20 per 1M requests
compute_cost = gb_seconds * 0.0000166667 # $0.0000166667 per GB-second

return {
'total_invocations': total_invocations,
'average_duration_ms': avg_duration_ms,
'gb_seconds': gb_seconds,
'estimated_cost': {
'requests': request_cost,
'compute': compute_cost,
'total': request_cost + compute_cost
},
'recommendations': self.generate_recommendations(
memory_mb, avg_duration_ms, total_invocations
)
}

def generate_recommendations(self, memory_mb, avg_duration_ms, invocations):
"""Generate cost optimization recommendations"""
recommendations = []

# Memory optimization
if avg_duration_ms < 100 and memory_mb > 512:
recommendations.append({
'type': 'memory_reduction',
'action': 'Reduce memory to 512MB',
'estimated_savings': '40%'
})

# Provisioned concurrency analysis
if invocations > 1000000: # 1M+ invocations per month
recommendations.append({
'type': 'provisioned_concurrency',
'action': 'Consider provisioned concurrency for consistent performance',
'estimated_impact': 'Eliminate cold starts'
})

# Architecture recommendations
if avg_duration_ms > 5000:
recommendations.append({
'type': 'architecture',
'action': 'Consider moving to container-based solution (Fargate)',
'reason': 'Long-running functions may be more cost-effective as containers'
})

return recommendations

Interview Preparation

System Design Questions

  1. Design a global serverless API with sub-100ms latency
  2. Build an event-driven image processing pipeline
  3. Create a serverless data analytics platform
  4. Design a multi-region edge application

Implementation Challenges

  1. Implement request coalescing at the edge
  2. Build a serverless WebSocket system
  3. Create a distributed rate limiter
  4. Design stateful serverless workflows

Optimization Problems

  1. Minimize cold starts in a high-traffic system
  2. Optimize costs for a serverless application
  3. Implement edge caching with personalization
  4. Build a serverless CI/CD pipeline

Essential Resources

Documentation

Books

Tools

Communities

Remember: Serverless and edge computing represent the future of distributed systems. Master these technologies to build applications that scale infinitely while minimizing operational overhead.