『ビッグデータを支える技術』を読んだ データインジェスチョンについて

2020-02-01 / [database] [book]

ビッグデータを支える技術―刻々とデータが脈打つ自動化の世界

仕事で広告配信のログを扱っているので、周辺ノウハウをつけるために読んだ。 仕事場で実物を見ていたので読んだ内容を体験と照合することでかなり楽しく読めた。

4章のデータインジェスチョンのところがとても面白かったので感想でも書く。 (というか全部面白かったけど全部は書けないので)

データインジェスチョン

ビッグデータにおいてデータをストレージまで配送することをデータインジェスチョンと呼ぶ。 代表的なアプローチとして バルク型ストリーミング型 がある。

バルク

生データから分散ストレージへの転送を担うノードをETLサーバと呼ぶ。ETLサーバはデータを抽出して保存し、次に転送タスクを実行する。 自分が思いつくものだとEmbulkなどが上げられる。 データ量に応じて転送の頻度を変える必要がある。通常ワークフロー管理ツールなどから実行する。

転送の信頼性が大事ならバルクがよい。ストレージからストレージへのデータの移動が基本にあるためやり直しが効き、失敗にも強い。

ストリーミング

まだどこにも保存されていないデータはまず保存できるところに転送する必要があるのでストリーミングが必要になる。

高頻度に小さなデータを保存する性質からNoSQLストレージが適していることが多い。またはメッセージキューやメッセージブローカなどの中継システ厶を置くこともある。 バッファ機能を持つメッセージ配送のひとつにfluentdがある。メッセージブローカの実装例としてKafakaがある。

ストリーミングで保存する時系列データの最適化

メッセージが届いた時間と処理される時間の違い。

  • 生成された時間を イベント時間 を呼ぶ
  • 到着した時間を プロセス時間 と呼ぶ

スマホだとオンラインになるまで時間が開くことがあるため、メッセージの生成から到着に数日遅延があることもある。

通常BI分析などで時間ベースの分析を行う場合はイベント時間ベースにする。プロセス時間はあくまでもシステムにメッセージが到着した時間であり、データ転送を中継するシステムの遅延などが多分に含まれている。そのためプロセス時間は分析者にとって意味のある時刻ではない。

このようにメッセージは遅れて届く。 遅れてきたデータを気にせずに分散ストレージに保存するとどうなるか。 例えば 2020-01-01 のデータが下記ように遅れのせいで2つのパーティションに保存されるとする。

  • 2020-01-01 中に到着したものはパーティションAに保存
  • 2020-01-03 に遅れて到着したものはパーティションBに保存

となると2020-01-01のデータをスキャンしたい場合に2つのパーティションにまたがってスキャンが必要になる。 このパーティションがネットワーク的にも離れた場所にあるとするとそのコストは深刻になる。

このように「2020-01-01のデータ」という検索の述語に対して局所性が効くようにするデータ保存の方法を 述語プッシュダウン という。 Cassandraなどの時系列インデックスを持つデータベースではイベント時間をベースに並べ替えをしておくことで検索を効率化できる。 これにより短い範囲でのリアルタイム集計は素早くできるようになる。

感想

データインジェスチョンの方式あれこれ

考えてみるとバルクとストリーミングは特に対立する手法ではなく組み合わせて使うものであるように読めた。

バルク方式ではそもそも転送元のデータが別のストレージに格納されていると仮定しており、バルク方式の信頼性はそこに依っている。 一方のストリーミング方式ではデータソースがストレージに永続化されていないものをまず最初に永続化するべくメッセージブローカなどに送信する。まだ永続されていないデータを永続化するのがストリーミングであるため、ここで障害発生して失われたデータを元に戻すのは難しい。

それぞれの方式が前提としている転送元データの状態は異なることから、これらは直交している手法のように思う。 データインジェスチョンにおいては、例えばストリーミング方式をフロントエンドとして使って、バルク方式をバックエンドとすることで大体どんなデータソースにも対応ができそうな感じにはなる。

述語プッシュダウンの話

初めて聞いた言葉だったので最初は意味を捉え損なったが、分散システムにおいて局所性を使った最適化であると咀嚼し直して納得した。 時間を軸に検索を行う時系列データベースであれば、同じ時間帯のデータは近いストレージブロックに格納されて欲しい。 そうするためにデータを時系列にソートしてから挿入することで、「近似したデータは物理的に近いところに格納」が実現できる。 分散システムにおいてもメモリアクセスの局所性みたいな性質があらわれるのだな、となんだか感心した。

前述のようにメッセージが遅れて到着した時にはかえってこれが厄介になる。 せっかくソートして保存したデータに遅れたメッセージを割り込ませるべく、対象領域のデータを再ソートして格納しなければならないからである。 データのソートと再格納は通常高いコストを伴うので、あまりに遅れてきたメッセージは無視することで不要な再格納を回避するとのこと。

あまり時系列データベースの実装を知らないのだけれど、こういう話を知ると実際にそういう実装になっているのか確認したくなる。 Cassandraは大きすぎるのでもう少し小さいデータベースがいいかな。。。