-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graceful shutdown of a single subscription #1201
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't look at the implementation yet, only docs and tests.
|
||
## Controlled shutdown | ||
|
||
The examples above will keep processing records forever, or until the fiber is interrupted, typically at application shutdown. When interrupted, in-flight records will not be processed fully through all stream stages and offsets may not be committed. For fast shutdown in an at-least-once processing scenario this is fine. zio-kafka also supports a _graceful shutdown_, where the fetching of records for the subscribed topics/partitions is stopped, the streams are ended and all downstream stages are completed, allowing all in-flight messages to be fully processed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allowing all in-flight messages to be fully processed.
I suggest we change these words to:
allowing commits for in-flight records to complete.
- Let's continue to use the word 'records'.
- Removed the word 'all' because when an interrupt is received, internally queued records are dropped and not passed to the consumer stream.
- The remaining in-flight records are already passed to the consumer so processing already commenced. The only relevant operation from zio-kafka point of view is that the records can be committed.
|
||
ZIO.scoped { | ||
for { | ||
streamControl <- Consumer.partitionedStreamWithControl(Subscription.topics("topic150"), Serde.string, Serde.string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be layed out for smaller screens please?
client <- randomClient | ||
|
||
keepProducing <- Ref.make(true) | ||
_ <- produceOne(topic, "key", "value").repeatWhileZIO(_ => keepProducing.get).fork |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also use test helper method scheduledProduce
.
zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Outdated
Show resolved
Hide resolved
zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Outdated
Show resolved
Hide resolved
zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need more time to digest this.
zio-kafka/src/main/scala/zio/kafka/consumer/SubscriptionStreamControl.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/SubscriptionStreamControl.scala
Outdated
Show resolved
Hide resolved
Hmm, should we instead of this: Consumer.runWithGracefulShutdown(Consumer.partitionedStreamWithControl(Subscription.topics("topic150"), Serde.string, Serde.string)) {
stream => ...
} offer this: Consumer.partitionedStreamWithGracefulShutdown(Subscription.topics("topic150"), Serde.string, Serde.string) {
(stream, _) => stream.flatMapPar(...)
} The second parameter would be the |
If I understand it correctly, the proposal allows for more use cases; with it you can also call |
Well, I mean compared to just the
|
If resume after |
Well, in both proposals you can call I don't think you want to do anything after stop, but it would give you more explicit control when to stop, instead of when the scope ends. We probably need to decide if we want to add pause/resume in the future. If we do, we should add the |
|
||
override def plainStream[R, K, V]( | ||
subscription: Subscription, | ||
keyDeserializer: Deserializer[R, K], | ||
valueDeserializer: Deserializer[R, V], | ||
bufferSize: Int | ||
bufferSize: RuntimeFlags |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably an IDE mistake
bufferSize: RuntimeFlags | |
bufferSize: Int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, happens to me all the time as well.
Hey :) Thanks for the great work! Here's some initial feedback: I'm not a big fan of the To me, functions/methods returning it should return a Tuple SubscriptionStreamControl[Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]]] in favor of: (Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])], SubscriptionStreamControl) Made the change in a PR to show/study how, to me, it simplifies things: https://github.com/zio/zio-kafka/pull/1207/files |
} yield stream | ||
} yield SubscriptionStreamControl( | ||
ZStream.fromQueue(partitionAssignmentQueue), | ||
withRunloopZIO(requireRunning = true)(_.stopSubscribedTopicPartitions(subscription)) *> partitionAssignmentQueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withRunloopZIO(requireRunning = true)(_.stopSubscribedTopicPartitions(subscription)) *> partitionAssignmentQueue | |
withRunloopZIO(requireRunning = false)(_.stopSubscribedTopicPartitions(subscription)) *> partitionAssignmentQueue |
Not sure you want to use true
here. true
means: if the runloop is not running, start it and apply the stopSubscribedTopicPartitions
function.
In your case, IMO, if the Runloop isn't running, calling the control.stop
should be a no-op, which will be the case if you use false
instead
Also, do we want to execute the partitionAssignmentQueue.offer(Take.end).ignore
code if the Runloop isn't running? If not, then something like this would be more appropriate:
withRunloopZIO(requireRunning = false) { runloop =>
runloop.stopSubscribedTopicPartitions(subscription) *>
partitionAssignmentQueue.offer(Take.end).ignore
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In your case, IMO, if the Runloop isn't running, calling the
control.stop
should be a no-op
We need to stop the subscription also when runloop isn't running. Right?
zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala
Outdated
Show resolved
Hide resolved
Didn't finish my review yet. I still have some parts of the code to explore/understand, but I have to go. I'll finish it later 🙂 |
Thanks for the feedback Jules. Agreed about the extra concept that would be unwanted. Check out my latest interface proposal where there is only a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reading the code...
zio-kafka/src/main/scala/zio/kafka/consumer/SubscriptionStreamControl.scala
Outdated
Show resolved
Hide resolved
* @tparam S | ||
* Type of the stream returned from [[stream]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, thinking about more, the trait doesn't really care what S is, or that it is even a stream at all. That means another abstraction might be hidden ('Stoppable'?). Abstracting further is hard though; the definition of stop
is pretty specific.
I also noticed that the stop
method is defined in terms of the consumer and is not related to the stream. Should that be the case? Shouldn't this stop
only relate to the referred stream
?
I am trying to weigh this form against @guizmaii 's proposal in #1207. I am no longer certain which one I like more.
|
||
override def plainStream[R, K, V]( | ||
subscription: Subscription, | ||
keyDeserializer: Deserializer[R, K], | ||
valueDeserializer: Deserializer[R, V], | ||
bufferSize: Int | ||
bufferSize: RuntimeFlags |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, happens to me all the time as well.
@@ -70,6 +70,22 @@ trait Consumer { | |||
valueDeserializer: Deserializer[R, V] | |||
): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] | |||
|
|||
/** | |||
* Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown | |
* Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown. |
@@ -93,6 +109,22 @@ trait Consumer { | |||
valueDeserializer: Deserializer[R, V] | |||
): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] | |||
|
|||
/** | |||
* Like [[partitionedStream]] but wraps the stream in a construct that ensures graceful shutdown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Like [[partitionedStream]] but wraps the stream in a construct that ensures graceful shutdown | |
* Like [[partitionedStream]] but wraps the stream in a construct that ensures graceful shutdown. |
/** | ||
* Like [[plainStream]] but wraps the stream in a construct that ensures graceful shutdown | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this method might be the most attractive way to use zio-kafka, lets extend the documentation a bit.
e.g.
/** | |
* Like [[plainStream]] but wraps the stream in a construct that ensures graceful shutdown | |
*/ | |
/** | |
* Like [[plainStream]] but wraps the stream in a construct that ensures graceful shutdown. During a graceful shutdown the consumer is stopped but the stream can complete processing and commit already fetched records. | |
* | |
* Example usage: | |
* {{{ | |
* ... todo ... | |
* }}} | |
*/ |
We could also include a reference to the documentation (though I am always extremely happy when the scaladocs are all you need, e.g. scalatest documentation is my benchmark).
@@ -75,6 +75,13 @@ private[consumer] final class Runloop private ( | |||
private[internal] def removeSubscription(subscription: Subscription): UIO[Unit] = | |||
commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit | |||
|
|||
private[internal] def stopSubscribedTopicPartitions(subscription: Subscription): UIO[Unit] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is stopping (actually ending) streams, not subscriptions. WDYT of:
private[internal] def stopSubscribedTopicPartitions(subscription: Subscription): UIO[Unit] = | |
private[internal] def endStreams(subscription: Subscription): UIO[Unit] = |
And similarly rename RunloopCommand.StopSubscribedTopicPartitions
to RunloopCommand.EndStreamsBySubscription
I understand now that when graceful shutdown starts we're ending the subscribed streams. That should work nicely. Lets work out what will happen next to the runloop. The runloop would still be happily fetching records for that stream. When those are offered to the stream, We can do slightly better though. We're fetching and storing all these records in the queue for nothing, even potentially causing an OOM for systems that are tuned for the case where processing happens almost immediately. My proposal is to:
If you want, I can extend this PR with that proposal (or create a separate PR). |
@erikvanoosten If you have some time to implement those two things, by all means. |
@svroonland Done in commit 1218204. Now I am wondering, how can we test this? |
Change looks good. Totally forgot to implement this part. |
958b0a5
to
8d19e16
Compare
Depends on zio/zio#8804. |
By using ZIO.async, we no longer need a reference to the zio runtime, nor do we need the `exec` trickery anymore.
Also: fix typo and make metric descriptions consistent.
When many hundreds of partitions need to be consumed, an excessive amount of heap can be used for pre-fetching. The `ManyPartitionsQueueSizeBasedFetchStrategy` works similarly as the default `QueueSizeBasedFetchStrategy` but limits total memory usage.
Refactoring of the producer so that it handles errors per record.
Zio-kafka applications always pre-fetch data so that user streams can process the data asynchronously. This is not compatible with auto commit. When auto commit is enabled, the consumer will automatically commit batches _before_ they are processed by the user streams. An unaware user might accidentally enable auto commit and lose data during rebalances. Solves #1289.
## About this PR 📦 Updates [org.scalameta:scalafmt-core](https://github.com/scalameta/scalafmt) from `3.8.2` to `3.8.3` 📜 [GitHub Release Notes](https://github.com/scalameta/scalafmt/releases/tag/v3.8.3) - [Version Diff](scalameta/scalafmt@v3.8.2...v3.8.3) ## Usage ✅ **Please merge!** I'll automatically update this PR to resolve conflicts as long as you don't change it yourself. If you'd like to skip this version, you can just close this PR. If you have any feedback, just mention me in the comments below. Configure Scala Steward for your repository with a [`.scala-steward.conf`](https://github.com/scala-steward-org/scala-steward/blob/767fcfecbfd53c507152f6cf15c846176bae561d/docs/repo-specific-configuration.md) file. _Have a fantastic day writing Scala!_ <details> <summary>⚙ Adjust future updates</summary> Add this to your `.scala-steward.conf` file to ignore future updates of this dependency: ``` updates.ignore = [ { groupId = "org.scalameta", artifactId = "scalafmt-core" } ] ``` Or, add this to slow down future updates of this dependency: ``` dependencyOverrides = [{ pullRequests = { frequency = "30 days" }, dependency = { groupId = "org.scalameta", artifactId = "scalafmt-core" } }] ``` </details> <sup> labels: library-update, early-semver-patch, semver-spec-patch, commit-count:1 </sup> Co-authored-by: zio-scala-steward[bot] <145262613+zio-scala-steward[bot]@users.noreply.github.com>
Implements functionality for gracefully stopping a stream for a single subscription: stop fetching records for the assigned topic-partitions but keep being subscribed so that offsets can still be committed. Intended to replace
stopConsumption
, which did not support multiple-subscription use cases.Implements some of #941.
We should deprecate
stopConsumption
before releasing.