CountMinSketch是一種計數器,用來統計一個元素的計數,它能夠以一個非常小的空間統計大量元素的計數,同時保證高的性能及準確性。
與布隆過濾器類似,由於它是基於概率的,因此它所統計的計數是有一定概率存在誤差的,也就是可能會比真實的計數大。比如一個元素實際的計數是10,但是計算器的計算結果可能比10大。因此適合能夠容忍計數存在一定誤差的場景,比如社交網絡中推文的訪問次數。
它一秒能夠進行上百萬次操作(主要取決於哈希函數的速度),並且如果我們每天有一個長度為100億的數據流需要進行計數,計數值允許的誤差範圍是100,允許的錯誤率是0.1%,計數器大小是32位,只需要7.2GB內存,這完全可以單機進行計數。
原理
數據結構
CountMinSketch計數器的數據結構是一個二維數組,每一個元素都是一個計數器,計數器可以使用一個數值類型進行表示,比如無符號int:
增加計數
每個元素會通過不同的哈希函數映射到每一行的某個位置,並增加對應位置上的計數:
估算計數
估算計數也是如上圖流程,根據哈希映射到每一行的對應位置,然後讀取所有行的計數,返回其中最小的一個。
返回最小的一個是因為其他其他元素也可能會映射到自身所映射位置上面,導致計數比真實計數大,因此最小的一個計數最可能是真實計數:
比如上圖元素123映射到了元素abc第一行的相同位置,因此這個位置的計數累加了元素abc和元素123的計數和。但是只要我們取三行裡面最小的一個計數,那麼就能容忍這種情況。
當然,如果一個元素的每一行的對應位置都被其他元素所映射,那麼這個估算的計數就會比真實計數大。
哈希函數
CountMinSketch計數器裡面的哈希函數需要是彼此獨立且均勻分布(類似於哈希表的哈希函數),而且需要儘可能的快,比如murmur3就是一個很好的選擇。
CountMinSketch計數器的性能嚴重依賴於哈希函數的性能,而一般哈希函數的性能則依賴於輸入串(一般為字節數組)的長度,因此為了提高CountMinSketch計數器的性能建議減少輸入串的長度。
下面是一個簡單的性能測試,單位是字節,可以看到時間的消耗隨著元素的增大基本是線性增長的:
pkg: github.com/jiaxwu/gommon/counter/cm
cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
BenchmarkAddAndEstimate/1-8 2289142 505.9 ns/op 1.98 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/2-8 2357380 513.7 ns/op 3.89 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/4-8 2342382 496.9 ns/op 8.05 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/8-8 2039792 499.7 ns/op 16.01 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/16-8 2350281 526.8 ns/op 30.37 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/32-8 2558060 444.3 ns/op 72.03 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/64-8 2540272 459.5 ns/op 139.29 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/128-8 1919720 538.6 ns/op 237.67 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/256-8 1601738 720.6 ns/op 355.28 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/512-8 950584 1599 ns/op 320.18 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/1024-8 363592 3169 ns/op 323.17 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/2048-8 187500 5888 ns/op 347.81 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/4096-8 130425 8825 ns/op 464.15 MB/s 0 B/op 0 allocs/op
BenchmarkAddAndEstimate/8192-8 67198 17460 ns/op 469.18 MB/s 0 B/op 0 allocs/op
數組大小、哈希函數數量、錯誤範圍、錯誤率
數組大小、哈希函數數量、錯誤範圍和錯誤率之間是互相影響的,如果我們想減少錯誤率和錯誤範圍,則需要更大的數組和更多的哈希函數。但是我們很難直觀的計算出這些參數,還好有兩個公式可以幫助我們計算出準確的數值:
在我們可以確定我們的數據流大小和能夠容忍的錯誤範圍和錯誤率的情況下,我們可以根據下面公式計算數組大小和哈希函數數量:
n = 數據流大小
m = 數組大小
k = 哈希函數數量
eRange = 錯誤範圍(ErrorRange)
eRate = 錯誤率(ErrorRate)
ceil() = 向上取整操作
E = 2.718281828459045(自然常數)
m = ceil(E/(eRange/n))
k = ceil(ln(1/eRate))
應用
TopK(海量數據計數器)
對於海量數據流中頻率最高的K個數,如果使用常規的map<key, uint>,由於內存大小限制,一般情況下單機無法完成計算,需要把數據路由到多台機器上進行計數。
而如果我們使用CountMinSketch則能夠在單機情況下處理大量的數據,比如開頭所提到對於一個長度為100億的數據流進行計數,只需要7.2GB內存。這個計數結果可能存在一定誤差,不過我們可以在這個基礎上再進行過濾。
TinyLFU
TinyLFU是一個緩存淘汰策略,它裡面有LFU策略的思想,LFU是一個基於訪問頻率的淘汰策略,因此需要統計每個元素被訪問的次數。如果對每個元素使用一個獨立的計數器,那麼這個成本會很大,而且對於一個緩存淘汰策略來說,我們並不需要這個計數器非常大且非常準確。
因此TinyLFU使用一個計數器長度為4位的CountMinSketch計數器統計每個元素的頻率,減少計數所消耗的內存空間,同時還引入了計數衰減機制避免某些之前熱門但是當前已經很少被訪問的元素很難被淘汰。
實現
這裡給出一個Golang的泛型實現,這個實現支持uint8、uint16、uint32、uint64等基本類型計數器,實際上還可以實現比如長度為2bit、4bit、6bit的計數器,但是代碼會稍微複雜一點(特別是非2的次方的計數器)。
數據結構
這裡的數據結構核心是一個k*m的二維數組counters,k是哈希函數數量,m是數組每一行的長度;countersLen其實就是m;Hashs是哈希函數列表;maxCount是當前類型的最大值,比如uint8就是255,下面的計算需要用到它。
type Counter[T constraints.Unsigned] struct {
counters [][]T
countersLen uint64 // 計數器長度
hashs []*hash.hash // 哈希函數列表
maxCount T // 最大計數值
}
初始化
我們首先使用上面提到的兩個公式計算數組每一行長度和哈希函數的數量,然後初始化哈希函數列表和二維數組。
// 創建一個計數器
// size:數據流大小
// errorRange:計數值誤差範圍(會超過真實計數值)
// errorRate:錯誤率
func New[T constraints.Unsigned](size uint64, errorRange T, errorRate float64) *Counter[T] {
// 計數器長度
countersLen := uint64(math.Ceil(math.E / (float64(errorRange) / float64(size))))
// 哈希個數
hashsCnt := int(math.Ceil(math.Log(1.0 / errorRate)))
hashs := make([]*hash.Hash, hashsCnt)
counters := make([][]T, hashsCnt)
for i := 0; i < hashsCnt; i++ {
hashs[i] = hash.New()
counters[i] = make([]T, countersLen)
}
return &Counter[T]{
counters: counters,
countersLen: countersLen,
hashs: hashs,
maxCount: T(0) - 1,
}
}
增加計數
對於一個元素,我們需要把它根據每個哈希函數計算出它在每一行數組的位置,然後增加對應位置計數器的計數值。
這裡需要注意的是,計數值可能會溢出,因此我們首先判斷是否溢出,如果溢出則設置為最大值。
// 增加元素的計數
func (c *Counter[T]) Add(b []byte, val T) {
for i, h := range c.hashs {
index := h.Sum64(b) % c.countersLen
if c.counters[i][index]+val <= c.counters[i][index] {
c.counters[i][index] = c.maxCount
} else {
c.counters[i][index] += val
}
}
}
估算計數
同增加計數原理,把元素根據哈希函數映射到每一行數組的對應位置,然後選擇所有行中最小的那個計數值。
// 估算元素的計數
func (c *Counter[T]) Estimate(b []byte) T {
minCount := c.maxCount
for i, h := range c.hashs {
index := h.Sum64(b) % c.countersLen
count := c.counters[i][index]
if count == 0 {
return 0
}
minCount = mmath.Min(minCount, count)
}
return minCount
}
作者:jiaxwu
連結:https://juejin.cn/post/7142896490061496334
來源:稀土掘金