Producer

A producer is a process that attaches to a topic and publishes messages to a Pulsar broker.

Neutron models it via a tagless algebra.

import org.apache.pulsar.client.api.{ MessageId, ProducerStats }

trait Producer[F[_], E] {
  def send(msg: E): F[MessageId]
  def send(msg: E, properties: Map[String, String]): F[MessageId]
  def stats: F[ProducerStats]
  def lastSequenceId: F[Long]
}

As well as the send_ equivalent that discards the MessageId and returns F[Unit]. We will expand on its methods in the next few sections.

Creating a Producer

It defines a few constructs, similarly as Consumer does. If we need Pulsar schema support, this is the constructor (also another one that takes in an extra argument for the producer settings):

def make[F[_]: FutureLift: Parallel: Sync, E](
    client: Pulsar.T,
    topic: Topic.Single,
    schema: Schema[E]
): Resource[F, Producer[F, E]] = ???

If we do not need Pulsar schema support, we need to provide a message encoder.

def make[F[_]: FutureLift: Parallel: Sync, E](
    client: Pulsar.T,
    topic: Topic.Single,
    messageEncoder: E => Array[Byte]
): Resource[F, Producer[F, E]] = ???

Check out all the available smart constructors either in the API or in the source code.

Once we have a connection and a topic, we can proceed with the creation of producer. If you missed that part, check out the connection and topic docs.

import dev.profunktor.pulsar._
import dev.profunktor.pulsar.schema.PulsarSchema

import cats.effect._

val schema = PulsarSchema.utf8

def creation(
    pulsar: Pulsar.T,
    topic: Topic.Single
): Resource[IO, Producer[IO, String]] =
  Producer.make[IO, String](pulsar, topic, schema)

Publishing a message

We can publish a message via the send method, which returns a MessageId we could potentially use to store in the application’s state. If you don’t need it, prefer to use send_ instead.

def simple(
    producer: Producer[IO, String]
): IO[Unit] =
  producer.send_("some-message")

Deduplication

Pulsar supports deduplication at the broker level.

In a nutshell, the deduplication mechanism is based on sequence ids, which can be set on every message on the underlying Java client.

To make things smoother, Neutron internally manages the creation of new sequence ids via the following interface.

trait SeqIdMaker[F[_], A] {
  def make(lastSeqId: Long, currentMsg: A): F[Long]
}

Users are responsible for keeping track of their messages (usually by an EventId or so), and return lastSeqId + 1 when the message is unique, or simply lastSeqId when it’s a duplicate.

You may use the instance constructor as follows:

val seqIdMaker = SeqIdMaker.instance[IO, String] { (lastSeqId, msg) =>
  // use `msg` to compare it to previous messages and determine SeqId
  IO.pure(lastSeqId + 1)
}

To enable deduplication, we can use the following setting.

Producer.Settings[IO, String]().withDeduplication(seqIdMaker)

The DeduplicationSuite showcases this feature (also see the run.sh script, where deduplication is enabled at the topic level).

Producer settings

The producer constructor can also be customized with a few extra options. E.g.

import java.nio.charset.StandardCharsets.UTF_8

import scala.concurrent.duration._

val batching =
  Producer.Batching.Enabled(maxDelay = 5.seconds, maxMessages = 500)

val encoder: String => Array[Byte] =
 _.getBytes(UTF_8)

val settings =
  Producer.Settings[IO, String]()
   .withBatching(batching)
   .withMessageKey(s => MessageKey.Of(s.hashCode.toString))
   .withShardKey(s => ShardKey.Of(s.hashCode.toString.getBytes))
   .withLogger(e => url => IO.println(s"Message: $e, URL: $url"))
   .withUnsafeConf(_.autoUpdatePartitions(false))

def custom(
    pulsar: Pulsar.T,
    topic: Topic.Single
): Resource[IO, Producer[IO, String]] =
  Producer.make(pulsar, topic, encoder, settings)

The withShardKey option is quite useful when we want to publish messages based on certain property of your messages, e.g. an EventId. This applies when you use it together with KeyShared subscriptions, on the consumer side. Internally, this sets the orderingKey of every message.

On the other hand, the withMessageKey option sets the “partitioning key” of every message, used for compacted topics. Also bear in mind that if you use a KeyShared subscription but don’t set the orderingKey (via withShardKey), the default MessageKey will be used.