概要
SparkとHadoopの関係を理解するため、それぞれの大まかな情報をまとめる。結論からいうと、SparkはHadoopエコシステムとは独立したもので、HadoopのMapReduceの苦手なイテラティブな計算やインタラクティブなクエリなどで優れたパフォーマンスを発揮する分散データ処理を可能にするフレームワークである。
Hadoop
Hadoop core
- Apache Hadoop(Hadoop core)はHadoopの中核技術であるHadoop Distributed FileSystem(HDFS)とHadoop MapReduce、及びそれらを利用するためのユーティリティ群で構成される
HDFS
- データをたくさんのコンピュータに分散して保管する仕組みである
- ファイルを固定のサイズのブロックで分割して管理する
- HDFSではNameNode、DataNodeと呼ばれるサーバを用意している
- NameNodeはどのブロックがどのコンピュータに配置されているかを管理しており、クラスタ全体でデータがどのように分散されているのかを管理する
- DataNodeはばらばらになったデータの断片を実際に保管している
- DataNodeは定期的に、NameNode宛にハートビートを送っており、これをもとにNameNodeはクラスタ全体の状況を更新/把握する
- 各ブロックを別々のコンピュータに複数個保管することで耐障害性を確保している
- NameNodeは単一障害点である
- Hadoopではデータの更新はできない仕様となっているので、更新する場合は更新したい部分を書き換えた新しいデータをHadoopクラスタ内に作成し、元のデータを削除する
MapReduce
- 大規模データを効率的に分散処理するためのプログラミングモデルである
- JobTrackerはどのブロックがどのコンピュータで処理しているかを管理しており、多くのコンピュータが並列で処理を行っている状況を把握し、処理が効率よくもれなく実施されるように調整する
- TaskTrackerは1つ1つのブロックを処理するプロセス(タスク)を管理する
- JobTrackerはTaskTrackerからのハートビートからTaskTrackerの状況を管理する
- Hadoopクライアントはジョブの実行をJobTrackerに依頼し、JobTrackerはその要求をもとに各コンピュータにタスクを割り振る
- DataNodeとTaskTrackerは同じコンピュータ上に同居するように構成する
- JobTrackerがTaskTrackerの異常を検知すると失われたのと同じブロックを持つコンピュータに自動的にタスクを割り当てなおすが、タスク再割り当てを繰り返しても状況が回復しない場合にはジョブ全体を中断する
- JobTrackerも単一障害点である
- MapReduceでは処理をMapとReduceの2つのフェーズに分ける
- 基本的にMapがデータの抜き出し、Reduceがデータの加工や集計を担当する
- MapReduceでは並列処理における同期を1回しか行わない。同期フェーズのことをShuffleという。これはMapフェーズの後、Reduceフェーズの前に自動的に行われる。
- MapフェーズではHDFSにブロックとして保管されているぶつ切りのデータを読み込んで、その中身を確認する。ブロック単位にMapのためのタスク(Mapperタスク)を起動し、データローカリティを最大限活用して処理を実施する
- Mapperタスクにはブロックをレコード単位に分解したデータが順に渡される。Mapフェーズでは各レコードに何らかの処理を実施するようプログラムを記述し、処理結果はキーバリューの対として出力する。バリューとして何を設定すべきかは、後続のReduceフェーズを考えながら決める。キーとバリューは次のShuffleフェーズに引き渡される。
- ShuffleはMapフェーズの最後に自動的に始まるので特別に処理を書く必要はない。指定できるのは取りまとめのルール(Partitioner)とキーの並び替えのルール(Comparator)の2つ。
- Partitionerはキーの値を受け取りそれが何番目のReducerタスクに渡されるべきなのかを判定する(指定しなければキーのハッシュ値から決める)
- Comparatorは2つのキーの値を受け取り、大小関係を判定する(指定しなければキーのバイナリ表現での比較により判定)
- Shuffleフェーズの前半ではネットワーク通信は行わない。まずMapperタスクのローカルのメモリとハードディスク上でPartitionerに基づいた仕分けを行い、その中をComparatorに従って並べ替える。これによってMapperから出力されたキーバリューがキーごとにひとまとまりに並ぶ。ここまで処理し終わったらMapperタスクはTaskTrackerを通じてJobTrackerに自身の処理が終了したことを通知する
- ReducerタスクはJobTrackerに対し、担当分のデータがどこのTaskTrackerに存在するのかを定期的に問い合わせる。処理すべきデータを見つけると、ReducerタスクはTaskTrackerからデータをコピーし、自身のメモリかローカルハードディスクに保管する。データのコピーは複数のTaskTrackerに対して同時並行で行う。担当分のデータをすべてコピーしたらそれらを1つのデータとして取りまとめる(並び順を保ったままにする)。
- ReducerタスクはShuffleフェーズで必要なデータがそろったら、キーごとのバリューのリストを順番に読み込んで処理を実施し、最終的な結果をHDFSに出力する
- Reducerで行う処理の内容に応じてバリューに何を設定すべきかが決まる。並列処理の主たるロジックはたいていの場合はReducerに記述する。設計の際にはReducerでどういう処理をするのかを先に決めることが多い。
MapReduceにおける並列処理のパターン
- 大きくはレコード並列(各レコードを完全に別々に処理可能で同期が不要。Mapフェーズのみで完結する)、キー並列(同一キーのレコード同士について処理を同期)、関連有=処理結果がキーの順番に依存(Partitioner,Comparatorのカスタマイズが必要)、という3つに分類される
- Emptyパターンは読み込んだデータをそのまま出力する。レコード並列に分類される。
- Editパターンは読み込んだデータを加工し、出力する。レコード並列に分類される。
- Limitパターンはデータを件数で絞り込む。レコード並列に分類される。
- SAMPLEパターンはデータを割合で絞り込む。レコード並列に分類される。
- FILTERパターンはデータを条件で絞り込む。レコード並列に分類される
- SPLITパターンは条件に該当したデータをそれぞれの出力先に出力する。レコード並列に分類される。
- MERGEパターンは複数のデータを1つのデータにまとめる。レコード並列に分類される。
- DISTINCTパターンはデータに重複したレコードがある場合にそれを除去する。キー並列に分類される。
- UNIONパターンは複数のデータを1つのデータにまとめる。ただし結果は重複は除去する。キー並列に分類される。
- GROUPパターンはキーをもとにデータをグループ化する。データ内に同じキーを持つレコードが複数ある場合はそれらを1つのグループとしてまとめる。キー並列に分類される。
- JOINパターンは複数のデータのそれぞれのレコードについて同じキーを持つ者同士を結合する。キー並列に分類される。
- SORTパターンはキーの順序に従いデータを並べ替える。関連有に分類される。
- CROSSパターンは複数のデータに含まれるすべてのレコードそれぞれのすべての組み合わせを作成する。3つの分類には含まれない。
Spark
Sparkとは何なのか
- 高速かつ汎用的であることを目標に設計されたクラスタコンピューティングプラットフォーム
- MapReduceのモデルを拡張し、インタラクティブなクエリやストリーム処理を含むより多くの処理の演算処理を効率的にサポート
- バッチ、インタラクティブなアルゴリズム、ストリーミングなど幅広い処理をカバーしており、さまざまな種類の処理を組み合わせることが可能
- 演算をオンメモリで行う機能があるが、ディスク上で動作する複雑なアプリケーションの場合でもMapReduceよりも効率が良い
- Python,Java,Scala,SQL,そして多彩な組み込みライブラリによって,シンプルなAPIが提供されている
- SparkはHadoopクラスタがなくても独立して動作することができ、HadoopのHDFSやYARNを使用せずに、スタンドアロンモードや他のクラスタマネージャ(例えば、Apache MesosやKubernetes)を使用して実行することが可能
- SparkはHadoopクラスタ上で実行させることもでき、Cassandraを含む、任意のHadoopのデータソースにアクセスできる
Sparkのスタック
Spark Core
- タスクスケジューリング、メモリ管理、障害回復、ストレージシステムとのやり取り等のコンポーネントを含む、Sparkの基本的な機能がある
- 耐障害性分散データセット(RDD)を定義しているAPIの本拠地でもある。
- RDDは多くのコンピュートノードにまたがって分散配置されているアイテムのコレクションを表現するもので、並列に処理可能である
- コレクションを構築して操作するためのAPIを大量に提供
Spark SQL
- 構造化データを扱うためのパッケージ。SQL,Hive Query Language(HQL)と呼ばれるHive用のSQLの変種を使ってデータに対するクエリを実行できる。
- Hiveのテーブル、Parquet、JSONを含む、多くのデータソースをサポート
- RDDをPython,Java,Scalaから扱う場合にサポートされているプログラムからのデータ操作を開発者がSQLと組み合わせることができるようになるので、SQLを複雑な分析と結合できるようになる
Spark Streaming
- データのライブストリームの処理を実現するコンポーネント
- Spark CoreのRDD APIとほぼぴったり一致するデータストリームの操作のためのAPIを提供
- APIの下位層では、Spark Coreと同等のフォールトトレランス、スループット、スケーラビリティを持つように設計
MLlib
- 一般的な機械学習の機能を含むライブラリ
- 分類、回帰、クラスタリング、協調フィルタリングを含む複数の種類の機械学習のアルゴリズムを提供
- モデルの評価、データのインポートといった支援機能ももつ
- 一般的な勾配降下最適化アルゴリズムを含む、低レベルのMLプリミティブ群も提供。
- これらのメソッド群はすべてクラスタに対してスケールアウトできるように設計されている
GraphX
- グラフに対して並列に演算処理を実行するライブラリ
- Spark RDD APIを拡張したものであり、有効グラフを生成し、それぞれの端点や辺に任意の属性を与えることがか可能
- グラフを操作するための様々な演算子、グラフアルゴリズムのライブラリも提供
クラスタマネージャ
- Sparkは内部的には数1000台に及ぶ演算ノードまで効率的にスケールアウト可能
- Sparkはさまざまなクラスタマネージャ上で動作する
- HadoopのYARN、Apache Mesos、Kubernetes、Spark自身が持つStandalone Schedulerと呼ばれるシンプルなクラスタマネージャに対応
Sparkのストレージ層
- Hadoop分散ファイルシステム(HDFS)やHadoopのAPIがサポートしているその他のストレージシステム(ローカルファイルシステム、S3,Cassandra、Hive、HBaseなどを含む)に保存された任意のファイルから分散データセットを生成可能
- SparkはHadoopを必要としない。単に、SparkはHadoopのAPIを実装しているストレージシステムをサポートしているというだけ。
- データソースとしてテキストファイル、SequenceFile、Avro、Parquet、あるいはその他のHadoopのInputFormatをサポートしている
SparkのPythonおよびScalaシェル
- Sparkにはインタラクティブシェルが付属しており、それを使ってアドホックなデータ解析を行うことができる
- Sparkのシェルでは多くのマシン上のディスクやメモリに分散配置されているデータを扱うことができ、処理の分散についてはSparkが自動的に面倒を見てくれる
- ワーカーノード群のメモリにデータをロードできるので、多くの分散処理は数秒以内に実行可能
- Python、Scalaのシェルを提供しており、これらはクラスタへの接続をサポートするよう拡張されている
- Sparkでは演算処理を分散コレクションに対する処理の並びとして表現していく。この処理は自動的にクラスタ内で並列化される。これらのコレクションは耐障害性分散データセット、あるいはRDDと呼ばれる。
Sparkの中核となっている概念
- Sparkのアプリケーションにはクラスタ上で複数の並列操作を起動するドライバプログラム(例えばSparkシェル)が含まれる
- ドライバプログラムはアプリケーションのmain関数を持ち、クラスタ上の分散データセットを定義してから、それらに対して操作を適用する
- ドライバプログラムはSparkContextオブジェクトを通じてSparkにアクセスする
- SparkContextは演算クラスタへの接続を表現する。シェルの中ではSparkContextはscという変数として自動的に生成される
- SparkContextがあれば、それを使ってRDDを構築できる
- RDDに対する操作を実行するために、ドライバプログラムはエクゼキュータと呼ばれるノードの数を管理する
- SparkのAPIの多くの中心的な動作は、演算子に渡された関数(例えばfilterのような関数ベースの操作)をクラスタ上で並列に実行することである。すなわち、Sparkは自動的に関数を選択し、それをエクゼキュータノードに転送する
スタンドアローンのアプリケーション
- Sparkをスタンドアローンアプリケーションで使う場合は自分でSparkContextを初期化する必要がある
- Sparkをリンクするには、Java,Scalaの場合はMavenの依存関係をspark-coreアーティファクトに設定する
- PythonのスクリプトはSparkに含まれているbin/spark-submitスクリプトを使って実行する。このスクリプトはSparkのPython APIが動作する環境をセットアップする
- java,scalaのアプリケーションのビルドの際にSpark Coreへの依存性はprovidedとする。これはワーカーのクラスパス上にspark-coreのJARが存在しているので、ビルドの時点ではインクルードしないようにするためである。アプリケーションのjarはspark-submitスクリプトを使って実行する。
Comments