参考資料
https://spotify.github.io/scio/examples/TemplateExample.scala.html
Hello World
build.sbt
name := "app" version := "0.1" scalaVersion := "2.13.7" resolvers += "confluent" at "https://packages.confluent.io/maven/" libraryDependencies ++= Seq( "com.spotify" %% "scio-core" % "0.11.1", "com.spotify" %% "scio-test" % "0.11.1" % "test", "com.spotify" %% "scio-google-cloud-platform" % "0.11.1", "org.apache.beam" % "beam-runners-direct-java" % "2.33.0", "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % "2.33.0", "ch.qos.logback" % "logback-classic" % "1.2.6", "org.slf4j" % "slf4j-api" % "1.7.32" )%
import com.spotify.scio.ContextAndArgs import java.time.Instant import com.spotify.scio.bigtable._ import com.spotify.scio.pubsub._ import com.spotify.scio.coders.Coder import com.spotify.scio.values.SCollection import com.google.bigtable.v2.Mutation import com.google.protobuf.ByteString import com.spotify.scio.bigtable.Mutations import org.apache.beam.runners.dataflow.DataflowRunner import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO import org.apache.beam.sdk.options.Validation.Required import org.apache.beam.sdk.options.{Description, PipelineOptions, PipelineOptionsFactory, StreamingOptions, ValueProvider} object App { trait Options extends PipelineOptions with StreamingOptions { } def main(args: Array[String]): Unit = { println("hello") println(args.mkString) val (sc, params) = ContextAndArgs(args) PipelineOptionsFactory.register(classOf[Options]) val options = PipelineOptionsFactory .fromArgs(args: _*) .withValidation .as(classOf[Options]) options.setStreaming(true) val inputIO = PubsubIO.readStrings().fromSubscription("projects/foobar-project/subscriptions/hello_sub") sc.customInput("input", inputIO).map(t => println(t)) sc.run() } }
コマンド
sbt "run --project=foobar-project \ --runner=DataflowRunner \ --zone=asia-northeast1 \ --region=asia-northeast1 \ --stagingLocation=gs://foobar-project-dataflow/stg \ --templateLocation=gs://foobar-project-dataflow/tmp-hello \ " gcloud dataflow jobs run hello \ --region=asia-northeast1 \ --gcs-location=gs://foobar-project-dataflow/tmp-hello \