Reader
A reader allows you to “manually position” the offset within a topic and reading all messages from a specified message onward. For instance, you can start from a specific MessageId
.
Neutron models it via a tagless algebra, as usual.
import scala.concurrent.duration.FiniteDuration
import fs2.Stream
object Reader {
sealed trait MessageAvailable
object MessageAvailable {
case object Yes extends MessageAvailable
case object No extends MessageAvailable
}
}
trait Reader[F[_], E] {
def read: Stream[F, E]
def read1: F[Option[E]]
def readUntil(timeout: FiniteDuration): F[Option[E]]
def messageAvailable: F[Reader.MessageAvailable]
}
There’s also a MessageReader
algebra, useful whenever you need more than the payload of the message, such as the MessageId
and MessageKey
.
Creating a Reader
It provides a few constructors as both Consumer
and Producer
do for schema and message decoders. E.g.
import dev.profunktor.pulsar._
import dev.profunktor.pulsar.Reader.Settings
import org.apache.pulsar.client.api.Schema
import cats.effect._
def make[F[_]: Sync, E](
client: Pulsar.T,
topic: Topic.Single,
schema: Schema[E],
settings: Settings[F, E]
): Resource[F, Reader[F, E]] = ???
If you’re interested in a MessageReader
instead, you can use messageReader
instead of make
. Check out all the available smart constructors either in the API or source code.
Once we have a Pulsar client and a topic, we can proceed with the creation of a reader. If you missed that part, check out the connection and topic docs.
import dev.profunktor.pulsar._
import dev.profunktor.pulsar.schema.PulsarSchema
import cats.effect._
val schema = PulsarSchema.utf8
def creation(
pulsar: Pulsar.T,
topic: Topic.Single
): Resource[IO, Reader[IO, String]] =
Reader.make[IO, String](pulsar, topic, schema)
Reading messages
We can use any of the available read
methods. E.g.
def simple(
reader: Reader[IO, String]
): IO[Unit] =
reader
.read
.evalMap(IO.println)
.compile
.drain
Or we can first ask whether there are available messages or not via messageAvailable
.
Reader settings
The reader constructor can also be customized with a few extra options. E.g.
import org.apache.pulsar.client.api.MessageId
val msgId: MessageId = null
val settings =
Reader.Settings[IO, String]()
.withStartMessageId(msgId)
.withReadCompacted
def custom(
pulsar: Pulsar.T,
topic: Topic.Single
): Resource[IO, Reader[IO, String]] =
Reader.make(pulsar, topic, schema, settings)
It is the responsibility of the application to know the specific MessageId
, which internally represents a Ledger ID, Entry ID, and Partition ID.