Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Consumer.commit and Consumer.commitOrRetry methods #1022

Closed
wants to merge 2 commits into from

Conversation

guizmaii
Copy link
Member

@guizmaii guizmaii commented Aug 12, 2023

First step toward removing the Offset trait atrocity and the current OOP orientation of the commit interface proposed by zio-kafka (ie. record.offset.commit. This is non sense which pisses me off TBH)

@@ -31,10 +31,10 @@ private[consumer] final class RunloopAccess private (
) {

private def withRunloopZIO[E](
requireRunning: Boolean
shouldStartIfNot: Boolean
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry @erikvanoosten but I really disliked the name you used. I prefer the one I was using. IMO, it's more explicit about what this boolean is controlling

Copy link
Collaborator

@erikvanoosten erikvanoosten Aug 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄 My major gripes with shouldStartIfNot is that it is like an unfinished sentence. If not..., if not what?
That is how I went to shouldStartIfNotRunning and from there to requireRunning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's "should start if not started" but there's IMO no need to repeat the "start" word with "started"

@guizmaii guizmaii marked this pull request as draft August 12, 2023 15:08
@guizmaii guizmaii changed the title Add Consumer.commit method Add Consumer.commit and Consumer.commitOrRetry methods Aug 12, 2023
@guizmaii guizmaii changed the title Add Consumer.commit and Consumer.commitOrRetry methods Add Consumer.commit and Consumer.commitOrRetry methods Aug 12, 2023
@guizmaii guizmaii marked this pull request as ready for review August 12, 2023 15:14
@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Aug 13, 2023

If I understand the code right, after this change one needs a CommittableRecord to do a commit. Unfortunately, for high throughput services it is not feasible to keep all CommittableRecords in memory until the offset needs to be committed.

I think we should keep the notion of Offset and Offsets. Then we expect the user to pass an Offsets into the commit method. This is also more in line with the Kafka model (one commits offsets, not a record).

Or did I miss something?

@svroonland
Copy link
Collaborator

svroonland commented Aug 13, 2023

I think we should keep the notion of Offset and Offsets.

👍

@guizmaii
Copy link
Member Author

guizmaii commented Sep 9, 2023

@erikvanoosten

I think we should keep the notion of Offset and Offsets. Then we expect the user to pass an Offsets into the commit method. This is also more in line with the Kafka model (one commits offsets, not a record).

Or did I miss something?

We agree on the need not to cache the records and to only cache the minimum information needed to avoid explode memory usage 🙂

This PR is just a first step towards the goal I want to achieve

These 2 functions I add here are only here to commit one record at a time - if it's what the user needs - replacing, by terms, the Offet::commit method.

Replacing the OffsetBatch::commit method requires more work that I wanna do later
My idea, for now, will be kind of similar tho.
The user will have something like this to commit some records:

 trait Consumer {

  // I was discussing this idea in Discord some time ago
  // see: https://discord.com/channels/629491597070827530/629497941719121960/1144576343330263080

  /**
   * The `UIO` represents the action of putting the Record(s) in the "commit batch"
   * The `Task` represents the action of commiting the batch to Kaka
   */
  def commitAccumBatch(commitSchedule: Schedule)(record: Record): UIO[Task[Unit]]
  def commitAccumBatch(commitschedule: Schedule)(records: Chunk[Record]): UIO[Task[Unit]]

}

so the user will replace the aggregation he/she does on his side by calling one of these functions which will do the aggregation and the commits on our side.

Doing the aggregation on our side means that we'll have access to the "list of pending commits" inside zio-kafka, which will help us fix the deadlock some people are experiencing (ie. #852)

Do you see what I wanna do? What do you think about it?

Maybe it's better to implement everything before merging these new functions so that we can bring the complete new interface and deprecate the old one all at once, and so we can ensure the new interface is sound and coherent.

I'll try to make a little POC this afternoon (nothing prod-ready, just to demonstrate the idea)

Edit:
Made a very rough implementation of this POC here: #1040
Seeing this POC, I'm not yet convinced by my own idea 😄
It needs more refinements

Edit 2:
Made a more approachable implementation of the POC here: #1041

Edit 3:
The end goal is for things to look like something like this: #1042
Bye-bye CommitableRecord, which will remove at least one allocation per consumed record! 🎉

Edit 4:
In the end I think that my .commitAccumBatch thing wasn't a good idea.
Also, my changes will not manage to put the aggregation inside zio-kafka.
Still, I think that the changes in #1042 (which are still to be polished) are going in the right direction

@guizmaii guizmaii marked this pull request as draft September 9, 2023 12:44
guizmaii added a commit that referenced this pull request Sep 9, 2023
guizmaii added a commit that referenced this pull request Sep 9, 2023
guizmaii added a commit that referenced this pull request Sep 9, 2023
guizmaii added a commit that referenced this pull request Sep 9, 2023
guizmaii added a commit that referenced this pull request Sep 9, 2023
guizmaii added a commit that referenced this pull request Sep 9, 2023
guizmaii added a commit that referenced this pull request Sep 9, 2023
guizmaii added a commit that referenced this pull request Sep 9, 2023
guizmaii added a commit that referenced this pull request Sep 9, 2023
guizmaii added a commit that referenced this pull request Sep 9, 2023
@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Sep 10, 2023

Do you see what I wanna do? What do you think about it?

Do I understand that correctly that you want the users to add offsets to a commit batch, and then when they think it is time, they can complete the batch, effectively committing it for real?

Yeah, that might work. In fact, I like that idea. If the user indicates that an offset needs to be committed at some point, we know we have to wait for it while closing the consumer (or the partition).

However, I'd prefer we do not smash up committing and this new concept of intending to commit. Can we keep separate API methods for that?

@svroonland
Copy link
Collaborator

How shall we move forward with the (currently) 4 open PRs related to the commit interface?

Can we summarize the interface changes we would like to see and break it down into small potential PRs?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants