Apache ZeppelinでSparkの勉強 ②MinIOに保存しているCSVファイルをリードしてみた

sparkのプログラムからMinIOに保存しているファイルにアクセスできると便利なので、設定してみました。Sparkインタープリタの設定に以下を追加します。

        "spark.hadoop.fs.s3a.endpoint": {
          "name": "spark.hadoop.fs.s3a.endpoint",
          "value": "http://minio:9000",
          "type": "string",
          "description": "minio endpoint"
        },
        "spark.hadoop.fs.s3a.access.key": {
          "name": "spark.hadoop.fs.s3a.access.key",
          "value": "youraccesskey",
          "type": "string",
          "description": "minio access key"
        },
        "spark.hadoop.fs.s3a.secret.key": {
          "name": "spark.hadoop.fs.s3a.secret.key",
          "value": "yoursecretkey",
          "type": "string",
          "description": "minio secret key"
        },
        "spark.hadoop.fs.s3a.path.style.access": {
          "name": "spark.hadoop.fs.s3a.path.style.access",
          "value": true,
          "type": "checkbox",
          "description": "use path style access"
        },
        "spark.hadoop.fs.s3a.impl": {
          "name": "spark.hadoop.fs.s3a.impl",
          "value": "org.apache.hadoop.fs.s3a.S3AFileSystem",
          "type": "string",
          "description": "s3a implemtent class"
        },
        "spark.jars.packages": {
          "name": "spark.jars.packages",
          "value": "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.1034",
          "type": "string",
          "description": "Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths. The coordinates should be groupId:artifactId:version. If spark.jars.ivySettings is given artifacts will be resolved according to the configuration in the file, otherwise artifacts will be searched for in the local maven repo, then maven central and finally any additional remote repositories given by the command-line option --repositories."
        }

MinIOサーバのエンドポイントや認証で必要となる情報などを設定しています。あと、最後のspark.jars.packagesで必要なパッケージをMaven Repositoryから取得しています。

例えばZeppelin上で以下を実行すると、MinIO上にあるCSVファイルを読み込んでDataFrameを生成します。

%spark

val df = spark.read
    .option("multiLine", "true")
    .option("delimiter", ",")
    .option("quote", "\"")
    .option("escape", "\"")
    .csv("s3a://dataset/articles.csv")
    .toDF("url", "title","publish_date", "article_body")
    .dropDuplicates("url")
    .orderBy(desc("publish_date"))
    .limit(100)

このCSVはurlカラムのURLにある記事データを集めたものですが、同じURLの行があるため、dropDuplicatesで重複をなくしてます。また、orderByによって投稿日時(publish_date)が最近のもの順に並べなおして、さらにlimitで最大100件までに制限してます。

Comments

Copied title and URL