Distributed Systems Theory
Distributed systems fail in ways single machines never do—partial failures, network partitions, clock skew, and split-brain leaders. This chapter covers the theory senior engineers need: consensus, locks, transactions, clocks, failure detection, and conflict-free replicated data types—with enough depth to survive L6 deep-dives.
Fallacies of distributed computing
Peter Deutsch's eight fallacies are assumptions that cause production outages when treated as truths. Every distributed system design should explicitly challenge each one.
| # | Fallacy | Reality | Design implication |
|---|---|---|---|
| 1 | The network is reliable | Packets drop, switches fail, AZs partition | Retries with backoff, circuit breakers, idempotency keys |
| 2 | Latency is zero | Cross-AZ: 1–5 ms; cross-region: 50–200 ms | Batch requests, async where possible, regional data placement |
| 3 | Bandwidth is infinite | 10 Gbps links saturate; egress costs money | Compress payloads, delta sync, CDN offload |
| 4 | The network is secure | MITM, BGP hijacks, insider threats | TLS everywhere, mTLS service mesh, zero trust |
| 5 | Topology doesn't change | Nodes join/leave, autoscaling, deploys | Service discovery, health checks, gossip membership |
| 6 | There is one administrator | Multiple teams, on-call rotations, config drift | GitOps, IaC, audit logs, blast radius limits |
| 7 | Transport cost is zero | Serialization, encryption, NAT traversal cost CPU | Binary protocols (Protobuf), connection pooling |
| 8 | The network is homogeneous | Mixed hardware, cloud + on-prem, mobile clients | Graceful degradation, capability negotiation |
Fallacy #1 kills the most systems. Synchronous RPC chains assume the network works—one slow downstream blocks the entire call tree. Design for partial failure: timeouts, bulkheads, and async boundaries.
Open deep-dives with "I'd design assuming the network is unreliable—timeouts on every RPC, idempotent handlers, and no distributed transactions across services." Shows maturity immediately.
Consensus
Consensus algorithms let unreliable nodes agree on a single value or ordered log despite failures. Foundation for leader election, distributed locks, and replicated state machines.
Paxos
Leslie Lamport's Paxos (1990) is the theoretical foundation. Proposers, acceptors, and learners coordinate in two phases (prepare + accept) to agree on a value. Correct but notoriously difficult to implement and reason about. Google Chubby, early ZooKeeper internals, and Spanner's Paxos groups build on these ideas.
Raft — leader election + log replication
Raft (2014) decomposes consensus into leader election, log replication, and safety—designed to be understandable. Used by etcd, Consul, TiKV, and KRaft (Kafka metadata).
stateDiagram-v2
[*] --> Follower
Follower --> Candidate: election timeout
Candidate --> Leader: wins majority vote
Candidate --> Follower: discovers higher term
Leader --> Follower: discovers higher term
Follower --> Follower: receives AppendEntries from leader
note right of Leader
Client writes go to leader
Leader appends to log
Replicates to majority
Commits then applies
end note
| Property | Paxos | Raft |
|---|---|---|
| Understandability | Notoriously difficult | Designed for teaching; "Understandable Consensus" |
| Leader | Multi-Paxos uses stable leader | Strong leader model; all client writes via leader |
| Log | Can have gaps in early formulations | Contiguous committed log; strong ordering |
| Production use | Chubby, Spanner | etcd, Consul, KRaft, CockroachDB (Raft-inspired) |
Raft log replication (simplified)
- Client sends command to leader
- Leader appends entry to its log, assigns term + index
- Leader sends
AppendEntriesRPC to followers - Entry committed once replicated on majority
- Leader applies to state machine; responds to client
- Followers apply committed entries in order
Never roll your own consensus. Edge cases in leader election, log truncation, and network partitions take years to get right. Use etcd, Consul, or managed coordination (DynamoDB conditional writes for simpler cases). Rolling your own is a top interview red flag.
Raft requires majority quorum for commit: 3 nodes tolerate 1 failure, 5 nodes tolerate 2. Even numbers (4, 6) waste a node—always use odd counts. Leader election uses randomized timeouts (150–300 ms) to reduce split-vote probability.
Distributed locks
Locks coordinate exclusive access across nodes—for leader election, cron deduplication, or resource serialization. Naive Redis locks fail silently under process pauses and clock drift.
Redis SETNX
Basic pattern: SET key unique_token NX EX 30. Only set if not exists, with TTL to prevent
deadlock if holder crashes. Release with Lua script comparing token before delete—prevents deleting another holder's lock.
-- Safe lock release (Redis Lua)
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
Redlock critique
Redis Redlock algorithm (Salvatore Sanfilippo) acquires locks on N independent Redis masters with quorum. Martin Kleppmann's critique (2016): if the lock holder pauses (GC, VM migration) longer than TTL, another process can acquire the lock—both believe they hold it. Fencing tokens are required: monotonically increasing token passed to storage; storage rejects writes with stale tokens.
| Mechanism | Coordination | Fencing support | Best for |
|---|---|---|---|
| Redis SETNX | Single Redis node | Manual token in app logic | Low-stakes dedup; cache stampede mutex |
| Redlock | Quorum across N Redis nodes | Still needs app-side fencing | Contested; prefer ZK/etcd for correctness-critical |
| ZooKeeper | Ephemeral sequential znodes | Sequential number acts as fence | Leader election, job scheduling (Curator recipes) |
| etcd | Lease + transaction (Raft-backed) | Revision number for fencing | K8s coordination, distributed config, locks |
L6 answer: "Redis lock for best-effort mutex (cache rebuild). For financial exclusivity, etcd lease with
fencing token checked by the database—reject writes where token < max_seen_token.
Never assume TTL locks are mutually exclusive under GC pause."
Distributed transactions
ACID across microservices is an anti-pattern at scale. Modern systems use sagas, outbox patterns, and eventual consistency instead of two-phase commit.
Two-Phase Commit (2PC)
Coordinator sends prepare to all participants → all vote commit/abort → coordinator sends commit/abort. Blocking: if coordinator crashes after prepare, participants hold locks indefinitely. Not used across microservices in practice.
sequenceDiagram participant C as Coordinator participant A as Service A participant B as Service B C->>A: prepare C->>B: prepare A-->>C: vote commit B-->>C: vote commit C->>A: commit C->>B: commit
Three-Phase Commit (3PC)
Adds pre-commit phase to reduce blocking—participants can timeout and abort if coordinator disappears after pre-commit. Still rarely used; network partitions can cause inconsistency.
Saga pattern
| Style | Orchestration | Choreography |
|---|---|---|
| Control flow | Central saga orchestrator directs each step | Services react to events; no central brain |
| Visibility | Easy to trace state in one place | Harder to debug; event graph spread across services |
| Coupling | Orchestrator knows all services | Loose coupling; services know only their events |
| Compensation | Orchestrator triggers rollback steps explicitly | Each service listens for failure events and compensates |
| Example | Temporal workflow, custom saga service | Kafka events: OrderPlaced → PaymentProcessed → ShipmentScheduled |
Transactional Outbox
Problem: how to atomically update DB and publish to Kafka? Outbox pattern: write business row + outbox row in same DB transaction. Separate relay process (Debezium CDC or polling publisher) reads outbox and publishes to message bus. Guarantees at-least-once delivery without 2PC across DB and broker.
flowchart LR App[Order Service] DB[(Orders DB\n+ outbox table)] Relay[Outbox Relay\nDebezium / Poller] Kafka[Kafka] App -->|single TX| DB Relay -->|read outbox| DB Relay -->|publish| Kafka
2PC gives atomicity but blocks and doesn't scale across service boundaries. Saga gives eventual consistency with explicit compensation—design for idempotent steps and duplicate event handling. Default to saga + outbox for microservices; reserve 2PC for single-database transactions.
"I avoid 2PC across services—it blocks and couples availability. Order flow: saga with choreography via Kafka, each step idempotent, compensating transactions on failure. Outbox ensures DB write and event publish are atomic."
Clocks
Time is not universal in distributed systems. NTP drift, leap seconds, and VM clock jumps make "last write wins" dangerous without careful clock semantics.
| Clock type | Mechanism | Ordering guarantee | Used by |
|---|---|---|---|
| Physical (wall clock) | NTP-synchronized UTC timestamp | None across nodes—drift ±ms to seconds | Logs, TTL, human-readable timestamps |
| Lamport clock | Counter incremented on send/receive | Happens-before for causally related events | Distributed debugging, basic ordering |
| Vector clock | Vector of counters per node | Detects concurrent writes (conflicts) | Dynamo, Riak conflict detection, CRDT foundations |
| HLC (Hybrid Logical Clock) | Physical time + logical counter | Causal + approximate wall time | CockroachDB, MongoDB causal consistency |
| TrueTime | GPS + atomic clocks; uncertainty interval | External consistency via commit wait | Google Spanner |
TrueTime and Spanner
Spanner assigns every transaction a commit timestamp from TrueTime—a bounded uncertainty interval (e.g., [earliest, latest]). Before commit, Spanner waits out the uncertainty window so all transactions get globally consistent timestamps. This provides external consistency (linearizability) at the cost of added commit latency (~7 ms wait typical).
Never use System.currentTimeMillis() for ordering across nodes. NTP can jump backward after VM snapshot restore.
Use database-generated IDs, ULIDs, or logical clocks for ordering. Physical clocks are fine for TTL and metrics only.
Leader election patterns
Many distributed components need exactly one active leader—scheduler, write coordinator, or singleton worker. Election must handle split-brain and fail over quickly without duplicate leaders.
| Pattern | Mechanism | Failover time | Examples |
|---|---|---|---|
| Raft built-in | Followers timeout → candidate → leader on majority | ~150–300 ms + election | etcd, KRaft, Consul |
| ZooKeeper ephemeral sequential | Lowest sequence ephemeral znode = leader | Session timeout (default 10s) | Kafka (legacy), Hadoop, Curator LeaderSelector |
| Kubernetes Lease | coordination.k8s.io/v1 Lease object; renew before expiry | Lease duration (15–60s typical) | controller-manager, custom operators |
| Bully algorithm | Highest-ID node wins; election on leader failure | O(n) messages | Academic; rarely production |
| Ring election | Token passed in logical ring | O(n) traversal | Academic; rarely production |
flowchart TB N1[Node 1\nFollower] N2[Node 2\nLEADER] N3[Node 3\nFollower] ETCD[(etcd\nRaft quorum)] N2 -->|heartbeat lease| ETCD N1 & N3 -->|watch leader key| ETCD N2 -.->|fails| X[leader key expires] X --> N1 X --> N3 N1 -->|wins election| ETCD N1 -->|becomes leader| N1
Leader election ≠ load balancing. Leaders coordinate writes; followers can still serve reads (Raft) or stand by idle. For work distribution across nodes, use consistent hashing or a queue—not leader election.
Failure detection
You cannot distinguish "slow" from "dead" with certainty—only probability. Failure detectors trade false positives (declaring alive nodes dead) against detection latency.
| Method | How it works | Strength | Weakness |
|---|---|---|---|
| Heartbeat + timeout | Periodic ping; suspect dead after N missed intervals | Simple; ubiquitous (K8s liveness, Redis sentinel) | Fixed timeout wrong for variable latency (GC pause = false positive) |
| Phi accrual failure detector | Models arrival distribution of heartbeats; outputs suspicion level φ | Adapts to network jitter; used in Akka, Cassandra | More complex; needs tuning of thresholds |
| SWIM (Scalable Weakly-consistent Infection-style Membership) | Random peer probing + indirect ping; disseminates via gossip | O(log n) probe traffic; scalable to thousands of nodes | Eventually consistent membership view |
Timeout tuning
- Too aggressive — GC pauses trigger false failover, split-brain, cascading restarts
- Too lenient — real failures take minutes to detect; extended downtime
- Rule of thumb — timeout > p99.9 peer response time × 3–5; account for GC (Java: 1–5s for full GC)
- Separate liveness from slowness — health check "can respond" vs SLA "responds in <100ms"
HashiCorp Consul uses SWIM for member failure detection and gossip-based dissemination.
Amazon Dynamo pioneered phi accrual in production. Kubernetes uses simple
heartbeat probes—tune failureThreshold and periodSeconds per workload.
Gossip protocol
Gossip (epidemic) protocols spread information probabilistically—each node periodically shares state with random peers. Eventually every node converges without central coordination.
flowchart LR A[Node A\nstate v3] -->|push/pull| B[Node B] B -->|push/pull| C[Node C] C -->|push/pull| D[Node D] D -->|push/pull| A B -.->|anti-entropy| D
Properties
- O(log n) convergence — information spreads exponentially with rounds
- Fault tolerant — no single point of failure; works with partial connectivity
- Eventually consistent — nodes may have stale views temporarily
- Bandwidth efficient — small periodic messages vs full mesh broadcast
Use cases
| System | Gossip carries |
|---|---|
| Cassandra | Cluster membership, schema versions, failure suspicion |
| Consul | Member list, health status, coordinate updates (Serf library) |
| Redis Cluster | Slot map, node failures (cluster bus gossip) |
| DynamoDB | Membership and hint handoff (Dynamo heritage) |
Gossip is wrong when you need immediate consistency—use Raft/etcd for authoritative state. Gossip excels for membership, health dissemination, and soft state that self-heals on convergence.
CRDTs — conflict-free replicated data types
CRDTs are data structures that merge concurrent updates without coordination—mathematically guaranteed to converge. Power collaborative apps where offline edits and partition tolerance matter more than strong consistency.
Two families
| Family | Mechanism | Examples | Trade-off |
|---|---|---|---|
| State-based (CvRDT) | Merge full state with join function | G-Counter, PN-Counter, G-Set | Large state transfer on sync |
| Operation-based (CmRDT) | Replay commutative operations | OT (operational transform), RGA | Requires reliable causal delivery |
Common CRDT types
| CRDT | Behavior | Use case |
|---|---|---|
| G-Counter | Grow-only counter per node; merge = max per slot | View counts, analytics (eventual accuracy) |
| PN-Counter | Increment + decrement counters | Inventory with adds/removes |
| LWW-Register | Last-write-wins by timestamp | Simple key-value; tombstone deletes |
| OR-Set | Observed-remove set; add wins over remove if concurrent | Collaborative tag lists, participant sets |
| RGA / LSEQ | Sequence CRDT for text/list positions | Collaborative text editing |
Real-world use cases
Figma uses CRDTs (and custom conflict resolution) for multiplayer design editing—multiple designers move objects simultaneously; merges happen locally without a central lock. Offline edits sync on reconnect.
Notion applies CRDT-inspired techniques for block-level collaborative editing—each block is independently mergeable; operational transforms handle rich text within blocks. Mobile offline mode queues operations that merge on sync.
flowchart TB
subgraph clientA [Designer A — offline]
A1[Move icon x+10]
A2[Add rectangle]
end
subgraph clientB [Designer B — online]
B1[Move icon y+5]
B2[Change color blue]
end
Merge[CRDT merge\ncommutative join]
State[Converged document\nboth edits preserved]
A1 & A2 --> Merge
B1 & B2 --> Merge
Merge --> State
Apple Notes and Google Docs use OT (Operational Transformation) rather than pure CRDTs— similar goal, different math. Redis CRDT (Enterprise) provides CRDT data types for multi-master replication. Automerge is an open-source CRDT library for JSON documents.
"Collaborative editor: CRDT or OT for conflict-free merge, WebSocket for real-time sync, optional central server for persistence. Trade strong consistency for availability under partition—users prefer merged edits over error dialogs. Figma/Notion proved this at scale."
If asked "design Google Docs": mention OT/CRDT for text merge, WebSocket fan-out, version vectors for causality, and S3/database for snapshot persistence. Don't propose single-leader locking for every keystroke.