<

Apache Beam + Kotlin Development Practical Introduction

Hi there, Re:Zero 2nd season has started 👏, @silverbirder. Recently, I've been using Apache Beam + Kotlin for my work. I didn't know any of those technologies, so I'll write about what I learned in this article ✍️.

A sample repository is included below.

https://github.com/silverbirder/apache-beam-kotlin-example/tree/master/src/main/kotlin

What is Apache Beam?

https://www.st-hakky-blog.com/entry/2020/04/29/172220

** Apache Beam is a data pipeline that can realize Batch and Streaming as a single pipeline process **. (Batch + Stream → Beam)

Language choices include Java, Python, and Go (experimental). The environment that runs on the pipeline is called a runner, and includes Cloud Dataflow, Apache Flink, and Apache Spark.

Streaming processing tends to be bottlenecked by server capacity. The use of Cloud Dataflow, a managed service from GCP, eliminates this problem.

If you want to use rich analysis libraries such as machine learning, choose Python; if you want type-safe development, choose Java.

In this case, we chose Java. We will code in Kotlin, which can be written in a modern way.

Setup

The software version is as follows

$ java -version
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_252-b09)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.252-b09, mixed mode)

It uses intelliJ as its IDE and has the Kotlin SDK (1.3.72) built in.

$ git clone https://github.com/silverbirder/apache-beam-kotlin-example.git && cd apache-beam-kotlin-example
$ . /gradlew build

Pipeline Processing Overview

1. input data (input → PCollection)
2. transform the input data (PCollection → PTransform → PCollection)
3. output the processed data (PCollection → output)

Think of PCollection as a single dataset.

Proceeding with a common sample code WordCount as an example The following is an example of a common sample code WordCount.

Originally, there was [ApacheBeam official WordCount](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/ WordCount simply extracts words from a sentence and counts them.

The main code is here. When you want to run it, debug it from the IDE. (This area is omitted. See the Makefile for details 🙇‍♂️)

@JvmStatic
fun main(args: Array<String>) {
    val options = (PipelineOptionsFactory.fromArgs(*args).withValidation(). `as`(WordCountOptions::class.java))
    runWordCount(options))
}

@JvmStatic
fun runWordCount(options: WordCountOptions) {
    // create pipeline (empty)
    val p = Pipeline.create(options)

         // input data from Text file → PCollection
        p.apply("ReadLines", TextIO.read().from(options.inputFile))

        // transform PCollection with PTransform
        .apply(CountWords())
        .apply(MapElements.via(FormatAsTextFn()))

        // output data (PCollection) to Text file
        .apply<PDone>("WriteCounts", TextIO.write().to(options.output))

    // run the pipeline
    p.run().waitUntilFinish()
}

PTransform

The following is a sample code for PTransform, the core of Apache Beam.

ParDo

ParDo allows you to process PCollection as you wish. It is the most flexible way to write a process.

    // Transformation process by PTransform
    public class CountWords : PTransform<PCollection<String>, PCollection<KV<String, Long>>>() {
        override fun expand(lines: PCollection<String>): PCollection<KV<String, Long>>> {

            // Split a sentence into words
            val words = lines.apply(ParDo.of(ExtractWordsFn()))

            // Count the split words
            val wordCounts = words.apply(Count.perElement())
            return wordCounts
        }
    }

    public class ExtractWordsFn : DoFn<String, String>() {
        @ProcessElement
        fun processElement(@Element element: String, receiver: DoFn.OutputReceiver<String>) {
        ...
    }

GroupByKey

Key-Value (KV) PCollection is grouped by Key.

Iterable as JavaIterable

// PCollection<KV<String, Long>>>
val wordCounts = words.apply(Count.perElement())

// PCollection<KV<String, JavaIterable<Long>>>>
val groupByWord = wordCounts.apply(GroupByKey.create<String, Long>()) as PCollection<KV<String, JavaIterable<Long>>>>

Kotlin does not allow Iterable to work, so Java's Iterable must be used.

Flatten

Combines multiple PCollections into a single PCollection.

// PCollection<KV<String, Long>>>
val wordCounts = words.apply(Count.perElement())

// PCollectionList<KV<String, Long>>>
val wordCountsDouble = PCollectionList.of(wordCounts).and(wordCounts)

// PCollection<KV<String, Long>>>
val flattenWordCount = wordCountsDouble.apply(Flatten.pCollections())

Combine

Combines elements of a PCollection. There are two ways to join elements: one is to join elements by GroupByKey key, and the other is to join elements by PCollection. The following is a sample code for GroupByKey.

// PCollection<KV<String, Long>>>
val wordCounts = words.apply(Count.perElement())

// PCollection<KV<String, Long>>>
val sumWordsByKey = wordCounts.apply(Sum.longsPerKey())

Partition

Partition PCollection by any number.

// PCollection<KV<String, Long>>>
val wordCounts = words.apply(Count.perElement())

// PCollection<KV<String, Long>>>
var 10wordCounts = wordCounts.apply(Partition.of(10, PartitionFunc()))

Streaming and Windowing

Using pipelines as they are is called Batch execution. Batch is used for finite data, while Streaming is used for infinite data. For processing infinite data, Windowing is used to cut the infinite into finite data and process it.

For Streaming processing, the code should be as follows

    @JvmStatic
    fun main(args: Array<String>) {
        val options = (PipelineOptionsFactory.fromArgs(*args).withValidation(). `as`(WordCountOptions::class.java))
        runWordCount(options))
    }

    @JvmStatic
    fun runWordCount(options: WordCountOptions) {
        val p = Pipeline.create(options)
        p.apply("ReadLines",
                        TextIO
                        .read()
                        .from(". /src/main/kotlin/*.json")
                        // monitor for files specified in from. (input value is infinite)
                        // Watch every 10 seconds, if no changes for 5 minutes, exit.
                        .watchForNewFiles(standardSeconds(10), afterTimeSinceNewOutput(standardMinutes(5))
             )

            // Windowing every 30 seconds. (Cutting infinite data into finite data)
            .apply(Window.into<String>(FixedWindows.of(standardSeconds(30))))
            .apply(CountWords())
            .apply(MapElements.via(FormatAsTextFn()))
            .apply<PDone>("WriteCounts", TextIO.write().to(options.output).withWindowedWrites().withNumShards(1))
        p.run().waitUntilFinish())
    }

Test Code

You can also write test code for Apache Beam. Sample code is here.

You can test the pipeline to be executed by setting it to TestPipeline.

import org.apache.beam.sdk.testing.TestPipeline

fun countWordsTest() {
        // Arrange
        val p: Pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false)
        val input: PCollection<String> = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of())
        val output: PCollection<KV<String, Long>>? = input.apply(CountWords())

        // Act
        p.run()

        // Assert
        PAssert.that<KV<String, Long>>(output).containsInAnyOrder(COUNTS_ARRAY)
    }

    companion object {
        val WORDS: List<String> = listOf(
            "hi there", "hi", "hi sue bob", "", "bob hi
            "hi sue", "", "bob hi"
        )
        val COUNTS_ARRAY = listOf(
            KV.of("hi", 5L), KV.of("there", 5L), KV.of("there", 5L)
            KV.of("there", 2L), KV.of("there", 2L), KV.of("there", 2L)
            KV.of("sue", 2L),
            KV.of("bob", 2L)
        )
    }

End

Apache Beam also offers Side input and Additional outputs. We will continue to work hard to make it easier to use!

Now let's watch Re:Zero 2nd season 👍

If it was helpful, support me with a ☕!

Share

Related tags