背景 之前写了一个一致性hash的实现 ,在线上跑得挺稳定的.不过在极端压力的情况下还是有bug.这次趁着双11把这个问题给fix了,性能提升也非常明显(可以跑一下后面的测试代码).主要归功于google guava中的实现.
代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package name.chengchao.myConsistentHash;import java.util.HashSet;import java.util.List;import java.util.Set;import com.google.common.collect.ImmutableList;import com.google.common.hash.Hashing;public class ConsistentHashV5 implements ConsistentHash { private Set<String> nodeSet = new HashSet <String>(); ImmutableList<String> immutableNodeList; public ConsistentHashV5 (List<String> nodeList) { Assert.notEmpty(nodeList, "nodeList can not be empty!" ); nodeSet.addAll(nodeList); immutableNodeList = ImmutableList.copyOf(nodeSet); } @Override public String dispatchToNode (String target) { Assert.hasText(target, "target can not be blank!" ); Long targetHashcode = HashCodeUtils.hashString(target); ImmutableList<String> tmpList = immutableNodeList; int index = Hashing.consistentHash(targetHashcode, tmpList.size()); return tmpList.get(index); } @Override synchronized public void addNode (String node) { Assert.hasText(node, "node can not be blank!" ); if (nodeSet.contains(node)) { return ; } nodeSet.add(node); immutableNodeList = ImmutableList.copyOf(nodeSet); } @Override synchronized public void removeNode (String node) { Assert.hasText(node, "node can not be blank!" ); if (!nodeSet.contains(node)) { return ; } nodeSet.remove(node); immutableNodeList = ImmutableList.copyOf(nodeSet); } @Override public int getHashCircleCount () { return 0 ; } }
测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 package name.chengchao.myConsistentHash;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.apache.commons.lang3.RandomStringUtils;public class ConsistentHashTestV5 { public final static int number = 2 ; public final static int dispatchNumber = 5 ; public static ExecutorService executorService = Executors.newFixedThreadPool(number); public static ExecutorService dispatchExecutorService = Executors.newFixedThreadPool(dispatchNumber); public static ConsistentHash buildDispatchHashV5 (int nodeCount) { List<String> nodeList = new ArrayList <String>(); for (int i = 0 ; i < nodeCount; i++) { nodeList.add("node-" + i); } ConsistentHash dispatchHash = new ConsistentHashV5 (nodeList); return dispatchHash; } public static ConsistentHash buildDispatchHashV4 (int nodeCount) { List<String> nodeList = new ArrayList <String>(); for (int i = 0 ; i < nodeCount; i++) { nodeList.add("node-" + i); } ConsistentHash dispatchHash = new ConsistentHashV4 (nodeList); return dispatchHash; } public static void main (String[] args) throws Exception { test_perfermance(); } public static void test_consistent () { ConsistentHash dispatchHash = buildDispatchHashV5(100 ); for (int i = 0 ; i < 3 ; i++) { System.out.println(dispatchHash.dispatchToNode("127.0.0.1" )); } System.out.println("==========remove node 10" ); dispatchHash.removeNode("node-10" ); System.out.println(dispatchHash.dispatchToNode("127.0.0.1" )); System.out.println("==========remove node 99" ); dispatchHash.removeNode("node-99" ); System.out.println(dispatchHash.dispatchToNode("127.0.0.1" )); System.out.println("==========add node 99" ); dispatchHash.addNode("node-101" ); System.out.println(dispatchHash.dispatchToNode("127.0.0.1" )); System.out.println("==========add node 200" ); dispatchHash.addNode("node-200" ); System.out.println(dispatchHash.dispatchToNode("127.0.0.1" )); } public static void test_balance () { int count = 1 ; int nodeCount = 5 ; long start = System.currentTimeMillis(); for (int i = 0 ; i < count; i++) { System.out.println("*************start************" ); ConsistentHash dispatchHash = buildDispatchHashV5(nodeCount); testDispatch(dispatchHash, 10000 ); System.out.println("==remove node==" ); dispatchHash.removeNode("node-0" ); testDispatch(dispatchHash, 10000 ); System.out.println("==add node==" ); dispatchHash.addNode("node-add-0" ); testDispatch(dispatchHash, 10000 ); System.out.println("**************end*************" ); } long end = System.currentTimeMillis(); System.out.println("耗时(ms):" + (end - start)); } public static void test_perfermance () { int count = 1000 ; int nodeCount = 20 ; long start = System.currentTimeMillis(); for (int i = 0 ; i < count; i++) { ConsistentHash dispatchHash = buildDispatchHashV4(nodeCount); dispatchHash.dispatchToNode("127.0.0.1" ); } long end = System.currentTimeMillis(); System.out.println("V4 耗时(ms):" + (end - start)); start = System.currentTimeMillis(); for (int i = 0 ; i < count; i++) { ConsistentHash dispatchHash = buildDispatchHashV5(nodeCount); dispatchHash.dispatchToNode("127.0.0.1" ); } end = System.currentTimeMillis(); System.out.println("V5 耗时(ms):" + (end - start)); } public static void testDispatch (ConsistentHash dispatchHash, int taskNum) { Map<String, Long> map = new HashMap <String, Long>(); for (int i = 0 ; i < taskNum; i++) { String target = RandomStringUtils.randomAlphanumeric(20 ); String node = dispatchHash.dispatchToNode(target); Long count = map.get(node); if (count == null ) { count = 0L ; } count++; map.put(node, count); } for (Map.Entry<String, Long> entry : map.entrySet()) { System.out.println(entry.getKey() + ":" + entry.getValue()); } } public static void test_multithread () throws Exception { List<String> nodeList = new ArrayList <String>(); for (int i = 0 ; i < 100 ; i++) { nodeList.add("node" + i); } final ConsistentHash consistentHash = buildDispatchHashV5(100 ); for (int i = 0 ; i < 50 ; i++) { final CyclicBarrier barrier = new CyclicBarrier (number); final int index = i; for (int j = 0 ; j < number; j++) { executorService.execute(new Runnable () { @Override public void run () { try { barrier.await(); } catch (Exception e) { e.printStackTrace(); } consistentHash.removeNode("node" + index); System.out.println("remove:" + "node" + index); } }); } System.out.println("sleep 1s" ); Thread.sleep(1000 ); for (int j = 0 ; j < dispatchNumber; j++) { dispatchExecutorService.execute(new Runnable () { @Override public void run () { String nodeStr = consistentHash.dispatchToNode(RandomStringUtils.random(10 )); System.out.println("dispatch done,node:" + nodeStr); } }); } } } }