Apache ZeppelinでSparkの勉強 ①Podmanでコンテナ起動してランダムフォレストを試す

Apache Sparkの勉強のため、Jupyter NotebookのようなUIでSparkのプログラムを実行できるApache Zeppelinをpodman(windows)で試してみました。以下はdocker-compose.ymlです。

version: '3.8'

services:
  zeppelin:
    image: apache/zeppelin:0.11.2
    container_name: zeppelin
    ports:
      - "8080:8080"
    environment:
      - ZEPPELIN_ADDR=0.0.0.0
      - ZEPPELIN_PORT=8080
      - ZEPPELIN_NOTEBOOK_DIR=/notebook
      - ZEPPELIN_LOG_DIR=/logs
      - SPARK_HOME=/spark
    volumes:
      - ./notebook:/notebook
      - ./logs:/logs
      - ./deps/spark-3.5.1-bin-hadoop3:/spark
      - ./spark-data:/data
    depends_on:
      - spark

  spark:
    image: bitnami/spark:3.5.1
    container_name: spark
    environment:
      - SPARK_MODE=master
      - SPARK_MASTER_HOST=spark
    ports:
      - "7077:7077"
      - "8081:8081"
    volumes:
      - ./spark-data:/data

  spark-worker-1:
    image: bitnami/spark:3.5.1
    container_name: spark-worker-1
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
    depends_on:
      - spark
    ports:
      - "8082:8081"
    volumes:
      - ./spark-data:/data

  spark-worker-2:
    image: bitnami/spark:3.5.1
    container_name: spark-worker-2
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
    depends_on:
      - spark
    ports:
      - "8083:8081"
    volumes:
      - ./spark-data:/data

spark-dataの下に入出力するファイルを置く想定です。また、depsの下にはZeppelinから使用するSparkをダウンロードして展開しています。使用するSparkのバージョンが3.5.1となっていますが、Zeppelinのソースコードをみたところ、サポートしているバージョンは3.2.0~4未満のようなので、とりあえずこのバージョンにしてます。ワーカー(executor instance)の数としては2つにしています。

podman-compose up -dで起動しますが、Sparkインタープリタの設定を永続化したかったので、以下のように設定を変更するためのバッチファイルを用意しました。オリジナルの設定ファイルはzeppelinコンテナ内の/opt/zeppelin/conf/interpreter.jsonにあるので、これを最初にホストにコピーしておいて、必要に応じて編集してこのバッチファイルを実行する、という使い方になります。直接マウントするとなぜかエラーが出てしまうので、コンテナに毎回コピーするようにしてます。

podman machine ssh "podman cp /mnt/c/zeppelin/interpreter.json zeppelin:/opt/zeppelin/conf/interpreter.json"
podman-compose restart zeppelin

以下はカスタマイズした設定ファイルの一部の例です。

        "spark.master": {
          "name": "spark.master",
          "value": "spark://spark:7077",
          "type": "string",
          "description": "Spark master uri. local | yarn-client | yarn-cluster | spark master address of standalone mode, ex) spark://master_host:7077"
        },
        "spark.executor.instances": {
          "name": "spark.executor.instances",
          "value": "2",
          "type": "number",
          "description": "The number of executors for static allocation."
        },
        "spark.jars": {
          "name": "spark.jars",
          "value": "file:///data/jar/myjar.jar",
          "type": "string",
          "description": "Comma-separated list of jars to include on the driver and executor classpaths. Globs are allowed."
        },

spark.masterにはstandalone modeで起動しているspark masterを指定しています(デフォルトだとlocal[*]になる)。spark.executor.instancesでは実際に実行するときに使用するワーカー数を指定してます。spark.jarsはプログラムから使用するjar(ホスト側のspark-data/jarの下に置いている)を指定しています(myjar.jarは自作したプログラム)。カンマ区切りで複数指定可能です。

コンテナを起動すると、http://localhost:8080をブラウザで開けばZeppelinの画面が表示されます。create new noteでノートブックを作成し、セルにプログラムを入力して実行すれば、結果がブラウザ上で確認できます。

例えば、以下はIrisのデータセット(こちらから入手して最初のカラム名の行を"SepalLength","SepalWidth","PetalLength","PetalWidth","Name"に変更したものを使用)をランダムフォレストで分類するモデルを学習、評価するサンプルプログラム(scala)です。

%spark
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorAssembler}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

// Irisデータセットのパス
val dataFilePath = "/data/iris.csv"

// データを読み込む
val data = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(dataFilePath)

// 特徴量をベクトルにまとめる
val assembler = new VectorAssembler().setInputCols(Array("SepalLength", "SepalWidth", "PetalLength", "PetalWidth")).setOutputCol("features")
val assembledData = assembler.transform(data)

// ターゲット変数(label)を数値に変換する
val indexer = new StringIndexer().setInputCol("Name").setOutputCol("indexedLabel").fit(assembledData)
val indexedData = indexer.transform(assembledData)

// データを訓練データとテストデータに分割する
val Array(trainingData, testData) = indexedData.randomSplit(Array(0.7, 0.3))

// ランダムフォレスト分類器を作成
val rf = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("features")

// パラメータグリッドを作成
val paramGrid = new ParamGridBuilder().addGrid(rf.numTrees, Array(10, 20, 30)).addGrid(rf.maxDepth, Array(5, 10)).build()

// モデル評価用のEvaluatorを作成
val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")

// クロスバリデーションを設定
val cv = new CrossValidator().setEstimator(rf).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

// モデルを訓練
val model = cv.fit(trainingData)

// テストデータで予測を行う
val predictions = model.transform(testData)

// 予測結果の評価
val accuracy = evaluator.evaluate(predictions)

println(s"Test Accuracy = $accuracy")

実行が成功したらTest Accuracy = 0.9428571428571428のように表示されると思います(数値は毎回異なるようです)。ちなみにうちのPC(Ryzen 7 3700X 8コア/16スレッド、32GBメモリ)だと1分14秒かかりました。あと、4つもコンテナを起動しているのでメモリをかなり消費するようです。

Comments

Copied title and URL