Skip to content

Commit

Permalink
Backport some code simplification from #910
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Jun 8, 2023
1 parent be52214 commit 665b95e
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ private[consumer] object Runloop {
restartStreamsOnRebalancing: Boolean,
partitionsHub: Hub[Take[Throwable, PartitionAssignment]],
consumerSettings: ConsumerSettings
): ZIO[Scope, Throwable, Runloop] =
): URIO[Scope, Runloop] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized))
commandQueue <- ZIO.acquireRelease(Queue.bounded[RunloopCommand](commandQueueSize))(_.shutdown)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.kafka.consumer.{ ConsumerSettings, Subscription }
import zio.stream.{ Stream, Take, UStream, ZStream }
import zio.{ durationInt, Hub, RIO, Ref, Scope, Task, UIO, ZIO, ZLayer }
import zio.{ durationInt, Hub, Ref, Scope, UIO, ZIO, ZLayer }

private[internal] sealed trait RunloopState
private[internal] object RunloopState {
Expand Down Expand Up @@ -37,23 +37,22 @@ private[internal] object RunloopState {
private[consumer] final class RunloopAccess private (
runloopStateRef: Ref.Synchronized[RunloopState],
partitionHub: Hub[Take[Throwable, PartitionAssignment]],
makeRunloop: Task[RunloopState.Started],
makeRunloop: UIO[RunloopState.Started],
diagnostics: Diagnostics
) {
private def runloop(shouldStartIfNot: Boolean): Task[RunloopState] =
private def runloop(shouldStartIfNot: Boolean): UIO[RunloopState] =
runloopStateRef.updateSomeAndGetZIO { case RunloopState.NotStarted if shouldStartIfNot => makeRunloop }
private def withRunloopZIO[R, A](shouldStartIfNot: Boolean)(f: Runloop => RIO[R, A]): RIO[R, A] =
private def withRunloopZIO[A](shouldStartIfNot: Boolean)(f: Runloop => UIO[A]): UIO[A] =
runloop(shouldStartIfNot).flatMap {
case RunloopState.NotStarted => ZIO.unit.asInstanceOf[RIO[R, A]]
case RunloopState.Stopped => ZIO.unit.asInstanceOf[UIO[A]]
case RunloopState.NotStarted => ZIO.unit.asInstanceOf[UIO[A]]
case RunloopState.Started(runloop) => f(runloop)
case RunloopState.Stopped => ZIO.unit.asInstanceOf[RIO[R, A]]
}

/**
* No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped.
*
* Note:
* 1. The `.orDie` is just here for compilation. It cannot happen.
* 1. We do a 100 retries waiting 10ms between each to roughly take max 1s before to stop to retry. We want to avoid
* an infinite loop. We need this recursion because if the user calls `stopConsumption` before the Runloop is
* started, we need to wait for it to be started. Can happen if the user starts a consuming session in a forked
Expand All @@ -63,7 +62,7 @@ private[consumer] final class RunloopAccess private (
def stopConsumption(retry: Int = 100, initialCall: Boolean = true): UIO[Unit] = {
@inline def next: UIO[Unit] = stopConsumption(retry - 1, initialCall = false)

runloop(shouldStartIfNot = false).orDie.flatMap {
runloop(shouldStartIfNot = false).flatMap {
case RunloopState.Stopped => ZIO.unit
case RunloopState.Started(runloop) => runloop.stopConsumption
case RunloopState.NotStarted =>
Expand All @@ -87,7 +86,7 @@ private[consumer] final class RunloopAccess private (
// starts the Runloop if not already started
_ <- withRunloopZIO(shouldStartIfNot = true)(_.addSubscription(subscription))
_ <- ZIO.addFinalizer {
withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)).orDie <*
withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)) <*
diagnostics.emit(Finalization.SubscriptionFinalized)
}
} yield stream
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package zio.kafka.consumer.internal

import zio.{ Executor, Scope, ZIO }
import zio.{ Executor, Scope, URIO, ZIO }

import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicLong
Expand All @@ -9,9 +9,9 @@ private[consumer] object RunloopExecutor {

private val counter: AtomicLong = new AtomicLong(0)

private val newSingleThreadedExecutor: ZIO[Scope, Throwable, Executor] =
private val newSingleThreadedExecutor: URIO[Scope, Executor] =
ZIO.acquireRelease {
ZIO.attempt {
ZIO.succeed {
val javaExecutor =
Executors.newSingleThreadExecutor { runnable =>
new Thread(runnable, s"zio-kafka-runloop-thread-${counter.getAndIncrement()}")
Expand All @@ -21,6 +21,6 @@ private[consumer] object RunloopExecutor {
}
} { case (_, executor) => ZIO.attempt(executor.shutdown()).orDie }.map(_._1)

val newInstance: ZIO[Scope, Throwable, Executor] = newSingleThreadedExecutor
val newInstance: URIO[Scope, Executor] = newSingleThreadedExecutor

}

0 comments on commit 665b95e

Please sign in to comment.