From 03cd4f98c7bd0c862c5424b637932834d2f5ec34 Mon Sep 17 00:00:00 2001 From: qixiaobo Date: Fri, 21 Aug 2020 11:30:00 +0800 Subject: [PATCH] support multi snowflake instance in one project Why SnowflakeZookeeperHolder use PREFIX_ZK_PATH static? Cannot use more than one SnowFlake instance in one project #138 --- .../leaf/snowflake/SnowflakeIDGenImpl.java | 9 +++-- .../snowflake/SnowflakeZookeeperHolder.java | 40 ++++++++++--------- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/leaf-core/src/main/java/com/sankuai/inf/leaf/snowflake/SnowflakeIDGenImpl.java b/leaf-core/src/main/java/com/sankuai/inf/leaf/snowflake/SnowflakeIDGenImpl.java index b4f813b2..fddc1ce1 100644 --- a/leaf-core/src/main/java/com/sankuai/inf/leaf/snowflake/SnowflakeIDGenImpl.java +++ b/leaf-core/src/main/java/com/sankuai/inf/leaf/snowflake/SnowflakeIDGenImpl.java @@ -2,6 +2,7 @@ import com.google.common.base.Preconditions; import com.sankuai.inf.leaf.IDGen; +import com.sankuai.inf.leaf.common.PropertyFactory; import com.sankuai.inf.leaf.common.Result; import com.sankuai.inf.leaf.common.Status; import com.sankuai.inf.leaf.common.Utils; @@ -30,10 +31,11 @@ public boolean init() { private long sequence = 0L; private long lastTimestamp = -1L; private static final Random RANDOM = new Random(); + private final String snowflakeName; public SnowflakeIDGenImpl(String zkAddress, int port) { //Thu Nov 04 2010 09:42:54 GMT+0800 (中国标准时间) - this(zkAddress, port, 1288834974657L); + this(zkAddress, port, 1288834974657L, PropertyFactory.getProperties().getProperty("leaf.name")); } /** @@ -41,11 +43,12 @@ public SnowflakeIDGenImpl(String zkAddress, int port) { * @param port snowflake监听端口 * @param twepoch 起始的时间戳 */ - public SnowflakeIDGenImpl(String zkAddress, int port, long twepoch) { + public SnowflakeIDGenImpl(String zkAddress, int port, long twepoch, String snowflakeName) { this.twepoch = twepoch; + this.snowflakeName = snowflakeName; Preconditions.checkArgument(timeGen() > twepoch, "Snowflake not support twepoch gt currentTime"); final String ip = Utils.getIp(); - SnowflakeZookeeperHolder holder = new SnowflakeZookeeperHolder(ip, String.valueOf(port), zkAddress); + SnowflakeZookeeperHolder holder = new SnowflakeZookeeperHolder(ip, String.valueOf(port), zkAddress, snowflakeName); LOGGER.info("twepoch:{} ,ip:{} ,zkAddress:{} port:{}", twepoch, ip, zkAddress, port); boolean initFlag = holder.init(); if (initFlag) { diff --git a/leaf-core/src/main/java/com/sankuai/inf/leaf/snowflake/SnowflakeZookeeperHolder.java b/leaf-core/src/main/java/com/sankuai/inf/leaf/snowflake/SnowflakeZookeeperHolder.java index 76e85e85..8befc0bf 100644 --- a/leaf-core/src/main/java/com/sankuai/inf/leaf/snowflake/SnowflakeZookeeperHolder.java +++ b/leaf-core/src/main/java/com/sankuai/inf/leaf/snowflake/SnowflakeZookeeperHolder.java @@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; -import com.sankuai.inf.leaf.common.*; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.zookeeper.CreateMode; @@ -31,39 +30,44 @@ public class SnowflakeZookeeperHolder { private String zk_AddressNode = null;//保存自身的key ip:port-000000001 private String listenAddress = null;//保存自身的key ip:port private int workerID; - private static final String PREFIX_ZK_PATH = "/snowflake/" + PropertyFactory.getProperties().getProperty("leaf.name"); - private static final String PROP_PATH = System.getProperty("java.io.tmpdir") + File.separator + PropertyFactory.getProperties().getProperty("leaf.name") + "/leafconf/{port}/workerID.properties"; - private static final String PATH_FOREVER = PREFIX_ZK_PATH + "/forever";//保存所有数据持久的节点 - private String ip; - private String port; - private String connectionString; + private final String prefixZkPath; + private final String propPath; + private final String pathForever; + private final String ip; + private final String port; + private final String connectionString; + private final String snowflakeName; private long lastUpdateTime; - public SnowflakeZookeeperHolder(String ip, String port, String connectionString) { + public SnowflakeZookeeperHolder(String ip, String port, String connectionString, String snowflakeName) { this.ip = ip; this.port = port; this.listenAddress = ip + ":" + port; this.connectionString = connectionString; + this.snowflakeName = snowflakeName; + prefixZkPath = "/snowflake/" + snowflakeName; + propPath = System.getProperty("java.io.tmpdir") + File.separator + snowflakeName + "/leafconf/{port}/workerID.properties"; + pathForever = prefixZkPath + "/forever";//保存所有数据持久的节点 } public boolean init() { try { CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000); curator.start(); - Stat stat = curator.checkExists().forPath(PATH_FOREVER); + Stat stat = curator.checkExists().forPath(pathForever); if (stat == null) { //不存在根节点,机器第一次启动,创建/snowflake/ip:port-000000000,并上传数据 zk_AddressNode = createNode(curator); //worker id 默认是0 updateLocalWorkerID(workerID); //定时上报本机时间给forever节点 - ScheduledUploadData(curator, zk_AddressNode); + scheduledUploadData(curator, zk_AddressNode); return true; } else { Map nodeMap = Maps.newHashMap();//ip:port->00001 Map realNode = Maps.newHashMap();//ip:port->(ipport-000001) //存在根节点,先检查是否有属于自己的根节点 - List keys = curator.getChildren().forPath(PATH_FOREVER); + List keys = curator.getChildren().forPath(pathForever); for (String key : keys) { String[] nodeKey = key.split("-"); realNode.put(nodeKey[0], key); @@ -72,7 +76,7 @@ public boolean init() { Integer workerid = nodeMap.get(listenAddress); if (workerid != null) { //有自己的节点,zk_AddressNode=ip:port - zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress); + zk_AddressNode = pathForever + "/" + realNode.get(listenAddress); workerID = workerid;//启动worder时使用会使用 if (!checkInitTimeStamp(curator, zk_AddressNode)) { throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time"); @@ -96,7 +100,7 @@ public boolean init() { LOGGER.error("Start node ERROR {}", e); try { Properties properties = new Properties(); - properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + "")))); + properties.load(new FileInputStream(new File(propPath.replace("{port}", port + "")))); workerID = Integer.valueOf(properties.getProperty("workerID")); LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID); } catch (Exception e1) { @@ -108,14 +112,14 @@ public boolean init() { } private void doService(CuratorFramework curator) { - ScheduledUploadData(curator, zk_AddressNode);// /snowflake_forever/ip:port-000000001 + scheduledUploadData(curator, zk_AddressNode);// /snowflake_forever/ip:port-000000001 } - private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) { + private void scheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) { Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { - Thread thread = new Thread(r, "schedule-upload-time"); + Thread thread = new Thread(r, snowflakeName + "-schedule-upload-time"); thread.setDaemon(true); return thread; } @@ -144,7 +148,7 @@ private boolean checkInitTimeStamp(CuratorFramework curator, String zk_AddressNo */ private String createNode(CuratorFramework curator) throws Exception { try { - return curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(PATH_FOREVER + "/" + listenAddress + "-", buildData().getBytes()); + return curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(pathForever + "/" + listenAddress + "-", buildData().getBytes()); } catch (Exception e) { LOGGER.error("create node error msg {} ", e.getMessage()); throw e; @@ -187,7 +191,7 @@ private Endpoint deBuildData(String json) throws IOException { * @param workerID */ private void updateLocalWorkerID(int workerID) { - File leafConfFile = new File(PROP_PATH.replace("{port}", port)); + File leafConfFile = new File(propPath.replace("{port}", port)); boolean exists = leafConfFile.exists(); LOGGER.info("file exists status is {}", exists); if (exists) {