Skip to content

Commit

Permalink
Add Consumer.commit method
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Aug 12, 2023
1 parent 1a7a8c7 commit 86498ee
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ final case class CommittableRecord[K, V](
def partition: Int = record.partition()
def timestamp: Long = record.timestamp()

lazy val topicPartition: TopicPartition = new TopicPartition(record.topic(), record.partition())

def offset: Offset =
OffsetImpl(
topic = record.topic(),
Expand Down
5 changes: 5 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ trait Consumer {
timeout: Duration = Duration.Infinity
): Task[Map[TopicPartition, Long]]

def commit(record: CommittableRecord[_, _]): Task[Unit]

/**
* Retrieve the last committed offset for the given topic-partitions
*/
Expand Down Expand Up @@ -436,6 +438,9 @@ private[consumer] final class ConsumerLive private[consumer] (
offs.asScala.map { case (k, v) => k -> v.longValue() }.toMap
}

override def commit(record: CommittableRecord[_, _]): Task[Unit] =
runloopAccess.commit(record)

override def committed(
partitions: Set[TopicPartition],
timeout: Duration = Duration.Infinity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ private[consumer] final class Runloop private (
}
}

private[internal] def commit(record: CommittableRecord[_, _]): Task[Unit] =
commit.apply(Map(record.topicPartition -> record.record.offset()))

private val commit: Map[TopicPartition, Long] => Task[Unit] =
offsets =>
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription }
import zio.kafka.consumer.{ CommittableRecord, ConsumerSettings, InvalidSubscriptionUnion, Subscription }
import zio.stream.{ Stream, Take, UStream, ZStream }
import zio.{ Hub, IO, Ref, Scope, UIO, ZIO, ZLayer }
import zio._

private[internal] sealed trait RunloopState
private[internal] object RunloopState {
Expand All @@ -31,10 +31,10 @@ private[consumer] final class RunloopAccess private (
) {

private def withRunloopZIO[E](
requireRunning: Boolean
shouldStartIfNot: Boolean
)(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] =
runloopStateRef.updateSomeAndGetZIO {
case RunloopState.NotStarted if requireRunning => makeRunloop.map(RunloopState.Started.apply)
case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply)
}.flatMap {
case RunloopState.NotStarted => ZIO.unit
case RunloopState.Started(runloop) => whenRunning(runloop)
Expand All @@ -44,7 +44,7 @@ private[consumer] final class RunloopAccess private (
/**
* No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped.
*/
def stopConsumption: UIO[Unit] = withRunloopZIO(requireRunning = false)(_.stopConsumption)
def stopConsumption: UIO[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.stopConsumption)

/**
* We're doing all of these things in this method so that the interface of this class is as simple as possible and
Expand All @@ -58,13 +58,16 @@ private[consumer] final class RunloopAccess private (
for {
stream <- ZStream.fromHubScoped(partitionHub)
// starts the Runloop if not already started
_ <- withRunloopZIO(requireRunning = true)(_.addSubscription(subscription))
_ <- withRunloopZIO(shouldStartIfNot = true)(_.addSubscription(subscription))
_ <- ZIO.addFinalizer {
withRunloopZIO(requireRunning = false)(_.removeSubscription(subscription)) <*
withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)) <*
diagnostics.emit(Finalization.SubscriptionFinalized)
}
} yield stream

def commit(record: CommittableRecord[_, _]): Task[Unit] =
withRunloopZIO(shouldStartIfNot = false)(_.commit(record))

}

private[consumer] object RunloopAccess {
Expand Down

0 comments on commit 86498ee

Please sign in to comment.