Apache Beam + Kotlin Development Practical Introduction
Hi there, Re:Zero 2nd season has started 👏, @silverbirder. Recently, I've been using Apache Beam + Kotlin for my work. I didn't know any of those technologies, so I'll write about what I learned in this article ✍️.
A sample repository is included below.
https://github.com/silverbirder/apache-beam-kotlin-example/tree/master/src/main/kotlin
What is Apache Beam?
https://www.st-hakky-blog.com/entry/2020/04/29/172220
** Apache Beam is a data pipeline that can realize Batch and Streaming as a single pipeline process **. (Batch + Stream → Beam)
Language choices include Java, Python, and Go (experimental). The environment that runs on the pipeline is called a runner, and includes Cloud Dataflow, Apache Flink, and Apache Spark.
Streaming processing tends to be bottlenecked by server capacity. The use of Cloud Dataflow, a managed service from GCP, eliminates this problem.
If you want to use rich analysis libraries such as machine learning, choose Python; if you want type-safe development, choose Java.
In this case, we chose Java. We will code in Kotlin, which can be written in a modern way.
Setup
The software version is as follows
$ java -version
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_252-b09)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.252-b09, mixed mode)
It uses intelliJ as its IDE and has the Kotlin SDK (1.3.72) built in.
$ git clone https://github.com/silverbirder/apache-beam-kotlin-example.git && cd apache-beam-kotlin-example
$ . /gradlew build
Pipeline Processing Overview
1. input data (input → PCollection)
2. transform the input data (PCollection → PTransform → PCollection)
3. output the processed data (PCollection → output)
Think of PCollection as a single dataset.
Proceeding with a common sample code WordCount as an example The following is an example of a common sample code WordCount.
Originally, there was [ApacheBeam official WordCount](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/ WordCount simply extracts words from a sentence and counts them.
The main code is here. When you want to run it, debug it from the IDE. (This area is omitted. See the Makefile for details 🙇♂️)
@JvmStatic
fun main(args: Array<String>) {
val options = (PipelineOptionsFactory.fromArgs(*args).withValidation(). `as`(WordCountOptions::class.java))
runWordCount(options))
}
@JvmStatic
fun runWordCount(options: WordCountOptions) {
// create pipeline (empty)
val p = Pipeline.create(options)
// input data from Text file → PCollection
p.apply("ReadLines", TextIO.read().from(options.inputFile))
// transform PCollection with PTransform
.apply(CountWords())
.apply(MapElements.via(FormatAsTextFn()))
// output data (PCollection) to Text file
.apply<PDone>("WriteCounts", TextIO.write().to(options.output))
// run the pipeline
p.run().waitUntilFinish()
}
PTransform
The following is a sample code for PTransform, the core of Apache Beam.
ParDo
ParDo allows you to process PCollection as you wish. It is the most flexible way to write a process.
// Transformation process by PTransform
public class CountWords : PTransform<PCollection<String>, PCollection<KV<String, Long>>>() {
override fun expand(lines: PCollection<String>): PCollection<KV<String, Long>>> {
// Split a sentence into words
val words = lines.apply(ParDo.of(ExtractWordsFn()))
// Count the split words
val wordCounts = words.apply(Count.perElement())
return wordCounts
}
}
public class ExtractWordsFn : DoFn<String, String>() {
@ProcessElement
fun processElement(@Element element: String, receiver: DoFn.OutputReceiver<String>) {
...
}
GroupByKey
Key-Value (KV) PCollection is grouped by Key.
Iterable as JavaIterable
// PCollection<KV<String, Long>>>
val wordCounts = words.apply(Count.perElement())
// PCollection<KV<String, JavaIterable<Long>>>>
val groupByWord = wordCounts.apply(GroupByKey.create<String, Long>()) as PCollection<KV<String, JavaIterable<Long>>>>
Kotlin does not allow Iterable to work, so Java's Iterable must be used.
Flatten
Combines multiple PCollections into a single PCollection.
// PCollection<KV<String, Long>>>
val wordCounts = words.apply(Count.perElement())
// PCollectionList<KV<String, Long>>>
val wordCountsDouble = PCollectionList.of(wordCounts).and(wordCounts)
// PCollection<KV<String, Long>>>
val flattenWordCount = wordCountsDouble.apply(Flatten.pCollections())
Combine
Combines elements of a PCollection. There are two ways to join elements: one is to join elements by GroupByKey key, and the other is to join elements by PCollection. The following is a sample code for GroupByKey.
// PCollection<KV<String, Long>>>
val wordCounts = words.apply(Count.perElement())
// PCollection<KV<String, Long>>>
val sumWordsByKey = wordCounts.apply(Sum.longsPerKey())
Partition
Partition PCollection by any number.
// PCollection<KV<String, Long>>>
val wordCounts = words.apply(Count.perElement())
// PCollection<KV<String, Long>>>
var 10wordCounts = wordCounts.apply(Partition.of(10, PartitionFunc()))
Streaming and Windowing
Using pipelines as they are is called Batch execution. Batch is used for finite data, while Streaming is used for infinite data. For processing infinite data, Windowing is used to cut the infinite into finite data and process it.
For Streaming processing, the code should be as follows
@JvmStatic
fun main(args: Array<String>) {
val options = (PipelineOptionsFactory.fromArgs(*args).withValidation(). `as`(WordCountOptions::class.java))
runWordCount(options))
}
@JvmStatic
fun runWordCount(options: WordCountOptions) {
val p = Pipeline.create(options)
p.apply("ReadLines",
TextIO
.read()
.from(". /src/main/kotlin/*.json")
// monitor for files specified in from. (input value is infinite)
// Watch every 10 seconds, if no changes for 5 minutes, exit.
.watchForNewFiles(standardSeconds(10), afterTimeSinceNewOutput(standardMinutes(5))
)
// Windowing every 30 seconds. (Cutting infinite data into finite data)
.apply(Window.into<String>(FixedWindows.of(standardSeconds(30))))
.apply(CountWords())
.apply(MapElements.via(FormatAsTextFn()))
.apply<PDone>("WriteCounts", TextIO.write().to(options.output).withWindowedWrites().withNumShards(1))
p.run().waitUntilFinish())
}
Test Code
You can also write test code for Apache Beam. Sample code is here.
You can test the pipeline to be executed by setting it to TestPipeline.
import org.apache.beam.sdk.testing.TestPipeline
fun countWordsTest() {
// Arrange
val p: Pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false)
val input: PCollection<String> = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of())
val output: PCollection<KV<String, Long>>? = input.apply(CountWords())
// Act
p.run()
// Assert
PAssert.that<KV<String, Long>>(output).containsInAnyOrder(COUNTS_ARRAY)
}
companion object {
val WORDS: List<String> = listOf(
"hi there", "hi", "hi sue bob", "", "bob hi
"hi sue", "", "bob hi"
)
val COUNTS_ARRAY = listOf(
KV.of("hi", 5L), KV.of("there", 5L), KV.of("there", 5L)
KV.of("there", 2L), KV.of("there", 2L), KV.of("there", 2L)
KV.of("sue", 2L),
KV.of("bob", 2L)
)
}
End
Apache Beam also offers Side input and Additional outputs. We will continue to work hard to make it easier to use!
Now let's watch Re:Zero 2nd season 👍