Zookeeper distributed lock

Zookeeper distributed lock

Most Internet systems are deployed in a distributed manner. Distributed deployment can indeed bring performance and efficiency improvements, but for this, we need to solve the problem of data consistency in a distributed environment in a single-machine environment. We can solve it through the concurrent API (lock, syn, etc.) provided by java, but it is much more complicated in a distributed environment (cross-JVM). Common solutions are distributed things, distributed locks, etc.

Zookeeper implements distributed locks

Business scenarios and problems

In a distributed environment, the global order number is produced. Since multiple clients cannot be synchronized, the use of timestamp to produce the order number in a distributed scenario may duplicate the solution:

  1. Use distributed locks
  2. Produce the order number in advance, store it in redis, and then take it from redis

Zookeeper is based on the distributed lock of the node with the same name

Zookeeper's strong consistency can well ensure that the creation of nodes in the case of distributed high concurrency can ensure global uniqueness. This feature of Zookeeper can be used to achieve exclusive locks.

  • Define lock: the node on Zookeeper represents a lock
  • Acquire the lock: use the zkClient client to call the create method to create a temporary node (lock), and the successfully created client obtains the lock
  • Monitoring lock: At the same time, register Watcher on the node to monitor the changes of the node in real time
  • Release the lock: The current client that has acquired the lock is down or abnormal, and the temporary node on Zookeeper will be deleted; the client actively deletes the temporary node

Implementation:

  1. Create a Lock interface, this is not provided by the JDK, it just simulates the custom implementation of the method in the interface and uses it on demand.
package com.ooliuyue.zookeeperlock.zk;

public interface Lock {

    public void  getLock();
    public void unLock();

}

 
  1. Create an abstract class AbstratcLock to implement the Lock interface (template)
package com.ooliuyue.zookeeperlock.zk;

/**
* @Auther: ly
* @Date: 2019/4/25 11:31
*/

public abstract class AbstracLock implements Lock {

   public void getLock() {
       //
       if (tryLock()) {
           System.out.println("## Lock ##");
       } else {
           //
           waitLock();
           //
           getLock();
       }
   }

   public abstract boolean tryLock();
   public abstract void waitLock();
}

 
  1. Create class ZookeeperAbstractLock (repeat code writing subclass), used to configure Zookeeper
package com.ooliuyue.zookeeperlock.zk;

import org.I0Itec.zkclient.ZkClient;

/**
 * @Auther: ly
 * @Date: 2019/4/25 11:41
 */
//
public abstract class ZookeeperAbstractLock extends AbstracLock {
    //zk 
    private static final String CONNECTSTRING = "127.0.0.1:2181";
    //zk 
    protected ZkClient zkClient = new ZkClient(CONNECTSTRING);

    protected static final String PATH = "/lock";

    protected static final String PATH2 = "/lock2";


}

 
  1. Zookeeper implements the business logic of distributed locks
package com.ooliuyue.zookeeperlock.zk;

import org.I0Itec.zkclient.IZkDataListener;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: ly
 * @Date: 2019/4/25 11:45
 */

public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {
    private CountDownLatch countDownLatch = null;

    @Override
    //
    public boolean tryLock() {
        try {
            zkClient.createEphemeral(PATH);
            System.out.println(" " + PATH + " ");
            return true;
        } catch (Exception e) {
            //
//           e.printStackTrace();
            return false;
        }

    }

    @Override
    public void waitLock() {
        /*  */
        IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String path, Object o) throws Exception {

            }

            @Override
            /*  path  */
            public void handleDataDeleted(String path) throws Exception {
                //
                if (countDownLatch != null ) {
                    countDownLatch.countDown();
                }

            }
        };
        //PATH 
        zkClient.subscribeDataChanges(PATH,iZkDataListener);

        //
        if (zkClient.exists(PATH)) {
            countDownLatch = new CountDownLatch(1);
            try {
                //
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //
        zkClient.unsubscribeDataChanges(PATH,iZkDataListener);
    }

    public void unLock() {
        //
        if (zkClient != null) {
            zkClient.delete(PATH);
            zkClient.close();
            System.out.println(" ");
        }

    }
}

 
  1. Create a test class OrderService to simulate concurrent production orders with 10 threads
package com.ooliuyue.zookeeperlock.zk;

import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: ly
 * @Date: 2019/4/25 14:39
 */

public class OrderService implements Runnable {

    private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();

    private Lock lock = new ZookeeperDistrbuteLock();



    @Override
    public void run() {
        getNum();
    }

    public void getNum() {
        try {
            lock.getLock();
            String number = orderNumGenerator.getNumber();
            System.out.println(Thread.currentThread().getName() + ", ID:" + number);
        } catch (Exception e) {
//           e.printStackTrace();
        } finally {
            lock.unLock();
        }
    }



    public static void main(String[] args) {
        System.out.println("## ##");
        for (int i = 0; i < 10 ; i++) {
            new Thread(new OrderService()).start();
        }
    }
}

 
  1. Class that generates the order
package com.ooliuyue.zookeeperlock.zk;

import java.text.SimpleDateFormat;
import java.util.Date;


/**
 * @Auther: ly
 * @Date: 2019/4/25 14:34
 */

public class OrderNumGenerator {

    private static int count = 0;

    public String getNumber() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
        return simpleDateFormat.format(new Date()) + "-" + ++count;
    }

}

 

Run main function, console output result

## ##
/lock 
## Lock ##
Thread-3, ID:2019-04-28-11-42-53-1
 
/lock 
## Lock ##
Thread-7, ID:2019-04-28-11-42-53-2
 
/lock 
## Lock ##
Thread-9, ID:2019-04-28-11-42-53-3
 
/lock 
## Lock ##
Thread-11, ID:2019-04-28-11-42-53-4
 
/lock 
## Lock ##
Thread-1, ID:2019-04-28-11-42-53-5
 
/lock 
## Lock ##
Thread-15, ID:2019-04-28-11-42-53-6
/lock 
## Lock ##
Thread-17, ID:2019-04-28-11-42-53-7
 
/lock 
## Lock ##
Thread-19, ID:2019-04-28-11-42-53-8
 
/lock 
## Lock ##
Thread-13, ID:2019-04-28-11-42-53-9
 
/lock 
## Lock ##
Thread-5, ID:2019-04-28-11-42-53-10
 


 

The idea is to use the monitoring mechanism. When a node is deleted, the lock is released, and other threads acquire the lock. This method is problematic. When the lock is released, many threads will acquire the lock at the same time, resulting in a herd effect. The performance will be poor, but it is simple to implement.

Zookeeper implements distributed locks based on temporary sequential nodes

Implementation

  • When multiple processes access shared resources, create temporary ordered nodes for them under a parent node respectively
  • Determine whether the created node is the smallest serial number among all nodes
  • If it is the node with the smallest sequence number, the corresponding process gets the lock. Otherwise, monitor the node smaller than itself through the watcher mechanism (it needs to be blocked here), and when the notification of deletion is received, then go to obtain the lock
  • Delete the current node when releasing the lock

code show as below:

package com.ooliuyue.zookeeperlock.zk;

import org.I0Itec.zkclient.IZkDataListener;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
* @Auther: ly
* @Date: 2019/4/26 10:55
*/

public class ZookeeperDistrbuteLock2 extends ZookeeperAbstractLock {
   private CountDownLatch countDownLatch = null;

   private String beforePath; //
   private String currentPath; //

   public ZookeeperDistrbuteLock2(){
       if (!this.zkClient.exists(PATH2)) {
           this.zkClient.createPersistent(PATH2);
       }
   }

   @Override
   public boolean tryLock() {
       //currentPath 
       if (currentPath == null || currentPath.length() <= 0) {
           //
           currentPath = this.zkClient.createEphemeralSequential(PATH2 + '/',"lock");
           System.out.println(" ===" + currentPath);
       }
       ///lock2/0000000004
       List<String> children = this.zkClient.getChildren(PATH2);
       Collections.sort(children);

       //
       if (currentPath.equals(PATH2 + '/' + children.get(0))) {
           System.out.println(currentPath + "=== ");
           return true;
       } else {
           //beforePath
           int i = Collections.binarySearch(children, currentPath.substring(7));
           beforePath = PATH2 + '/' + children.get(i - 1);
       }
       return false;
   }

   @Override
   public void waitLock() {
       IZkDataListener iZkDataListener = new IZkDataListener() {
           @Override
           public void handleDataChange(String s, Object o) throws Exception {

           }

           @Override
           public void handleDataDeleted(String s) throws Exception {
               if (countDownLatch != null) {
                   countDownLatch.countDown();
               }

           }
       };
       //watcher, 
       System.out.println(currentPath + "=== beforPath===" + beforePath);
       this.zkClient.subscribeDataChanges(beforePath,iZkDataListener);

       if (this.zkClient.exists(beforePath)) {
           countDownLatch = new CountDownLatch(1);
           try {
               //
               countDownLatch.await();
           } catch (InterruptedException e) {
//               e.printStackTrace();
           }
       }

       this.zkClient.unsubscribeDataChanges(beforePath,iZkDataListener);

   }

   @Override
   public void unLock() {
       System.out.println(currentPath + "=== ...");
       zkClient.delete(currentPath);
       zkClient.close();


   }
}

 

Run the main function to test

package com.ooliuyue.zookeeperlock.zk;

import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;

/**
* @Auther: ly
* @Date: 2019/4/25 14:39
*/

public class OrderService implements Runnable {

   private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();

   private Lock lock = new ZookeeperDistrbuteLock2();



   @Override
   public void run() {
       getNum();
   }

   public void getNum() {
       try {
           lock.getLock();
           String number = orderNumGenerator.getNumber();
           System.out.println(Thread.currentThread().getName() + ", ID:" + number);
       } catch (Exception e) {
//           e.printStackTrace();
       } finally {
           lock.unLock();
       }
   }



   public static void main(String[] args) {
       System.out.println("## ##");
       for (int i = 0; i < 10 ; i++) {
           new Thread(new OrderService()).start();
       }
   }
}

 

The console output:

## ##

 ===/lock2/0000000083
 ===/lock2/0000000084
/lock2/0000000083=== 
Thread-1, ID:2019-04-26-14-12-37-1
/lock2/0000000083=== ...
 ===/lock2/0000000085
 ===/lock2/0000000086
 ===/lock2/0000000087
/lock2/0000000086=== beforPath===/lock2/0000000085
/lock2/0000000085=== beforPath===/lock2/0000000084
/lock2/0000000084=== beforPath===/lock2/0000000083
/lock2/0000000087=== beforPath===/lock2/0000000086
 ===/lock2/0000000088
/lock2/0000000084=== 
Thread-3, ID:2019-04-26-14-12-37-2
/lock2/0000000084=== ...
 ===/lock2/0000000089
/lock2/0000000089=== beforPath===/lock2/0000000088
 ===/lock2/0000000090
/lock2/0000000090=== beforPath===/lock2/0000000089
 ===/lock2/0000000091
/lock2/0000000091=== beforPath===/lock2/0000000090
/lock2/0000000085=== 
Thread-5, ID:2019-04-26-14-12-37-3
/lock2/0000000085=== ...
 ===/lock2/0000000092
/lock2/0000000086=== 
Thread-7, ID:2019-04-26-14-12-37-4
/lock2/0000000086=== ...
/lock2/0000000087=== 
Thread-9, ID:2019-04-26-14-12-37-5
/lock2/0000000087=== ...
/lock2/0000000088=== 
Thread-11, ID:2019-04-26-14-12-37-6
/lock2/0000000088=== ...
/lock2/0000000089=== 
Thread-13, ID:2019-04-26-14-12-37-7
/lock2/0000000089=== ...
/lock2/0000000090=== 
Thread-15, ID:2019-04-26-14-12-37-8
/lock2/0000000090=== ...
/lock2/0000000091=== 
Thread-17, ID:2019-04-26-14-12-37-9
/lock2/0000000091=== ...
/lock2/0000000092=== 
Thread-19, ID:2019-04-26-14-12-37-10
/lock2/0000000092=== ...

 

Jump git