開發 - Scala Spark Client

支援環境

目前 Spark 整合僅在 on-prem 及 AWS 環境下支援

Installation

你可以從下方連結下載 Jar

Quick Start

Setup Project

Folders

/cannerflow-client-test
├── lib
│ └── cannerflow-spark-client-assembly_<version>.jar
├── src/main/scala/test
│ └── QuickStart.scala
├── project
│ └── build.properties
└── build.sbt

請確保下載的 Jar 放置到 /lib

/build.sbt
name := "cannerflow-client-test"
version := "0.1"
scalaVersion := "2.12.12"
mainClass in (Compile, run) := Some("test.QuickStart")
libraryDependencies += "com.cannerflow" % "cannerflow-spark-client" % "1.0.0" from "file:///<path-to-client-test>/cannerflow-client-test/lib"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"

Count 範例程式

如果您使用 CannerFlow distributed mode on AWS, 您需創建一個 AWS User,其權限需可存取 CannerFlow 的 S3 Bucket,再將此使用者的 <AWS_ACCESS_KEY_ID>, <AWS_SECRET_ACCESS_KEY> 放到程式碼中使用

如果您使用 Standalone mode. 請跟管理員拿取 <AWS_ACCESS_KEY_ID>, <AWS_SECRET_ACCESS_KEY>

/src/main/scala/test/QuickStart.scala
package test
import org.apache.spark.sql.SparkSession
import com.cannerflow.Cannerflow
object QuickStart {
val TOKEN = "<CANNERFLOW_PERSONAL_ACCESS_TOKEN>"
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
// if you're using standalone, please specify the endpoint to object storage url.
// if you're using ssl and encounter certification issue, please add `-Dcom.amazonaws.sdk.disableCertChecking=true` to avoid this problem
//.config("spark.hadoop.fs.s3a.endpoint", "https://<my-endpoint-name>:9000")
.config("spark.hadoop.fs.s3a.access.key", "<AWS_ACCESS_KEY_ID>")
.config("spark.hadoop.fs.s3a.secret.key", "<AWS_SECRET_ACCESS_KEY>")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").master("local[*]").getOrCreate()
// entrypoint is the hostname of CannerFlow
val entrypoint = "https://test.cannerflow.your.host.com"
val workspace = "<workspace ID>"
val cannerflow = Cannerflow(entrypoint, TOKEN, workspace, spark)
// Executing a sql query on CannerFlow and getting a `DataFrame` back
val df = cannerflow.genQuery("select * from lineitem")
println("----------------------")
print("df count: " + df.count())
println("----------------------")
}
}