ZooKeeper实现分布式锁

什么是分布式锁

以前在项目较小时,单机即可处理很多业务。但随着用户增长,单机已经无法满足当今业务。解决方案可能是上集群,但是在多个 JVM 中都有同一个变量。假设多个请求分到不同的 JVM 中,都对变量进行修改,这就造成变量可能会的不正确。

为了防止分布式系统中的多个进程之间相互干扰,需要一个分布式协调技术,这个技术便是分布式锁。

分布式锁和我们平常使用的锁类似

  • 排他,只允许一个线程占用
  • 可重入性,不可重入会造成死锁
  • 具备失效机制,万一执行过程崩溃,也会有自动过期机制

ZooKeeper 分布式锁原理

ZooKeeper实现分布式锁是基于ZooKeeper数据存储结构,ZooKeeper的数据存储结构类似于一棵树。其中的节点叫Znode

Znode有四种类型,分别为持久节点(PERSISTENT)持久顺序节点(PERSISTENT_SEQUENTIAL)临时节点(EPHEMERAL)临时顺序节点(EPHEMERAL_SEQUENTIAL)

ZooKeeper实现分布式锁原理是依据临时顺序节点(EPHEMERAL_SEQUENTIAL)来实现的。下面我们来学习一个ZooKeeper是如何利用临时顺序节点实现分布式锁的。

首先,我们创建一个持久节点Lock,当客户端想要拿到锁时,需要创建一个临时节点lock1

ZooKeeper客户端拿锁

这样,client-1创建了临时节点lock1并拿到了锁。这时,假设又来了个client-2想要拿锁。

ZooKeeper锁被占有

client-2也在Lock下创建了临时节点lock2,遍历Lock判断自己的lock2前面是否还有节点,如果没有,说明自己是第一个,就顺利拿锁。如果有则表明锁被人拿了,client-2将会注册一个Watcher去监听lock1是否存在。

这时,又来了一个client-3想要拿锁,在Lock下创建了一个lock3节点,结果也发现自己不是最前面的一个。便会注册一个Watcher去监听上一个节点,也就是lock2判断其是存在。

ZooKeeper client-3拿锁失败

client-1的业务执行完毕,可以释放锁了。执行完毕后,client-1调用节点删除的指令,将临时节点lock1删除。

ZooKeeper client-1准备释放锁

ZooKeeper client-1释放锁

由于client-2一直在监听着lock1,当lock1释放锁删除节点后,顺理成章的拿到了锁。而lock3还在监听着lock2,等着它释放锁。

ZooKeeper client-2拿到锁

上面是正常业务执行完毕的情况下释放锁。假设,执行到中途,客户端崩溃了,与ZooKeeper断开了连接,又会如何呢?

临时节点的特性:仅当创建者会话有效时才得以保存,如果客户端崩溃了,那创建的会话是无效的,那样与客户端相关联的节点也会被删除。

ZooKeeper client-2崩溃了

监听lock2的客户端client-3发现节点lock2被删除,那样他也可以拿到锁了。

ZooKeeper client-3拿到锁

最后,client-3也释放锁,整个分布式锁原理就这样结束了。

ZooKeeper client-3释放锁

使用 ZooKeeper 实现分布式锁

学习完了ZooKeeper分布式锁原理,下面们我们来使用ZooKeeper实现实现实现分布式锁。为了快速示例,这里使用Spring Boot来进行编写。

依赖包

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>

yml配置文件

1
2
3
4
5
6
7
8
9
10
11
12
#zookeeper地址
zookeeper.server=192.168.79.128:2181,192.168.79.128:2182,192.168.79.128:2183
#锁文件路径
zookeeper.lockPath=/springboot/test
#重试间隔时间
zookeeper.elapsedTimeMs=5000
#session超时时间
zookeeper.sessionTimeoutMs=60000
#重试次数
zookeeper.retryCount=5
#连接超时时间
zookeeper.connectionTimeoutMs=5000

ZooKeeper配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Component
@ConfigurationProperties(prefix = "zookeeper")
@Data
public class ZookeeperProperties {
/**
* zookeeper 地址:ip
*/
private String server;
/**
* 加锁路径
*/
private String lockPath;
/**
* session超时时间
*/
private Integer sessionTimeoutMs;
/**
* 连接超时时间
*/
private Integer connectionTimeoutMs;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 重试间隔时间
*/
private Integer elapsedTimeMs;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Component
public class ZookeeperConfig {
@Autowired
ZookeeperProperties zookeeperProperties;

/**
* 配置 Zookeeper 客户端,构建连接
* @return CuratorFramework 对象
*/
@Bean
public CuratorFramework curatorFramework() {
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(
zookeeperProperties.getServer(),
zookeeperProperties.getSessionTimeoutMs(),
zookeeperProperties.getConnectionTimeoutMs(),
new RetryNTimes(zookeeperProperties.getRetryCount(), zookeeperProperties.getElapsedTimeMs()));
curatorFramework.start();
return curatorFramework;
}
/**
* @param curatorFramework 分布式锁对象
* @return
*/
@Bean
public InterProcessMutex interProcessMutex(CuratorFramework curatorFramework) {
return new InterProcessMutex(curatorFramework,zookeeperProperties.getLockPath());
}
}

使用注解的方式进行解耦

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 分布式做注解
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockAnnotation {
/**
* @return 锁等待时间
*/
long time() default 20;
/**
* 时间类型 可选秒,毫秒,时等
* @return
*/
TimeUnit util() default TimeUnit.SECONDS;
}

AOP拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 分布式锁注解
*/
@Aspect
@Order(5)
@Component
public class LockIntercept {
@Autowired
private InterProcessMutex interProcessMutex;

@Around("@annotation(lockAnnotation)")
public void lockHandler(ProceedingJoinPoint joinPoint, LockAnnotation lockAnnotation ) {
//判断是否拿到锁
boolean acquire = false;
try {
//自定义时间
acquire = interProcessMutex.acquire(lockAnnotation.time(), lockAnnotation.util());
//拿锁成功则进行业务
if (acquire) {
Object proceed = joinPoint.proceed();
}
} catch (Throwable throwable) {
throwable.printStackTrace();
} finally {
//拿到锁的才进行释放
if (acquire) {
try {
interProcessMutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

业务接口和实现类

这里的业务仅仅只做一个计数

1
2
3
4
5
6
7
8
9
public interface ZookeeperLockService {

/**
* 业务
* @return
* @throws Exception
*/
Boolean lockAcquireTimeOut() throws Exception;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class ZookeeperLockServiceImpl implements ZookeeperLockService {

@Autowired
CuratorFramework curatorFramework;
/**
* 计数
*/
int a =0;
/**
* 计数业务
* @return false
*/

@Override
//分布式锁注解
@LockAnnotation
public Boolean lockAcquireTimeOut() {
System.out.println(a++);
return false;
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@RunWith(SpringRunner.class)
@SpringBootTest
public class GentleApplicationTests {

@Autowired
ZookeeperLockService zookeeperLockService;

/**
* 200 个线程测试分布式锁
* @throws InterruptedException
*/
@Test
public void contextLoads() throws InterruptedException {
//闭锁
CountDownLatch countDownLatch = new CountDownLatch(200);
ExecutorService executorService = Executors.newCachedThreadPool();
//200 个线程进行测试
for (int i = 0; i <200; i++) {
executorService.submit(() -> {
try {
//调用业务
zookeeperLockService.lockAcquireTimeOut();
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.countDown();
});
}
//等待完成
countDownLatch.await();
}
}

测试结果:

1
2
3
4
5
6
7
8
9
191
192
193
194
195
196
197
198
199

总结

ZooKeeper分布式锁相对来说还是比较简单的,可能在性能方面比Redis实现的分布式锁要差一些(频繁创建和删除节点),但胜在ZooKeeper的实现比较简单。

Powered by AppBlog.CN     浙ICP备14037229号

Copyright © 2012 - 2021 APP开发技术博客 All Rights Reserved.

访客数 : | 访问量 :