Skip to content

Commit

Permalink
A lost partition is no longer fatal (#1252)
Browse files Browse the repository at this point in the history
Before 2.7.0 a lost partition was treated as a revoked partition. Since
the partition is already assigned to another node, this potentially
leads to duplicate processing of records.

Zio-kafka 2.7.0 assumes that a lost partition is a fatal event. It leads
to an interrupt in the stream that handles the partition. The other
streams are ended, and the consumer closes with an error. Usually, a
full program restart is needed to resume consuming.

It should be noted that stream processing is not interrupted
immediately. Only when the stream requests new records, the interrupt is
observed. Unfortunately, we have not found a clean way to interrupt the
stream consumer directly.

Meanwhile, from bug reports (#1233, #1250), we understand that
partitions are usually lost when no records have been received for a
long time.

In conclusion, 1) it is not possible to immediately interrupt user
stream processing, and 2) it is most likely not needed anyway because
the stream is already done processing and awaiting more records.

With this change, a lost partition no longer leads to an interrupt.
Instead, we first drain the stream's internal queue (just to be sure, it
is probably already empty), and then we end the stream gracefully (that
is, without error, like we do with revoked partitions). Other streams
are not affected, the consumer will continue to work.

Lost partitions do not affect the features `rebalanceSafeCommits` and
`restartStreamsOnRebalancing`; they do _not_ hold up a rebalance waiting
for commits to complete, and they do _not_ lead to restarts of other
streams.

Clients that want to handle the partition lost event somehow, instead of
handling the failed stream they need to create their own `RebalanceListener`
and handle the `onLost` call.

Fixes #1233 and #1250.
  • Loading branch information
erikvanoosten authored Jul 9, 2024
1 parent ff5b9fc commit a8596f3
Showing 1 changed file with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,6 @@ final class PartitionStreamControl private (
private[internal] def maxPollIntervalExceeded(now: NanoTime): UIO[Boolean] =
queueInfoRef.get.map(_.deadlineExceeded(now))

/** To be invoked when the partition was lost. */
private[internal] def lost: UIO[Boolean] = {
val lostException = new RuntimeException(s"Partition ${tp.toString} was lost") with NoStackTrace
interruptionPromise.fail(lostException)
}

/** To be invoked when the stream is no longer processing. */
private[internal] def halt: UIO[Boolean] = {
val timeOutMessage = s"No records were polled for more than $maxPollInterval for topic partition $tp. " +
Expand All @@ -97,6 +91,17 @@ final class PartitionStreamControl private (
interruptionPromise.fail(consumeTimeout)
}

/** To be invoked when the partition was lost. It clears the queue and ends the stream. */
private[internal] def lost: UIO[Unit] =
logAnnotate {
for {
_ <- ZIO.logDebug(s"Partition ${tp.toString} lost")
taken <- dataQueue.takeAll.map(_.size)
_ <- dataQueue.offer(Take.end)
_ <- ZIO.logDebug(s"Ignored ${taken} records on lost partition").when(taken != 0)
} yield ()
}

/** To be invoked when the partition was revoked or otherwise needs to be ended. */
private[internal] def end: ZIO[Any, Nothing, Unit] =
logAnnotate {
Expand Down

0 comments on commit a8596f3

Please sign in to comment.