Ketama is an implementation of a consistent hashing algorithm, meaning you can add or remove servers from the memcached pool without causing a complete remap of all keys.
Here’s how it works:
* Take your list of servers (eg: 1.2.3.4:11211, 5.6.7.8:11211, 9.8.7.6:11211)
* Hash each server string to several (100-200) unsigned ints
* Conceptually, these numbers are placed on a circle called the continuum. (imagine a clock face that goes from 0 to 2^32)
* Each number links to the server it was hashed from, so servers appear at several points on the continuum, by each of the numbers they hashed to.
* To map a key->server, hash your key to a single unsigned int, and find the next biggest number on the continuum. The server linked to that number is the correct server for that key.
* If you hash your key to a value near 2^32 and there are no points on the continuum greater than your hash, return the first server in the continuum.
If you then add or remove a server from the list, only a small proportion of keys end up mapping to different servers.
前面介绍了ketama的一些概念和特性,接下来我们废话少说,直接上代码。
具体java实现如下:
public final class KetamaNodeLocator { /** * 存储节点 */ private TreeMap<Long, Node> ketamaNodes; private HashAlgorithm hashAlg; private int numReps = 100; /** * @param nodes 实际服务器的节点 * @param alg 采用的hash算法 * @param nodeCopies 虚节点数量 */ public KetamaNodeLocator(List<Node> nodes, HashAlgorithm alg, int nodeCopies) { hashAlg = alg; ketamaNodes = new TreeMap<Long, Node>(); numReps = nodeCopies; //对所有节点,生成nCopies个虚拟结点 for (Node node : nodes) { //虚拟结点分为四组 for (int i = 0; i < numReps / 4; i++) { //getKeyForNode方法为这组虚拟结点得到惟一名称 byte[] digest = hashAlg.computeMd5(getKeyForNode(node, i)); /** Md5是一个16字节长度的数组,将16字节的数组每四个字节一组, 分别对应一个虚拟结点,这就是为什么上面把虚拟结点分为组的原因*/ for (int h = 0; h < 4; h++) { // Md5编码后,每个虚拟结点对应Md5码16个字节中的4个,组成一个long型数值,做为这个虚拟结点在环中的惟一key long m = hashAlg.hash(digest, h); ketamaNodes.put(m, node); } } } } /** * Get the primary location for the given key * * @param k * 需要从节点上获取值的Key * @return * 存储key值的节点 */ public Node getPrimary(final String k) { byte[] digest = hashAlg.computeMd5(k); Node rv = getNodeForKey(hashAlg.hash(digest, 0)); return rv; } private Node getNodeForKey(long hash) { final Node rv; Long key = hash; if (!ketamaNodes.containsKey(key)) { //得到大于当前key的那个子Map,然后从中取出第一个key,就是大于且离它最近的那个key SortedMap<Long, Node> tailMap = ketamaNodes.tailMap(key); if (tailMap.isEmpty()) { key = ketamaNodes.firstKey(); } else { key = tailMap.firstKey(); } //For JDK1.6 version // key = ketamaNodes.ceilingKey(key); // if (key == null) { // key = ketamaNodes.firstKey(); // } } //如果找到这个节点,直接取节点,返回 rv = ketamaNodes.get(key); return rv; } /** * Returns a uniquely identifying key, suitable for hashing by the * KetamaNodeLocator algorithm. * * @param node The Node to use to form the unique identifier * @param repetition The repetition number for the particular node in * question (0 is the first repetition) * @return The key that represents the specific repetition of the node */ private String getKeyForNode(Node node, int repetition) { return node.getName() + repetition; } }
public enum HashAlgorithm { /** * MD5-based hash algorithm used by ketama. */ KETAMA_HASH; public long hash(byte[] digest, int nTime) { long rv = ((long) (digest[3+nTime*4] & 0xFF) << 24) | ((long) (digest[2+nTime*4] & 0xFF) << 16) | ((long) (digest[1+nTime*4] & 0xFF) << 8) | (digest[0+nTime*4] & 0xFF); return rv & 0xffffffffL; /* Truncate to 32-bits */ } /** * Get the md5 of the given key. */ public byte[] computeMd5(String k) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new RuntimeException("MD5 not supported", e); } md5.reset(); byte[] keyBytes = null; try { keyBytes = k.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException("Unknown string :" + k, e); } md5.update(keyBytes); return md5.digest(); } }
public class Node { private String name; public Node(String name) { this.name = name; } /** * 可以用机器的ip + port * * @return */ public String getName() { return name; } public String toString() { return "name : " + name; } public boolean equals(Object obj) { if(obj == null) return false; if(!(obj instanceof Node)) return false; Node node = (Node) obj; return this.name.equals(node.getName()); } }
下面是测试代码:
分布平均性测试:测试随机生成的众多key是否会平均分布到各个结点上
public class HashAlgorithmTest { static Random ran = new Random(); /** key's count */ private static final Integer EXE_TIMES = 100000; private static final Integer NODE_COUNT = 5; private static final Integer VIRTUAL_NODE_COUNT = 160; public static void main(String[] args) { HashAlgorithmTest test = new HashAlgorithmTest(); /** Records the times of locating node*/ Map<Node, Integer> nodeRecord = new HashMap<Node, Integer>(); List<Node> allNodes = test.getNodes(NODE_COUNT); KetamaNodeLocator locator = new KetamaNodeLocator(allNodes, HashAlgorithm.KETAMA_HASH, VIRTUAL_NODE_COUNT); //模拟初始化所有的key值 List<String> allKeys = test.getAllStrings(); for (String key : allKeys) { Node node = locator.getPrimary(key); Integer times = nodeRecord.get(node); if (times == null) { nodeRecord.put(node, 1); } else { nodeRecord.put(node, times + 1); } } System.out.println("Nodes count : " + NODE_COUNT + ", Keys count : " + EXE_TIMES + ", Normal percent : " + (float) 100 / NODE_COUNT + "%"); System.out.println("-------------------- boundary ----------------------"); for (Map.Entry<Node, Integer> entry : nodeRecord.entrySet()) { System.out.println("Node name :" + entry.getKey() + " - Times : " + entry.getValue() + " - Percent : " + (float)entry.getValue() / EXE_TIMES * 100 + "%"); } } /** * Gets the mock node by the material parameter * * @param nodeCount * the count of node wanted * @return * the node list */ private List<Node> getNodes(int nodeCount) { List<Node> nodes = new ArrayList<Node>(); for (int k = 1; k <= nodeCount; k++) { Node node = new Node("node" + k); nodes.add(node); } return nodes; } /** * All the keys */ private List<String> getAllStrings() { List<String> allStrings = new ArrayList<String>(EXE_TIMES); for (int i = 0; i < EXE_TIMES; i++) { allStrings.add(generateRandomString(ran.nextInt(50))); } return allStrings; } /** * To generate the random string by the random algorithm * <br> * The char between 32 and 127 is normal char * * @param length * @return */ private String generateRandomString(int length) { StringBuffer sb = new StringBuffer(length); for (int i = 0; i < length; i++) { sb.append((char) (ran.nextInt(95) + 32)); } return sb.toString(); } }
节点增删测试:在环上插入N个结点,每个节点nCopies个虚拟结点。随机生成众多key,在增删节点时,测试同一个key选择相同节点的概率
public class HashAlgorithmPercentTest { static Random ran = new Random(); /** key's count */ private static final Integer EXE_TIMES = 100000; private static final Integer NODE_COUNT = 50; private static final Integer VIRTUAL_NODE_COUNT = 160; static List<String> allKeys = null; static { allKeys = getAllStrings(); } public static void main(String[] args) { Map<String, List<Node>> map = generateRecord(); List<Node> allNodes = getNodes(NODE_COUNT); System.out.println("Normal case : nodes count : " + allNodes.size()); call(allNodes, map); allNodes = getNodes(NODE_COUNT + 8); System.out.println("Added case : nodes count : " + allNodes.size()); call(allNodes, map); allNodes = getNodes(NODE_COUNT - 10); System.out.println("Reduced case : nodes count : " + allNodes.size()); call(allNodes, map); int addCount = 0; int reduceCount = 0; for (Map.Entry<String, List<Node>> entry : map.entrySet()) { List<Node> list = entry.getValue(); if (list.size() == 3) { if (list.get(0).equals(list.get(1))) { addCount++; } if (list.get(0).equals(list.get(2))) { reduceCount++; } } else { System.out.println("It's wrong size of list, key is " + entry.getKey() + ", size is " + list.size()); } } System.out.println(addCount + " --- " + reduceCount); System.out.println("Same percent in added case : " + (float) addCount * 100/ EXE_TIMES + "%"); System.out.println("Same percent in reduced case : " + (float) reduceCount * 100/ EXE_TIMES + "%"); } private static void call(List<Node> nodes, Map<String, List<Node>> map) { KetamaNodeLocator locator = new KetamaNodeLocator(nodes, HashAlgorithm.KETAMA_HASH, VIRTUAL_NODE_COUNT); for (Map.Entry<String, List<Node>> entry : map.entrySet()) { Node node = locator.getPrimary(entry.getKey()); if (node != null) { List<Node> list = entry.getValue(); list.add(node); } } } private static Map<String, List<Node>> generateRecord() { Map<String, List<Node>> record = new HashMap<String, List<Node>>(EXE_TIMES); for (String key : allKeys) { List<Node> list = record.get(key); if (list == null) { list = new ArrayList<Node>(); record.put(key, list); } } return record; } /** * Gets the mock node by the material parameter * * @param nodeCount * the count of node wanted * @return * the node list */ private static List<Node> getNodes(int nodeCount) { List<Node> nodes = new ArrayList<Node>(); for (int k = 1; k <= nodeCount; k++) { Node node = new Node("node" + k); nodes.add(node); } return nodes; } /** * All the keys */ private static List<String> getAllStrings() { List<String> allStrings = new ArrayList<String>(EXE_TIMES); for (int i = 0; i < EXE_TIMES; i++) { allStrings.add(generateRandomString(ran.nextInt(50))); } return allStrings; } /** * To generate the random string by the random algorithm * <br> * The char between 32 and 127 is normal char * * @param length * @return */ private static String generateRandomString(int length) { StringBuffer sb = new StringBuffer(length); for (int i = 0; i < length; i++) { sb.append((char) (ran.nextInt(95) + 32)); } return sb.toString(); } }