Wait Free Coordination -Zookeeper(Part-5)

ZooKeeper Implementation

Learner101
15 min readJun 20, 2024

High Availability and Fault Tolerance in ZooKeeper

ZooKeeper ensures high availability and fault tolerance through a distributed architecture that replicates data across multiple servers (nodes) in a ZooKeeper ensemble. Let’s break down the key components and processes involved in maintaining high availability and fault tolerance.

Key Components and Processes

  1. ZooKeeper Ensemble:
  • An ensemble is a group of ZooKeeper servers that collectively manage the data and provide the service.
  • The ensemble typically consists of a leader and multiple followers.

2. Data Replication:

  • The ZooKeeper data is replicated across all servers in the ensemble.
  • Each server maintains a complete in-memory replica of the data tree.
  • Data replication ensures that if one or more servers fail, other servers can continue to provide the service without interruption.

3. Request Handling:

  • Read Requests:
  • Handled by the server the client is connected to.
  • The server reads the state of its local replica and returns the result.
  • Write Requests:
  • Require coordination among all servers in the ensemble.
  • Handled using an agreement protocol (atomic broadcast).

4. Agreement Protocol:

  • Ensures consistency and coordination among servers for write requests.
  • Involves the following steps:
  • A server receives a write request and forwards it to the leader.
  • The leader generates a proposal for the state change and sends it to all followers.
  • Followers agree on the proposal, and once a majority agrees, the state change is committed.

5. Leader and Followers:

  • Leader:
  • Coordinates write operations.
  • Generates proposals for state changes.
  • Followers:
  • Receive proposals from the leader.
  • Agree on the state changes by participating in the consensus protocol.

6. Data Storage and Logging:

  • In-Memory Database:
  • Each server stores the entire data tree in memory for fast access.
  • Write-Ahead Log (Replay Log):
  • Logs updates to disk before applying them to the in-memory database.
  • Ensures recoverability by allowing the system to replay committed operations.
  • Periodic Snapshots:
  • Periodically generate snapshots of the in-memory database.
  • Helps in faster recovery in case of server restarts.

Example Scenario

Step-by-Step Process for a Write Request

  1. Client Sends Write Request:
  2. A client connects to a ZooKeeper server (Server A) and sends a write request to create or update a znode.
  3. Server A Forwards Request to Leader:
  4. Server A prepares the request and forwards it to the leader (Server B) for coordination.
  5. Leader Proposes State Change
  6. Leader (Server B) generates a proposal for the state change and sends it to all followers (Servers C, D, E).
  7. Followers Agree on Proposal:
  8. Followers receive the proposal and participate in the consensus protocol.
  9. Once a majority of followers agree, they send their agreement back to the leader.
  10. Commit State Change:
  11. Upon receiving agreement from a majority, the leader commits the state change to its in-memory database and updates the write-ahead log. The leader then notifies all followers to commit the state change.
  12. Followers Commit State Change:
  13. Followers commit the state change to their in-memory databases and update their write-ahead logs.
  14. Client Receives Confirmation:
  15. Once the state change is committed, the leader sends a confirmation back to Server A, which then responds to the client.

Step-by-Step Process for a Read Request

  1. Client Sends Read Request
  • A client connects to a ZooKeeper server (Server C) and sends a read request.

2. Server C Reads Local Replica:

  • Server C reads the requested data from its local in-memory replica of the ZooKeeper database.

3. Client Receives Data:

  • Server C returns the data to the client.

ZooKeeper Request Processor and Transaction Management

ZooKeeper ensures consistency and coordination among servers by using a messaging layer that guarantees atomic broadcast of messages. This ensures that the local replicas of the ZooKeeper database on each server never diverge, even though some servers might have processed more transactions than others at any given time. Here’s a detailed explanation of how the request processor works, with an example.

Key Concepts

  1. Atomic Messaging Layer:
  • Guarantees that all servers receive messages (transactions) in the same order.
  • Ensures consistency across replicas.

2. Idempotent Transactions:

  • Transactions are designed to be idempotent, meaning that applying the same transaction multiple times has no different effect than applying it once.

3. Transaction Creation:

  • When the leader server receives a write request, it calculates the future state of the system after the write is applied.
  • The leader then transforms the write request into a transaction that captures this new state.

4. Conditional Operations:

  • Some operations, like setData, can be conditional, requiring certain conditions (e.g., version numbers) to be met before applying the change.
  • The leader calculates if these conditions will be met in the future state and generates the appropriate transaction.

Example Scenario

Let’s walk through an example where a client sends a conditional setData request to update a znode.

  1. Initial Setup:
  • Assume there is a znode /example with the current version number 5 and some data.

2. Client Sends Conditional setData Request

  • A client sends a setData request to update the data of /example but only if the version number matches 5 (current version).

3. Leader Receives the Request:

  • The leader server receives this request.
  • There might be outstanding transactions that have not yet been applied to the database, so the leader calculates the future state considering these transactions.

4. Future State Calculation:

  • The leader checks the conditional requirement (version number 5) against the future state.
  • If the condition is met (i.e., the version number in the future state is still 5), the leader proceeds to generate a setDataTXN.

5. Transaction Generation:

  • setDataTXN:
  • If the version number matches, the leader creates a setDataTXN transaction which includes:
  • The new data to be set.
  • The new version number (incremented by 1).
  • Updated timestamps.
  • errorTXN:
  • If the condition is not met (e.g., version number does not match), the leader generates an errorTXN indicating the failure.

6. Broadcasting the Transaction:

  • The leader broadcasts the generated transaction (either setDataTXN or errorTXN) to all followers using the atomic messaging layer.

7. Applying the Transaction:

  • Each server (leader and followers) applies the transaction to their local replica of the ZooKeeper database.
  • If a setDataTXN is applied, the data of /example is updated, the version number is incremented to 6, and timestamps are updated.
  • If an errorTXN is applied, no changes are made to /example.

Detailed Example

Let’s consider the following sequence of operations:

  • Current State:
  • /example znode:
  • Data: “oldData”
  • Version: 5
  • Client Request:
  • Request: setData("/example", "newData", 5)
  • Leader Calculation:
  • Leader calculates the future state and sees that the version number 5 condition will be met.
  • Generated Transaction:
  • setDataTXN:
  • Data: “newData”
  • Version: 6
  • Timestamps: updated
  • Broadcast and Apply:
  • The leader broadcasts the setDataTXN to followers.
  • Each server applies the setDataTXN:
  • /example znode:
  • Data: “newData”
  • Version: 6
  • Timestamps: updated

If instead the condition was not met (e.g., the current version was already 6 due to another update):

  • Client Request:
  • Request: setData("/example", "newData", 5)
  • Leader Calculation:
  • Leader calculates the future state and sees that the version number condition is not met.
  • Generated Transaction:
  • errorTXN indicating version mismatch.
  • Broadcast and Apply:
  • The leader broadcasts the errorTXN to followers.
  • Each server applies the errorTXN without changing /example.

This mechanism ensures that all servers maintain a consistent view of the data and handle conditional updates correctly, maintaining the integrity and synchronization of the distributed system.

Atomic Broadcast in ZooKeeper Using Zab Protocol

ZooKeeper uses the Zab (ZooKeeper Atomic Broadcast) protocol to ensure that all state changes (write requests) are consistently replicated across all servers in the ensemble. Here’s a detailed explanation of how this works, with an example.

Key Concepts

  1. Leader and Followers:
  • ZooKeeper ensemble has a leader and multiple followers.
  • All write requests are forwarded to the leader.
  • The leader is responsible for executing the request and broadcasting the state change to all followers.

2. Atomic Broadcast:

  • Zab ensures that all changes are delivered in the same order to all servers.
  • Zab uses a majority quorum (simple majority) for deciding on proposals. This means ZooKeeper can tolerate up to fff failures in a system with 2f+12f + 12f+1 servers.

3. Pipeline of Requests:

  • ZooKeeper keeps a pipeline of requests to achieve high throughput.
  • Thousands of requests may be in different stages of the pipeline at any given time.

4. Order Guarantees:

  • Zab provides strong order guarantees, ensuring that:
  • Changes broadcast by a leader are delivered in the order they were sent.
  • All changes from previous leaders are delivered to a new leader before it broadcasts its own changes.

5. Transport and Logging:

  • TCP is used for transport, maintaining message order.
  • The leader chosen by Zab is also the ZooKeeper leader.
  • Proposals are logged as a write-ahead log for the in-memory database to avoid writing messages twice to disk.

6. Redelivery During Recovery:

  • During normal operation, Zab delivers messages exactly once and in order.
  • During recovery, Zab may redeliver messages, but since transactions are idempotent, multiple deliveries are acceptable as long as they are in order.

Example Scenario

Let’s walk through an example to illustrate how this works in practice.

  1. Initial Setup:
  • Assume a ZooKeeper ensemble with 5 servers (S1, S2, S3, S4, S5).
  • Server S1 is the leader, and the others are followers.

2. Client Sends Write Request:

  • A client sends a write request to update a znode /example to the ZooKeeper ensemble.
  • The request is forwarded to the leader, S1.

3. Leader Executes and Proposes State Change:

  • S1 executes the request and calculates the new state of /example.
  • S1 creates a proposal for the state change and logs it to its write-ahead log.

4. Broadcasting the Proposal:

  • S1 broadcasts the proposal to all followers (S2, S3, S4, S5) using Zab.

5. Followers Acknowledge the Proposal:

  • Each follower logs the proposal to its write-ahead log.
  • Each follower sends an acknowledgment (ACK) back to S1.

6. Achieving Quorum:

  • Once S1 receives ACKs from a majority of followers (at least 3 out of 5), it considers the proposal committed.

7. Commit the State Change:

  • S1 commits the state change to its in-memory database and sends a response to the client.
  • S1 broadcasts a commit message to all followers.
  • Each follower commits the state change to its in-memory database.

Detailed Example

Let’s consider a write request to update the data of znode /example from "oldData" to "newData".

  • Current State:
  • /example znode:
  • Data: “oldData”
  • Version: 5
  • Client Request:
  • Request: setData("/example", "newData")
  • Leader Execution and Proposal:
  • S1 executes the request, updates the data, and creates a proposal.
  • Proposal: Change /example data to "newData".
  • Broadcasting the Proposal:
  • S1 broadcasts the proposal to S2, S3, S4, and S5.
  • Follower Acknowledgments:
  • S2, S3, and S4 log the proposal and send ACKs to S1.
  • S5 is slow or down and does not respond immediately.
  • Quorum Achieved:
  • S1 receives ACKs from S2, S3, and S4 (majority).
  • S1 commits the state change and sends a response to the client.
  • Committing the State Change:
  • S1 commits the new data “newData” to /example.
  • S1 broadcasts a commit message to S2, S3, S4, and S5.
  • S2, S3, and S4 commit the state change to their in-memory databases.
  • S5 eventually receives the commit message and commits the change.
  • Final State:
  • /example znode:
  • Data: “newData”
  • Version: 6

This mechanism ensures that all servers in the ensemble have a consistent view of the data and can tolerate failures while maintaining high availability and throughput.

3. Replicated Database and Fuzzy Snapshots

ZooKeeper maintains a replicated database across all servers in its ensemble. Each server keeps an in-memory copy of the ZooKeeper state, which includes all znodes and their data. This state needs to be recovered if a server crashes and restarts.

Fuzzy Snapshots

Snapshot Process:

  • Fuzzy Snapshot: ZooKeeper takes periodic snapshots of its state to facilitate recovery after crashes. These snapshots are called fuzzy because they capture the state of znodes at different points in time during the snapshot process.
  • Snapshot Mechanism: ZooKeeper performs a depth-first traversal of its data tree to take a snapshot. During this traversal, it reads each znode’s data and metadata atomically and writes them to disk.
  • No Locking: Unlike traditional database snapshots that lock the state to ensure consistency during snapshot creation, ZooKeeper’s fuzzy snapshots do not lock the state. This means that the snapshot may capture intermediate states of znodes as it traverses the tree.

Example Scenario

Let’s consider an example to illustrate how fuzzy snapshots work:

  1. Initial State:
  • ZooKeeper data tree has two nodes: /foo and /goo.
  • /foo has value f1 and version 1.
  • /goo has value g1 and version 1.

2. State Changes Arrive:

  • The following state changes arrive:
  • SetDataTXN, /foo, f2, 2
  • SetDataTXN, /goo, g2, 2
  • SetDataTXN, /foo, f3, 3

3. Resulting State After Changes:

  • After processing these changes:
  • /foo has value f3 and version 3.
  • /goo has value g2 and version 2.

4. Fuzzy Snapshot Process:

  • During the fuzzy snapshot creation:
  • ZooKeeper traverses the tree and captures states of /foo and /goo.
  • Due to the nature of fuzzy snapshots, it might capture:
  • /foo with value f3 and version 3.
  • /goo with value g1 and version 1 (assuming it captured an intermediate state).

5. Crash and Recovery:

  • Suppose the ZooKeeper server crashes after taking this fuzzy snapshot.
  • Upon recovery, the server reloads its state from the last fuzzy snapshot taken.
  • The server applies the state changes recorded in its transaction log (write-ahead log, Zab) since the snapshot was taken.
  • These changes (SetDataTXN operations) are reapplied in order:
  • SetDataTXN, /foo, f2, 2
  • SetDataTXN, /goo, g2, 2
  • SetDataTXN, /foo, f3, 3
  • As these changes are idempotent (reapplying them does not change the final state if already applied), ZooKeeper ensures that the final state matches the intended state before the crash.

Summary

ZooKeeper’s use of fuzzy snapshots allows it to efficiently capture and restore the state of its distributed database after server crashes. Despite potentially capturing intermediate states during snapshot creation, ZooKeeper guarantees eventual consistency by applying idempotent state changes in order during recovery. This approach ensures that the ZooKeeper service remains highly available and resilient to server failures while maintaining data integrity and consistency across the ensemble.

4. Client Server Interactions

Write Request Processing and Notifications

  1. Write Request Processing:
  • When a client sends a write request to ZooKeeper (e.g., create, setData, delete), the request is processed by the ZooKeeper ensemble.
  • Each write request is received and processed by a single leader server (elected via ZooKeeper’s consensus protocol), ensuring linearizable consistency across the ensemble.

2. Sending Notifications:

  • After a write request is successfully processed and committed by the leader, the server sends notifications to all clients that have watches set on the znode or path affected by the write operation.
  • Notifications inform clients that the state of a znode has changed, which allows clients to take appropriate actions based on these changes.

3. Clearing Notifications:

  • Upon sending out notifications, the server clears these watches. This means that once a notification is sent, the watch is removed from the server’s watch list for that client.
  • This action prevents redundant notifications from being sent for the same event and ensures that each watch triggers exactly one notification event.

4. Order of Processing:

  • ZooKeeper servers process write requests in strict order. This ensures that writes are applied sequentially and consistently across all replicas in the ensemble.
  • By maintaining strict order, ZooKeeper guarantees that notifications are sent in the same order as the corresponding write operations. This sequential processing prevents race conditions and ensures that clients observe changes in a predictable manner.

5. Local Handling of Notifications:

  • Notifications are handled locally by each server. Only the server that a client is currently connected to tracks and triggers notifications for that client.
  • If a client has watches set on multiple znodes across different servers in the ensemble, each server independently manages and triggers notifications for watches associated with the znodes it manages.

Example Scenario:

Let’s illustrate this with an example:

  • Initial State:
  • Client A is connected to ZooKeeper and sets a watch on znode /data.
  • Client B performs a write operation (setData) to update /data from value1 to value2.
  • Processing Steps:
  1. Client B sends a setData request to ZooKeeper.
  2. The request is processed by the leader server (let’s say S1), which assigns a zxid (zxid1) to this write operation.
  3. S1 applies the write to its local state and sends notifications to all clients, including Client A, that have watches set on /data.
  4. Client A receives a notification that /data has been updated, triggering its watch event handler.
  • Notification Handling:
  • Once S1 sends the notification to Client A, it clears the watch associated with /data for Client A.
  • If Client A needs to continue monitoring /data, it must explicitly set another watch after processing the notification.

Benefits of This Approach:

  • Consistency: Clients receive notifications in strict order corresponding to the sequence of write operations. This ensures consistent views of data across distributed clients.
  • Efficiency: Clearing watches after sending notifications prevents redundant notifications and reduces unnecessary network traffic.
  • Reliability: By processing writes sequentially and handling notificationslocally, ZooKeeper maintains reliability and guarantees that clients are promptly informed of changes in the state of watched znodes.

Read Requests and Local Processing

  1. Local Processing:
  • ZooKeeper servers handle read requests locally. This means that when a client issues a read operation (like getData), the server doesn't need to consult other servers in the ensemble.
  • The server retrieves data directly from its own local replica of the ZooKeeper state, which is maintained in memory for fast access.

2. Local Replica:

  • Each ZooKeeper server maintains an in-memory replica of the entire ZooKeeper state tree.
  • This local replica is synchronized with replicas on other servers using ZooKeeper’s atomic broadcast protocol (Zab) to ensure consistency across the ensemble.

Zxid Tagging

  1. Zxid (ZooKeeper Transaction ID):
  • Zxid is a monotonically increasing number assigned to every transaction (read or write) processed by a ZooKeeper server.
  • It uniquely identifies each transaction and helps in establishing a total order of transactions across the ZooKeeper ensemble.

2. Why Zxid Tagging is Needed:

  • Consistency and Ordering: By tagging read requests with zxids, ZooKeeper ensures that clients receive data based on a consistent view of the state. This zxid reflects the state of the ZooKeeper tree at the time of the read.
  • Partial Order: ZooKeeper uses zxids to establish a partial order of read requests relative to write requests. This means that a read operation will reflect all writes that were committed with zxids less than or equal to the zxid of the read operation itself.
  • Idempotency: Zxids ensure that operations are idempotent. Even if a read operation is retried due to network issues or timeouts, it will always reflect the state up to a certain zxid, ensuring consistency.

Partial Order of Read Requests

  1. Partial Order:
  • ZooKeeper does not guarantee strict linearizability (total order) of read operations with respect to write operations across the entire ensemble.
  • Instead, it guarantees a partial order where each read operation is tagged with a zxid that corresponds to the last transaction seen by the server.
  • Clients can observe changes up to a specific zxid, but they might not see the effects of the very latest write if the write has not been fully replicated to all servers in the ensemble.

Example Scenario:

Let’s illustrate with an example:

  • Initial State:
  • ZooKeeper ensemble has three servers: S1, S2, S3.
  • Client sends a write operation to set /node to value1.
  • S1 receives the write request first and assigns it a zxid, say zxid1.
  • Read Request:
  • Client sends a read request to retrieve the value of /node.
  • The request is directed to S2.
  • S2 reads its local replica and tags the read response with the zxid zxid1, which was the last transaction it applied.
  • Partial Order Example:
  • If another client sends a write operation (setData) to update /node to value2 with zxid zxid2, subsequent read requests to S2 might reflect value1 (up to zxid1) until S2 applies zxid2.

Conclusion

ZooKeeper’s use of zxids for tagging read requests and establishing partial order ensures that clients receive a consistent view of shared data. This approach balances the need for performance (local processing of reads) with the requirement for consistency and ordering in distributed environments.

Background: Fast Reads and Precedence Order

ZooKeeper is designed for high-performance and distributed coordination, which involves managing shared state among multiple clients. One of the design trade-offs in ZooKeeper is its support for fast reads, which means that read operations are handled locally by each server without waiting for synchronization with other servers. This approach enhances read performance but may lead to returning stale data if a more recent update has been committed but not yet propagated.

Introducing Sync Primitive

To address scenarios where strict precedence order of read operations is necessary, ZooKeeper provides a sync primitive. This primitive ensures that a client can synchronize its view of a znode’s state with the most recent updates, even if those updates have not yet been fully propagated across the ZooKeeper ensemble.

How Sync Works:

  1. Asynchronous Execution:
  • Sync is executed asynchronously by a client, meaning it does not block the client’s thread. Instead, the client issues a sync request to the ZooKeeper ensemble.

2. Ordered Execution by Leader:

  • The sync operation is ordered by the leader server after all pending writes to its local replica have been applied. This ensures that all changes before the sync operation are reflected in the client’s view.

3. Implementation Details:

  • Leader-Based Algorithm: ZooKeeper uses a leader-based algorithm for coordination. The leader coordinates the sync operation by placing it at the end of the queue of requests between itself and the server executing the sync call.
  • Null Transaction: If there are no pending transactions that require committing, the leader may issue a null transaction. This null transaction is used to mark a point in the transaction log and allows the sync operation to be ordered effectively.

4. Avoiding Broadcast Overhead:

  • Unlike atomic broadcast protocols used for write operations, sync does not require atomic broadcast across the ensemble. Instead, it leverages the leader’s authority to order the sync operation locally, reducing unnecessary broadcast traffic.

5. Handling Leader Changes:

  • To ensure correctness, the follower servers must verify that the current leader is still valid and active before processing sync requests. If there are pending transactions or updates, the leader remains active, ensuring consistent operation without extra overhead.

6. Timeout Mechanism:

  • ZooKeeper employs timeouts to detect leader failures. If a leader fails or becomes unresponsive, the system triggers a leader election process to select a new leader. This mechanism ensures continuous availability and reliability of the ZooKeeper ensemble.

Example Scenario:

Let’s illustrate how sync works with an example:

  • Initial State:
  • Client A performs a write operation (setData) to update znode /node1 from value1 to value2.
  • Client B, which requires the latest value of /node1, issues a read operation immediately after Client A's write.
  • Scenario Without Sync:
  • Client B performs a read operation on /node1 and may receive value1 instead of value2 if the read operation is handled before the write operation is fully propagated.
  • Scenario with Sync:
  • Client B issues a sync operation followed by the read operation on /node1.
  • The sync operation ensures that all updates (including Client A’s write) are processed and applied before Client B’s read operation is executed.
  • Client B receives value2, reflecting the most recent update to /node1.

Benefits of Sync Primitive:

  • Guaranteed Precedence Order: Ensures that read operations reflect the most recent updates, maintaining consistency in distributed applications where strict order of operations is crucial.
  • Efficient and Scalable: Utilizes leader-based ordering and avoids unnecessary broadcast traffic, optimizing performance without compromising consistency.
  • Flexibility: Allows applications to choose between fast reads (for performance-sensitive operations) and sync (for operations requiring strict consistency), based on their specific requirements.

In conclusion, ZooKeeper’s sync primitive provides a flexible mechanism for ensuring strict precedence order of read operations in distributed coordination scenarios, balancing performance with consistency based on application needs.

--

--