Consumer
A consumer is a process that attaches to a topic via a subscription, in order to receive messages.
Neutron models it via a tagless algebra.
import dev.profunktor.pulsar._
import fs2.Stream
import org.apache.pulsar.client.api.{ Message => JMessage, MessageId }
object Consumer {
case class Message[A](
id: MessageId,
key: MessageKey,
properties: Map[String, String],
raw: JMessage[Any],
payload: A
)
}
trait Consumer[F[_], E] {
def ack(id: MessageId): F[Unit]
def nack(id: MessageId): F[Unit]
def subscribe: Stream[F, Consumer.Message[E]]
def subscribe(id: MessageId): Stream[F, Consumer.Message[E]]
def autoSubscribe: Stream[F, E]
def unsubscribe: F[Unit]
}
We will expand on its methods in the next few sections.
Subscriptions
Pulsar supports multiple subscription modes, which can be created in Neutron via smart builders.
import dev.profunktor.pulsar._
val subs =
Subscription.Builder
.withName("my-sub")
.withType(Subscription.Type.Shared)
.build
There are four types of subscriptions: Exclusive
, Shared
, KeyShared
, and Failover
.
Creating a Consumer
There are a few smart constructors we can use to create a consumer. If we want Pulsar schema support, we have the following one (also another one that takes an extra argument for the consumer settings).
import org.apache.pulsar.client.api.Schema
def make[F[_]: FutureLift: Sync, E](
client: Pulsar.T,
topic: Topic,
sub: Subscription,
schema: Schema[E]
): Resource[F, Consumer[F, E]] = ???
If we do not want Pulsar schema support, then we need to provide a message decoder, and ideally a decoding error handler, which are both functions.
def make[F[_]: FutureLift: Sync, E](
client: Pulsar.T,
topic: Topic,
sub: Subscription,
messageDecoder: Array[Byte] => F[E],
decodingErrorHandler: Throwable => F[OnFailure] // defaults to Raise
): Resource[F, Consumer[F, E]] = ???
For example, an UTF-8 encoded string could be the following one:
import java.nio.charset.StandardCharsets.UTF_8
val utf8Decoder: Array[Byte] => IO[String] =
bs => IO(new String(bs, UTF_8))
Or we could use a JSON decoder powered by Circe:
import io.circe.Decoder
def jsonDecoder[A: Decoder]: Array[Byte] => IO[A] =
bs => IO.fromEither(io.circe.parser.decode[A](new String(bs, UTF_8)))
If we do not specify the decoding error handler, then the default is to re-raise the error when a message cannot be decoded. That’s usually a sane default, but every case is different and you might want to ack
or nack
the message, which can be done as follows:
val handler: Throwable => IO[Consumer.OnFailure] =
e => IO.println(s"[error] - ${e.getMessage}").as(Consumer.OnFailure.Nack)
There are many smart constructors to ensure you create a consumer with a valid state. Check out all of them in the API or source code.
Once we have a subscription, we can create a consumer, assuming we also have a pulsar connection and a topic.
If you missed that part, check out the connection and topic docs.
import dev.profunktor.pulsar.schema.PulsarSchema
import cats.effect._
val schema = PulsarSchema.utf8
def creation(
pulsar: Pulsar.T,
topic: Topic
): Resource[IO, Consumer[IO, String]] =
Consumer.make[IO, String](pulsar, topic, subs, schema)
Auto-subscription
This is the easiest way to get started with a consumer. Once a message is received, it will be automatically acknowledged (ack) by us. It is done via the autoSubscribe
method, as shown below.
def auto(
consumer: Consumer[IO, String]
): IO[Unit] =
consumer
.autoSubscribe
.evalMap(IO.println)
.compile
.drain
In this case, autoSubscribe
returns Stream[IO, String]
, meaning we directly get the body of the message.
Manual ack
In most serious applications, this should be the preferred way to consume messages, to avoid losing messages whenever the application fails after consuming a message.
For this purpose, we can use the subscribe
method, as shown in the example below.
def process(payload: String): IO[Unit] =
IO.println(s"Payload: $payload")
def manual(
consumer: Consumer[IO, String]
): IO[Unit] =
consumer
.subscribe
.evalMap { case Consumer.Message(id, _, _, _, payload) =>
process(payload) // pretend `process` might raise an error
.flatMap(_ => consumer.ack(id))
.handleErrorWith(e => IO.println(e) *> consumer.nack(id))
}
.compile
.drain
It allows us to decide whether to ack or nack a message (it will be re-delivered by Pulsar).
Manual subscription
As shown in the section above, we can use the subscribe
method to manually handle acknowledgements. Additionally, we have another variant of subscribe
that takes a MessageId
as an argument.
def subscribe(id: MessageId): Stream[F, Consumer.Message[E]]
This type of subscription will override the SubscriptionInitialPosition
set in the settings and point this consumer to a specific message id — internally done via seekAsync
. This could be useful when we know exactly how far we want to rewind or where exactly we would like to start consuming.
Unsubscribe
We can unsubscribe from a topic via the unsubscribe
method, which implies deleting the subscription. We can do this whenever we are sure the process is over and we no longer need such subscription.
def finish(
consumer: Consumer[IO, String]
): IO[Unit] =
consumer
.autoSubscribe
.evalMap(IO.println)
.onFinalize(consumer.unsubscribe)
.compile
.drain
This functionality can be enabled to be performed automatically via the autoUnsubscribe
option.
Consumer settings
When creating a consumer, we can choose to customize the default options. E.g.
import java.nio.charset.StandardCharsets.UTF_8
import org.apache.pulsar.client.api.{
DeadLetterPolicy,
SubscriptionInitialPosition
}
val deadLetterPolicy =
DeadLetterPolicy
.builder()
.deadLetterTopic("foo")
.maxRedeliverCount(100)
.retryLetterTopic("bar")
.build()
val utf8Decoder: Array[Byte] => IO[String] =
bs => IO(new String(bs, UTF_8))
val handler: Throwable => IO[Consumer.OnFailure] =
e => IO.println(s"[error] - ${e.getMessage}").as(Consumer.OnFailure.Nack)
val settings =
Consumer.Settings[IO, String]()
.withInitialPosition(SubscriptionInitialPosition.Earliest)
.withLogger(e => url => IO.println(s"Message: $e, URL: $url"))
.withAutoUnsubscribe
.withReadCompacted
.withDeadLetterPolicy(deadLetterPolicy)
.withUnsafeConf(_.autoUpdatePartitions(false))
def custom(
pulsar: Pulsar.T,
topic: Topic
): Resource[IO, Consumer[IO, String]] =
Consumer.make[IO, String](
pulsar,
topic,
subs,
utf8Decoder,
handler,
settings
)