Skip to content

Commit

Permalink
Reproducer for #852 copied from @erikvanoosten Gist: https://gist.git…
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Jun 11, 2023
1 parent 51fa26a commit d09e7ac
Show file tree
Hide file tree
Showing 16 changed files with 569 additions and 287 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: '8'
Expand All @@ -54,7 +54,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: '8'
Expand All @@ -80,7 +80,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: ${{ matrix.java }}
Expand All @@ -92,7 +92,7 @@ jobs:
with:
fetch-depth: '0'
- name: Test
run: sbt ++test
run: sbt +test
update-readme:
name: Update README
runs-on: ubuntu-latest
Expand All @@ -106,7 +106,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: '8'
Expand Down Expand Up @@ -180,7 +180,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: '8'
Expand Down Expand Up @@ -209,7 +209,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: '8'
Expand Down
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ lazy val zioKafkaExample =
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.0.13",
"dev.zio" %% "zio-kafka" % "2.3.1",
"dev.zio" %% "zio-kafka-testkit" % "2.3.1" % Test,
"dev.zio" %% "zio-test" % "2.0.13" % Test,
"ch.qos.logback" % "logback-classic" % "1.4.6",
"dev.zio" %% "zio-logging-slf4j2" % "2.1.13",
Expand All @@ -175,6 +173,7 @@ lazy val zioKafkaExample =
// [error] org.scala-lang.modules:scala-collection-compat _3, _2.13
crossScalaVersions -= scala3.value
)
.dependsOn(zioKafka, zioKafkaTestkit)

addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, minimalConsumer, produceMany, producer }
import zio.{ durationInt, ULayer, ZIO, ZLayer }
import zio.{ ULayer, ZIO, ZLayer }

import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -55,9 +55,7 @@ class ConsumersComparisonBenchmark extends ZioBenchmark[Env] {
consumerSettings(
clientId = randomThing("client"),
groupId = Some(randomThing("client")),
`max.poll.records` = 1000, // A more production worthy value
runloopTimeout =
1.hour // Absurdly high timeout to avoid the runloop from being interrupted while we're benchmarking other stuff
`max.poll.records` = 1000 // A more production worthy value
)
)

Expand Down
66 changes: 43 additions & 23 deletions zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package zio.kafka.example

import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import org.apache.kafka.clients.producer.ProducerRecord
import zio._
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, ConsumerSettings, Subscription }
import zio.kafka.consumer.Consumer.AutoOffsetStrategy
import zio.kafka.consumer.{ Consumer, ConsumerSettings, OffsetBatch, Subscription }
import zio.kafka.producer.{ Producer, ProducerSettings }
import zio.kafka.serde.Serde
import zio.logging.backend.SLF4J

Expand Down Expand Up @@ -41,32 +43,50 @@ object Main extends ZIOAppDefault {

private val topic = "test-topic"

private def consumerLayer(kafka: MyKafka): ZLayer[Any, Throwable, Consumer] = {
val consumerSettings =
ConsumerSettings(kafka.bootstrapServers)
.withPollTimeout(500.millis)
.withGroupId("test")
private val consumerLayer: ZLayer[MyKafka, Throwable, Consumer] =
ZLayer.scoped {
ZIO.serviceWithZIO[MyKafka] { kafka =>
val consumerSettings =
ConsumerSettings(kafka.bootstrapServers)
.withGroupId("group1")
.withOffsetRetrieval(Consumer.OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
Consumer.make(consumerSettings)
}
}

ZLayer.make[Consumer](
ZLayer.succeed(consumerSettings),
ZLayer.succeed(Diagnostics.NoOp),
Consumer.live
)
}
private val producerLayer: ZLayer[MyKafka, Throwable, Producer] =
ZLayer.scoped {
ZIO.serviceWithZIO[MyKafka] { kafka =>
val producerSettings = ProducerSettings(kafka.bootstrapServers)
Producer.make(producerSettings)
}
}

override def run: ZIO[ZIOAppArgs with Scope, Any, Any] =
ZIO.addFinalizer(ZIO.logInfo("Stopping app")) *>
(
for {
_ <- ZIO.logInfo(s"Starting app")
kafka <- ZIO.service[MyKafka]
stream = Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.provideLayer(consumerLayer(kafka))
_ <- ZIO.logInfo(s"Consuming messages...")
consumed <- stream.take(1000).tap(r => ZIO.logInfo(s"Consumed record $r")).runCount
_ <- ZIO.logInfo(s"Consumed $consumed records")
_ <- ZIO.logInfo(s"Starting app")
_ <- Producer.produceChunk(
Chunk.fromIterable(1 to 1000).map(n => new ProducerRecord(topic, n, n.toString)),
Serde.int,
Serde.string
)
_ <- Consumer
.plainStream(Subscription.topics(topic), Serde.int, Serde.string)
.take(100)
.groupedWithin(10, 100.millis)
.mapZIOPar(2)(c => ZIO.debug(c.size) as c.map(_.offset))
.map(OffsetBatch.apply)
.debug("Offset")
.mapZIO(_.commit)
.debug("Commit")
.runDrain
_ <- ZIO.logInfo("Ready!")
} yield ()
).provideSomeLayer[ZIOAppArgs with Scope](MyKafka.embedded)

).provide(
MyKafka.embedded,
consumerLayer,
producerLayer
)
}
2 changes: 1 addition & 1 deletion zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -470,5 +470,5 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ timeout(2.minutes) @@ sequential
) @@ withLiveClock @@ timeout(3.minutes) @@ sequential
}
Loading

0 comments on commit d09e7ac

Please sign in to comment.