Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alive checks #1287

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package zio.kafka.consumer

//import io.github.embeddedkafka.EmbeddedKafka
//import org.apache.kafka.clients.consumer.ConsumerConfig
//import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
//import org.apache.kafka.clients.consumer.CooperativeStickyAssignor
//import org.apache.kafka.clients.consumer.MockConsumer
//import org.apache.kafka.clients.consumer.OffsetResetStrategy
//import org.apache.kafka.common.config.ConfigException
import zio.kafka.serde.Serde
//import org.apache.kafka.clients.consumer.RangeAssignor
//import org.apache.kafka.clients.producer.ProducerRecord
//import org.apache.kafka.common.TopicPartition
//import org.apache.kafka.common.config.ConfigException
import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
//import zio.kafka.consumer.Consumer.AutoOffsetStrategy
//import zio.kafka.consumer.Consumer.CommitTimeout
//import zio.kafka.consumer.Consumer.OffsetRetrieval
//import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
//import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.ConsumerFinalized
//import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.RunloopFinalized
//import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.SubscriptionFinalized
//import zio.kafka.consumer.diagnostics.DiagnosticEvent
//import zio.kafka.consumer.diagnostics.Diagnostics
//import zio.kafka.producer.Producer
//import zio.kafka.producer.TransactionalProducer
//import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils._
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaRandom
//import zio.stream.ZSink
//import zio.stream.ZStream
//import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

//import scala.reflect.ClassTag

//noinspection SimplifyAssertInspection
object ConsumerAliveSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
override val kafkaPrefix: String = "consumespec2"

override def spec /*: Spec[TestEnvironment with Scope, Throwable]*/ =
suite("Consumer isAlive")(
test("isAlive should be true when everything is fine") {
for {
client <- randomClient
group <- randomGroup
c <- consumer(client, Some(group)).build

isAlive <- c.get.isAlive
} yield assertTrue(isAlive)
},
test("isAlive should turn false when consumer gets shutdown") {
for {
client <- randomClient
group <- randomGroup
topic <- randomTopic
cs <- consumerSettings(client, Some(group))

// mock = new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.NONE)

scope <- Scope.make
_ <- scope.addFinalizer(ZIO.debug("scope getting closed"))
c <- Consumer.make(cs).provide(ZLayer.succeed(scope))

_ <- Consumer
.plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(1)
.runCollect
.provide(ZLayer.succeed(c))
.fork

// // wait for runloop to start
// _ <- c.isAlive.repeatWhileZIO(_ => c.isAlive.map(!_)).timeout(10.seconds)

isAliveBefore <- c.isAlive

_ <- scope.close(Exit.unit).fork

_ <- c.isAlive.repeatWhileZIO(_ => c.isAlive)
_ <- ZIO.logDebug("after sleeping")
_ <- c.isAlive.repeatWhileZIO(_ => c.isAlive).timeout(15.seconds)
_ <- ZIO.logDebug("after repeatWhile")
isAliveAfter <- c.isAlive
_ <- ZIO.logDebug(s"should be down now: ${isAliveAfter}")
} yield assertTrue(isAliveBefore) && assertTrue(!isAliveAfter)

}
// test("isAlive should turn false when the runloop crashes") {
// for {
// client <- randomClient
// group <- randomGroup
// topic <- randomTopic
// cs <- consumerSettings(client, Some(group))
// mock = new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.NONE)
// c <- Consumer.fromJavaConsumer(
// mock,
// cs
// )
//
// _ <- Consumer
// .plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string)
// .take(5)
// .runCollect
// .sandbox
// .provide(ZLayer.succeed(c))
// .fork
// .sandbox
//
// isAliveBefore <- c.isAlive
// _ = mock.setPollException(new ConfigException("dummy"))
// _ <- ZIO.debug("after setting poll exception")
// _ <- ZIO.sleep(2.seconds)
// _ <- ZIO.debug("after sleeping")
// _ <- c.isAlive.repeatWhileZIO(_ => c.isAlive)
// _ <- ZIO.debug("after repeatWhile")
// isAliveAfter <- c.isAlive
// _ <- ZIO.debug(s"should be down now: ${isAliveAfter}")
// } yield assertTrue(isAliveBefore) && assertTrue(!isAliveAfter)
//
// }
)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ timeout(2.minutes)

}
8 changes: 8 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ trait Consumer {
* Expose internal consumer metrics
*/
def metrics: Task[Map[MetricName, Metric]]

/**
* Liveness check for this consumer
*/
def isAlive: UIO[Boolean]
}

object Consumer {
Expand Down Expand Up @@ -432,6 +437,9 @@ private[consumer] final class ConsumerLive private[consumer] (
) extends Consumer {
import Consumer._

override def isAlive: UIO[Boolean] =
ZIO.debug("isAlive in ConsumerLive") *> runloopAccess.isAlive

override def assignment: Task[Set[TopicPartition]] =
consumer.withConsumer(_.assignment().asScala.toSet)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ private[consumer] final class Runloop private (
diagnostics: Diagnostics,
maxRebalanceDuration: Duration,
currentStateRef: Ref[State],
committedOffsetsRef: Ref[CommitOffsets]
committedOffsetsRef: Ref[CommitOffsets],
alive: Ref[Boolean]
) {
private val commitTimeout = settings.commitTimeout
private val commitTimeoutNanos = settings.commitTimeout.toNanos
Expand All @@ -46,12 +47,16 @@ private[consumer] final class Runloop private (
private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] =
PartitionStreamControl.newPartitionStream(tp, commandQueue, diagnostics, maxPollInterval)

def isAlive: UIO[Boolean] = ZIO.debug("isAlive in Runloop") *> alive.get

def stopConsumption: UIO[Unit] =
ZIO.logDebug("stopConsumption called") *>
alive.set(false) *>
commandQueue.offer(RunloopCommand.StopAllStreams).unit

private[consumer] def shutdown: UIO[Unit] =
ZIO.logDebug(s"Shutting down runloop initiated") *>
alive.set(false) *>
commandQueue
.offerAll(
Chunk(
Expand Down Expand Up @@ -746,8 +751,12 @@ private[consumer] final class Runloop private (
_ <- currentStateRef.set(updatedStateAfterPoll)
} yield updatedStateAfterPoll
}
.tapErrorCause(cause => ZIO.logErrorCause("Error in Runloop", cause))
.tapErrorCause(cause => alive.set(false) *> ZIO.logErrorCause("Error in Runloop, alive => false", cause))
.onError(cause => partitionsHub.offer(Take.failCause(cause)))
.tapDefect(cause => alive.set(false) *> ZIO.logErrorCause(s"Defect in Runloop, alive => false", cause)) <* (ZIO
.logTrace(
"commandQueue finished, alive => false"
) *> alive.set(false))
}

private def observeRunloopMetrics(runloopMetricsSchedule: Schedule[Any, Unit, Long]): ZIO[Any, Nothing, Unit] = {
Expand Down Expand Up @@ -865,6 +874,8 @@ object Runloop {
committedOffsetsRef <- Ref.make(CommitOffsets.empty)
sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer)
executor <- ZIO.executor
alive <- Ref.make[Boolean](true)
_ <- ZIO.addFinalizer(ZIO.debug("finalizing runloop") *> alive.set(false))
runloop = new Runloop(
settings = settings,
topLevelExecutor = executor,
Expand All @@ -878,7 +889,8 @@ object Runloop {
diagnostics = diagnostics,
maxRebalanceDuration = maxRebalanceDuration,
currentStateRef = currentStateRef,
committedOffsetsRef = committedOffsetsRef
committedOffsetsRef = committedOffsetsRef,
alive = alive
)
_ <- ZIO.logDebug("Starting Runloop")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ private[consumer] final class RunloopAccess private (
case RunloopState.Finalized => ZIO.unit
}

def isAlive: UIO[Boolean] = runloopStateRef.get.flatMap {
case RunloopState.NotStarted => /* ZIO.debug("isAlive in RunLoopAccess 1") *> */ ZIO.succeed(true)
case RunloopState.Started(runloop) => /* ZIO.debug("isAlive in RunLoopAccess 2") *> */ runloop.isAlive
case RunloopState.Finalized => ZIO.debug("isAlive in RunLoopAccess 3") *> ZIO.succeed(false)
}

/**
* No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped.
*/
Expand Down
58 changes: 43 additions & 15 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package zio.kafka.producer

import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, ProducerRecord, RecordMetadata }
import scala.jdk.CollectionConverters._

import org.apache.kafka.clients.producer.{ Producer => JProducer }
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo }
import org.apache.kafka.common.Metric
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.PartitionInfo

import zio._
import zio.kafka.serde.Serializer
import zio.kafka.utils.SslHelper
import zio.stream.{ ZPipeline, ZStream }

import scala.jdk.CollectionConverters._
import zio.stream.ZPipeline
import zio.stream.ZStream

trait Producer {

Expand Down Expand Up @@ -182,6 +189,11 @@ trait Producer {
* Expose internal producer metrics
*/
def metrics: Task[Map[MetricName, Metric]]

/**
* Liveness check for this producer
*/
def isAlive: UIO[Boolean]
}

object Producer {
Expand Down Expand Up @@ -218,12 +230,17 @@ object Producer {
settings: ProducerSettings
): ZIO[Scope, Throwable, Producer] =
for {

alive <- Ref.make(true)
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.sendBufferSize
)
producer = new ProducerLive(javaProducer, sendQueue)
producer <- ZIO.acquireRelease(
ZIO.attempt(new ProducerLive(javaProducer, sendQueue, alive))
)(_ => alive.set(false))
_ <- producer.sendFromQueue.forkScoped

} yield producer

/**
Expand Down Expand Up @@ -357,11 +374,17 @@ object Producer {

}

private[producer] final class ProducerLive(
final private[producer] class ProducerLive(
private[producer] val p: JProducer[Array[Byte], Array[Byte]],
sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]
sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])],
alive: Ref[Boolean]
) extends Producer {

/**
* Liveness check for this producer
*/
override def isAlive: UIO[Boolean] = alive.get

override def produce(record: ProducerRecord[Array[Byte], Array[Byte]]): Task[RecordMetadata] =
produceAsync(record).flatten

Expand Down Expand Up @@ -470,13 +493,18 @@ private[producer] final class ProducerLive(
// Calls to 'send' may block when updating metadata or when communication with the broker is (temporarily) lost,
// therefore this stream is run on the blocking thread pool.
ZIO.blocking {
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
sendChunk(runtime, serializedRecords)
.flatMap(done.succeed(_))
}
.runDrain
{
ZStream
.fromQueueWithShutdown(sendQueue)
.mapZIO { case (serializedRecords, done) =>
sendChunk(runtime, serializedRecords)
.flatMap(done.succeed(_))
}
.runDrain
.tapDefect(c =>
ZIO.logWarningCause("defect in producer sendQueue, producer stopped", c) *> alive.set(false)
)
} <* (ZIO.logDebug("producer sendQueue stopped") *> alive.set(false))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package zio.kafka.producer

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.{ KafkaProducer, RecordMetadata }
import org.apache.kafka.clients.producer.{ KafkaProducer, Producer => JProducer, RecordMetadata }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidGroupIdException
import org.apache.kafka.common.serialization.ByteArraySerializer
Expand Down Expand Up @@ -83,6 +83,23 @@ object TransactionalProducer {
} yield producer
}

def fromJavaProducer(
rawProducer: JProducer[Array[Byte], Array[Byte]],
settings: TransactionalProducerSettings
): ZIO[Scope, Throwable, TransactionalProducer] =
for {
_ <- ZIO.attemptBlocking(rawProducer.initTransactions())
semaphore <- Semaphore.make(1)
alive <- Ref.make(true)
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.producerSettings.sendBufferSize
)
live <- ZIO.acquireRelease(ZIO.attempt(new ProducerLive(rawProducer, sendQueue, alive)))(_ => alive.set(false))
_ <- live.sendFromQueue.forkScoped

} yield new LiveTransactionalProducer(live, semaphore)

def make(settings: TransactionalProducerSettings): ZIO[Scope, Throwable, TransactionalProducer] =
for {
rawProducer <- ZIO.acquireRelease(
Expand All @@ -94,13 +111,6 @@ object TransactionalProducer {
)
)
)(p => ZIO.attemptBlocking(p.close(settings.producerSettings.closeTimeout)).orDie)
_ <- ZIO.attemptBlocking(rawProducer.initTransactions())
semaphore <- Semaphore.make(1)
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.producerSettings.sendBufferSize
)
live = new ProducerLive(rawProducer, sendQueue)
_ <- live.sendFromQueue.forkScoped
} yield new LiveTransactionalProducer(live, semaphore)
liveTransactionalProducer <- fromJavaProducer(rawProducer, settings)
} yield liveTransactionalProducer
}
Loading