Skip to content

Commit 4b361a8

Browse files
authored
Modified Scala Getting Started example to work both locally and on MSF (aws-samples#62)
Scala getting started working locally and on MSF (downgraded to Scala 2.12)
1 parent 2cd1e71 commit 4b361a8

File tree

6 files changed

+105
-51
lines changed

6 files changed

+105
-51
lines changed

scala/GettingStarted/README.md

+16-7
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Skeleton project for a basic Flink Java application to run on Amazon Managed Service for Apache Flink.
44

55
* Flink version: 1.20
6-
* Scala version: 3.3.0
6+
* Scala version: 2.12.20
77
* Flink API: DataStream API
88
* Language: Java (11)
99

@@ -12,11 +12,13 @@ The project can run both on Amazon Managed Service for Apache Flink, and locally
1212
The application shows how to get runtime configuration, and sets up a Kinesis Data Stream source and a sink.
1313

1414
### Pre-requisites
15+
1516
You need to have `sbt` tool installed on you machine to build a Scala project. Use [steps from official guide](https://www.scala-sbt.org/download.html) to do that.
1617

1718
### Build
19+
1820
- Run `sbt assembly` to build an uber jar
19-
- Use `target/scala-3.3.0/ScalaGettingStarted-flink_1.20.jar` in your MSF application
21+
- Use `target/scala-2.12/ScalaGettingStarted-flink_1.20.jar` in your MSF application
2022

2123
### Runtime configuration
2224

@@ -25,12 +27,19 @@ Apache Flink.
2527

2628
The following runtime properties are required:
2729

28-
| Group ID | Key | Description |
29-
|----------------|---------------|--------------------------|
30-
| `InputStream0` | `stream.name` | Name of the input stream |
31-
| `InputStream0` | `aws.region` | Region of the input stream, e.g. `us-east-1` |
30+
| Group ID | Key | Description |
31+
|-----------------|---------------|-----------------------------------------------|
32+
| `InputStream0` | `stream.name` | Name of the input stream |
33+
| `InputStream0` | `aws.region` | Region of the input stream, e.g. `us-east-1` |
34+
| `OutputStream0` | `stream.name` | Name of the output stream |
35+
| `OutputStream0` | `aws.region` | Region of the output stream, e.g. `us-east-1` |
36+
37+
38+
### Running in IntelliJ
39+
40+
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
3241

33-
Runtime Properties are expected in the Group IDs `InputStream0` and `OutputStream0`.
42+
See [Running examples locally](../../java/running-examples-locally.md) for details.
3443

3544
### Generating data
3645

scala/GettingStarted/build.sbt

+9-2
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,28 @@ lazy val root = (project in file(".")).
44
settings(
55
name := "getting-started-scala",
66
version := "1.0",
7-
scalaVersion := "3.3.0",
8-
mainClass := Some("com.amazonaws.services.kinesisanalytics.main"),
7+
scalaVersion := "2.12.20",
8+
mainClass := Some("com.amazonaws.services.msf.BasicStreamingJob"),
99
javacOptions ++= Seq("-source", "11", "-target", "11")
1010
)
1111

1212
val jarName = "ScalaGettingStarted-flink_1.20.jar"
1313
val flinkVersion = "1.20.0"
1414
val msfRuntimeVersion = "1.2.0"
1515
val connectorVersion = "4.3.0-1.19"
16+
val log4jVersion = "2.17.2"
1617

1718
libraryDependencies ++= Seq(
1819
"com.amazonaws" % "aws-kinesisanalytics-runtime" % msfRuntimeVersion,
1920
"org.apache.flink" % "flink-connector-kinesis" % connectorVersion,
2021
"org.apache.flink" % "flink-streaming-java" % flinkVersion % "provided",
22+
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
2123
"org.apache.flink" % "flink-connector-base" % flinkVersion % "provided",
24+
"org.apache.flink" % "flink-clients" % flinkVersion % "provided",
25+
// Logging
26+
"org.apache.logging.log4j" % "log4j-slf4j-impl" % log4jVersion % "provided",
27+
"org.apache.logging.log4j" % "log4j-api" % log4jVersion % "provided",
28+
"org.apache.logging.log4j" % "log4j-core" % log4jVersion % "provided",
2229
)
2330

2431
artifactName := { (_: ScalaVersion, _: ModuleID, _: Artifact) => jarName }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[
2+
{
3+
"PropertyGroupId": "InputStream0",
4+
"PropertyMap": {
5+
"aws.region": "us-east-1",
6+
"stream.name": "ExampleInputStream"
7+
}
8+
},
9+
{
10+
"PropertyGroupId": "OutputStream0",
11+
"PropertyMap": {
12+
"aws.region": "us-east-1",
13+
"stream.name": "ExampleOutputStream"
14+
}
15+
}
16+
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
rootLogger.level = INFO
2+
rootLogger.appenderRef.console.ref = ConsoleAppender
3+
4+
appender.console.name = ConsoleAppender
5+
appender.console.type = CONSOLE
6+
appender.console.layout.type = PatternLayout
7+
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

scala/GettingStarted/src/main/scala/com/amazonaws/services/kinesisanalytics/BasicStreamingJob.scala

-42
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.amazonaws.services.msf
2+
3+
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime
4+
import org.apache.flink.api.common.serialization.SimpleStringSchema
5+
import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink
6+
import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, StreamExecutionEnvironment}
7+
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
8+
9+
import java.util
10+
import java.util.Properties
11+
12+
13+
object BasicStreamingJob {
14+
15+
private val streamNameKey = "stream.name"
16+
private val defaultInputStreamName = "ExampleInputStream"
17+
private val defaultOutputStreamName = "ExampleOutputStream"
18+
private val localApplicationPropertiesResource = "/flink-application-properties-dev.json"
19+
20+
private def getApplicationProperties(env: StreamExecutionEnvironment): util.Map[String, Properties] = {
21+
env match {
22+
case localEnv: LocalStreamEnvironment =>
23+
KinesisAnalyticsRuntime.getApplicationProperties(getClass.getResource(localApplicationPropertiesResource).getPath)
24+
case _ =>
25+
KinesisAnalyticsRuntime.getApplicationProperties
26+
}
27+
}
28+
29+
private def createSource(env: StreamExecutionEnvironment): FlinkKinesisConsumer[String] = {
30+
val applicationProperties = getApplicationProperties(env)
31+
val inputProperties = applicationProperties.get("InputStream0")
32+
33+
new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName),
34+
new SimpleStringSchema, inputProperties)
35+
}
36+
37+
private def createSink(env: StreamExecutionEnvironment): KinesisStreamsSink[String] = {
38+
val applicationProperties = getApplicationProperties(env)
39+
val outputProperties = applicationProperties.get("OutputStream0")
40+
41+
KinesisStreamsSink.builder[String]
42+
.setKinesisClientProperties(outputProperties)
43+
.setSerializationSchema(new SimpleStringSchema)
44+
.setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName))
45+
.setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode))
46+
.build
47+
}
48+
49+
def main(args: Array[String]): Unit = {
50+
val environment = StreamExecutionEnvironment.getExecutionEnvironment
51+
52+
environment
53+
.addSource(createSource(environment))
54+
.sinkTo(createSink(environment))
55+
environment.execute("Flink Streaming Scala Example")
56+
}
57+
}

0 commit comments

Comments
 (0)