Watcher
먼저 zookeeper Watcher에 대해 알아보자
watcher는 leader election에서 가장 핵심이 되는 기능이며, 데이터가 변경되었을 때, 해당 watch를 설정한 클라이언트에게 보내지는 일회성 트리거(one-time-trigger)로 정의하고 있음
즉, zookeeper는 주기적으로 확인하는 polling 방식이 아니라 event기반의 wathcer방식을 사용하고 있음
Watcher의 특징
- one-time-trigger
- 데이터가 변경되어 이벤트가 클라이언트에게 전송되면, 그 watcher는 즉시 삭제됨
- 순서 보장
- 이벤트는 비동기적으로 클라이언트에게 전송되며, 클라이언트는 변경된 데이터를 실제로 보기 전에, watch이벤트를 받게 된다는 것을 보장함
- 즉, 변경된 것을 먼저 파악하고, 노드의 변경된 데이터를 실제로 확인함
실습을 통해 확인해보자
target_znode를 감시하는 코드를 작성해보자
package org.example;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.Leader;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
public class LeaderElection implements Watcher {
private static final String ZOOKEEPER_ADDRESS = "localhost:2181";
private ZooKeeper zooKeeper;
private static final int SESSION_TIMEOUT = 3000;
//리더 선출 관련 데이터만 모아두는 상위 폴더를 지정한거임
private static final String ELECTION_NAMESPACE="/election";
private String currentZnodename;
private static final String TARGET_ZNODE = "/target_znode";
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
~~~~
}
public void volunteerForLeadership() throws KeeperException, InterruptedException {
~~~
}
public void electionLeader() throws KeeperException, InterruptedException {
~~~
}
public void connectToZooKeeper() throws IOException {
this.zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS,SESSION_TIMEOUT, this);
}
public void run() throws InterruptedException {
synchronized (this) {
// 누군가 깨울 때까지(notify) 무한정 대기합니다.
// 프로그램이 바로 종료되지 않게 해줍니다.
this.wait();
}
}
public void close() throws InterruptedException {
zooKeeper.close();
}
public void watchTargetZnode() throws KeeperException, InterruptedException {
Stat stat = zooKeeper.exists(TARGET_ZNODE,this);
if(stat == null) {
return;
}
byte[] data = zooKeeper.getData(TARGET_ZNODE,this,stat);
List<String> childern = zooKeeper.getChildren(TARGET_ZNODE,this);
System.out.println("Data : "+new String(data) + "childern : " + childern);
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()){
case None:
if(event.getState() == Event.KeeperState.SyncConnected){
System.out.println("Connected to ZooKeeper Successfully");
}
//주키퍼와 연결이 끊어지면 메인 스레드를 깨워서 run메서드 탈출할 수 있게 해줌
else {
synchronized (zooKeeper){
System.out.println("Disconnected from Zookeeper event");
zooKeeper.notifyAll();
}
}
case NodeDeleted:
System.out.println(TARGET_ZNODE+"deleted");
break;
case NodeCreated:
System.out.println(TARGET_ZNODE+"created");
break;
case NodeDataChanged:
System.out.println(TARGET_ZNODE+"changed");
break;
case NodeChildrenChanged:
System.out.println(TARGET_ZNODE+"childrenchanged");
break;
}
try{
watchTargetZnode();
}catch (KeeperException | InterruptedException e){
}
}
}

노드의 data가 성공적으로 변했음을 알 수 있음
- 데이터 내용이 변경된 내용으로 출력됨
- dataVersion이 1로 변경됨
- 트랜잭션 ID인 mZxid(수정된 번호)가 cZxid(create된 번호)보다 큰 값이 나타남
Herd Effect
이벤트 하나에 여러 개의 프로세스가 동시에 깨어나는 현상을 말함
ex)
리더 노드가 죽게되면, 나머지 노드들은 대장이 죽은 알림을 보내는 이벤트를 수행하고,
해당 노드들 자신이 리더인지 확인하는 getChildern 요청을 보냄
이때 주키퍼 서버가 많은 요청을 처리하다가 병목이 발생하는 현상을 말함
이러한 현상을 해결하기 위해 sequence Node 전략을 사용함
즉 c_0000... , c_0000...1과 같은 번호를 가지고 있으며, 바로 앞의 번호만을 감시하는 방식을 사용함
String znodeFullpath = zooKeeper.create(znodePrefix, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
CreateMode.EPHEMERAL_SEQUENTIAL이렇게 값을 설정했기에, 세션이 끊어지면 주키퍼는 알아서 노드를 삭제를 함
만약 PERSISTENT로 설정을 통해 영구 노드로 리더를 선출했으면 죽은 리더가 계속 자리를 차지하고, 새로운 리더를 뽑을 수 없는 좀비 상태가 발생하게 됨
public class LeaderElection implements Watcher {
private static final String ZOOKEEPER_ADDRESS = "localhost:2181";
private ZooKeeper zooKeeper;
private static final int SESSION_TIMEOUT = 3000;
//리더 선출 관련 데이터만 모아두는 상위 폴더를 지정한거임
private static final String ELECTION_NAMESPACE="/election";
private String currentZnodename;
private static final String TARGET_ZNODE = "/target_znode";
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
~~
}
public void volunteerForLeadership() throws KeeperException, InterruptedException {
// election폴더 안에 c_로 시작하는 후보자를 만드는거임
String znodePrefix = ELECTION_NAMESPACE + "/c_";
String znodeFullpath = zooKeeper.create(znodePrefix, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("znode namea" + znodeFullpath);
this.currentZnodename = znodeFullpath.replace(ELECTION_NAMESPACE+"/", "");
}
public void electionLeader() throws KeeperException, InterruptedException {
String predecessorZnodeName = "";
Stat predecessorStat = null;
while(predecessorStat == null){
List<String> children = zooKeeper.getChildren(ELECTION_NAMESPACE, false);
Collections.sort(children);
String smallestChild = children.get(0);
if(smallestChild.equals(currentZnodename)) {
System.out.println("Leader");
return;
}
else{
System.out.println("it is not leader");
int predecessorIndex = Collections.binarySearch(children, currentZnodename)-1;
predecessorZnodeName = children.get(predecessorIndex);
predecessorStat = zooKeeper.exists(ELECTION_NAMESPACE+ "/" + predecessorZnodeName, this);
}
}
System.out.println("watching znode"+predecessorZnodeName);
System.out.println();
}
public void connectToZooKeeper() throws IOException {
this.zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS,SESSION_TIMEOUT, this);
}
public void run() throws InterruptedException {
synchronized (this) {
// 누군가 깨울 때까지(notify) 무한정 대기합니다.
// 프로그램이 바로 종료되지 않게 해줍니다.
this.wait();
}
}
public void close() throws InterruptedException {
zooKeeper.close();
}
public void watchTargetZnode() throws KeeperException, InterruptedException {
Stat stat = zooKeeper.exists(TARGET_ZNODE,this);
if(stat == null) {
return;
}
byte[] data = zooKeeper.getData(TARGET_ZNODE,this,stat);
List<String> childern = zooKeeper.getChildren(TARGET_ZNODE,this);
System.out.println("Data : "+new String(data) + "childern : " + childern);
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()){
case None:
if(event.getState() == Event.KeeperState.SyncConnected){
System.out.println("Connected to ZooKeeper Successfully");
}
//주키퍼와 연결이 끊어지면 메인 스레드를 깨워서 run메서드 탈출할 수 있게 해줌
else {
synchronized (zooKeeper){
System.out.println("Disconnected from Zookeeper event");
zooKeeper.notifyAll();
}
}
case NodeDeleted:
System.out.println(TARGET_ZNODE+"deleted");
break;
case NodeCreated:
System.out.println(TARGET_ZNODE+"created");
break;
case NodeDataChanged:
System.out.println(TARGET_ZNODE+"changed");
break;
case NodeChildrenChanged:
System.out.println(TARGET_ZNODE+"childrenchanged");
break;
}
try{
watchTargetZnode();
}catch (KeeperException | InterruptedException e){
}
}
}
while문을 써서 race condition을 막아줘야함
ex) 2번째 노드가 1번을 감시하기로 인식을 하는 순간, 1번 서버가 죽는 경우가 발생하면 null 반환이 발생하는데, 이걸 감시하지 못하면 감시 설정이 끝났다고 인식을 하고 현 상태를 그냥 계속 유지하게 되기 때문임

현재 사진은 4번 노드를 나타내는 터미널이며 3번 노드를 바라보고 있음, leader가 아니라고 나옴
하지만 3번 터미널을 죽이고 다시 확인을 해보면

리더가 4번으로 선출되어있음을 알 수 있음
앞서 1,2,3번을 만들때 사용한 서버를 끄고 다시 실행시키면서 4번부터 번호가 붙는 것을 확인했음
부모 노드(/election)은 앞서 사용한 1,2,3번을 기억하고 있기에 추후 재시작시 중복을 막기 위해 4번부터 번호를 할당을 수행하고 있음을 알 수 있음
부모 노드를 지우고 다시 시작하면 1번부터 번호를 다시 할당받을 수 있음
위에 실험에서 6번 터미널이 죽게되면, 다시 4번 터미널이 리더로 선출되는 것을 확인할 수 있음
'Distributed System' 카테고리의 다른 글
| 분산 코디네이터와 분산 알고리즘(1) (0) | 2026.02.02 |
|---|