Building Dynamo from scratch in Go: Part 4
Request Coordination, Quorums, and Failure Handling
This series
- Part 1: The Big Picture
- Part 2: Partitioning with Consistent Hashing
- Part 3: Vector Clocks and Conflict Resolution
- Part 4: Request Coordination, Quorums, and Failure Handling (this post)
- Part 5: Membership, Failure Detection, and Replica Synchronization
- Part 6: Production Concerns - RPC, Resilience, Performance, and Testing
This is Part 4 of a 6-part series walking through a from-scratch Go implementation of Amazon's Dynamo. In previous posts, we built a storage engine (Part 1), a consistent hash ring (Part 2), and a vector clock versioning layer (Part 3). Now we wire them together into a fully distributed read/write path with quorum semantics, read repair, and hinted handoff.
Outline
- What Changes in This Part
- The Coordinator
- The Write Path
- The Read Path
- Quorum Math: N, R, and W
- Local vs. Remote: The writeToNode / readFromNode Split
- Read Repair
- Hinted Handoff
- Delete as Tombstone
- The Context: Read-Modify-Write
- Evolving Node Lifecycle
- The Full Picture So Far
- What's Next
What Changes in This Part
This is the post where our Dynamo implementation becomes a distributed system. Up to now, we've had the pieces - a ring that knows which nodes own which keys, and a versioning layer that can track causality and detect conflicts. But every read and write still happened locally. There was no mechanism to fan out a write to multiple replicas, wait for acknowledgments, or handle node failures.
The Coordinator is the component that changes all of that. It sits between the public API (Node.Get, Node.Put, Node.Delete) and the storage layer, orchestrating every operation as a distributed state machine. This is also where the N/R/W quorum parameters come into play, where hinted handoff keeps writes available during failures, and where read repair opportunistically fixes stale replicas.
The Node struct grows substantially in this post:
type Node struct {
id string
address string
config *Config
storage storage.Storage
ring *ring.Ring
coordinator *Coordinator // NEW
membership *membership.Membership // NEW
hintedHoff *replication.HintedHandoff // NEW
rpcClient *rpc.Client // NEW
rpcServer *rpc.Server // NEW
stopCh chan struct{}
wg sync.WaitGroup // NEW
mu sync.RWMutex
}
And Config gains the core distributed systems parameters:
type Config struct {
// ... existing fields from Parts 1-3 ...
N int // NEW: replication factor
R int // NEW: read quorum
W int // NEW: write quorum
RequestTimeout time.Duration // NEW
HintedHandoffEnabled bool // NEW
HintTimeout time.Duration // NEW
ReadRepairEnabled bool // NEW
}
The defaults set N=3, R=2, W=2 - the paper's recommended "balanced" configuration. Let's see how these parameters drive the entire request path.
The Coordinator
The Coordinator is a surprisingly lean struct:
// pkg/dynamo/coordinator.go
type Coordinator struct {
node *Node
}
func NewCoordinator(node *Node) *Coordinator {
return &Coordinator{node: node}
}
Just a pointer back to the node. All the complexity is in its methods, not its state. The coordinator doesn't cache anything - every operation starts fresh by consulting the ring for the preference list, then fanning out to replicas. This is intentional: a stateless coordinator means no stale caches, no invalidation bugs, no coordination state to recover after a crash.
The Write Path
Let's trace through Coordinator.Put step by step. This is the most important method in the entire system - it's where Dynamo's design philosophy (always-writeable, eventually consistent) becomes concrete code.
func (c *Coordinator) Put(ctx context.Context, key string, value []byte,
context *Context) error {
Step 1: Find the Preference List
preferenceList := c.node.ring.GetPreferenceList(key, c.node.config.N)
if len(preferenceList) == 0 {
return ErrNodeNotFound
}
The ring returns the N nodes responsible for this key (see Part 2). For N=3, that's the coordinator plus the next two distinct physical nodes walking clockwise around the ring. This list is ordered - the first node is the primary owner, and the rest are replicas.
Step 2: Generate a New Version
var newClock *versioning.VectorClock
if context != nil && context.VectorClock != nil {
newClock = context.VectorClock.Copy()
} else {
newClock = versioning.NewVectorClock()
}
newClock.Increment(c.node.id)
newClock.Prune(c.node.config.VectorClockMaxSize)
versioned := versioning.VersionedValue{
Data: value,
VectorClock: newClock,
}
If the caller provides a Context (from a previous Get), we build on its vector clock - this is the read-modify-write pattern that maintains causal ordering. If no context is provided (a blind write), we start with a fresh clock. Either way, the coordinator increments its own counter in the clock and prunes if the clock has grown too large (see Part 3).
Step 3: Fan Out Writes to N Replicas
type response struct {
nodeID string
err error
}
responses := make(chan response, len(preferenceList))
for _, nodeID := range preferenceList {
go func(nid string) {
err := c.writeToNode(nid, key, versioned)
responses <- response{nodeID: nid, err: err}
}(nodeID)
}
Every replica gets a concurrent write request. The buffered channel collects results as they arrive. This is the fan-out - all N writes happen in parallel, and we don't wait for them sequentially.
Step 4: Wait for W Acknowledgments
acks := 0
failedNodes := make([]string, 0)
timeout := time.After(c.node.config.RequestTimeout)
for acks < c.node.config.W && len(failedNodes) < len(preferenceList) {
select {
case resp := <-responses:
if resp.err == nil {
acks++
} else {
failedNodes = append(failedNodes, resp.nodeID)
}
case <-timeout:
if acks == 0 {
return ErrTimeout
}
return ErrWriteQuorumFailed
case <-ctx.Done():
return ctx.Err()
}
}
This is the quorum logic. We wait until W replicas have acknowledged the write, or until we run out of time. The select statement multiplexes across three channels: replica responses, the timeout, and the caller's context (for cancellation).
A few subtleties worth noting. The loop also tracks failed nodes - if all N nodes fail, we bail out even before the timeout. The timeout differentiates between "zero acks" (ErrTimeout - total failure) and "some acks but not enough" (ErrWriteQuorumFailed - partial failure). And the ctx.Done() case means the caller can cancel a slow operation without waiting for the full timeout.
Step 5: Handle Failures with Hinted Handoff
if c.node.config.HintedHandoffEnabled && len(failedNodes) > 0 {
c.handleHintedHandoff(key, versioned, failedNodes, preferenceList)
}
return nil
}
If some replicas failed but we still got W acks (enough for success), the write succeeded - but the failed nodes missed the update. Hinted handoff records these missed writes for later delivery. We'll dive into the details shortly.
The Read Path
The read path mirrors the write path structurally, but adds reconciliation and read repair:
func (c *Coordinator) Get(ctx context.Context, key string) (*GetResult, error) {
// 1. Get preference list
preferenceList := c.node.ring.GetPreferenceList(key, c.node.config.N)
// 2. Fan out reads to all N replicas
type response struct {
values []versioning.VersionedValue
nodeID string
err error
}
responses := make(chan response, len(preferenceList))
for _, nodeID := range preferenceList {
go func(nid string) {
values, err := c.readFromNode(nid, key)
responses <- response{values: values, nodeID: nid, err: err}
}(nodeID)
}
// 3. Wait for R responses
allValues := make([]versioning.VersionedValue, 0)
successCount := 0
timeout := time.After(c.node.config.RequestTimeout)
for successCount < c.node.config.R {
select {
case resp := <-responses:
if resp.err == nil && len(resp.values) > 0 {
allValues = append(allValues, resp.values...)
successCount++
}
case <-timeout:
if successCount == 0 {
return nil, ErrTimeout
}
return nil, ErrReadQuorumFailed
case <-ctx.Done():
return nil, ctx.Err()
}
}
So far, very similar to the write path: fan out to N, wait for R. But now we have allValues - a pile of versioned values from R different replicas, potentially containing duplicates, ancestors, and concurrent versions.
// 4. Reconcile concurrent versions
concurrent := versioning.ReconcileConcurrent(allValues)
result := &GetResult{
Values: concurrent,
Context: c.buildContext(concurrent),
}
// 5. Async read repair
if c.node.config.ReadRepairEnabled {
go c.readRepair(key, concurrent, preferenceList)
}
return result, nil
}
ReconcileConcurrent (from Part 3) strips out dominated versions, leaving only concurrent ones. The Context is built by merging all their vector clocks - if the application writes back a reconciled value using this context, the new version will dominate all the versions it resolved.
Read repair is launched asynchronously - it doesn't block the response to the client. We'll cover it in detail below.
Quorum Math: N, R, and W
The relationship between N, R, and W determines the system's consistency and availability characteristics. The key invariant is:
R + W > N -> read and write quorums overlap
-> at least one node in the read quorum has the latest write
-> strong consistency (under normal conditions)
The default N=3, R=2, W=2 satisfies this (2 + 2 > 3). Any read quorum of 2 and any write quorum of 2 must share at least one node in a set of 3.
Different tuning choices serve different workloads:
N=3, R=2, W=2 -> Balanced. The paper's recommendation for most services.
N=3, R=1, W=1 -> Highest availability. Reads and writes succeed with a
single node. The shopping cart use case.
N=3, R=1, W=3 -> Fast reads. Every replica must acknowledge writes, but
reads are instant from any one replica.
N=3, R=3, W=1 -> Fast writes. Writes need only one ack, but reads must
contact all replicas to find the latest.
When R + W ≤ N, reads and writes can succeed without overlapping - meaning a read might miss the latest write. This gives higher availability (operations succeed even when more nodes are down) at the cost of potentially stale reads.
Local vs. Remote: The writeToNode / readFromNode Split
Every read and write fans out to N nodes, but one of those nodes is (usually) the local node. The coordinator handles this with a fast path:
func (c *Coordinator) writeToNode(nodeID string, key string,
value versioning.VersionedValue) error {
if nodeID == c.node.id {
// Local write - direct storage call
return c.node.storage.Put(key, value)
}
// Remote write - RPC to the other node
member := c.node.membership.GetMember(nodeID)
if member == nil || member.Status != membership.StatusAlive {
return ErrNodeNotFound
}
ctx, cancel := context.WithTimeout(context.Background(),
c.node.config.RequestTimeout)
defer cancel()
resp, err := c.node.rpcClient.Put(ctx, member.Address, key, value)
if err != nil {
return err
}
if !resp.Success {
return ErrWriteFailed
}
return nil
}
For the local node, it's a direct storage.Put call - no serialization, no network, no timeout. For remote nodes, it looks up the node's address from the membership table (more on membership in Part 5), then makes an RPC call with a timeout.
The read side is symmetric:
func (c *Coordinator) readFromNode(nodeID string, key string) (
[]versioning.VersionedValue, error) {
if nodeID == c.node.id {
return c.node.storage.Get(key)
}
member := c.node.membership.GetMember(nodeID)
if member == nil || member.Status != membership.StatusAlive {
return nil, ErrNodeNotFound
}
ctx, cancel := context.WithTimeout(context.Background(),
c.node.config.RequestTimeout)
defer cancel()
return c.node.rpcClient.GetValues(ctx, member.Address, key)
}
This local/remote split is why the coordinator needs access to the membership - it must know each node's network address and health status to make RPC calls.
Read Repair
Read repair is an optimization that piggybacks on normal reads to fix stale replicas. When a read contacts multiple replicas and finds that some have older data than others, it pushes the latest version to the stale ones.
func (c *Coordinator) readRepair(key string,
latest []versioning.VersionedValue,
preferenceList []string) {
for _, nodeID := range preferenceList {
if nodeID == c.node.id {
continue
}
current, err := c.readFromNode(nodeID, key)
if err != nil {
continue
}
if c.hasStaleData(current, latest) {
for _, v := range latest {
c.writeToNode(nodeID, key, v)
}
}
}
}
The method re-reads from each replica (yes, this is extra work - that's why it runs asynchronously) and compares what each has against the reconciled result. If a replica is stale, it gets updated.
The hasStaleData method is where vector clocks earn their keep:
func (c *Coordinator) hasStaleData(current, latest []versioning.VersionedValue) bool {
if len(current) == 0 && len(latest) > 0 {
return true
}
if len(latest) == 0 {
return false
}
for _, l := range latest {
hasLatestOrNewer := false
for _, curr := range current {
ordering := curr.VectorClock.Compare(l.VectorClock)
if ordering == versioning.Equal || ordering == versioning.After {
hasLatestOrNewer = true
break
}
if ordering == versioning.Concurrent && curr.Equals(&l) {
hasLatestOrNewer = true
break
}
}
if !hasLatestOrNewer {
return true
}
}
return false
}
For each version in latest, we check whether the replica has that version or something newer. If the replica is missing any of the latest versions, it's stale and needs repair. The Concurrent && Equals check handles the case where two versions have the same data and vector clock - they're the same version, just stored on different nodes.
Read repair is a best-effort, opportunistic mechanism. It reduces the work that the background anti-entropy process (Part 5) needs to do, but doesn't replace it. Failed repair attempts are simply ignored - the anti-entropy system will catch them eventually.
Hinted Handoff
Hinted handoff is how Dynamo maintains write availability when nodes are temporarily down. The idea: if a write is destined for Node C but Node C is unreachable, store the data on another healthy node (say, Node D) along with a hint that says "this data actually belongs to Node C." When Node C comes back, Node D delivers the data.
The Hint struct captures everything needed for later delivery:
// pkg/replication/hinted_handoff.go
type Hint struct {
ForNode string // the intended recipient
Key string
Value versioning.VersionedValue
Timestamp time.Time
}
And HintedHandoff manages the hint store:
type HintedHandoff struct {
nodeInfo types.NodeInfo
membership *membership.Membership
storage types.Storage
config types.Config
rpcClient *rpc.Client
hints map[string][]*Hint // target nodeID -> pending hints
mu sync.RWMutex
}
Storing Hints
When the coordinator detects failed nodes after a write, it distributes hints to healthy nodes:
func (c *Coordinator) handleHintedHandoff(key string,
value versioning.VersionedValue,
failedNodes []string, preferenceList []string) {
// Find healthy nodes (those that didn't fail)
healthyNodes := make([]string, 0)
for _, nodeID := range preferenceList {
isHealthy := true
for _, failed := range failedNodes {
if nodeID == failed {
isHealthy = false
break
}
}
if isHealthy {
healthyNodes = append(healthyNodes, nodeID)
}
}
// Store a hint on a healthy node for each failed node
for _, failedNode := range failedNodes {
if len(healthyNodes) > 0 {
hintNode := healthyNodes[0]
hint := &replication.Hint{
ForNode: failedNode,
Key: key,
Value: value,
Timestamp: time.Now(),
}
c.node.hintedHoff.StoreHint(hintNode, hint)
healthyNodes = healthyNodes[1:] // rotate to spread hints
}
}
}
The hint is stored on a healthy node's local hint queue - not in the main storage. This is important: hints are metadata about missed writes, not regular data. They live in a separate map and are cleaned up after successful delivery.
Delivering Hints
A background loop periodically checks if any hinted nodes have come back to life:
func (h *HintedHandoff) DeliverHints() {
h.mu.Lock()
defer h.mu.Unlock()
for nodeID, hints := range h.hints {
if len(hints) == 0 {
continue
}
if !h.isNodeAlive(nodeID) {
continue // still down, try again later
}
// Deliver each hint
delivered := make([]bool, len(hints))
for i, hint := range hints {
if h.deliverHint(nodeID, hint) {
delivered[i] = true
}
}
// Remove successfully delivered hints
remaining := make([]*Hint, 0)
for i, hint := range hints {
if !delivered[i] {
remaining = append(remaining, hint)
}
}
h.hints[nodeID] = remaining
}
}
The loop is conservative: it only attempts delivery to nodes that the membership system considers alive. If delivery fails (maybe the node crashed again), the hint stays in the queue for the next round.
The deliverHint method sends the data via RPC:
func (h *HintedHandoff) deliverHint(nodeID string, hint *Hint) bool {
if h.rpcClient == nil {
return false
}
member := h.membership.GetMember(nodeID)
if member == nil {
return false
}
ctx, cancel := context.WithTimeout(context.Background(),
h.config.GetRequestTimeout())
defer cancel()
resp, err := h.rpcClient.DeliverHint(ctx, member.Address,
h.nodeInfo.GetID(), hint.Key, hint.Value)
if err != nil {
return false
}
return resp.Success
}
The receiving node handles the hint by simply storing the value in its local storage - the value carries its vector clock, so the storage layer's reconciliation logic (from Part 3) correctly handles it whether the value is new, an ancestor, or concurrent with existing data.
Sloppy Quorum
Hinted handoff enables what the paper calls a sloppy quorum. In a strict quorum, the W nodes that acknowledge a write must be from the key's preference list - the specific nodes assigned by the consistent hash ring. In a sloppy quorum, any healthy node can stand in for a failed preference-list node.
This means writes can succeed even when multiple preference-list nodes are down, as long as enough total nodes are reachable. The hint ensures the data eventually reaches its intended home. The trade-off is that until the hints are delivered, reads might not find the latest version on the "correct" nodes - but the system remains available for writes, which is Dynamo's primary design goal.
Delete as Tombstone
The Delete method follows the same quorum write path as Put, but writes a tombstone instead of a regular value:
func (c *Coordinator) Delete(ctx context.Context, key string,
context *Context) error {
preferenceList := c.node.ring.GetPreferenceList(key, c.node.config.N)
// Generate new version for the tombstone
var newClock *versioning.VectorClock
if context != nil && context.VectorClock != nil {
newClock = context.VectorClock.Copy()
} else {
newClock = versioning.NewVectorClock()
}
newClock.Increment(c.node.id)
newClock.Prune(c.node.config.VectorClockMaxSize)
tombstone := versioning.VersionedValue{
Data: nil,
VectorClock: newClock,
IsTombstone: true,
}
// ... same fan-out, quorum wait, and hinted handoff as Put ...
}
This is the same code structure as Put - fan out to N replicas, wait for W acks, hinted handoff for failures. The only difference is the value being written: a VersionedValue with nil data and IsTombstone: true. Because the tombstone participates in normal vector clock reconciliation, it correctly supersedes the version it intends to delete without interfering with concurrent writes from other nodes (as we saw in Part 3's tombstone tests).
The Context: Read-Modify-Write
The Context type ties the read and write paths together:
type Context struct {
VectorClock *versioning.VectorClock
}
type GetResult struct {
Values []versioning.VersionedValue
Context *Context
}
When a Get returns multiple concurrent versions, the Context contains the merge of all their vector clocks:
func (c *Coordinator) buildContext(values []versioning.VersionedValue) *Context {
if len(values) == 0 {
return &Context{VectorClock: versioning.NewVectorClock()}
}
merged := values[0].VectorClock.Copy()
for _, v := range values[1:] {
merged = merged.Merge(v.VectorClock)
}
return &Context{VectorClock: merged}
}
When the application writes back using this context, the new version's vector clock dominates all the concurrent versions it resolved. This is the complete read-modify-write cycle:
// Read - might get concurrent versions
result, _ := node.Get(ctx, "key1")
// Application resolves conflict
merged := myMergeLogic(result.Values)
// Write back with the context from the read
// The merged clock ensures this version supersedes all parents
node.Put(ctx, "key1", merged, result.Context)
The coordinator test suite verifies this cycle works end-to-end:
func TestCoordinatorSequentialWrites_VersionsIncrement(t *testing.T) {
node, _ := startTestNode(t, "coord-seq-1")
defer node.Stop()
ctx := context.Background()
var lastCtx *dynamo.Context
for i := 1; i <= 5; i++ {
node.Put(ctx, "counter", []byte(fmt.Sprintf("v%d", i)), lastCtx)
result, _ := node.Get(ctx, "counter")
lastCtx = result.Context
}
result, _ := node.Get(ctx, "counter")
// After 5 sequential writes, exactly 1 version survives
// (each write dominates the previous)
// Vector clock counter should be 5
counter := result.Values[0].VectorClock.Versions["coord-seq-1"]
// counter == 5
}
Evolving Node Lifecycle
With all these new components, NewNode and Start grow significantly:
func NewNode(id, address string, config *Config) (*Node, error) {
// ... config validation, storage init, ring init (from Parts 1-2) ...
// Initialize membership (tracks cluster members)
node.membership = membership.NewMembership(id, address, config)
// Initialize coordinator (handles distributed reads/writes)
node.coordinator = NewCoordinator(node)
// Initialize hinted handoff (stores writes for failed nodes)
node.hintedHoff = replication.NewHintedHandoff(node, node.membership,
node.storage, config)
// Initialize RPC (inter-node communication)
node.rpcClient = rpc.NewClient(config.RequestTimeout)
node.rpcServer = rpc.NewServer(address, node)
return node, nil
}
And Start now launches the RPC server and a background loop:
func (n *Node) Start() error {
// Start RPC server
if err := n.rpcServer.Start(); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
}
// Add self to ring
n.ring.AddNode(n.id, n.config.VirtualNodes)
// Start hinted handoff delivery loop
if n.config.HintedHandoffEnabled {
n.wg.Add(1)
go n.hintedHandoffLoop()
}
return nil
}
The hinted handoff loop runs every 10 seconds, attempting to deliver any stored hints:
func (n *Node) hintedHandoffLoop() {
defer n.wg.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
n.hintedHoff.DeliverHints()
case <-n.stopCh:
return
}
}
}
The sync.WaitGroup ensures Stop waits for all background goroutines to finish before closing the storage engine. The stopCh channel signals all loops to exit.
The public API now delegates to the coordinator:
func (n *Node) Get(ctx context.Context, key string) (*GetResult, error) {
return n.coordinator.Get(ctx, key)
}
func (n *Node) Put(ctx context.Context, key string, value []byte,
context *Context) error {
return n.coordinator.Put(ctx, key, value, context)
}
func (n *Node) Delete(ctx context.Context, key string,
context *Context) error {
return n.coordinator.Delete(ctx, key, context)
}
The Full Picture So Far
Let's step back and trace a complete write through the system as it now stands. A client calls node.Put(ctx, "user:123", []byte("Alice"), nil):
Node.Putdelegates toCoordinator.Put.- The coordinator asks the ring: "who are the 3 nodes responsible for
user:123?" - say,[node1, node3, node2]. - It creates a new vector clock
{node1: 1}, wraps the value in aVersionedValue. - It launches 3 goroutines, each calling
writeToNode. The one fornode1(local) callsstorage.Putdirectly. The others make RPC calls. - It waits for 2 acknowledgments (W=2). Say
node1(local) andnode3(remote) respond.node2is slow. - The write succeeds. If
node2had failed entirely, a hint would be stored for it.
A subsequent read of the same key:
Node.Getdelegates toCoordinator.Get.- Preference list:
[node1, node3, node2]. Read from all 3, wait for 2. - Say
node1returns{data: "Alice", clock: {node1:1}}andnode3returns the same. ReconcileConcurrentstrips duplicates - one version survives.- Result returned to client. Async read repair checks
node2- if it's stale, it gets updated.
What's Next
We now have a working distributed key-value store with quorum-based reads and writes, vector clock versioning, hinted handoff for failure tolerance, and read repair for opportunistic consistency. But two major pieces are missing: how do nodes discover each other and detect failures? And how do replicas that have drifted apart (beyond what read repair can catch) get back in sync?
In Part 5, we'll build the gossip protocol for membership management, a phi accrual failure detector that adapts to network conditions, and Merkle tree-based anti-entropy for comprehensive replica synchronization.
The full source code is available at github.com/tripab/toy-dynamo. The coordinator lives in pkg/dynamo/coordinator.go, hinted handoff in pkg/replication/hinted_handoff.go.