Skip to content

Commit a28f427

Browse files
author
Marc Rooding
committed
Initial commit
0 parents  commit a28f427

File tree

11 files changed

+294
-0
lines changed

11 files changed

+294
-0
lines changed

build.sbt

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
2+
name := "akka-streams-kafka-last-n-minutes"
3+
4+
version := "0.1"
5+
6+
scalaVersion := "2.13.1"
7+
8+
scalacOptions := Seq(
9+
"-encoding", "utf8",
10+
"-target:jvm-1.8",
11+
"-feature",
12+
"-language:implicitConversions",
13+
"-language:postfixOps",
14+
"-unchecked",
15+
"-deprecation",
16+
"-Xlog-reflective-calls"
17+
)
18+
19+
resolvers ++= Seq(
20+
Resolver.mavenCentral
21+
)
22+
23+
val akkaVersion = "2.5.23"
24+
val circeVersion = "0.12.3"
25+
26+
libraryDependencies ++= Seq(
27+
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
28+
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
29+
"com.typesafe.akka" %% "akka-stream-kafka" % "1.0.4",
30+
31+
"io.circe" %% "circe-core" % circeVersion,
32+
"io.circe" %% "circe-generic" % circeVersion,
33+
"io.circe" %% "circe-parser" % circeVersion,
34+
35+
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
36+
37+
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
38+
"org.scalatest" %% "scalatest" % "3.0.8" % Test,
39+
)

project/build.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.version = 1.3.3

src/main/resources/akka-kafka.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
akka.kafka.consumer {
2+
kafka-clients {
3+
enable.auto.commit = false
4+
}
5+
}

src/main/resources/application.conf

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
me.mrooding.blog {
2+
kafka {
3+
group-id: ${?KAFKA_GROUPID}
4+
5+
broker-list: ${?KAFKA_BROKERLIST}
6+
7+
topics {
8+
price: ${?KAFKA_PRICE_TOPIC}
9+
}
10+
}
11+
}
12+
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package me.mrooding.blog
2+
3+
import akka.kafka.ConsumerSettings
4+
import com.typesafe.config.{Config, ConfigFactory}
5+
import me.mrooding.blog.AppConfig.{KafkaConfig, KafkaTopics}
6+
import org.apache.kafka.clients.consumer.ConsumerConfig
7+
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
8+
9+
class AppConfig(
10+
config: Config = ConfigFactory.load().getConfig("me.mrooding.blog"),
11+
akkaKafkaConfig: Config = ConfigFactory.load().getConfig("akka.kafka")
12+
) {
13+
val kafka = KafkaConfig(
14+
groupId = config.getString("kafka.group-id"),
15+
brokerList = config.getString("kafka.broker-list"),
16+
topics = KafkaTopics(
17+
price = config.getString("kafka.topics.price"),
18+
),
19+
bufferSize = config.getInt("kafka.producer.buffer-size")
20+
)
21+
22+
val consumerSettings: ConsumerSettings[String, Array[Byte]] =
23+
ConsumerSettings(akkaKafkaConfig.getConfig("consumer"), new StringDeserializer, new ByteArrayDeserializer)
24+
.withBootstrapServers(kafka.brokerList)
25+
.withGroupId(kafka.groupId)
26+
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
27+
}
28+
29+
object AppConfig {
30+
case class KafkaTopics(price: String)
31+
case class KafkaConfig(groupId: String, brokerList: String, topics: KafkaTopics, bufferSize: Int)
32+
}
33+
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package me.mrooding.blog
2+
3+
import java.time.{Duration, Instant}
4+
5+
import akka.actor.ActorSystem
6+
import akka.kafka.KafkaConsumerActor
7+
import akka.stream.ActorMaterializer
8+
import com.typesafe.scalalogging.LazyLogging
9+
import me.mrooding.blog.kafka.{KafkaPartitionsResolver, KafkaPriceConsumerService}
10+
import me.mrooding.blog.publish.PriceUpdater
11+
12+
import scala.concurrent.ExecutionContextExecutor
13+
import scala.util.{Failure, Success, Try}
14+
15+
object Main extends App with LazyLogging {
16+
implicit val system: ActorSystem = ActorSystem("mune")
17+
implicit val materializer: ActorMaterializer = ActorMaterializer()
18+
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
19+
20+
Try(new AppConfig()) match {
21+
case Success(config) =>
22+
val kafkaConsumer = system.actorOf(KafkaConsumerActor.props(config.consumerSettings))
23+
val kafkaPartitionsResolver = new KafkaPartitionsResolver(kafkaConsumer)
24+
25+
kafkaPartitionsResolver
26+
.retrievePartitionsForTopic(config.kafka.topics.price)
27+
.flatMap { partitions =>
28+
val priceUpdater = system.actorOf(PriceUpdater.props())
29+
val kafkaRFQConsumerService = new KafkaPriceConsumerService(config, partitions, priceUpdater)
30+
31+
val timeHorizon = Duration.ofMinutes(5)
32+
val horizonStart = Instant.now().minus(timeHorizon)
33+
34+
val consumerServiceDone = kafkaRFQConsumerService.start(horizonStart)
35+
36+
consumerServiceDone
37+
.andThen {
38+
case Success(_) =>
39+
logger.info("RFQ consumer service has stopped.")
40+
case Failure(exception) =>
41+
logger.error("RFQ consumer service has stopped due to error.", exception)
42+
}
43+
}
44+
}
45+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package me.mrooding.blog.data
2+
3+
import java.time.Instant
4+
5+
case class Price(isin: String, price: BigDecimal, timestamp: Instant)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package me.mrooding.blog.json
2+
3+
import io.circe.Decoder
4+
import io.circe.generic.semiauto
5+
import me.mrooding.blog.data.Price
6+
7+
trait PriceProtocol {
8+
implicit val priceFromKafkaJson: Decoder[Price] = semiauto.deriveDecoder
9+
}
10+
11+
object PriceProtocol extends PriceProtocol
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package me.mrooding.blog.kafka
2+
3+
import akka.actor.ActorRef
4+
import akka.kafka.Metadata
5+
import akka.kafka.Metadata.GetPartitionsFor
6+
import akka.pattern._
7+
import akka.util.Timeout
8+
import org.apache.kafka.common.PartitionInfo
9+
10+
import scala.concurrent.ExecutionContext.Implicits.global
11+
import scala.concurrent.Future
12+
import scala.concurrent.duration._
13+
14+
class KafkaPartitionsResolver(consumer: ActorRef) {
15+
implicit val timeout: Timeout = Timeout(5 seconds)
16+
17+
def retrievePartitionsForTopic(topic: String): Future[List[PartitionInfo]] = {
18+
val partitionsFor = (consumer ? GetPartitionsFor(topic)).mapTo[Metadata.PartitionsFor]
19+
20+
partitionsFor.flatMap(p => Future.fromTry(p.response))
21+
}
22+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package me.mrooding.blog.kafka
2+
3+
import java.nio.charset.StandardCharsets
4+
import java.time.Instant
5+
6+
import akka.actor.{ActorRef, ActorSystem}
7+
import akka.kafka.Subscriptions
8+
import akka.kafka.scaladsl.Consumer
9+
import akka.kafka.scaladsl.Consumer.Control
10+
import akka.pattern._
11+
import akka.stream.Materializer
12+
import akka.stream.scaladsl.{MergeLatest, Sink, Source}
13+
import akka.util.Timeout
14+
import akka.{Done, NotUsed}
15+
import com.typesafe.scalalogging.LazyLogging
16+
import io.circe.parser.decode
17+
import me.mrooding.blog.AppConfig
18+
import me.mrooding.blog.data.Price
19+
import me.mrooding.blog.publish.PriceUpdater.UpdatePrice
20+
import org.apache.kafka.clients.consumer.ConsumerRecord
21+
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
22+
23+
import scala.collection.SortedMap
24+
import scala.concurrent.duration._
25+
import scala.concurrent.{ExecutionContext, Future}
26+
27+
class KafkaPriceConsumerService(config: AppConfig, partitions: Seq[PartitionInfo], priceUpdater: ActorRef)(
28+
implicit
29+
val system: ActorSystem,
30+
implicit val materializer: Materializer
31+
) extends LazyLogging {
32+
import KafkaPriceConsumerService._
33+
34+
require(partitions.nonEmpty, "Cannot consume without any partitions")
35+
36+
def start(timeHorizon: Instant): Future[Done] = {
37+
val consumerSettings = config.consumerSettings
38+
val kafkaSources: SortedMap[Partition, Source[ConsumerRecord[String, Array[Byte]], Control]] =
39+
SortedMap(partitions.map { partition =>
40+
val topicPartition = new TopicPartition(partition.topic(), partition.partition())
41+
val partitionSubscription = Subscriptions.assignmentOffsetsForTimes(topicPartition, timeHorizon.toEpochMilli)
42+
Partition(partition.partition()) -> Consumer.plainSource[String, Array[Byte]](consumerSettings, partitionSubscription)
43+
}: _*)
44+
45+
implicit val timeout: Timeout = Timeout(10 seconds)
46+
val publisher = jsonPublisher(rfq => priceUpdater ? UpdatePrice(rfq), parallelism = 1)(system.dispatcher)
47+
val rfqPublishers: Seq[Source[(Partition, RecordPosition), Control]] = kafkaSources.map {
48+
case (partition, source) =>
49+
source.via(publisher).map(position => partition -> position)
50+
}.toSeq
51+
52+
val compositeStream: Source[List[(Partition, RecordPosition)], _] = rfqPublishers match {
53+
case Seq(onlyPartition) => onlyPartition.map(x => List(x))
54+
case Seq(first, second) => Source.combine(first, second)(MergeLatest(_, eagerComplete = true))
55+
case Seq(first, second, rest @ _*) => Source.combine(first, second, rest: _*)(MergeLatest(_, eagerComplete = true))
56+
}
57+
compositeStream
58+
.log(getClass.getSimpleName)
59+
.runWith(Sink.ignore)
60+
}
61+
}
62+
63+
object KafkaPriceConsumerService extends LazyLogging {
64+
import akka.stream.scaladsl._
65+
66+
final case class Partition(id: Long) extends AnyVal
67+
68+
object Partition {
69+
implicit def orderingById: Ordering[Partition] = Ordering.by(p => p.id)
70+
}
71+
final case class RecordPosition(offset: Long, timestamp: Instant)
72+
73+
def jsonPublisher(publish: Price => Future[Any], parallelism: Int)(
74+
implicit executionContext: ExecutionContext
75+
): Flow[ConsumerRecord[_, Array[Byte]], RecordPosition, NotUsed] =
76+
Flow[ConsumerRecord[_, Array[Byte]]]
77+
.map { record =>
78+
import me.mrooding.blog.json.PriceProtocol.priceFromKafkaJson
79+
80+
val message = new String(record.value(), StandardCharsets.UTF_8)
81+
val decodingResult = decode(message)
82+
83+
decodingResult.left
84+
.foreach(decodingFailure => logger.warn(s"Failed to parse ConsumerRecord to Price: $message", decodingFailure))
85+
86+
RecordPosition(record.offset(), Instant.ofEpochMilli(record.timestamp())) -> decodingResult
87+
}
88+
.mapAsync(parallelism) {
89+
case (position, Right(rfq)) =>
90+
publish(rfq).map(_ => position)
91+
case (position, _) =>
92+
/* We logged the decoding failure above. We'll just skip the unprocessable RFQ for now */
93+
Future.successful(position)
94+
}
95+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package me.mrooding.blog.publish
2+
3+
import akka.Done
4+
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
5+
import me.mrooding.blog.data.Price
6+
import me.mrooding.blog.publish.PriceUpdater.UpdatePrice
7+
8+
import scala.concurrent.ExecutionContext
9+
10+
class PriceUpdater()(implicit system: ActorSystem, execContext: ExecutionContext)
11+
extends Actor
12+
with ActorLogging {
13+
override def receive: Receive = {
14+
case UpdatePrice(price) =>
15+
log.debug(s"Updating price: $price")
16+
sender() ! Done
17+
}
18+
}
19+
20+
object PriceUpdater {
21+
22+
case class UpdatePrice(price: Price)
23+
24+
def props()(implicit system: ActorSystem, context: ExecutionContext) =
25+
Props(new PriceUpdater())
26+
}

0 commit comments

Comments
 (0)