Neutron

Neutron is a purely functional Apache Pulsar client for Scala, build on top of fs2 and the Java client for Pulsar.

It is published for Scala 2.13/3.1. You can include it in your project by adding the following dependencies.

sbt
libraryDependencies ++= Seq(
  "dev.profunktor" %% "neutron-core" % "0.7.0",
  "dev.profunktor" %% "neutron-circe" % "0.7.0",
  "dev.profunktor" %% "neutron-function" % "0.7.0"
)
Maven
<dependencies>
  <dependency>
    <groupId>dev.profunktor</groupId>
    <artifactId>neutron-core_2.13</artifactId>
    <version>0.7.0</version>
  </dependency>
  <dependency>
    <groupId>dev.profunktor</groupId>
    <artifactId>neutron-circe_2.13</artifactId>
    <version>0.7.0</version>
  </dependency>
  <dependency>
    <groupId>dev.profunktor</groupId>
    <artifactId>neutron-function_2.13</artifactId>
    <version>0.7.0</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "dev.profunktor:neutron-core_2.13:0.7.0"
  implementation "dev.profunktor:neutron-circe_2.13:0.7.0"
  implementation "dev.profunktor:neutron-function_2.13:0.7.0"
}

Quick start

Here’s a quick consumer / producer example using neutron. Note: both are fully asynchronous.

import scala.concurrent.duration._

import dev.profunktor.pulsar._
import dev.profunktor.pulsar.schema.PulsarSchema

import cats.effect._
import fs2.Stream

object Demo extends IOApp.Simple {

  val config = Config.Builder.default

  val topic  =
    Topic.Builder
      .withName("my-topic")
      .withConfig(config)
      .withType(Topic.Type.NonPersistent)
      .build

  val subs =
    Subscription.Builder
      .withName("my-sub")
      .withType(Subscription.Type.Exclusive)
      .build

  val schema = PulsarSchema.utf8

  val resources: Resource[IO, (Consumer[IO, String], Producer[IO, String])] =
    for {
      pulsar   <- Pulsar.make[IO](config.url)
      consumer <- Consumer.make[IO, String](pulsar, topic, subs, schema)
      producer <- Producer.make[IO, String](pulsar, topic, schema)
    } yield consumer -> producer

  val run: IO[Unit] =
    Stream
      .resource(resources)
      .flatMap {
        case (consumer, producer) =>
          val consume =
            consumer
              .autoSubscribe
              .evalMap(IO.println)

          val produce =
            Stream
              .emit("test data")
              .covary[IO]
              .metered(3.seconds)
              .evalMap(producer.send_)

          consume.concurrently(produce)
      }
      .compile
      .drain

}