Apache Curator Leader Election

用于Leader选举,也可以用Shared Reentrant Lock来实现。

如果需要集群中的固定的一台机器去做的事,就可以用此特性来实现,直到这台Leader死去,会产生新的Leader。

1.直接运行LLtest


package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientCreate;
import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientDelete;
import com.collonn.javaUtilMvn.zookeeper.curator.NodeCache.NLClientUpdate;

public class LLTest {
    public static void main(String[] args) throws Exception {
        LeaderListener.main(null);
        LeaderListener2.main(null);
        LeaderListener3.main(null);
        LeaderListener4.main(null);
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

import java.util.List;

public class LeaderListener {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app1";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    //once you take the leader ship
                                    //and you want hold the leader ship during the whole life of APP1
                                    //you should use Thread.sleep(Integer.MAX_VALUE)
                                    //once tha APP1 dead, the other APP (participants) will reElect an new leader
                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener2 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app2";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener3 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app3";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.LeaderElection;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class LeaderListener4 {
    public static final String C_PATH = "/TestLeader";
    public static final String CHARSET = "UTF-8";
    public static final String APP_NAME = "app4";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final LeaderSelector leaderSelector = new LeaderSelector(client, C_PATH, new LeaderSelectorListener() {
                            @Override
                            public void takeLeadership(CuratorFramework client) throws Exception {
                                try {
                                    int timeMilliSeconds = 6000;
                                    System.out.println("======" + APP_NAME + " take leader ship, will do some task, will hold time milli seconds=" + timeMilliSeconds);

                                    for(int i = 0; i < 6; i++){
                                        System.out.println("===" + APP_NAME + " sleep " + i);
                                        Thread.sleep(1000);
                                    }
                                }catch (Exception e){
                                    e.printStackTrace();
                                }
                            }

                            @Override
                            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                            }
                        });

                        leaderSelector.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}








郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。