FS2 Kafka

libraryDependencies += "com.avast" %% "sst-fs2-kafka" % "0.18.4"

This subproject initializes FS2 Kafka consumer or producer:

import cats.effect.Resource
import cats.syntax.flatMap.*
import com.avast.sst.fs2kafka.*
import fs2.kafka.{AutoOffsetReset, ProducerRecord, ProducerRecords}
import zio.*
import zio.interop.catz.*
import zio.interop.catz.implicits.*

implicit val runtime: Runtime[ZEnv] = zio.Runtime.default // this is just needed in example

for {
  consumer <- Fs2KafkaModule.makeConsumer[Task, String, String](
    ConsumerConfig(List("localhost:9092"), groupId = "test", autoOffsetReset = AutoOffsetReset.Earliest), None, None
  )
  _ <- Resource.eval(consumer.subscribeTo("test"))
  consumerStream <- Resource.eval(consumer.stream)
} yield consumerStream

The configuration of Kafka client is very large therefore you can either use the provided configuration case class, or you can use the underlying ConsumerSettings/ProducerSettings builders directly.

The configuration case classes contain an “escape hatch” into the full world of Kafka client configuration options via untyped properties. This is there to be flexible in case it is needed. Documentation of all the configuration properties is available here:

Beware that there is an optional dependency on jackson-databind for the default implementation of SASL/OAUTHBEARER in kafka-clients. You need to provide it explicitly: https://kafka.apache.org/documentation/#security_sasl_oauthbearer_clientconfig