目录
(一)Hash
哈希表是一种基本的数据结构,其思想是利用Hash函数来支持快速的【插入和搜索】,这是哈希表的第一个重要概念。本文从哈希表开始说起,是为数据去重问题提供最原始的思路。该模块不涉及任何复杂算法,或者是Java中的实现方法,仅从最简单的角度进行讲解,便于初学者快速理解。
既然说起了【插入和搜索】,那么哈希表的相关原理就离不开这两个场景,与插入对应的,是哈希函数;与搜索对应的,是存储桶。哈希表的原理,即是通过哈希函数,将随机的数字,固定的映射到一个存储桶里。例如当我们插入一个新的键值时,会通过哈希函数映射,查询该键值应该分配到哪个存储桶里,并进行对应的存储操作;当我们搜索一个键值时,使用相同的哈希函数,找到对应的存储桶,并只在这个存储桶里进行搜索。
如下图所示,假设哈希函数是y = x % 5:
说到这里,哈希函数是原始的“分而治之”思想的实践者,天然的应该应用到对应的高性能场景中。哈希函数的设计并没有标准方法,最完美的情况下,键值和存储桶之间能做到严格的一对一映射。
然而,在大多数情况下,哈希函数并不完美,不可避免的会遇到冲突的问题。例如上文提到的y = x % 5,1987和2这两个键值都被分配到了存储桶2中。在这里,哈希表就引入了第二个重要概念【冲突解决】。
冲突解决算法需要解决如下三个问题:
(1)同一个存储桶下的键值应该如何组织?
(2)如果某个存储桶中的键值超过了最大存储数量,应如何解决?
(3)如何在同一个存储桶中快速定位目标值?
因此,哈希表需要有一个“N”的概念,即每个存储桶能够存储的最大键值。
讲到这里,对于基本的哈希表概念就大致了解了,接下来我们实现两个最简单的数据结构:HashSet和HashMap。
先说HashSet,实现如下:
class MyHashSet {
private final int MAX_LEN = 100000;
private List<Integer>[] set;
private int getIndex(int key) {
return key % MAX_LEN;
}
private int getPos(int key, int index) {
List<Integer> temp = set[index];
if (temp == null) {
return -1;
}
for (int i = 0; i < temp.size(); ++i) {
if (temp.get(i) == key) {
return i;
}
}
return -1;
}
public MyHashSet() {
set = (List<Integer>[])new ArrayList[MAX_LEN];
}
public void add(int key) {
int index = getIndex(key);
int pos = getPos(key, index);
if (pos < 0) {
if (set[index] == null) {
set[index] = new ArrayList<Integer>();
}
set[index].add(key);
}
}
public void remove(int key) {
int index = getIndex(key);
int pos = getPos(key, index);
if (pos >= 0) {
set[index].remove(pos);
}
}
public boolean contains(int key) {
int index = getIndex(key);
int pos = getPos(key, index);
return pos >= 0;
}
}
再说HashMap,实现如下:
import javafx.util.Pair;
class MyHashMap {
private final int MAX_LEN = 100000;
private List<Pair<Integer, Integer>>[] map;
private int getIndex(int key) {
return key % MAX_LEN;
}
private int getPos(int key, int index) {
List<Pair<Integer, Integer>> temp = map[index];
if (temp == null) {
return -1;
}
for (int i = 0; i < temp.size(); ++i) {
if (temp.get(i).getKey() == key) {
return i;
}
}
return -1;
}
public MyHashMap() {
map = (List<Pair<Integer, Integer>>[])new ArrayList[MAX_LEN];
}
public void put(int key, int value) {
int index = getIndex(key);
int pos = getPos(key, index);
if (pos < 0) {
if (map[index] == null) {
map[index] = new ArrayList<Pair<Integer, Integer>>();
}
map[index].add(new Pair(key, value));
} else {
map[index].set(pos, new Pair(key, value));
}
}
public int get(int key) {
int index = getIndex(key);
int pos = getPos(key, index);
if (pos < 0) {
return -1;
} else {
return map[index].get(pos).getValue();
}
}
public void remove(int key) {
int index = getIndex(key);
int pos = getPos(key, index);
if (pos >= 0) {
map[index].remove(pos);
}
}
}
以上内容很基础,但对于理解接下来的内容很有帮助。
(二)BitMap
理解了Hash的基本概念,接下来就引入第二个概念:BitMap。简单讲:BitMap就是用一个bit位来标记某个元素对应的键值。用哈希表的概念看,就是将每个bit位当作存储桶,位移作哈希函数,“N”值设定为1。
例如:我们目前有16个bit位:0000000000000000(标号从0开始),还有一个数字集合{1,2,5,7,11},那么这16个bit位就可以表示为:0110010100010000。如下图所示:
再举例:一台32位机器上的自然数总共有4294967295个,如果用一个bit来存放一个整数,1代表存在,0代表不存在,那么把全部自然数存储在内存只要4294967295 / (8 * 1024 * 1024) ≈ 512MB,如果存储在文件中,需要约20G的容量。
以下的实现方式需要自行补充Java位运算的相关知识:
因为在Java中,最小的数据类型为byte(8位),因此这里用byte举例。这里我们需要拿到两个位置:一个是数字在每个byte中的位置,另一个是在byte[]数组中的位置。因此计算公式如下:
所处于数组位置:outerIndex = num >> 3 (相当于除以8取整)
在byte中位置:innerIndex = num & 7 (相当于mod8)
在byte中标记位置:0x01 << innerIndex(位移)
更新byte:bitsMap[outerIndex] | (0x01 << innerIndex) (或运算更新bit位)
相关实现代码如下:
public class BitMap {
private static byte[] bitsMap;
public BitMap(long length) {
bitsMap = new byte[(byte) (length >> 3) + ((length & 7) > 0 ? 1 : 0)];
}
public int get(long num) {
byte data = bitsMap[(byte) (num >> 3)];
byte innerIndex = (byte) (num & 7);
return data >> innerIndex & 0x01;
}
public void put(long num) {
byte outerIndex = (byte) (num >> 3);
byte innerIndex = (byte) (num & 7);
bitsMap[outerIndex] = (byte) (bitsMap[outerIndex] | (0x01 << innerIndex));
}
public static void main(String[] args) {
BitMap bitMap = new BitMap(101);
bitMap.put(33);
System.out.println(bitMap.get(0));
System.out.println(bitMap.get(33));
System.out.println(bitMap.get(100));
}
}
结果:0、1、0
是不是与HashMap的实现方式有雷同呢?
(三)BitSet
理解了BitMap,接下来直接看Java.util中的BitSet方法,就容易多了,基本的原理都是相通的。接下来就拿出一部分源码来看:
public void set(int bitIndex) {
if (bitIndex < 0)
throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
int wordIndex = wordIndex(bitIndex);
expandTo(wordIndex);
words[wordIndex] |= (1L << bitIndex); // Restores invariants
checkInvariants();
}
以Set方法为例:先是通过wordIndex方法获得outerIndex的位置,“>> ADDRESS_BITS_PER_WORD”这里设定的值是6,也就是2的6次方,对应的就是Java中Long的数据类型。再是通过expandTo方法来判断是否超过了当前words数组,如果存不下,则新增一个Long数组。最后是“1L << bitIndex”更新下标,与BitMap中的位移方式相同。
Get方法的实现方式类似,详细的实现代码可以看一下Java源码。
(四)BloomFilter
布隆过滤器本质上是一种数据结构,通过巧妙的概率型数据结构,以较小的存储空间,换取高效的插入和去重查询操作。但布隆过滤器的结果是概率性的,并不准确。
理解了BitSet的概念,我们就可以通过BitSet的数据结构,来实现基本的BloomFilterInMemory方法:
import java.util.BitSet;
public class BloomFilterInMemory {
protected BitSet bloom;
public BloomFilterInMemory(int size) {
bloom = new BitSet(size);
}
public synchronized boolean addElement(int element) {
boolean added = false;
if (!getBit(element)) {
added = true;
setBit(element, true);
}
return added;
}
public synchronized void clear() {
bloom.clear();
}
public synchronized boolean contains(int element) {
if (!getBit(element)) {
return false;
}
return true;
}
protected boolean getBit(int index) {
return bloom.get(index);
}
protected void setBit(int index, boolean to) {
bloom.set(index, to);
}
public synchronized boolean isEmpty() {
return bloom.isEmpty();
}
}
(五)Flink
实时数据去重是一种比较常见的近似场景,通常有以下三种实现方式:
1. 通过布隆过滤器;
2. 通过内嵌数据库(如RocksDB);
3. 引入外部数据库(如Redis)。
第一种方式是近似统计,第二、三种方式统计的就相对准确。假如使用场景有很多额外的因素,例如反作弊会对后续数据进行修正,那么还是推荐通过布隆过滤器的方式来进行统计,只需要去重的指标是整型(字符串去重对哈希函数的设计要求较高)。
实时统计中,我们最常遇到的是对用户进行去重,假设用户是登陆状态,那么可以获得用户的ID,这种ID通常是数据库自增型的,那么就很适合布隆过滤器的使用场景。
以上一篇文章的WordCount程序实例,实现一个新的DeduplicateFlatMapFunction即可:
public class DeduplicateFlatMapFunction implements FlatMapFunction<Integer, Tuple2<Integer, Integer>> {
private static final int DEFAULT_SIZE = 1000000;
private volatile BloomFilterInMemory bloomFilter;
@Override
public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (bloomFilter == null) {
bloomFilter = new BloomFilterInMemory(DEFAULT_SIZE);
}
if (!bloomFilter.contains(value)) {
bloomFilter.addElement(value);
out.collect(new Tuple2<>(value, 1));
}
}
}