Wait Free Coordination -Zookeeper(Part-4)

Implementing Primitives with Zookeeper

Learner101
20 min readJun 20, 2024
  1. Config Management

In distributed applications, ZooKeeper is commonly used for dynamic configuration management. Let’s delve into how ZooKeeper can be utilized to achieve this, using its watch mechanism for ensuring processes have the most recent configuration information.

Scenario Explanation

  1. ZooKeeper Znode (zc):
  • Configuration is stored in a znode named zc within ZooKeeper.

2. Process Initialization:

  • When processes start up, they initialize by obtaining their configuration from zc.
  • This is achieved by reading zc with the watch flag set to true. Setting the watch flag means that if zc is updated in the future, the process will receive a notification from ZooKeeper.

3. Handling Configuration Updates:

  • If the configuration stored in zc is updated at any point:
  • ZooKeeper triggers notifications to all processes that are watching zc.
  • Processes that have set the watch flag will be informed of the update.

Example Sequence:

  • Initial State: zc contains configuration data config_v1.
  • Process A starts up and reads zc with the watch flag set to true, obtaining config_v1.
  • Process B starts up similarly, also reading zc with the watch flag set to true.
  • Update: An administrator updates zc to config_v2.

Notification Handling:

  • ZooKeeper notifies both Process A and Process B of the update to zc.
  • If there are subsequent updates to zc before Process A or Process B can react, ZooKeeper consolidates these changes into a single notification for each process.
  • This ensures that processes are notified only once of the most recent configuration change (config_v2), even if multiple changes occur in quick succession.

Implementation with ZooKeeper API

Here’s how a process might implement this primitive using ZooKeeper’s Java API:

public class ConfigWatcher implements Watcher {
private static final String ZNODE_PATH = "/config";
private ZooKeeper zk;
public ConfigWatcher(String zkHost) throws Exception {
this.zk = new ZooKeeper(zkHost, 3000, this);
}
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
// Get the updated configuration data
byte[] newData = zk.getData(ZNODE_PATH, true, null);
String newConfig = new String(newData, "UTF-8");
System.out.println("Updated configuration: " + newConfig);

// Continue watching for future changes
zk.exists(ZNODE_PATH, true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void start() throws Exception {
// Initial read and watch for changes
byte[] initData = zk.getData(ZNODE_PATH, true, null);
String initialConfig = new String(initData, "UTF-8");
System.out.println("Initial configuration: " + initialConfig);
// Keep the process running
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
String zkHost = "localhost:2181";
ConfigWatcher configWatcher = new ConfigWatcher(zkHost);
configWatcher.start();
}
}
  • Initialization (start method):
  • The ConfigWatcher initializes a connection to ZooKeeper (zk) and reads the initial configuration from /config znode.
  • It sets a watch on zc to receive notifications for any future changes.
  • Watch Handling (process method):
  • The process method is invoked when a watch event occurs (e.g., NodeDataChanged).
  • It retrieves the updated configuration data (newConfig) and prints it.
  • It sets another watch on zc to continue monitoring for subsequent changes.

2. Second Primitive — Rendezvous

In distributed systems, the concept of rendezvous is used when processes need to dynamically discover and coordinate with each other without knowing the specific details upfront. ZooKeeper provides a mechanism to facilitate this through its znodes and watch mechanism. Let’s explore how this works with an example:

Example Scenario: Master-Worker Coordination using ZooKeeper Rendezvous

  1. Setup:
  • Imagine a scenario where a client needs to start a master process and several worker processes dynamically.
  • The client creates a rendezvous znode (zr) in ZooKeeper to facilitate coordination between these processes.

2. Client’s Role:

  • The client initializes the system by creating the znode zr in ZooKeeper.
  • It passes the full pathname of zr as a startup parameter to both the master and worker processes.

3. Master Process:

  • When the master process starts up, it fills in zr with the information about addresses and ports it is using.
  • This information could include network addresses, ports, or any other configuration data necessary for workers to connect to the master.

4. Worker Processes:

  • Worker processes, started by a scheduler or independently, also receive the full pathname of zr as a startup parameter.
  • Each worker process reads zr with the watch flag set to true. If zr has not yet been filled in by the master (i.e., it's empty or incomplete), the worker process waits to be notified by ZooKeeper when zr is updated.

5. Coordination via ZooKeeper:

  • ZooKeeper ensures that once zr is updated by the master process, all worker processes watching zr will receive a notification.
  • Upon receiving the notification, worker processes can read the updated information from zr and proceed with connecting to the master using the provided addresses and ports.

6. Ephemeral Nodes:

  • Optionally, zr can be created as an ephemeral node. Ephemeral nodes in ZooKeeper are automatically deleted when the client session ends (e.g., when the client disconnects or terminates).
  • If zr is ephemeral, both master and worker processes can also watch for the deletion of zr. This allows them to clean up resources or take appropriate action when the client that created zr ends its session.

Implementation Considerations

Here’s a simplified outline of how this could be implemented using ZooKeeper’s Java API:

public class RendezvousExample {
private static final String ZNODE_PATH = "/rendezvous";
private ZooKeeper zk;
public RendezvousExample(String zkHost) throws Exception {
this.zk = new ZooKeeper(zkHost, 3000, null);
}
public void startMasterProcess(String masterInfo) throws KeeperException, InterruptedException {
// Master process writes its information to zr
zk.create(ZNODE_PATH, masterInfo.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("Master process has written to " + ZNODE_PATH);
}
public void startWorkerProcess() throws KeeperException, InterruptedException {
// Worker process reads zr with watch set to true
byte[] data = zk.getData(ZNODE_PATH, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
System.out.println("Worker process detected change in " + ZNODE_PATH);
try {
// Read updated information and proceed
byte[] newData = zk.getData(ZNODE_PATH, false, null);
String updatedInfo = new String(newData, "UTF-8");
System.out.println("Updated information from zr: " + updatedInfo);
} catch (Exception e) {
e.printStackTrace();
}
}
}, null);

String initialInfo = new String(data, "UTF-8");
System.out.println("Worker process started with initial information from zr: " + initialInfo);
}
public static void main(String[] args) throws Exception {
String zkHost = "localhost:2181";
RendezvousExample example = new RendezvousExample(zkHost);
// Start master process
example.startMasterProcess("MasterInfo: localhost:5000");
// Start worker processes
example.startWorkerProcess();
example.startWorkerProcess();
// Simulate some activity
Thread.sleep(5000);
// Close ZooKeeper connection
example.zk.close();
}
}

Explanation of the Example:

  • Master Process:
  • startMasterProcess method creates zr in ZooKeeper with the master's information (MasterInfo: localhost:5000 in this case).
  • Worker Process:
  • startWorkerProcess method reads zr with a watch set to true. It prints the initial information from zr and sets a watch for future changes.
  • Upon detecting a change in zr, it reads the updated information and prints it.
  • Execution:
  • When executed, the master process writes to zr, and worker processes (simulated here) start and read from zr.
  • Worker processes are notified when zr is updated by the master process, ensuring they get the latest information dynamically.

3. Group Membership -Primitive

In distributed systems, group membership management is a critical task, often facilitated by ZooKeeper using its znodes and ephemeral nodes. Let’s break down how group membership can be implemented using ZooKeeper with an example:

Scenario:

Imagine a scenario where multiple processes (nodes) need to join and leave a group dynamically. These processes might be part of a distributed application where they collaborate or coordinate based on their membership in the group.

Implementation Steps:

  1. Designating the Group Znode (zg):
  • First, a designated znode (zg) is created in ZooKeeper to represent the group. This znode acts as a parent node under which each member of the group will create ephemeral child nodes.

2. Joining the Group:

  • When a process wants to join the group, it creates an ephemeral znode under zg.
  • If the process has a unique identifier, it uses this identifier as the name of its child znode under zg.
  • If identifiers are not unique or to ensure uniqueness, the process can use the SEQUENTIAL flag when creating the znode, which ZooKeeper will automatically assign a unique name.

3. Adding Process Information:

  • The child znode created by the process can contain information relevant to that process, such as its address, ports, or any other metadata needed for group coordination.

4. Automatic Removal on Failure:

  • If a process fails or terminates abnormally, ZooKeeper automatically removes the ephemeral znode associated with that process from zg.
  • This ensures that the group membership accurately reflects the current active processes.

5. Obtaining Group Information:

  • To obtain information about the group membership, processes can simply list the children of zg.
  • Each child znode represents an active member of the group, and its data can provide details about that member.

6. Monitoring Changes:

  • Processes interested in monitoring changes in group membership can set a watch on zg.
  • When changes occur (i.e., when a new member joins or leaves the group), ZooKeeper notifies the process that has set the watch.
  • Upon receiving a notification, the process can refresh its view of the group by listing the children of zg again, ensuring it always has up-to-date information.

Example:

Let’s illustrate with an example where processes A, B, and C join a group managed by ZooKeeper:

  1. Initialization:
  • ZooKeeper client connects to the ZooKeeper ensemble.
  • It creates a znode /group (let's call it zg) to manage group membership.

2. Joining the Group:

  • Process A wants to join the group. It creates an ephemeral znode under /group, say /group/A.
  • Process B also joins and creates /group/B.
  • Process C joins with /group/C.

3. Membership Information:

  • Any process can list the children of /group to see who is currently in the group (A, B, C).

4. Monitoring Changes:

  • Process D sets a watch on /group.
  • If Process B leaves the group (its session ends or it explicitly deletes its znode), ZooKeeper notifies Process D.
  • Process D can then refresh its view of the group by listing the children of /group, now seeing only A and C.

5. Automatic Cleanup:

  • If Process B crashes or terminates unexpectedly, ZooKeeper removes /group/B automatically.
  • This ensures that only active, currently running processes are listed under /group.

4. Simple Locking primitive

Implementing locks using ZooKeeper is a common use case, even though ZooKeeper itself is not a dedicated lock service. Let’s delve into how locks can be implemented with ZooKeeper, addressing both the basic implementation and the subsequent improvements to overcome certain limitations.

Simple Lock Implementation

  1. Lock Representation:
  • A lock is represented by a znode in ZooKeeper.

2. Acquiring the Lock:

  • To acquire a lock, a client attempts to create a designated znode with the EPHEMERAL flag set.
  • If the creation succeeds, the client holds the lock because ephemeral nodes are automatically removed when the client’s session ends.

3. Handling Lock Contention:

  • If the create operation fails (i.e., the znode already exists), the client sets a watch on the znode.
  • It waits to be notified if the current lock holder (leader) releases the lock (by deleting the znode) or if the client itself disconnects unexpectedly (session ends).

4. Releasing the Lock:

  • A client releases the lock explicitly by deleting the znode associated with the lock.
  • Alternatively, if the client session ends unexpectedly, ZooKeeper removes the ephemeral znode automatically, thereby releasing the lock.

5. Contention Issues:

  • Herd Effect: If multiple clients are waiting to acquire the lock and it becomes available, all waiting clients might attempt to acquire the lock simultaneously. However, only one client will succeed.
  • Exclusive Locking: This basic implementation only supports exclusive locking, meaning only one client can hold the lock at a time.

Improved Locking Primitives

The provided locking scheme using ZooKeeper addresses the herd effect and enhances the basic lock implementation to manage contention more efficiently. Let’s break down how this scheme works and why it mitigates the herd effect:

Locking Mechanism Explanation

Lock Acquisition (Lock operation):

  1. Create Ephemeral Sequential Node (n):
  • Each client attempting to acquire the lock creates an ephemeral sequential znode (n) under the lock znode (l). The sequential flag ensures that nodes are created in a strictly increasing order.

2. Get Children (C):

  • Clients retrieve the list of children (C) under the lock znode (l), sorted by their creation order.

3. Check Lowest Sequence Number:

  • If the client’s node (n) has the lowest sequence number in C, it acquires the lock immediately and exits.

4. Watch Previous Znode (p):

  • If not the lowest, the client identifies the znode (p) just before its own in the ordered list C.
  • It sets a watch on p to monitor changes (deletion or modification).

5. Wait for Event:

  • The client waits for the watch event on p. This ensures that the client is notified immediately when the lock becomes available (i.e., p is deleted) without polling or timeouts.

6. Retry if Needed:

  • Upon receiving the watch event, the client repeats the process from step 2 to check if its znode (n) now has the lowest sequence number in C. If so, it acquires the lock; otherwise, it continues waiting on the next znode in C.

Lock Release (Unlock operation):

  1. Delete Node (n):
  • To release the lock, the client simply deletes its ephemeral sequential znode (n).
  • ZooKeeper automatically cleans up the znode if the client session ends unexpectedly, ensuring the lock is released properly.

Advantages of This Locking Scheme

  1. Elimination of Herd Effect:
  • Each client watches the znode immediately preceding its own in the sequence. When this znode is deleted (indicating the lock is released), only that specific client wakes up to check if it can now acquire the lock. This prevents multiple clients from simultaneously vying for the lock when unnecessary.

2. No Polling or Timeouts:

  • Clients do not need to poll or implement timeout mechanisms. They rely solely on ZooKeeper’s watch mechanism to be notified of lock availability changes.

3. Visibility and Debugging:

  • The structure allows easy visibility into lock contention by browsing ZooKeeper data. This helps in diagnosing and debugging locking issues, such as detecting which clients are waiting for locks or which znodes hold active locks.

Example Scenario

Let’s illustrate this with an example involving three clients (A, B, C) trying to acquire a lock:

  1. Client A creates znode l/lock-0000000001.
  2. Client B creates znode l/lock-0000000002.
  3. Client C creates znode l/lock-0000000003.

Assume Client A successfully acquires the lock because it creates l/lock-0000000001 first. Clients B and C set watches on l/lock-0000000001.

  • If Client A releases the lock by deleting l/lock-0000000001, Client B receives a watch event and attempts to acquire the lock (by checking if l/lock-0000000002 is now the lowest).
  • If Client B acquires the lock, Client C waits for l/lock-0000000002 to be deleted, then tries to acquire the lock.

This sequential waiting and acquisition process ensures that only one client attempts to acquire the lock at any given moment, thereby eliminating the herd effect.

Read Write Lock

These procedures outline how read and write locks can be implemented using ZooKeeper. Let’s break down each procedure step by step:

Write Lock Procedure:

1 n = create(l + "/write-", EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if n is lowest znode in C, exit
4 p = znode in C ordered just before n
5 if exists(p, true) wait for event
6 goto 2

Explanation:

  1. Create Write Lock Znode:
  • n = create(l + “/write-”, EPHEMERAL|SEQUENTIAL): Client creates an ephemeral sequential znode under the path /lock with the prefix /lock/write-. The sequential flag ensures that each znode gets a unique, sequentially increasing suffix.

2. Get Children Znodes:

  • C = getChildren(l, false): Client retrieves the list of children znodes under the path /lock.

3. Check if Lowest Znode:

  • if n is lowest znode in C, exit: Client checks if the znode n it just created is the lowest (earliest) znode in the list C.
  • If it is the lowest, then Client E has successfully acquired the write lock and can proceed with its critical section.

4. Wait for Previous Znode (p):

  • p = znode in C ordered just before n: Client identifies the znode p in list C that is ordered just before n.
  • if exists(p, true) wait for event: If the znode p exists, Client E waits for an event (watch event) indicating changes or deletion of p.

5. Retry:

  • goto 2: Client E retries the process from step 2 after receiving the watch event for znode p. This ensures that Client E remains in the queue and waits until its turn to acquire the write lock.

Read Lock Procedure:

1 n = create(l + "/read-", EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if no write znodes lower than n in C, exit
4 p = write znode in C ordered just before n
5 if exists(p, true) wait for event
6 goto 3

Explanation:

  1. Create Read Lock Znode:
  • n = create(l + “/read-”, EPHEMERAL|SEQUENTIAL): Client creates an ephemeral sequential znode under the path /lock with the prefix /lock/read-.

2. Get Children Znodes:

  • C = getChildren(l, false): Client retrieves the list of children znodes under the path /lock.

3. Check for Lower Write Znodes:

  • if no write znodes lower than n in C, exit: Client checks if there are no write lock znodes (/lock/write-*) with sequence numbers lower than n.
  • If there are no lower write locks, Client E can proceed with acquiring the read lock.

4. Wait for Previous Write Znode (p):

  • p = write znode in C ordered just before n: Client identifies the write lock znode p in list C that is ordered just before n.
  • if exists(p, true) wait for event: If the znode p exists, Client E waits for an event (watch event) indicating changes or deletion of p.

5. Retry:

  • goto 3: Client E retries the process from step 3 after receiving the watch event for znode p. This ensures that Client E waits until it can safely acquire the read lock without conflicting with existing write locks.
  • Write Lock Procedure: Ensures exclusive access for write operations by waiting for all preceding write locks to be released.
  • Read Lock Procedure: Allows concurrent access for read operations but waits for conflicting write locks to be released before proceeding.

These procedures demonstrate how clients can implement locking mechanisms using ZooKeeper to ensure coordinated access to shared resources in a distributed system. Each client follows a protocol to acquire locks in a sequential order based on the creation of ephemeral sequential znodes, ensuring orderly access and preventing contention.

Example Scenario:

Imagine a distributed system where several clients (Client A, Client B, Client C, Client D, and Client E) need access to a shared resource, such as a database, and they use ZooKeeper for coordination. Here’s how they implement read and write locks:

Write Lock Procedure:

  1. Client E Requests a Write Lock: Client E wants to perform a critical write operation on the shared resource.
  2. Create Write Lock Znode:
  • Client E creates an ephemeral sequential znode under the path /lock with the prefix /lock/write-. Let’s say it creates /lock/write-0000000001.

3. Check for Lowest Znode:

  • Client E retrieves the list of children znodes (C) under /lock. Let’s assume the current list is: /lock/read-0000000002, /lock/read-0000000003, /lock/write-0000000001, /lock/read-0000000004.
  • Client E checks if its own znode (/lock/write-0000000001) is the lowest (earliest) in the list C.
  • In our example, since /lock/write-0000000001 is the lowest numbered write lock, Client E proceeds.

4. Acquire Write Lock:

  • Client E has acquired the write lock and can proceed with its critical write operation on the shared resource.

Read Lock Procedure:

  1. Client D Requests a Read Lock:
  • Client D wants to perform a read operation on the shared resource.

2. Create Read Lock Znode:

  • Client D creates an ephemeral sequential znode under the path /lock with the prefix /lock/read-. Let’s say it creates /lock/read-0000000002.

3. Check for Lower Write Znodes:

  • Client D retrieves the list of children znodes (C) under /lock. Assuming the current list is: /lock/read-0000000001, /lock/write-0000000003, /lock/read-0000000004.
  • Client D checks if there are any write lock znodes (/lock/write-*) with sequence numbers lower than its own read znode (/lock/read-0000000002).
  • In our example, Client D finds that there is no write lock znode lower than /lock/read-0000000002.

4. Acquire Read Lock:

  • Client D has acquired the read lock and can proceed with its read operation on the shared resource.

Eventual Consistency and Coordination:

  • Write Lock Release: When Client E finishes its write operation, it deletes /lock/write-0000000001, allowing the next waiting client (if any) to acquire the write lock.
  • Read Lock Handling: Clients acquiring read locks (Client D in this example) only need to wait for preceding write locks to release, ensuring that they can safely read consistent data.

5. Double Barrier Primitive

A double barrier in distributed systems is a synchronization primitive used to coordinate the start and end of a computation among multiple processes. It ensures that all participating processes:

  1. Begin the computation at the same time: No process starts until all processes are ready.
  2. End the computation together: No process finishes until all processes have completed their work.

This coordination is essential in scenarios where processes need to wait for each other to ensure that computations are performed correctly and efficiently.

How Double Barrier Works with ZooKeeper

ZooKeeper provides a way to implement a double barrier using znodes (ZooKeeper nodes). The steps involved in using a double barrier are as follows:

Setting Up the Double Barrier

  1. Barrier Znode Creation: A designated znode (let’s call it /barrier) is created to represent the barrier. This znode will have child znodes representing the participating processes.
  2. Barrier Threshold: A threshold is defined, which is the number of processes that need to join the barrier before they can start the computation.

Entering the Barrier

Each process performs the following steps to join the barrier:

  1. Create a Child Znode: Each process creates an ephemeral sequential znode under /barrier. For example, /barrier/process-0000000001, /barrier/process-0000000002, etc.
  2. Check Barrier Condition: The process retrieves the list of child znodes under /barrier and checks if the number of children znodes is equal to or exceeds the threshold.
  3. Wait if Condition Not Met: If the number of children znodes is less than the threshold, the process sets a watch on the barrier znode and waits to be notified when more processes join.
  4. Start Computation: Once the number of children znodes meets or exceeds the threshold, all processes can start their computation simultaneously.

Exiting the Barrier

Each process performs the following steps to leave the barrier:

  1. Complete Computation: Each process finishes its assigned task.
  2. Delete Child Znode: After completing the computation, each process deletes its child znode under /barrier.
  3. Check Exit Condition: The process retrieves the list of remaining child znodes under /barrier. If there are still child znodes present, the process sets a watch to wait for other processes to finish.
  4. Exit Barrier: Once all child znodes are deleted, indicating that all processes have finished their computation, the processes can proceed beyond the barrier.

Detailed Example

Let’s consider an example where we have four processes (P1, P2, P3, and P4) and the threshold is set to four.

  1. Processes Join the Barrier:
  • P1 creates /barrier/process-0000000001.
  • P2 creates /barrier/process-0000000002.
  • P3 creates /barrier/process-0000000003.
  • P4 creates /barrier/process-0000000004.

2. Processes Check Barrier Condition:

  • Each process checks the number of children znodes under /barrier.
  • When P4 creates its znode, the number of children znodes is four, which meets the threshold.
  • All processes are now allowed to start their computation.

3. Processes Complete Computation:

  • P1 completes its task and deletes /barrier/process-0000000001.
  • P2 completes its task and deletes /barrier/process-0000000002.
  • P3 completes its task and deletes /barrier/process-0000000003.
  • P4 completes its task and deletes /barrier/process-0000000004.

4. Processes Check Exit Condition:

  • Each process checks the number of remaining children znodes.
  • Once all znodes are deleted, indicating all processes have finished, they can all proceed beyond the barrier.

Why Use Double Barriers?

Double barriers are particularly useful in scenarios where:

  1. Coordinated Start: Processes need to start at the same time to ensure consistent state or input conditions.
  2. Coordinated Finish: Processes must wait for each other to finish to ensure that no process proceeds with incomplete or partial data.

This synchronization mechanism prevents race conditions, ensures data consistency, and helps in achieving coordinated operations in distributed systems.

The double barrier ensures synchronization at both the start and end of a computation phase, making it a powerful tool for managing distributed processes. ZooKeeper’s ephemeral sequential znodes and watches provide a robust way to implement this synchronization, ensuring that processes can efficiently and correctly coordinate their activities.

Application Using Zookeeper

1. The Fetching Service (FS) in Yahoo! Crawler Using ZooKeeper

Overview

The Fetching Service (FS) is a critical part of Yahoo!’s web crawler, responsible for fetching billions of web documents. It employs a master-slave architecture where master processes control multiple page-fetching (fetcher) processes.

ZooKeeper’s Role in FS

ZooKeeper is used in FS for several key functions, ensuring robustness, availability, and efficient management of the fetching tasks.

  1. Configuration Management:
  • Primitives Used: Configuration Metadata
  • Function: The master processes store configuration information in ZooKeeper znodes. Fetcher processes read this configuration to understand which pages to fetch and other operational parameters.
  • Example: When a fetcher starts, it reads a znode /fetch/config that contains the URLs to be fetched and the fetching parameters.

2. Status and Health Monitoring:

  • Primitives Used: Status Updates
  • Function: Fetcher processes write back their status and health information to ZooKeeper. This allows the master to monitor the health of fetchers and take corrective actions if a fetcher is unresponsive or fails.
  • Example: Each fetcher creates an ephemeral znode like /fetch/status/fetcher-1 and periodically updates it with status information such as running, completed, or failed.

3. Failure Recovery:

  • Primitives Used: Leader Election
  • Function: In the event of a master process failure, ZooKeeper’s leader election mechanism is used to elect a new master. This ensures the fetching service remains operational despite individual process failures.
  • Example: Masters participate in a leader election by creating ephemeral znodes with sequential flags like /fetch/masters/leader-. ZooKeeper elects the master with the lowest sequence number as the leader.

4. Decoupling Clients from Servers:

  • Primitives Used: Service Discovery
  • Function: Fetcher processes and other clients discover which servers are healthy by reading their status from ZooKeeper. This allows dynamic redirection to operational servers without hardcoding server addresses.
  • Example: Clients check the znodes under /fetch/status/ to find active and healthy fetcher processes. If fetcher-2 is down, clients can redirect their requests to another active fetcher by checking znodes like /fetch/status/fetcher-3.

Example Workflow in FS Using ZooKeeper

  1. Initialization:
  • A master process writes its configuration in a znode /fetch/config.
  • Fetcher processes read the configuration from this znode when they start.

2. Fetchers Start and Report Status:

  • Each fetcher creates an ephemeral znode /fetch/status/fetcher-X and starts fetching URLs.
  • Fetchers periodically update their znodes with their status (e.g., /fetch/status/fetcher-1 = running).

3. Master Monitors Fetchers:

  • The master process reads the status znodes to monitor fetchers.
  • If a fetcher’s znode disappears, the master knows the fetcher has failed and can redistribute its tasks.

4. Leader Election in Case of Master Failure:

  • If the master process fails, the ephemeral znode it created (e.g., /fetch/masters/leader-0000000001) disappears.
  • ZooKeeper’s leader election mechanism automatically promotes the next candidate (e.g., /fetch/masters/leader-0000000002) to become the new master.

5. Service Discovery and Client Requests:

  • Clients looking to interact with the fetchers check the status znodes under /fetch/status/.
  • They direct their requests to fetchers whose status indicates they are healthy and running.

Benefits of Using ZooKeeper in FS

  • Failure Recovery: Automatic leader election ensures minimal disruption in case of master failures.
  • High Availability: Fetcher processes can quickly discover and communicate with healthy servers.
  • Scalability: Decoupling clients from servers allows the system to scale without significant reconfiguration.
  • Resilience: Continuous monitoring and status updates enable proactive management and fault tolerance.

ZooKeeper’s role in the Fetching Service highlights its capabilities in managing configuration, monitoring status, enabling leader election, and facilitating dynamic service discovery, all of which are crucial for robust distributed systems.

2. Katta: A Distributed Indexer Using ZooKeeper for Coordination

Katta is a distributed indexer that leverages ZooKeeper to manage coordination among its components. It uses a shard-based architecture to distribute indexing tasks across multiple servers. Here’s a detailed explanation of how Katta utilizes ZooKeeper, with examples to illustrate the process.

Key Components of Katta

  1. Master Server: Responsible for assigning shards to slave servers and tracking their progress.
  2. Slave Servers: Perform the actual indexing work on the assigned shards.
  3. Shards: Smaller, manageable pieces of the overall dataset that are distributed among slave servers.

ZooKeeper’s Role in Katta

ZooKeeper plays a crucial role in Katta by managing:

  1. Group Membership: Tracking the status of slave servers and the master.
  2. Leader Election: Handling failover for the master server.
  3. Configuration Management: Tracking and propagating shard assignments to slave servers.

Detailed Process with Examples

1. Group Membership

  • Slave Status Tracking:
  • Each slave server registers itself in ZooKeeper by creating an ephemeral znode under /katta/slaves.
  • Example: A slave server slave1 starts and creates a znode /katta/slaves/slave1.
  • The master server periodically checks this directory to get the list of active slaves.
  • Master Status Tracking:
  • The current master server creates an ephemeral znode /katta/master to indicate its active status.
  • Example: When master1 is active, it creates /katta/master/master1.

2. Leader Election

  • Handling Master Failover:
  • When the current master server fails, its ephemeral znode /katta/master/master1 is automatically deleted by ZooKeeper.
  • Other potential masters participate in leader election by creating sequential znodes like /katta/masters/candidate-0000000001.
  • ZooKeeper selects the node with the lowest sequence number as the new master.
  • Example: If candidate-0000000002 is the next lowest znode after candidate-0000000001, candidate-0000000002 becomes the new master.

3. Configuration Management

  • Shard Assignment:
  • The master server assigns shards to slaves and records these assignments in ZooKeeper.
  • Example: The master assigns shard1 to slave1 and creates a znode /katta/assignments/shard1 with data pointing to slave1.
  • Slave servers read their assignments from ZooKeeper.
  • Propagating Shard Assignments:
  • If a slave server fails, its ephemeral znode is deleted, and the master reassigns the shards that were assigned to the failed slave.
  • Example: If slave1 fails, the master detects the deletion of /katta/slaves/slave1 and reassigns shard1 to slave2 by updating /katta/assignments/shard1.

Example Scenario

Initial Setup

  1. Master Initialization:
  • The master server master1 creates /katta/master/master1.

2. Slave Registration:

  • slave1 and slave2 start and create /katta/slaves/slave1 and /katta/slaves/slave2.

Shard Assignment

  1. Master Assigns Shards:
  • The master assigns shard1 to slave1 and shard2 to slave2.
  • Creates /katta/assignments/shard1 with data slave1.
  • Creates /katta/assignments/shard2 with data slave2.

Slave Failure

  1. Handling Slave Failure:
  • slave1 fails, and its znode /katta/slaves/slave1 is deleted.
  • The master detects this and reassigns shard1 to slave2 by updating /katta/assignments/shard1.

Master Failure

  1. Handling Master Failure:
  • master1 fails, and its znode /katta/master/master1 is deleted.
  • ZooKeeper holds a leader election, and the next candidate, candidate-0000000002, becomes the new master.
  • The new master takes over the responsibility of assigning and managing shards.

Benefits of Using ZooKeeper in Katta

  • Fault Tolerance: Automatic failover for both master and slave servers ensures high availability.
  • Scalability: Dynamic tracking and reassignment of shards allow Katta to handle varying workloads efficiently.
  • Consistency: ZooKeeper’s reliable synchronization mechanisms ensure that all servers have a consistent view of the system state.

By leveraging ZooKeeper for coordination, Katta can efficiently manage a distributed indexing system that is robust, fault-tolerant, and scalable.

--

--

Learner101
Learner101

No responses yet