Skip to content

Commit

Permalink
bugfix [issue #27] zookeeper分配workerId多节点同时启动时存在并发安全问题
Browse files Browse the repository at this point in the history
update 优化部分代码
  • Loading branch information
simonalong committed Mar 27, 2024
1 parent e213d0d commit 797b499
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@
import com.simonalong.neo.Neo;
import com.simonalong.neo.NeoMap;
import com.simonalong.neo.TableMap;
import com.simonalong.neo.util.LocalDateTimeUtil;
import com.simonalong.neo.util.TimeUtils;
import lombok.extern.slf4j.Slf4j;

import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDate;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static com.simonalong.butterfly.sequence.UuidConstant.*;
import static com.simonalong.butterfly.sequence.UuidConstant.DISTRIBUTE_SERVER;
import static com.simonalong.butterfly.sequence.UuidConstant.HEART_TIME;
import static com.simonalong.butterfly.sequence.UuidConstant.MAX_WORKER_SIZE;
import static com.simonalong.butterfly.sequence.UuidConstant.KEEP_NODE_EXIST_TIME;

import static com.simonalong.butterfly.worker.db.DbConstant.DB_LOG_PRE;
import static com.simonalong.butterfly.worker.db.DbConstant.UUID_TABLE;

Expand Down Expand Up @@ -74,7 +77,7 @@ private void init() {

@Override
public Long getLastExpireTime() {
return LocalDateTimeUtil.timestampToLong(uuidGeneratorDO.getLastExpireTime());
return TimeUtils.timestampToLong(uuidGeneratorDO.getLastExpireTime());
}

@Override
Expand All @@ -93,7 +96,7 @@ private UuidGeneratorDO generateUuidGeneratorDo(Long id, Integer workerId) {
uuidGeneratorDO.setId(id);
uuidGeneratorDO.setWorkId(workerId);
uuidGeneratorDO.setNamespace(namespace);
uuidGeneratorDO.setLastExpireTime(LocalDateTimeUtil.longToTimestamp(afterHour()));
uuidGeneratorDO.setLastExpireTime(TimeUtils.longToTimestamp(afterHour()));
uuidGeneratorDO.setUid(uidKey);
uuidGeneratorDO.setProcessId(processId);
uuidGeneratorDO.setIp(ip);
Expand Down Expand Up @@ -143,10 +146,10 @@ private void refreshNodeInfo() {
long lastExpireTime = afterHour();
UuidGeneratorDO newGenerate = new UuidGeneratorDO();
newGenerate.setId(uuidGeneratorDO.getId());
newGenerate.setLastExpireTime(LocalDateTimeUtil.longToTimestamp(lastExpireTime));
newGenerate.setLastExpireTime(TimeUtils.longToTimestamp(lastExpireTime));
newGenerate = neo.update(UUID_TABLE, newGenerate);
if (null != newGenerate && null != newGenerate.getId()) {
uuidGeneratorDO.setLastExpireTime(LocalDateTimeUtil.longToTimestamp(lastExpireTime));
uuidGeneratorDO.setLastExpireTime(TimeUtils.longToTimestamp(lastExpireTime));
}
} catch (Throwable e) {
log.error("刷新节点信息异常:", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
import lombok.Data;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.MediaType;
import okhttp3.HttpUrl;

import java.io.IOException;
import java.io.Serializable;
Expand Down
26 changes: 26 additions & 0 deletions butterfly-allocator/butterfly-allocator-zookeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,32 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>


<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface ZkConstant {
/**
* 机器节点的左前缀
*/
String WORKER_NODE = "/worker";
String WORKER_NODE = "/worker_";
/**
* 每个业务中的配置节点路径
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import com.alibaba.fastjson.JSON;
import com.simonalong.butterfly.sequence.exception.ButterflyException;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.Watcher;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -256,7 +261,6 @@ public void process(WatchedEvent event) {
log.info(ZK_LOG_PRE + "---------------------start-------------------");
Event.KeeperState state = event.getState();
Event.EventType type = event.getType();
String path = event.getPath();

log.info(ZK_LOG_PRE + "receive Watcher notify");
log.info(ZK_LOG_PRE + "connect status:\t" + state.toString());
Expand Down Expand Up @@ -368,52 +372,44 @@ public Boolean nodeExist(String nodePath) {
try {
return null != this.zookeeper.exists(nodePath, false);
} catch (KeeperException | InterruptedException e) {
log.error(ZK_LOG_PRE + "judge node exist fail", e);
log.error(ZK_LOG_PRE + "judge node exist fail, path=" + nodePath, e);
}
return false;
}

/**
* 分布式锁
* 分布式锁,尝试加锁
* <ul>
* <li>说明:修改为添加永久节点;临时节点存在时间配置不准确问题,进而导致正好业务没执行完就失效了,这里不排队,不做通用分布式锁,因此这里改用永久节点,在业务的正常运行中任何异常情况可以执行完删除;如果是业务异常退出情况,则手动删除也不错</li>
* </ul>
*
* @param lockPath 锁路径
* @param callable 加锁成功后的处理
* @param <T> 类型
* @return 对象
*
* @return true:成功;false:失败
*/
public <T> T distributeLock(String lockPath, Callable<T> callable) {
public Boolean distributeTryLock(String lockPath, Callable<Boolean> callable) {
// 添加永久节点
if (!addPersistentNode(lockPath)) {
return false;
}
try {
// 添加分布式锁
if (!addEphemeralNode(lockPath)) {
throw new RuntimeException("加锁失败");
}

return callable.call();
} catch (Exception e) {
throw new RuntimeException("执行异常", e);
} catch (Throwable e) {
return false;
} finally {
if (nodeExist(lockPath)) {
deleteNode(lockPath);
}
}
}

/**
* 分布式锁
*
* @param lockPath 锁路径
* @param runnable 加锁成功后的处理
*/
public void distributeLock(String lockPath, Runnable runnable) {
public void distributeTryLock(String lockPath, Runnable runnable) {
if (!addPersistentNode(lockPath)) {
return;
}
try {
// 添加分布式锁
if (!addEphemeralNode(lockPath)) {
throw new RuntimeException("加锁失败");
}

runnable.run();
} catch (Exception e) {
throw new RuntimeException("执行异常", e);
} finally {
if (nodeExist(lockPath)) {
deleteNode(lockPath);
Expand All @@ -428,17 +424,24 @@ private Boolean createNode(String node, String data, CreateMode createMode) {
try {
if (null == this.zookeeper.exists(node, false)) {
String realPath = zookeeper.create(node, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
log.info(ZK_LOG_PRE + "node create success, Path: " + realPath);
return true;
if (null != this.zookeeper.exists(node, false)) {
log.info(ZK_LOG_PRE + "node create success, Path: " + realPath);
return true;
} else {
log.info(ZK_LOG_PRE + "node(" + node + ") has existed");
return false;
}
} else {
log.info(ZK_LOG_PRE + "node(" + node + ") has existed");
return false;
}
} catch (KeeperException e) {
log.error(ZK_LOG_PRE + "node(" + node + ") create fail");
} catch (KeeperException.NodeExistsException e) {
log.warn(ZK_LOG_PRE + "node(" + node + ") has existed", e);
} catch (InterruptedException e) {
log.error(ZK_LOG_PRE + "node(" + node + ") create fail");
log.error(ZK_LOG_PRE + "node(" + node + ") create fail", e);
Thread.currentThread().interrupt();
} catch (Throwable e) {
log.error(ZK_LOG_PRE + "node(" + node + ") create fail", e);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import lombok.extern.slf4j.Slf4j;

import static com.simonalong.butterfly.sequence.UuidConstant.MAX_WORKER_SIZE;
import static com.simonalong.butterfly.worker.zookeeper.ZkConstant.*;
import static com.simonalong.butterfly.worker.zookeeper.ZkConstant.SESSION_NODE;
import static com.simonalong.butterfly.worker.zookeeper.ZkConstant.ZK_LOG_PRE;

/**
* @author shizi
Expand All @@ -20,11 +21,11 @@ public class DefaultWorkerIdAllocator implements WorkerIdAllocator {
/**
* 当前分配的索引(就是workerId)
*/
private Integer index;
private String namespace;
private Integer workerId;
private final String namespace;
protected ZookeeperClient zkClient;
private WorkerNodeHandler workerNodeHandler;
private ConfigNodeHandler configNodeHandler;
private final WorkerNodeHandler workerNodeHandler;
private final ConfigNodeHandler configNodeHandler;

public DefaultWorkerIdAllocator(String namespace, ZookeeperClient zkClient, WorkerNodeHandler workerNodeHandler, ConfigNodeHandler configNodeHandler) {
this.namespace = namespace;
Expand All @@ -36,12 +37,12 @@ public DefaultWorkerIdAllocator(String namespace, ZookeeperClient zkClient, Work

@Override
public Integer getWorkerId() {
return index;
return workerId;
}

@Override
public String getWorkerNodePath() {
return getNodePathWithIndex(getWorkerId());
return ZkNodeHelper.getWorkerPath(namespace, workerId);
}

/**
Expand All @@ -58,14 +59,14 @@ private void init() {
zkClient.reconnect().registerDisconnectCallback(this::init);

// 初始化索引
index = getWorkId(workerNodeHandler.getUidKey().hashCode());
workerId = getWorkId(workerNodeHandler.getUidKey().hashCode());

// 初始化节点信息
initNode();
}

private void initNode() {
if (findNode(index)) {
if (findNode(workerId)) {
return;
}
// 没有找到可用worker节点,则扩充worker节点
Expand All @@ -80,17 +81,25 @@ private void initNode() {
*/
private Boolean findNode(final Integer index) {
log.info(ZK_LOG_PRE + " find one node to create session, workerId = " + index);
String workerNodePathTem = getNodePathWithIndex(index);
String workerNodePathTem = ZkNodeHelper.getWorkerPath(namespace, index);
if (zkClient.nodeExist(workerNodePathTem)) {
if (addSessionNode(workerNodePathTem)) {
savePath(index);
return true;
// 添加分布式锁
Boolean result = zkClient.distributeTryLock(ZkNodeHelper.getSessionCreateLock(namespace, index), () -> {
if (addSessionNode(workerNodePathTem)) {
setWorkerId(index);
return true;
}
return false;
});

if (result) {
return true;
}
}

// 如果转了一圈都没有找到,则考虑扩容
Integer nextIndex = getWorkId(index + 1);
if (nextIndex.equals(this.index)) {
if (nextIndex.equals(this.workerId)) {
return false;
}
return findNode(nextIndex);
Expand Down Expand Up @@ -120,9 +129,9 @@ private Boolean addSessionNode(String workerPath) {

// 节点过期:创建session节点
if (null != lastTime && lastTime < System.currentTimeMillis()) {
return zkClient.distributeLock(ZkNodeHelper.getSessionCreateLock(namespace), () -> createSession(workerPath));
return createSession(workerPath);
} else {
// 节点未过期:如果是自己,则可以使用
// 节点未过期:如果是自己,则可以使用;否则创建
if (null != uidKeyTem && workerNodeEntity.getUidKey().equals(workerNodeHandler.getUidKey())) {
return createSession(workerPath);
}
Expand Down Expand Up @@ -162,7 +171,7 @@ private void expandWorker() {
throw new ButterflyException("当前最大值" + maxMachineNum + "已到机器最大值");
}

zkClient.distributeLock(ZkNodeHelper.getBizExpandLock(namespace), () -> {
zkClient.distributeTryLock(ZkNodeHelper.getBizExpandLock(namespace), () -> {
// 扩容
if (innerExpand(maxMachineNum)) {
log.info(ZK_LOG_PRE + " expand success, ready to find one node to create session");
Expand All @@ -173,11 +182,11 @@ private void expandWorker() {
}

private Boolean innerExpand(Integer maxMachineNum) {
String leftPath = getNodePathWithPre();
for (int index = maxMachineNum; index < maxMachineNum * 2; index++) {
String workerPath = leftPath + index;
// 添加永久节点
zkClient.addPersistentNode(workerPath);
if (!zkClient.addPersistentNode(ZkNodeHelper.getWorkerPath(namespace, index))){
log.warn(ZK_LOG_PRE + "add persistent node error");
}
}

log.debug(ZK_LOG_PRE + " maxMachineNum * 2 have created finished");
Expand All @@ -186,8 +195,8 @@ private Boolean innerExpand(Integer maxMachineNum) {
return true;
}

private void savePath(Integer workerId) {
this.index = workerId;
private void setWorkerId(Integer workerId) {
this.workerId = workerId;
}

/**
Expand All @@ -211,17 +220,4 @@ private Integer getWorkId(Integer index) {
private Integer getMaxMachineNum() {
return Math.toIntExact(MAX_WORKER_SIZE);
}

/**
* 获取要分配的节点的带索引的路径
*
* @param workerId 节点索引
*/
private String getNodePathWithIndex(final Integer workerId) {
return getNodePathWithPre() + workerId;
}

private String getNodePathWithPre() {
return ROOT_PATH + "/" + namespace + WORKER_NODE + "_";
}
}
Loading

0 comments on commit 797b499

Please sign in to comment.