Building Dynamo from scratch in Go: Part 5
Part 5: Membership, Failure Detection, and Replica Synchronization
This series
- Part 1: The Big Picture
- Part 2: Partitioning with Consistent Hashing
- Part 3: Vector Clocks and Conflict Resolution
- Part 4: Request Coordination, Quorums, and Failure Handling
- Part 5: Membership, Failure Detection, and Replica Synchronization (this Part)
- Part 6: Production Concerns - RPC, Resilience, Performance, and Testing
This is Part 5 of a 6-part series walking through a from-scratch Go implementation of Amazon's Dynamo. In Part 4 we built the distributed read/write path with quorum semantics, read repair, and hinted handoff. But we hand-waved over two critical questions: how do nodes discover each other, and how do diverged replicas get back in sync? This Part answers both.
Outline
- Three Missing Pieces
- Gossip Protocol
- Phi Accrual Failure Detection
- Merkle Trees and Anti-Entropy
- The Background Loops
- How It All Ties Together
- What's Next
Three Missing Pieces
At the end of Part 4, our system could fan out reads and writes to multiple replicas, handle timeouts, and store hints for failed nodes. But several things were quietly assumed:
Membership - The coordinator looked up node addresses from a membership table, but we never explained how that table was populated. How does a node joining the cluster discover the other nodes? How do all nodes converge on the same view of cluster membership?
Failure detection - The coordinator skipped nodes with Status != StatusAlive, but we never explained how a node gets marked as suspected or dead. How do you tell the difference between "this node is slow" and "this node has crashed"?
Background synchronization - Read repair (Part 4) fixes stale replicas opportunistically during reads. But what about keys that are never read? Over time, replicas can drift arbitrarily far apart. How do you detect and repair this drift efficiently, without scanning every key?
This Part introduces three subsystems that address these problems: a gossip protocol for membership, a phi accrual failure detector, and Merkle tree-based anti-entropy.
The Node struct grows by two fields:
type Node struct {
// ... all fields from Parts 1-4 ...
failDetector *membership.FailureDetector // <- NEW
antiEntropy *synchronization.AntiEntropy // <- NEW
}
And Config gains:
type Config struct {
// ... existing fields ...
GossipInterval time.Duration // <- NEW (default: 1s)
AntiEntropyInterval time.Duration // <- NEW (default: 60s)
}
Gossip Protocol
Gossip is how Dynamo maintains a decentralized, eventually consistent view of cluster membership without any central authority. The protocol is simple: every second, each node picks a random peer and exchanges membership information. Over multiple rounds, updates propagate through the entire cluster - like a rumor spreading through a crowd.
The Member and Membership Structs
Each node maintains a local membership table:
// pkg/membership/gossip.go
type Membership struct {
localID string
address string
members map[string]*Member // nodeID -> member info
config types.Config
rpcClient *rpc.Client
failureDetector *FailureDetector
mu sync.RWMutex
}
type Member struct {
NodeID string
Address string
Status MemberStatus // Alive, Suspected, Dead
Heartbeat uint64 // monotonically increasing counter
Tokens []uint32 // ring positions (from Part 2)
Timestamp time.Time // last known update time
}
type MemberStatus int
const (
StatusAlive MemberStatus = iota
StatusSuspected
StatusDead
)
The Heartbeat field is the key mechanism. Each node increments its own heartbeat counter every gossip round. When you receive a member record with a higher heartbeat than what you have locally, you know it's fresher information - so you adopt it. This gives us a simple, decentralized rule for merging membership views: higher heartbeat wins.
The Tokens field carries the node's ring positions (from Part 2). This is how new nodes learn where existing nodes sit on the consistent hash ring - the membership table is the gossip medium for ring topology.
One Gossip Round
func (m *Membership) Gossip() {
// Increment our own heartbeat
m.mu.Lock()
local := m.members[m.localID]
if local != nil {
local.Heartbeat++
local.Timestamp = time.Now()
}
m.mu.Unlock()
// Select a random peer
peer := m.selectRandomPeer()
if peer == "" {
return
}
// Exchange membership info
m.gossipWith(peer)
}
Three steps. First, bump our own heartbeat - this is how other nodes know we're still alive. Second, pick a random peer. Third, exchange information with that peer.
Peer selection is simple randomization:
func (m *Membership) selectRandomPeer() string {
m.mu.RLock()
defer m.mu.RUnlock()
if len(m.members) <= 1 {
return "" // no one to gossip with
}
peers := make([]string, 0, len(m.members)-1)
for id := range m.members {
if id != m.localID {
peers = append(peers, id)
}
}
if len(peers) == 0 {
return ""
}
return peers[rand.Intn(len(peers))]
}
The randomness is important. Deterministic peer selection (like round-robin) would create predictable communication patterns that could lead to uneven propagation. Random selection ensures that, probabilistically, updates reach all nodes in O(log N) gossip rounds - even in large clusters.
The Exchange: Push-Pull Gossip
The actual exchange sends our full membership table to the peer and receives theirs:
func (m *Membership) gossipWith(peerID string) {
if m.rpcClient == nil {
return
}
m.mu.RLock()
peer := m.members[peerID]
if peer == nil {
m.mu.RUnlock()
return
}
peerAddress := peer.Address
memberDTOs := m.buildMemberDTOs()
m.mu.RUnlock()
// RPC exchange
ctx, cancel := context.WithTimeout(context.Background(),
m.config.GetRequestTimeout())
defer cancel()
resp, err := m.rpcClient.Gossip(ctx, peerAddress, m.localID, memberDTOs)
if err != nil {
return // peer might be down - failure detector handles this
}
// Merge their view into ours
m.mergeMembers(resp.Members)
}
This is push-pull gossip: we push our view to the peer and pull theirs back. Both sides merge the received information. This is more efficient than push-only gossip because a single round allows bidirectional information flow.
The merge rule is the higher-heartbeat-wins rule:
func (m *Membership) mergeMembers(members []rpc.MemberDTO) {
m.mu.Lock()
defer m.mu.Unlock()
for _, dto := range members {
existing := m.members[dto.NodeID]
if existing == nil || dto.Heartbeat > existing.Heartbeat {
m.members[dto.NodeID] = &Member{
NodeID: dto.NodeID,
Address: dto.Address,
Status: MemberStatus(dto.Status),
Heartbeat: dto.Heartbeat,
Tokens: dto.Tokens,
Timestamp: dto.Timestamp,
}
// Record heartbeat for failure detection
if m.failureDetector != nil && dto.NodeID != m.localID {
m.failureDetector.RecordHeartbeat(dto.NodeID)
}
}
}
}
A new member (one we haven't seen before) is always added. An existing member is updated only if the incoming heartbeat is higher - this prevents stale information from overwriting fresher data. Every successful merge also records a heartbeat with the failure detector, which feeds into the phi accrual algorithm we'll see next.
The test suite verifies these properties:
func TestGossipHeartbeatIncrement(t *testing.T) {
config := newMockConfig()
m := membership.NewMembership("node1", "localhost:8000", config)
m.AddMember(&membership.Member{
NodeID: "node1", Status: membership.StatusAlive, Heartbeat: 1,
// ...
})
m.Gossip()
got := m.GetMember("node1")
// got.Heartbeat == 2
m.Gossip()
got = m.GetMember("node1")
// got.Heartbeat == 3
}
Joining a Cluster
When a brand-new node starts, it doesn't know about any other nodes. It bootstraps by contacting one or more seed nodes - well-known addresses that are configured ahead of time:
func (n *Node) Join(seeds []string) error {
if len(seeds) == 0 {
return nil // first node in cluster
}
for _, seed := range seeds {
members, err := n.membership.SyncWithSeed(seed)
if err != nil {
continue
}
// Add discovered members to our ring
for _, member := range members {
if member.NodeID != n.id {
n.ring.AddNodeWithTokens(member.NodeID, member.Tokens)
n.membership.AddMember(member)
}
}
return nil
}
return fmt.Errorf("failed to join cluster: no seed reachable")
}
SyncWithSeed is essentially a one-shot gossip exchange - it sends our membership (just ourselves) and receives the seed's full membership table. After this initial sync, the regular gossip loop takes over and keeps the membership converging.
The seed node simultaneously learns about the joining node through the same exchange. Within a few gossip rounds, every node in the cluster knows about the new member.
On the Receiving Side
When a node receives a gossip message via RPC, the Node.HandleGossip method processes it:
func (n *Node) HandleGossip(members []rpc.MemberDTO) []rpc.MemberDTO {
for _, m := range members {
existing := n.membership.GetMember(m.NodeID)
if existing == nil || m.Heartbeat > existing.Heartbeat {
member := &membership.Member{
NodeID: m.NodeID, Address: m.Address,
Status: membership.MemberStatus(m.Status),
Heartbeat: m.Heartbeat, Tokens: m.Tokens,
Timestamp: m.Timestamp,
}
n.membership.AddMember(member)
// New node? Add to ring
if existing == nil && m.NodeID != n.id {
n.ring.AddNodeWithTokens(m.NodeID, m.Tokens)
}
}
}
// Return our membership list
localMembers := n.membership.GetAllMembers()
result := make([]rpc.MemberDTO, len(localMembers))
// ... convert to DTOs ...
return result
}
This is where gossip updates the ring. When a node learns about a previously unknown member, it adds that member's tokens to the consistent hash ring. From that point on, the ring correctly routes keys to the new node. Membership and partitioning stay in sync through the same gossip mechanism - there's no separate protocol for ring management.
Phi Accrual Failure Detection
Simple timeout-based failure detection has a fundamental problem: you have to pick a fixed timeout. Too short, and you'll falsely mark healthy-but-slow nodes as dead. Too long, and genuinely crashed nodes take ages to be detected. In a distributed system where network conditions vary, no single timeout works well.
The phi accrual failure detector, based on the paper by Hayashibara et al., takes a statistical approach. Instead of a binary "alive or dead" decision, it outputs a continuous suspicion level (phi, φ) that represents how likely it is that a node has failed, based on the historical pattern of its heartbeats.
Heartbeat History
The detector tracks the inter-arrival times of heartbeats for each node:
// pkg/membership/failure_detector.go
type HeartbeatHistory struct {
intervals []time.Duration // recent inter-arrival times
maxSize int // rolling window size (default: 100)
lastArrival time.Time
mu sync.Mutex
}
type FailureDetector struct {
membership *Membership
timeout time.Duration // fallback timeout
phiThreshold float64 // suspicion threshold (default: 8)
phiDeadThreshold float64 // dead threshold (default: 16)
histories map[string]*HeartbeatHistory
suspects map[string]time.Time
deadNodes map[string]time.Time
callbacks []StatusChangeCallback
mu sync.RWMutex
}
Each time a heartbeat arrives (via gossip merge), the detector records the interval since the previous heartbeat:
func (fd *FailureDetector) RecordHeartbeat(nodeID string) {
fd.mu.Lock()
defer fd.mu.Unlock()
history := fd.histories[nodeID]
if history == nil {
history = &HeartbeatHistory{
intervals: make([]time.Duration, 0, 100),
maxSize: 100,
}
fd.histories[nodeID] = history
}
history.mu.Lock()
defer history.mu.Unlock()
now := time.Now()
if !history.lastArrival.IsZero() {
interval := now.Sub(history.lastArrival)
history.intervals = append(history.intervals, interval)
if len(history.intervals) > history.maxSize {
history.intervals = history.intervals[1:]
}
}
history.lastArrival = now
}
Over time, the detector builds a statistical model of each node's heartbeat pattern - "Node B's heartbeats typically arrive every 1.0 ± 0.2 seconds."
Computing Phi
The phi calculation asks: "given the historical distribution of heartbeat intervals, how anomalous is the current silence?" Mathematically, it uses the CDF of a normal distribution fitted to the observed intervals:
func (fd *FailureDetector) calculatePhi(nodeID string, now time.Time) float64 {
history := fd.histories[nodeID]
if history == nil || len(history.intervals) < 2 {
return fd.calculateSimplePhi(nodeID, now) // fallback
}
// Compute mean and stddev of inter-arrival times
var sum time.Duration
for _, interval := range history.intervals {
sum += interval
}
mean := float64(sum) / float64(len(history.intervals))
var varianceSum float64
for _, interval := range history.intervals {
diff := float64(interval) - mean
varianceSum += diff * diff
}
stdDev := math.Sqrt(varianceSum / float64(len(history.intervals)))
if stdDev < float64(time.Millisecond*10) {
stdDev = float64(time.Millisecond * 10) // floor to avoid division issues
}
// How long since last heartbeat?
timeSinceLast := float64(now.Sub(history.lastArrival))
// Phi = -log10(1 - CDF(timeSinceLast))
y := (timeSinceLast - mean) / (stdDev * math.Sqrt(2))
cdf := 0.5 * (1 + erf(y))
if cdf >= 0.9999 {
cdf = 0.9999
}
return -math.Log10(1 - cdf)
}
The intuition: if heartbeats typically arrive every 1 second with stddev 0.1s, and it's been 1.2 seconds since the last heartbeat, phi might be around 3 - somewhat suspicious but not alarming. If it's been 5 seconds, phi jumps to 15+ - almost certainly a failure.
The logarithmic scale is key. Phi = 1 means "there's a 10% chance this is a false positive." Phi = 2 means 1%. Phi = 3 means 0.1%. The threshold is configurable - the default phiThreshold = 8 means we suspect a node only when there's less than a 0.000001% chance the silence is normal.
For nodes with insufficient heartbeat history (fewer than 2 samples), the detector falls back to a simple timeout-based calculation:
func (fd *FailureDetector) calculateSimplePhi(nodeID string, now time.Time) float64 {
member := fd.membership.GetMember(nodeID)
if member == nil {
return fd.phiDeadThreshold + 1
}
elapsed := now.Sub(member.Timestamp)
ratio := float64(elapsed) / float64(fd.timeout)
if ratio <= 1 {
return ratio * fd.phiThreshold / 2
}
overRatio := ratio - 1
return fd.phiThreshold/2 + overRatio*fd.phiThreshold
}
Status Transitions
The CheckFailures method runs periodically and uses phi to update each node's status:
func (fd *FailureDetector) CheckFailures() {
now := time.Now()
members := fd.membership.GetAllMembers()
for _, member := range members {
if member.NodeID == fd.membership.localID {
continue // never suspect ourselves
}
phi := fd.calculatePhi(member.NodeID, now)
var newStatus MemberStatus
if phi >= fd.phiDeadThreshold { // default: 16
newStatus = StatusDead
} else if phi >= fd.phiThreshold { // default: 8
newStatus = StatusSuspected
} else {
newStatus = StatusAlive
}
if newStatus != member.Status {
fd.updateStatus(member, newStatus, now)
}
}
}
Two thresholds, three states. phiThreshold (8) triggers suspicion - the coordinator will still attempt to reach suspected nodes but is prepared for failure. phiDeadThreshold (16) marks the node as dead - the coordinator skips it entirely, and hinted handoff takes over for missed writes.
Status transitions fire callbacks, which other subsystems can use for reactive behavior:
fd.RegisterCallback(func(nodeID string, oldStatus, newStatus MemberStatus) {
// e.g., trigger data re-replication when a node goes Dead
})
Recovery works naturally: when a crashed node restarts and begins sending heartbeats again, fresh intervals push phi back down below the thresholds, and CheckFailures transitions it back to Alive.
func TestFailureDetectorRecovery(t *testing.T) {
// ... setup with node2 marked dead ...
// Simulate recovery with fresh heartbeats
for i := 0; i < 5; i++ {
fd.RecordHeartbeat("node2")
time.Sleep(10 * time.Millisecond)
}
fd.CheckFailures()
// fd.IsDead("node2") == false - recovered!
}
Merkle Trees and Anti-Entropy
Read repair (Part 4) fixes stale replicas when keys are read. But what about keys that are never read, or keys where the divergence happened before read repair was in place? Over time, replicas can drift apart through missed writes, failed hinted handoffs, or simply because a node was down during a write quorum.
Anti-entropy is the background process that comprehensively detects and repairs these divergences. The core challenge is efficiency: comparing every key-value pair between two replicas is O(N) in the number of keys, which is expensive. Merkle trees bring this down to O(log N) by enabling hierarchical comparison - you only descend into subtrees that differ.
The Merkle Tree
A Merkle tree is a binary tree where each leaf holds the hash of a single key-value pair, and each internal node holds the hash of its children's hashes. The root hash is a fingerprint of the entire dataset - if two replicas have the same root hash, their data is identical.
// pkg/synchronization/merkle.go
type MerkleTree struct {
root *MerkleNode
leaves map[string]*MerkleNode
keyRange KeyRange
}
type MerkleNode struct {
Hash string
Left *MerkleNode
Right *MerkleNode
KeyRange KeyRange
IsLeaf bool
Key string // only for leaves
Value []byte // only for leaves
}
type KeyRange struct {
Start string
End string
}
Building the Tree
Construction sorts the keys, creates leaf nodes, then builds internal nodes bottom-up:
func (mt *MerkleTree) Build(data map[string][]byte) {
mt.leaves = make(map[string]*MerkleNode)
// Sort keys for deterministic tree structure
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)
// Create leaf nodes
leafNodes := make([]*MerkleNode, 0, len(keys))
for _, key := range keys {
value := data[key]
hash := computeHash(append([]byte(key), value...))
leaf := &MerkleNode{
Hash: hash, IsLeaf: true, Key: key, Value: value,
}
mt.leaves[key] = leaf
leafNodes = append(leafNodes, leaf)
}
// Build tree bottom-up
mt.root = mt.buildTree(leafNodes, mt.keyRange)
}
The recursive buildTree splits nodes into left and right halves and combines their hashes:
func (mt *MerkleTree) buildTree(nodes []*MerkleNode,
keyRange KeyRange) *MerkleNode {
if len(nodes) == 0 {
return &MerkleNode{Hash: computeHash([]byte("empty")),
KeyRange: keyRange}
}
if len(nodes) == 1 {
return nodes[0]
}
mid := len(nodes) / 2
left := mt.buildTree(nodes[:mid], /* left range */)
right := mt.buildTree(nodes[mid:], /* right range */)
combinedHash := computeHash([]byte(left.Hash + right.Hash))
return &MerkleNode{
Hash: combinedHash, Left: left, Right: right,
KeyRange: keyRange,
}
}
Hashing uses SHA-256:
func computeHash(data []byte) string {
hash := sha256.Sum256(data)
return hex.EncodeToString(hash[:])
}
Finding Differences
The power of Merkle trees is in comparison. Two replicas exchange root hashes - if they match, the replicas are in sync and nothing more needs to happen. If they differ, they recursively compare child hashes to narrow down which keys actually diverge:
func (mt *MerkleTree) FindDifferences(localNode, remoteNode *MerkleNode) []string {
if localNode == nil || remoteNode == nil {
return []string{}
}
// Hashes match -> this subtree is identical
if localNode.Hash == remoteNode.Hash {
return []string{}
}
// Reached a leaf -> this specific key differs
if localNode.IsLeaf {
return []string{localNode.Key}
}
// Recurse into children
differences := []string{}
if localNode.Left != nil && remoteNode.Left != nil {
differences = append(differences,
mt.FindDifferences(localNode.Left, remoteNode.Left)...)
}
if localNode.Right != nil && remoteNode.Right != nil {
differences = append(differences,
mt.FindDifferences(localNode.Right, remoteNode.Right)...)
}
return differences
}
If a replica has 1 million keys and only 3 have diverged, the tree comparison visits roughly 3 × log₂(1,000,000) ≈ 60 nodes instead of all million. That's the O(log N) efficiency at work.
The test suite verifies this:
func TestMerkleTreeFindDifferences(t *testing.T) {
tree1 := synchronization.NewMerkleTree(keyRange)
tree1.Build(map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
})
tree2 := synchronization.NewMerkleTree(keyRange)
tree2.Build(map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("different"), // <- only this differs
})
diffs := tree1.FindDifferences(tree1.GetRoot(), tree2.GetRoot())
// diffs contains "key2"
}
func TestMerkleTreeNoDifferences(t *testing.T) {
// Same data on both -> diffs is empty
}
The Anti-Entropy Process
The AntiEntropy struct orchestrates periodic replica synchronization:
// pkg/synchronization/anti_entropy.go
type AntiEntropy struct {
nodeID string
storage types.Storage
ring types.Ring
membership *membership.Membership
config types.Config
rpcClient *rpc.Client
trees map[KeyRange]*MerkleTree // cached trees
mu sync.RWMutex
}
Each round of anti-entropy follows a clear pipeline:
func (ae *AntiEntropy) Run() {
// 1. What key ranges am I responsible for?
ranges := ae.getResponsibleRanges()
for _, keyRange := range ranges {
// 2. Build (or retrieve cached) Merkle tree for this range
tree := ae.getOrBuildTree(keyRange)
// 3. Pick a random replica for this range
replica := ae.selectReplica(keyRange)
if replica == "" {
continue
}
// 4. Sync with that replica
ae.syncWithReplica(replica, keyRange, tree)
}
}
Step 1: Determine responsible ranges. Each node is responsible for the key ranges around its token positions on the ring. The getResponsibleRanges method derives these from the node's tokens.
Step 2: Build or cache the Merkle tree. Building a tree requires scanning all keys in a range, so trees are cached and only rebuilt when data changes (invalidated after syncs that modify data).
func (ae *AntiEntropy) getOrBuildTree(keyRange KeyRange) *MerkleTree {
ae.mu.Lock()
defer ae.mu.Unlock()
tree, exists := ae.trees[keyRange]
if exists {
return tree
}
data := ae.getRangeData(keyRange)
tree = NewMerkleTree(keyRange)
tree.Build(data)
ae.trees[keyRange] = tree
return tree
}
Step 3: Select a random replica. For each key range, the anti-entropy system picks a random alive replica from the preference list - not a dead one (no point trying to sync with a node that's down).
Step 4: Sync with the replica. This is where the Merkle tree comparison happens:
func (ae *AntiEntropy) syncWithReplica(replicaID string, keyRange KeyRange,
localTree *MerkleTree) {
member := ae.membership.GetMember(replicaID)
if member == nil || member.Status != membership.StatusAlive {
return
}
// Send our root hash to the replica
resp, err := client.Sync(ctx, member.Address, &rpc.SyncRequest{
KeyRange: rpc.KeyRange{Start: keyRange.Start, End: keyRange.End},
TreeRoot: []byte(localTree.GetRootHash()),
})
if err != nil {
return
}
// If root hashes match, replicas are in sync
if string(resp.TreeRoot) == localTree.GetRootHash() {
return
}
// Pull differing keys from the replica
for _, key := range resp.Differences {
ae.syncKey(replicaID, key)
}
// Push our differing keys to the replica
localDiffs := ae.findLocalDifferences(keyRange, resp.TreeRoot)
for _, key := range localDiffs {
ae.pushKeyToReplica(replicaID, key)
}
}
The sync is bidirectional. We pull keys the replica has that we're missing (syncKey), and we push keys we have that the replica is missing (pushKeyToReplica). Each individual key sync uses vector clock comparison to ensure we never overwrite newer data with older data:
func (ae *AntiEntropy) syncKey(replicaID string, key string) {
// Fetch from replica
remoteValues, err := client.GetValues(ctx, member.Address, key)
if err != nil {
return
}
// Get local values
localValues, _ := ae.storage.Get(key)
// Only store remote values that are newer or concurrent
for _, remoteVal := range remoteValues {
shouldStore := true
for _, localVal := range localValues {
cmp := localVal.VectorClock.Compare(remoteVal.VectorClock)
if cmp == versioning.After || cmp == versioning.Equal {
shouldStore = false // we already have this or newer
break
}
}
if shouldStore {
ae.storage.Put(key, remoteVal)
}
}
// Invalidate cached tree since data changed
ae.InvalidateTree(KeyRange{Start: "", End: "\xff"})
}
The Background Loops
All three subsystems - gossip, failure detection, and anti-entropy - run as background goroutines launched by Node.Start:
func (n *Node) Start() error {
// ... RPC server, ring setup (from Part 4) ...
// Gossip: exchange membership every second
n.wg.Add(1)
go n.gossipLoop()
// Failure detection: check for failed nodes every second
n.wg.Add(1)
go n.failureDetectionLoop()
// Anti-entropy: sync replicas every 60 seconds
n.wg.Add(1)
go n.antiEntropyLoop()
// Hinted handoff: deliver hints every 10 seconds (from Part 4)
// ...
return nil
}
Each loop follows the same pattern - a ticker, a select on the ticker and stopCh, and a call to the subsystem:
func (n *Node) gossipLoop() {
defer n.wg.Done()
ticker := time.NewTicker(n.config.GossipInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
n.membership.Gossip()
case <-n.stopCh:
return
}
}
}
func (n *Node) failureDetectionLoop() {
defer n.wg.Done()
ticker := time.NewTicker(n.config.GossipInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
n.failDetector.CheckFailures()
case <-n.stopCh:
return
}
}
}
func (n *Node) antiEntropyLoop() {
defer n.wg.Done()
ticker := time.NewTicker(n.config.AntiEntropyInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
n.antiEntropy.Run()
case <-n.stopCh:
return
}
}
}
Note the different intervals. Gossip and failure detection run every second - they're cheap operations (a single RPC exchange, a scan through the member list). Anti-entropy runs every 60 seconds - it's more expensive (builds Merkle trees, potentially transfers data) and doesn't need to run as frequently because read repair handles the common case of stale replicas.
How It All Ties Together
Let's trace through a failure scenario to see all three subsystems cooperating:
Node C crashes. It stops sending heartbeats.
Gossip propagates. Over the next few rounds, other nodes notice that their gossip exchanges no longer update Node C's heartbeat counter. Its heartbeat stays frozen.
Failure detection triggers. The phi accrual detector sees that Node C's heartbeat inter-arrival times have stopped. Phi rises past
phiThreshold(8) - Node C is markedSuspected. A few more seconds, phi exceedsphiDeadThreshold(16) - Node C is markedDead.Writes route around. The coordinator (Part 4) checks member status before sending writes. It skips Node C and uses hinted handoff to store data on a healthy substitute.
Node C recovers. It restarts, calls
Jointo re-sync membership, and begins gossiping again. Fresh heartbeats arrive. Phi drops.CheckFailurestransitions Node C back toAlive.Hinted handoff delivers. The hinted handoff loop detects that Node C is alive again and delivers the stored hints, bringing it up to date for the keys that were written during its downtime.
Anti-entropy catches the rest. For any keys that hinted handoff missed (e.g., hints that expired or nodes that didn't store hints), the next anti-entropy round detects the divergence via Merkle tree comparison and syncs the missing keys.
No single subsystem handles the entire failure recovery - they cooperate. Gossip propagates the membership change, failure detection makes the routing decision, hinted handoff provides fast recovery for recent writes, and anti-entropy serves as the comprehensive safety net.
What's Next
The core distributed system is now complete - we have partitioning, versioning, quorum-based coordination, membership, failure detection, and replica synchronization. In the final Part, Part 6, we'll focus on the production infrastructure that makes the system robust and observable: the RPC layer (HTTP/JSON protocol, circuit breakers, retry with exponential backoff), admission control for protecting foreground latency, latency-based coordinator selection, metrics and monitoring, the pluggable storage engine implementations beyond memory, and the testing strategy.
The full source code is available at github.com/tripab/toy-dynamo. The gossip protocol and failure detector live in pkg/membership/, and Merkle trees and anti-entropy live in pkg/synchronization/.