अभिव्यक्ति

Building Dynamo from scratch in Go: Part 4

Request Coordination, Quorums, and Failure Handling

This series

Code for this series

This is Part 4 of a 6-part series walking through a from-scratch Go implementation of Amazon's Dynamo. In previous posts, we built a storage engine (Part 1), a consistent hash ring (Part 2), and a vector clock versioning layer (Part 3). Now we wire them together into a fully distributed read/write path with quorum semantics, read repair, and hinted handoff.

Outline

What Changes in This Part

This is the post where our Dynamo implementation becomes a distributed system. Up to now, we've had the pieces - a ring that knows which nodes own which keys, and a versioning layer that can track causality and detect conflicts. But every read and write still happened locally. There was no mechanism to fan out a write to multiple replicas, wait for acknowledgments, or handle node failures.

The Coordinator is the component that changes all of that. It sits between the public API (Node.Get, Node.Put, Node.Delete) and the storage layer, orchestrating every operation as a distributed state machine. This is also where the N/R/W quorum parameters come into play, where hinted handoff keeps writes available during failures, and where read repair opportunistically fixes stale replicas.

The Node struct grows substantially in this post:

type Node struct {
    id      string
    address string
    config  *Config
    storage storage.Storage
    ring    *ring.Ring

    coordinator  *Coordinator                  // NEW
    membership   *membership.Membership        // NEW
    hintedHoff   *replication.HintedHandoff    // NEW
    rpcClient    *rpc.Client                   // NEW
    rpcServer    *rpc.Server                   // NEW

    stopCh  chan struct{}
    wg      sync.WaitGroup                    // NEW
    mu      sync.RWMutex
}

And Config gains the core distributed systems parameters:

type Config struct {
    // ... existing fields from Parts 1-3 ...

    N                    int            // NEW: replication factor
    R                    int            // NEW: read quorum
    W                    int            // NEW: write quorum
    RequestTimeout       time.Duration  // NEW
    HintedHandoffEnabled bool           // NEW
    HintTimeout          time.Duration  // NEW
    ReadRepairEnabled    bool           // NEW
}

The defaults set N=3, R=2, W=2 - the paper's recommended "balanced" configuration. Let's see how these parameters drive the entire request path.

The Coordinator

The Coordinator is a surprisingly lean struct:

// pkg/dynamo/coordinator.go

type Coordinator struct {
    node *Node
}

func NewCoordinator(node *Node) *Coordinator {
    return &Coordinator{node: node}
}

Just a pointer back to the node. All the complexity is in its methods, not its state. The coordinator doesn't cache anything - every operation starts fresh by consulting the ring for the preference list, then fanning out to replicas. This is intentional: a stateless coordinator means no stale caches, no invalidation bugs, no coordination state to recover after a crash.

The Write Path

Let's trace through Coordinator.Put step by step. This is the most important method in the entire system - it's where Dynamo's design philosophy (always-writeable, eventually consistent) becomes concrete code.

func (c *Coordinator) Put(ctx context.Context, key string, value []byte,
                          context *Context) error {

Step 1: Find the Preference List

    preferenceList := c.node.ring.GetPreferenceList(key, c.node.config.N)
    if len(preferenceList) == 0 {
        return ErrNodeNotFound
    }

The ring returns the N nodes responsible for this key (see Part 2). For N=3, that's the coordinator plus the next two distinct physical nodes walking clockwise around the ring. This list is ordered - the first node is the primary owner, and the rest are replicas.

Step 2: Generate a New Version

    var newClock *versioning.VectorClock
    if context != nil && context.VectorClock != nil {
        newClock = context.VectorClock.Copy()
    } else {
        newClock = versioning.NewVectorClock()
    }
    newClock.Increment(c.node.id)
    newClock.Prune(c.node.config.VectorClockMaxSize)

    versioned := versioning.VersionedValue{
        Data:        value,
        VectorClock: newClock,
    }

If the caller provides a Context (from a previous Get), we build on its vector clock - this is the read-modify-write pattern that maintains causal ordering. If no context is provided (a blind write), we start with a fresh clock. Either way, the coordinator increments its own counter in the clock and prunes if the clock has grown too large (see Part 3).

Step 3: Fan Out Writes to N Replicas

    type response struct {
        nodeID string
        err    error
    }

    responses := make(chan response, len(preferenceList))

    for _, nodeID := range preferenceList {
        go func(nid string) {
            err := c.writeToNode(nid, key, versioned)
            responses <- response{nodeID: nid, err: err}
        }(nodeID)
    }

Every replica gets a concurrent write request. The buffered channel collects results as they arrive. This is the fan-out - all N writes happen in parallel, and we don't wait for them sequentially.

Step 4: Wait for W Acknowledgments

    acks := 0
    failedNodes := make([]string, 0)
    timeout := time.After(c.node.config.RequestTimeout)

    for acks < c.node.config.W && len(failedNodes) < len(preferenceList) {
        select {
        case resp := <-responses:
            if resp.err == nil {
                acks++
            } else {
                failedNodes = append(failedNodes, resp.nodeID)
            }
        case <-timeout:
            if acks == 0 {
                return ErrTimeout
            }
            return ErrWriteQuorumFailed
        case <-ctx.Done():
            return ctx.Err()
        }
    }

This is the quorum logic. We wait until W replicas have acknowledged the write, or until we run out of time. The select statement multiplexes across three channels: replica responses, the timeout, and the caller's context (for cancellation).

A few subtleties worth noting. The loop also tracks failed nodes - if all N nodes fail, we bail out even before the timeout. The timeout differentiates between "zero acks" (ErrTimeout - total failure) and "some acks but not enough" (ErrWriteQuorumFailed - partial failure). And the ctx.Done() case means the caller can cancel a slow operation without waiting for the full timeout.

Step 5: Handle Failures with Hinted Handoff

    if c.node.config.HintedHandoffEnabled && len(failedNodes) > 0 {
        c.handleHintedHandoff(key, versioned, failedNodes, preferenceList)
    }

    return nil
}

If some replicas failed but we still got W acks (enough for success), the write succeeded - but the failed nodes missed the update. Hinted handoff records these missed writes for later delivery. We'll dive into the details shortly.

The Read Path

The read path mirrors the write path structurally, but adds reconciliation and read repair:

func (c *Coordinator) Get(ctx context.Context, key string) (*GetResult, error) {
    // 1. Get preference list
    preferenceList := c.node.ring.GetPreferenceList(key, c.node.config.N)

    // 2. Fan out reads to all N replicas
    type response struct {
        values []versioning.VersionedValue
        nodeID string
        err    error
    }

    responses := make(chan response, len(preferenceList))

    for _, nodeID := range preferenceList {
        go func(nid string) {
            values, err := c.readFromNode(nid, key)
            responses <- response{values: values, nodeID: nid, err: err}
        }(nodeID)
    }

    // 3. Wait for R responses
    allValues := make([]versioning.VersionedValue, 0)
    successCount := 0
    timeout := time.After(c.node.config.RequestTimeout)

    for successCount < c.node.config.R {
        select {
        case resp := <-responses:
            if resp.err == nil && len(resp.values) > 0 {
                allValues = append(allValues, resp.values...)
                successCount++
            }
        case <-timeout:
            if successCount == 0 {
                return nil, ErrTimeout
            }
            return nil, ErrReadQuorumFailed
        case <-ctx.Done():
            return nil, ctx.Err()
        }
    }

So far, very similar to the write path: fan out to N, wait for R. But now we have allValues - a pile of versioned values from R different replicas, potentially containing duplicates, ancestors, and concurrent versions.

    // 4. Reconcile concurrent versions
    concurrent := versioning.ReconcileConcurrent(allValues)

    result := &GetResult{
        Values:  concurrent,
        Context: c.buildContext(concurrent),
    }

    // 5. Async read repair
    if c.node.config.ReadRepairEnabled {
        go c.readRepair(key, concurrent, preferenceList)
    }

    return result, nil
}

ReconcileConcurrent (from Part 3) strips out dominated versions, leaving only concurrent ones. The Context is built by merging all their vector clocks - if the application writes back a reconciled value using this context, the new version will dominate all the versions it resolved.

Read repair is launched asynchronously - it doesn't block the response to the client. We'll cover it in detail below.

Quorum Math: N, R, and W

The relationship between N, R, and W determines the system's consistency and availability characteristics. The key invariant is:

R + W > N  ->  read and write quorums overlap
            ->  at least one node in the read quorum has the latest write
            ->  strong consistency (under normal conditions)

The default N=3, R=2, W=2 satisfies this (2 + 2 > 3). Any read quorum of 2 and any write quorum of 2 must share at least one node in a set of 3.

Different tuning choices serve different workloads:

N=3, R=2, W=2  -> Balanced. The paper's recommendation for most services.
N=3, R=1, W=1  -> Highest availability. Reads and writes succeed with a
                  single node. The shopping cart use case.
N=3, R=1, W=3  -> Fast reads. Every replica must acknowledge writes, but
                  reads are instant from any one replica.
N=3, R=3, W=1  -> Fast writes. Writes need only one ack, but reads must
                  contact all replicas to find the latest.

When R + W ≤ N, reads and writes can succeed without overlapping - meaning a read might miss the latest write. This gives higher availability (operations succeed even when more nodes are down) at the cost of potentially stale reads.

Local vs. Remote: The writeToNode / readFromNode Split

Every read and write fans out to N nodes, but one of those nodes is (usually) the local node. The coordinator handles this with a fast path:

func (c *Coordinator) writeToNode(nodeID string, key string,
                                  value versioning.VersionedValue) error {
    if nodeID == c.node.id {
        // Local write - direct storage call
        return c.node.storage.Put(key, value)
    }

    // Remote write - RPC to the other node
    member := c.node.membership.GetMember(nodeID)
    if member == nil || member.Status != membership.StatusAlive {
        return ErrNodeNotFound
    }

    ctx, cancel := context.WithTimeout(context.Background(),
                                       c.node.config.RequestTimeout)
    defer cancel()

    resp, err := c.node.rpcClient.Put(ctx, member.Address, key, value)
    if err != nil {
        return err
    }
    if !resp.Success {
        return ErrWriteFailed
    }
    return nil
}

For the local node, it's a direct storage.Put call - no serialization, no network, no timeout. For remote nodes, it looks up the node's address from the membership table (more on membership in Part 5), then makes an RPC call with a timeout.

The read side is symmetric:

func (c *Coordinator) readFromNode(nodeID string, key string) (
                                   []versioning.VersionedValue, error) {
    if nodeID == c.node.id {
        return c.node.storage.Get(key)
    }

    member := c.node.membership.GetMember(nodeID)
    if member == nil || member.Status != membership.StatusAlive {
        return nil, ErrNodeNotFound
    }

    ctx, cancel := context.WithTimeout(context.Background(),
                                       c.node.config.RequestTimeout)
    defer cancel()

    return c.node.rpcClient.GetValues(ctx, member.Address, key)
}

This local/remote split is why the coordinator needs access to the membership - it must know each node's network address and health status to make RPC calls.

Read Repair

Read repair is an optimization that piggybacks on normal reads to fix stale replicas. When a read contacts multiple replicas and finds that some have older data than others, it pushes the latest version to the stale ones.

func (c *Coordinator) readRepair(key string,
                                 latest []versioning.VersionedValue,
                                 preferenceList []string) {
    for _, nodeID := range preferenceList {
        if nodeID == c.node.id {
            continue
        }

        current, err := c.readFromNode(nodeID, key)
        if err != nil {
            continue
        }

        if c.hasStaleData(current, latest) {
            for _, v := range latest {
                c.writeToNode(nodeID, key, v)
            }
        }
    }
}

The method re-reads from each replica (yes, this is extra work - that's why it runs asynchronously) and compares what each has against the reconciled result. If a replica is stale, it gets updated.

The hasStaleData method is where vector clocks earn their keep:

func (c *Coordinator) hasStaleData(current, latest []versioning.VersionedValue) bool {
    if len(current) == 0 && len(latest) > 0 {
        return true
    }
    if len(latest) == 0 {
        return false
    }

    for _, l := range latest {
        hasLatestOrNewer := false
        for _, curr := range current {
            ordering := curr.VectorClock.Compare(l.VectorClock)
            if ordering == versioning.Equal || ordering == versioning.After {
                hasLatestOrNewer = true
                break
            }
            if ordering == versioning.Concurrent && curr.Equals(&l) {
                hasLatestOrNewer = true
                break
            }
        }
        if !hasLatestOrNewer {
            return true
        }
    }
    return false
}

For each version in latest, we check whether the replica has that version or something newer. If the replica is missing any of the latest versions, it's stale and needs repair. The Concurrent && Equals check handles the case where two versions have the same data and vector clock - they're the same version, just stored on different nodes.

Read repair is a best-effort, opportunistic mechanism. It reduces the work that the background anti-entropy process (Part 5) needs to do, but doesn't replace it. Failed repair attempts are simply ignored - the anti-entropy system will catch them eventually.

Hinted Handoff

Hinted handoff is how Dynamo maintains write availability when nodes are temporarily down. The idea: if a write is destined for Node C but Node C is unreachable, store the data on another healthy node (say, Node D) along with a hint that says "this data actually belongs to Node C." When Node C comes back, Node D delivers the data.

The Hint struct captures everything needed for later delivery:

// pkg/replication/hinted_handoff.go

type Hint struct {
    ForNode   string                    // the intended recipient
    Key       string
    Value     versioning.VersionedValue
    Timestamp time.Time
}

And HintedHandoff manages the hint store:

type HintedHandoff struct {
    nodeInfo   types.NodeInfo
    membership *membership.Membership
    storage    types.Storage
    config     types.Config
    rpcClient  *rpc.Client
    hints      map[string][]*Hint   // target nodeID -> pending hints
    mu         sync.RWMutex
}

Storing Hints

When the coordinator detects failed nodes after a write, it distributes hints to healthy nodes:

func (c *Coordinator) handleHintedHandoff(key string,
    value versioning.VersionedValue,
    failedNodes []string, preferenceList []string) {

    // Find healthy nodes (those that didn't fail)
    healthyNodes := make([]string, 0)
    for _, nodeID := range preferenceList {
        isHealthy := true
        for _, failed := range failedNodes {
            if nodeID == failed {
                isHealthy = false
                break
            }
        }
        if isHealthy {
            healthyNodes = append(healthyNodes, nodeID)
        }
    }

    // Store a hint on a healthy node for each failed node
    for _, failedNode := range failedNodes {
        if len(healthyNodes) > 0 {
            hintNode := healthyNodes[0]
            hint := &replication.Hint{
                ForNode:   failedNode,
                Key:       key,
                Value:     value,
                Timestamp: time.Now(),
            }
            c.node.hintedHoff.StoreHint(hintNode, hint)
            healthyNodes = healthyNodes[1:]  // rotate to spread hints
        }
    }
}

The hint is stored on a healthy node's local hint queue - not in the main storage. This is important: hints are metadata about missed writes, not regular data. They live in a separate map and are cleaned up after successful delivery.

Delivering Hints

A background loop periodically checks if any hinted nodes have come back to life:

func (h *HintedHandoff) DeliverHints() {
    h.mu.Lock()
    defer h.mu.Unlock()

    for nodeID, hints := range h.hints {
        if len(hints) == 0 {
            continue
        }

        if !h.isNodeAlive(nodeID) {
            continue  // still down, try again later
        }

        // Deliver each hint
        delivered := make([]bool, len(hints))
        for i, hint := range hints {
            if h.deliverHint(nodeID, hint) {
                delivered[i] = true
            }
        }

        // Remove successfully delivered hints
        remaining := make([]*Hint, 0)
        for i, hint := range hints {
            if !delivered[i] {
                remaining = append(remaining, hint)
            }
        }
        h.hints[nodeID] = remaining
    }
}

The loop is conservative: it only attempts delivery to nodes that the membership system considers alive. If delivery fails (maybe the node crashed again), the hint stays in the queue for the next round.

The deliverHint method sends the data via RPC:

func (h *HintedHandoff) deliverHint(nodeID string, hint *Hint) bool {
    if h.rpcClient == nil {
        return false
    }

    member := h.membership.GetMember(nodeID)
    if member == nil {
        return false
    }

    ctx, cancel := context.WithTimeout(context.Background(),
                                       h.config.GetRequestTimeout())
    defer cancel()

    resp, err := h.rpcClient.DeliverHint(ctx, member.Address,
                                         h.nodeInfo.GetID(), hint.Key, hint.Value)
    if err != nil {
        return false
    }
    return resp.Success
}

The receiving node handles the hint by simply storing the value in its local storage - the value carries its vector clock, so the storage layer's reconciliation logic (from Part 3) correctly handles it whether the value is new, an ancestor, or concurrent with existing data.

Sloppy Quorum

Hinted handoff enables what the paper calls a sloppy quorum. In a strict quorum, the W nodes that acknowledge a write must be from the key's preference list - the specific nodes assigned by the consistent hash ring. In a sloppy quorum, any healthy node can stand in for a failed preference-list node.

This means writes can succeed even when multiple preference-list nodes are down, as long as enough total nodes are reachable. The hint ensures the data eventually reaches its intended home. The trade-off is that until the hints are delivered, reads might not find the latest version on the "correct" nodes - but the system remains available for writes, which is Dynamo's primary design goal.

Delete as Tombstone

The Delete method follows the same quorum write path as Put, but writes a tombstone instead of a regular value:

func (c *Coordinator) Delete(ctx context.Context, key string,
                             context *Context) error {
    preferenceList := c.node.ring.GetPreferenceList(key, c.node.config.N)

    // Generate new version for the tombstone
    var newClock *versioning.VectorClock
    if context != nil && context.VectorClock != nil {
        newClock = context.VectorClock.Copy()
    } else {
        newClock = versioning.NewVectorClock()
    }
    newClock.Increment(c.node.id)
    newClock.Prune(c.node.config.VectorClockMaxSize)

    tombstone := versioning.VersionedValue{
        Data:        nil,
        VectorClock: newClock,
        IsTombstone: true,
    }

    // ... same fan-out, quorum wait, and hinted handoff as Put ...
}

This is the same code structure as Put - fan out to N replicas, wait for W acks, hinted handoff for failures. The only difference is the value being written: a VersionedValue with nil data and IsTombstone: true. Because the tombstone participates in normal vector clock reconciliation, it correctly supersedes the version it intends to delete without interfering with concurrent writes from other nodes (as we saw in Part 3's tombstone tests).

The Context: Read-Modify-Write

The Context type ties the read and write paths together:

type Context struct {
    VectorClock *versioning.VectorClock
}

type GetResult struct {
    Values  []versioning.VersionedValue
    Context *Context
}

When a Get returns multiple concurrent versions, the Context contains the merge of all their vector clocks:

func (c *Coordinator) buildContext(values []versioning.VersionedValue) *Context {
    if len(values) == 0 {
        return &Context{VectorClock: versioning.NewVectorClock()}
    }

    merged := values[0].VectorClock.Copy()
    for _, v := range values[1:] {
        merged = merged.Merge(v.VectorClock)
    }

    return &Context{VectorClock: merged}
}

When the application writes back using this context, the new version's vector clock dominates all the concurrent versions it resolved. This is the complete read-modify-write cycle:

// Read - might get concurrent versions
result, _ := node.Get(ctx, "key1")

// Application resolves conflict
merged := myMergeLogic(result.Values)

// Write back with the context from the read
// The merged clock ensures this version supersedes all parents
node.Put(ctx, "key1", merged, result.Context)

The coordinator test suite verifies this cycle works end-to-end:

func TestCoordinatorSequentialWrites_VersionsIncrement(t *testing.T) {
    node, _ := startTestNode(t, "coord-seq-1")
    defer node.Stop()

    ctx := context.Background()

    var lastCtx *dynamo.Context
    for i := 1; i <= 5; i++ {
        node.Put(ctx, "counter", []byte(fmt.Sprintf("v%d", i)), lastCtx)
        result, _ := node.Get(ctx, "counter")
        lastCtx = result.Context
    }

    result, _ := node.Get(ctx, "counter")

    // After 5 sequential writes, exactly 1 version survives
    // (each write dominates the previous)
    // Vector clock counter should be 5
    counter := result.Values[0].VectorClock.Versions["coord-seq-1"]
    // counter == 5
}

Evolving Node Lifecycle

With all these new components, NewNode and Start grow significantly:

func NewNode(id, address string, config *Config) (*Node, error) {
    // ... config validation, storage init, ring init (from Parts 1-2) ...

    // Initialize membership (tracks cluster members)
    node.membership = membership.NewMembership(id, address, config)

    // Initialize coordinator (handles distributed reads/writes)
    node.coordinator = NewCoordinator(node)

    // Initialize hinted handoff (stores writes for failed nodes)
    node.hintedHoff = replication.NewHintedHandoff(node, node.membership,
                                                    node.storage, config)

    // Initialize RPC (inter-node communication)
    node.rpcClient = rpc.NewClient(config.RequestTimeout)
    node.rpcServer = rpc.NewServer(address, node)

    return node, nil
}

And Start now launches the RPC server and a background loop:

func (n *Node) Start() error {
    // Start RPC server
    if err := n.rpcServer.Start(); err != nil {
        return fmt.Errorf("failed to start RPC server: %w", err)
    }

    // Add self to ring
    n.ring.AddNode(n.id, n.config.VirtualNodes)

    // Start hinted handoff delivery loop
    if n.config.HintedHandoffEnabled {
        n.wg.Add(1)
        go n.hintedHandoffLoop()
    }

    return nil
}

The hinted handoff loop runs every 10 seconds, attempting to deliver any stored hints:

func (n *Node) hintedHandoffLoop() {
    defer n.wg.Done()
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            n.hintedHoff.DeliverHints()
        case <-n.stopCh:
            return
        }
    }
}

The sync.WaitGroup ensures Stop waits for all background goroutines to finish before closing the storage engine. The stopCh channel signals all loops to exit.

The public API now delegates to the coordinator:

func (n *Node) Get(ctx context.Context, key string) (*GetResult, error) {
    return n.coordinator.Get(ctx, key)
}

func (n *Node) Put(ctx context.Context, key string, value []byte,
                   context *Context) error {
    return n.coordinator.Put(ctx, key, value, context)
}

func (n *Node) Delete(ctx context.Context, key string,
                      context *Context) error {
    return n.coordinator.Delete(ctx, key, context)
}

The Full Picture So Far

Let's step back and trace a complete write through the system as it now stands. A client calls node.Put(ctx, "user:123", []byte("Alice"), nil):

  1. Node.Put delegates to Coordinator.Put.
  2. The coordinator asks the ring: "who are the 3 nodes responsible for user:123?" - say, [node1, node3, node2].
  3. It creates a new vector clock {node1: 1}, wraps the value in a VersionedValue.
  4. It launches 3 goroutines, each calling writeToNode. The one for node1 (local) calls storage.Put directly. The others make RPC calls.
  5. It waits for 2 acknowledgments (W=2). Say node1 (local) and node3 (remote) respond. node2 is slow.
  6. The write succeeds. If node2 had failed entirely, a hint would be stored for it.

A subsequent read of the same key:

  1. Node.Get delegates to Coordinator.Get.
  2. Preference list: [node1, node3, node2]. Read from all 3, wait for 2.
  3. Say node1 returns {data: "Alice", clock: {node1:1}} and node3 returns the same.
  4. ReconcileConcurrent strips duplicates - one version survives.
  5. Result returned to client. Async read repair checks node2 - if it's stale, it gets updated.

What's Next

We now have a working distributed key-value store with quorum-based reads and writes, vector clock versioning, hinted handoff for failure tolerance, and read repair for opportunistic consistency. But two major pieces are missing: how do nodes discover each other and detect failures? And how do replicas that have drifted apart (beyond what read repair can catch) get back in sync?

In Part 5, we'll build the gossip protocol for membership management, a phi accrual failure detector that adapts to network conditions, and Merkle tree-based anti-entropy for comprehensive replica synchronization.

The full source code is available at github.com/tripab/toy-dynamo. The coordinator lives in pkg/dynamo/coordinator.go, hinted handoff in pkg/replication/hinted_handoff.go.