scio

参考資料

https://spotify.github.io/scio/examples/TemplateExample.scala.html

Hello World

build.sbt

name := "app"

version := "0.1"

scalaVersion := "2.13.7"

resolvers += "confluent" at "https://packages.confluent.io/maven/"

libraryDependencies ++= Seq(
  "com.spotify" %% "scio-core" % "0.11.1",
  "com.spotify" %% "scio-test" % "0.11.1" % "test",
  "com.spotify" %% "scio-google-cloud-platform" % "0.11.1",
  "org.apache.beam" % "beam-runners-direct-java" % "2.33.0",
  "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % "2.33.0",
  "ch.qos.logback" % "logback-classic" % "1.2.6",
  "org.slf4j" % "slf4j-api" % "1.7.32"
)%

src/main/scala/App.scala

import com.spotify.scio.ContextAndArgs

import java.time.Instant
import com.spotify.scio.bigtable._
import com.spotify.scio.pubsub._
import com.spotify.scio.coders.Coder
import com.spotify.scio.values.SCollection
import com.google.bigtable.v2.Mutation
import com.google.protobuf.ByteString
import com.spotify.scio.bigtable.Mutations
import org.apache.beam.runners.dataflow.DataflowRunner
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
import org.apache.beam.sdk.options.Validation.Required
import org.apache.beam.sdk.options.{Description, PipelineOptions, PipelineOptionsFactory, StreamingOptions, ValueProvider}

object App {

  trait Options extends PipelineOptions with StreamingOptions {
  }

  def main(args: Array[String]): Unit = {
    println("hello")
    println(args.mkString)

    val (sc, params) = ContextAndArgs(args)

    PipelineOptionsFactory.register(classOf[Options])
    val options = PipelineOptionsFactory
      .fromArgs(args: _*)
      .withValidation
      .as(classOf[Options])
    options.setStreaming(true)

    val inputIO = PubsubIO.readStrings().fromSubscription("projects/foobar-project/subscriptions/hello_sub")
    sc.customInput("input", inputIO).map(t => println(t))

    sc.run()


  }
}

コマンド

sbt "run --project=foobar-project \
--runner=DataflowRunner \
--zone=asia-northeast1 \
--region=asia-northeast1 \
--stagingLocation=gs://foobar-project-dataflow/stg \
--templateLocation=gs://foobar-project-dataflow/tmp-hello \
"

gcloud dataflow jobs run hello \
    --region=asia-northeast1 \
    --gcs-location=gs://foobar-project-dataflow/tmp-hello \