確率的データ構造入門:Bloom FilterとHyperLogLogのPython実装

確率的データ構造(Bloom Filter・HyperLogLog)の理論と仕組みをわかりやすく解説し、Pythonによるスクラッチ実装を紹介します。

確率的データ構造とは

大規模データを扱う場面では、すべての要素を正確に記録しようとするとメモリが膨大になります。確率的データ構造(Probabilistic Data Structure)は、わずかな誤差を許容する代わりに、メモリ使用量を大幅に削減するデータ構造の総称です。

確率的データ構造の基本的なトレードオフは次のとおりです。

観点通常のデータ構造確率的データ構造
正確性100%正確わずかな誤差を許容
メモリ使用量データ量に比例データ量に対して極めて小さい
クエリ速度\(O(1)\)〜\(O(n)\)\(O(k)\)(ハッシュ回数)
要素の削除可能基本的に不可(例外あり)

本記事では、代表的な確率的データ構造である Bloom Filter(集合への所属判定)と HyperLogLog(ユニーク要素数の推定)を取り上げ、理論的な背景とともに Python によるスクラッチ実装を紹介します。

Bloom Filter

Bloom Filter は、ある要素が集合に含まれるかどうかを高速に判定するための確率的データ構造です。1970 年に Burton Howard Bloom が提案しました。

仕組み

Bloom Filter は以下の 2 つの要素で構成されます。

  • ビット配列: サイズ \(m\) の配列(すべて 0 で初期化)
  • ハッシュ関数: \(k\) 個の独立なハッシュ関数 \(h_1, h_2, \dots, h_k\)(各関数は \(\{0, 1, \dots, m-1\}\) の値を返す)

挿入操作(add): 要素 \(x\) を追加するとき、\(k\) 個のハッシュ関数を適用し、対応するビットをすべて 1 にします。

操作ビット配列の変化
初期状態[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
“apple” を追加[0, 1, 0, 0, 1, 0, 0, 1, 0, 0] (位置 1,4,7)
“banana” を追加[0, 1, 1, 0, 1, 0, 1, 1, 0, 0] (位置 2,6 追加)

検索操作(contains): 要素 \(x\) が含まれるかを判定するとき、\(k\) 個のハッシュ値に対応するビットを確認します。

  • すべてのビットが 1 → 「おそらく含まれる」(偽陽性の可能性あり)
  • 1 つでもビットが 0 → 「確実に含まれない」(偽陰性は発生しない)

この性質が Bloom Filter の最大の特徴です。偽陰性(False Negative)は絶対に発生しませんが、偽陽性(False Positive)は発生し得ます

偽陽性率の理論

\(n\) 個の要素を挿入した後の偽陽性率は、次の式で近似されます。

\[P(fp) = \left(1 - e^{-kn/m}\right)^k \tag{1}\]

ここで \(m\) はビット配列のサイズ、\(k\) はハッシュ関数の個数、\(n\) は挿入済み要素数です。

偽陽性率を最小化する最適なハッシュ関数の個数は次のとおりです。

\[k_{\text{opt}} = \frac{m}{n} \ln 2 \tag{2}\]

このとき偽陽性率は次のように簡略化されます。

\[P(fp)_{\min} = \left(\frac{1}{2}\right)^k = (0.6185)^{m/n} \tag{3}\]

Python 実装

import hashlib
import math


class BloomFilter:
    """Bloom Filter のスクラッチ実装"""

    def __init__(self, expected_items: int, fp_rate: float = 0.01):
        """
        :param expected_items: 挿入予定の要素数
        :param fp_rate: 許容する偽陽性率(デフォルト 1%)
        """
        # 最適なビット配列サイズ: m = -n*ln(p) / (ln2)^2
        self.size = self._optimal_size(expected_items, fp_rate)
        # 最適なハッシュ関数の個数: k = (m/n) * ln2
        self.hash_count = self._optimal_hash_count(self.size, expected_items)
        self.bit_array = [0] * self.size
        self.item_count = 0

    @staticmethod
    def _optimal_size(n: int, p: float) -> int:
        """最適なビット配列サイズを計算"""
        m = -n * math.log(p) / (math.log(2) ** 2)
        return int(math.ceil(m))

    @staticmethod
    def _optimal_hash_count(m: int, n: int) -> int:
        """最適なハッシュ関数の個数を計算"""
        k = (m / n) * math.log(2)
        return int(math.ceil(k))

    def _hashes(self, item: str) -> list[int]:
        """k 個のハッシュ値を生成(double hashing 方式)"""
        h1 = int(hashlib.md5(item.encode()).hexdigest(), 16)
        h2 = int(hashlib.sha256(item.encode()).hexdigest(), 16)
        return [(h1 + i * h2) % self.size for i in range(self.hash_count)]

    def add(self, item: str) -> None:
        """要素を追加"""
        for pos in self._hashes(item):
            self.bit_array[pos] = 1
        self.item_count += 1

    def contains(self, item: str) -> bool:
        """要素が含まれるか判定(偽陽性あり、偽陰性なし)"""
        return all(self.bit_array[pos] == 1 for pos in self._hashes(item))

    def false_positive_rate(self) -> float:
        """現在の理論上の偽陽性率を計算"""
        n = self.item_count
        m = self.size
        k = self.hash_count
        return (1 - math.exp(-k * n / m)) ** k

    def memory_bytes(self) -> int:
        """おおよそのメモリ使用量(バイト)"""
        return self.size // 8 + 1


# --- 使用例 ---
if __name__ == "__main__":
    bf = BloomFilter(expected_items=10000, fp_rate=0.01)
    print(f"ビット配列サイズ: {bf.size:,} bits ({bf.memory_bytes():,} bytes)")
    print(f"ハッシュ関数の個数: {bf.hash_count}")

    # 単語リストを追加
    words = [f"word_{i}" for i in range(10000)]
    for w in words:
        bf.add(w)

    # 追加済み要素の検索(偽陰性は発生しない)
    fn_count = sum(1 for w in words if not bf.contains(w))
    print(f"\n偽陰性数: {fn_count} (常に 0)")

    # 未追加要素の検索(偽陽性の測定)
    test_words = [f"test_{i}" for i in range(10000)]
    fp_count = sum(1 for w in test_words if bf.contains(w))
    print(f"偽陽性数: {fp_count} / {len(test_words)}")
    print(f"実測偽陽性率: {fp_count / len(test_words):.4f}")
    print(f"理論偽陽性率: {bf.false_positive_rate():.4f}")

    # メモリ比較
    actual_set = set(words)
    import sys
    set_size = sys.getsizeof(actual_set)
    print(f"\n--- メモリ比較 ---")
    print(f"Bloom Filter: {bf.memory_bytes():,} bytes")
    print(f"Python set:   {set_size:,} bytes")
    print(f"削減率: {(1 - bf.memory_bytes() / set_size) * 100:.1f}%")

実行結果の例

ビット配列サイズ: 95,851 bits (11,982 bytes)
ハッシュ関数の個数: 7
偽陰性数: 0 (常に 0)
偽陽性数: 107 / 10000
実測偽陽性率: 0.0107
理論偽陽性率: 0.0101

--- メモリ比較 ---
Bloom Filter: 11,982 bytes
Python set:   524,504 bytes
削減率: 97.7%

10,000 要素を格納した場合、Python の set と比較してメモリ使用量が約 97% 削減されていることがわかります。

実用例

用途説明
Web クローラ訪問済み URL の重複チェック(Google Bigtable で採用)
データベース(LSM-Tree)存在しないキーのディスク読み取りを回避(LevelDB, RocksDB)
スペルチェッカ辞書に含まれない単語の高速検出
ネットワークルータパケットフィルタリング、キャッシュ判定
CDN / キャッシュキャッシュに存在するか否かの事前判定

HyperLogLog

HyperLogLog は、集合の**カーディナリティ(ユニーク要素数)**を極めて少ないメモリで推定するアルゴリズムです。2007 年に Flajolet らが提案しました。

仕組み

HyperLogLog の基本的なアイデアは、ハッシュ値の先頭の連続するゼロビットの最大数を観測することで、ユニーク要素数を推定するというものです。

直感的な理解: コインを投げ続けて、最初に表が出るまでの回数を記録するとします。1 回目で表が出る確率は \(1/2\)、2 回目まで裏が続く確率は \(1/4\)、\(r\) 回続く確率は \(1/2^r\) です。多数の試行で最大 \(r\) 回連続した裏を観測した場合、おおよそ \(2^r\) 回の試行があったと推定できます。

LogLog の改良: 単一のレジスタでは分散が大きいため、HyperLogLog ではハッシュ値の先頭 \(p\) ビットを使って \(m = 2^p\) 個のレジスタに分割し(確率的平均化)、推定精度を向上させます。

各レジスタ \(M[j]\) には、そのレジスタに割り当てられた要素のハッシュ値における先頭ゼロビットの最大数 + 1 を記録します。

推定式は次のとおりです。

\[E = \alpha_m \cdot m^2 \cdot \left(\sum_{j=1}^{m} 2^{-M[j]}\right)^{-1} \tag{4}\]

ここで \(\alpha_m\) はバイアス補正係数で、次のように定義されます。

\[\alpha_m = \left(m \int_0^{\infty} \left(\log_2 \left(\frac{2+u}{1+u}\right)\right)^m du\right)^{-1} \tag{5}\]

実用上は次の近似値がよく使われます。

\(m\)\(\alpha_m\)
160.673
320.697
640.709
\(\geq 128\)\(0.7213 / (1 + 1.079 / m)\)

HyperLogLog の標準誤差は次の式で表されます。

\[\sigma = \frac{1.04}{\sqrt{m}} \tag{6}\]

\(m = 2^{14} = 16384\) レジスタの場合、標準誤差は約 0.81% となります。

Python 実装

import hashlib
import math


class HyperLogLog:
    """HyperLogLog のスクラッチ実装"""

    def __init__(self, precision: int = 14):
        """
        :param precision: レジスタ数を決定する精度パラメータ p
                          レジスタ数 m = 2^p(デフォルト p=14 → 16384 レジスタ)
        """
        self.p = precision
        self.m = 1 << precision  # 2^p
        self.registers = [0] * self.m
        self.alpha = self._compute_alpha(self.m)

    @staticmethod
    def _compute_alpha(m: int) -> float:
        """バイアス補正係数を計算"""
        if m == 16:
            return 0.673
        elif m == 32:
            return 0.697
        elif m == 64:
            return 0.709
        else:
            return 0.7213 / (1 + 1.079 / m)

    def _hash(self, item: str) -> int:
        """要素を 64 ビットハッシュ値に変換"""
        h = hashlib.sha256(item.encode()).hexdigest()
        return int(h[:16], 16)  # 先頭 64 ビットを使用

    @staticmethod
    def _leading_zeros(value: int, max_bits: int) -> int:
        """ハッシュ値の先頭ゼロビット数 + 1 を返す"""
        if value == 0:
            return max_bits + 1
        count = 1
        for i in range(max_bits - 1, -1, -1):
            if value & (1 << i):
                break
            count += 1
        return count

    def add(self, item: str) -> None:
        """要素を追加"""
        h = self._hash(item)
        # 先頭 p ビットでレジスタのインデックスを決定
        j = h >> (64 - self.p)
        # 残りのビットで先頭ゼロ数を計算
        remaining = h & ((1 << (64 - self.p)) - 1)
        rank = self._leading_zeros(remaining, 64 - self.p)
        self.registers[j] = max(self.registers[j], rank)

    def count(self) -> int:
        """カーディナリティ(ユニーク要素数)を推定"""
        # 調和平均による推定
        indicator = sum(2.0 ** (-r) for r in self.registers)
        estimate = self.alpha * self.m * self.m / indicator

        # 小さな値の補正(Linear Counting)
        if estimate <= 2.5 * self.m:
            zeros = self.registers.count(0)
            if zeros > 0:
                estimate = self.m * math.log(self.m / zeros)

        # 大きな値の補正
        if estimate > (1 << 32) / 30.0:
            estimate = -(1 << 32) * math.log(1 - estimate / (1 << 32))

        return int(estimate)

    def merge(self, other: "HyperLogLog") -> "HyperLogLog":
        """2 つの HyperLogLog を統合"""
        if self.p != other.p:
            raise ValueError("精度パラメータが一致しません")
        merged = HyperLogLog(self.p)
        merged.registers = [
            max(a, b) for a, b in zip(self.registers, other.registers)
        ]
        return merged

    def standard_error(self) -> float:
        """標準誤差を返す"""
        return 1.04 / math.sqrt(self.m)

    def memory_bytes(self) -> int:
        """おおよそのメモリ使用量(バイト)"""
        # 各レジスタは最大 6 ビットで十分(値は最大 64-p+1)
        return self.m * 6 // 8


# --- 使用例 ---
if __name__ == "__main__":
    hll = HyperLogLog(precision=14)
    true_count = 1_000_000

    for i in range(true_count):
        hll.add(f"user_{i}")

    estimated = hll.count()
    error = abs(estimated - true_count) / true_count * 100

    print(f"真のユニーク数: {true_count:,}")
    print(f"推定ユニーク数: {estimated:,}")
    print(f"誤差: {error:.2f}%")
    print(f"理論標準誤差: {hll.standard_error() * 100:.2f}%")

    # メモリ比較
    import sys
    actual_set = {f"user_{i}" for i in range(true_count)}
    set_size = sys.getsizeof(actual_set)
    hll_size = hll.memory_bytes()
    print(f"\n--- メモリ比較 ---")
    print(f"HyperLogLog: {hll_size:,} bytes")
    print(f"Python set:  {set_size:,} bytes")
    print(f"削減率: {(1 - hll_size / set_size) * 100:.1f}%")

    # merge のデモ
    hll_a = HyperLogLog(precision=14)
    hll_b = HyperLogLog(precision=14)
    for i in range(500_000):
        hll_a.add(f"user_{i}")
    for i in range(300_000, 800_000):
        hll_b.add(f"user_{i}")
    merged = hll_a.merge(hll_b)
    print(f"\n--- merge デモ ---")
    print(f"HLL_A のユニーク数: {hll_a.count():,} (真値: 500,000)")
    print(f"HLL_B のユニーク数: {hll_b.count():,} (真値: 500,000)")
    print(f"merge 後のユニーク数: {merged.count():,} (真値: 800,000)")

実行結果の例

真のユニーク数: 1,000,000
推定ユニーク数: 1,007,822
誤差: 0.78%
理論標準誤差: 0.81%

--- メモリ比較 ---
HyperLogLog: 12,288 bytes
Python set:  33,554,656 bytes
削減率: 99.96%

--- merge デモ ---
HLL_A のユニーク数: 498,653 (真値: 500,000)
HLL_B のユニーク数: 501,247 (真値: 500,000)
merge 後のユニーク数: 802,134 (真値: 800,000)

100 万件のユニーク要素に対して、わずか約 12 KB のメモリで誤差 1% 未満の推定が可能です。Python の set と比較して、メモリ使用量が 99.96% 削減されています。

実用例

用途説明
Redis PFCOUNTHyperLogLog を組み込みでサポート、12 KB でカーディナリティ推定
Web アナリティクスユニークビジター数のリアルタイム集計
ネットワーク監視フロー内のユニーク IP アドレス数の推定
データベースクエリ最適化COUNT(DISTINCT ...) の高速近似
分散システムmerge 可能なため、各ノードで独立に計算し統合できる

比較表

特性Bloom FilterHyperLogLogPython set
用途集合の所属判定カーディナリティ推定汎用集合操作
メモリ(10 万要素)約 120 KB約 12 KB約 4 MB
正確性偽陽性あり(率は調整可能)標準誤差 0.81%(\(p=14\))100% 正確
要素の追加\(O(k)\)\(O(1)\)平均 \(O(1)\)
クエリ\(O(k)\)「含まれるか?」\(O(m)\)「何個あるか?」\(O(1)\) 各種操作
削除不可(Counting BF なら可能)不可可能
統合(merge)OR 演算で可能max 演算で可能union で可能
偽陰性なしN/AN/A

確率的データ構造は、正確性が 100% でなくても許容できる場面で、メモリ効率と速度の大幅な改善を実現します。用途に応じて適切なデータ構造を選択することが重要です。

関連記事

参考文献

  • Bloom, B. H. (1970). “Space/time trade-offs in hash coding with allowable errors.” Communications of the ACM, 13(7), 422-426.
  • Flajolet, P., Fusy, E., Gandouet, O., & Meunier, F. (2007). “HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm.” Discrete Mathematics and Theoretical Computer Science, AH, 137-156.
  • Broder, A., & Mitzenmacher, M. (2004). “Network applications of Bloom filters: A survey.” Internet Mathematics, 1(4), 485-509.

関連ツール