Neutron
Neutron is a purely functional Apache Pulsar client for Scala, build on top of fs2 and the Java client for Pulsar.
It is published for Scala 2.13/3.3. You can include it in your project by adding the following dependencies.
- sbt
libraryDependencies ++= Seq( "dev.profunktor" %% "neutron-core" % "0.8.0", "dev.profunktor" %% "neutron-circe" % "0.8.0", "dev.profunktor" %% "neutron-function" % "0.8.0" )
- Maven
<dependencies> <dependency> <groupId>dev.profunktor</groupId> <artifactId>neutron-core_2.13</artifactId> <version>0.8.0</version> </dependency> <dependency> <groupId>dev.profunktor</groupId> <artifactId>neutron-circe_2.13</artifactId> <version>0.8.0</version> </dependency> <dependency> <groupId>dev.profunktor</groupId> <artifactId>neutron-function_2.13</artifactId> <version>0.8.0</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "dev.profunktor:neutron-core_2.13:0.8.0" implementation "dev.profunktor:neutron-circe_2.13:0.8.0" implementation "dev.profunktor:neutron-function_2.13:0.8.0" }
Quick start
Here’s a quick consumer / producer example using neutron. Note: both are fully asynchronous.
import scala.concurrent.duration._
import dev.profunktor.pulsar._
import dev.profunktor.pulsar.schema.PulsarSchema
import cats.effect._
import fs2.Stream
object Demo extends IOApp.Simple {
val config = Config.Builder.default
val topic =
Topic.Builder
.withName("my-topic")
.withConfig(config)
.withType(Topic.Type.NonPersistent)
.build
val subs =
Subscription.Builder
.withName("my-sub")
.withType(Subscription.Type.Exclusive)
.build
val schema = PulsarSchema.utf8
val resources: Resource[IO, (Consumer[IO, String], Producer[IO, String])] =
for {
pulsar <- Pulsar.make[IO](config.url)
consumer <- Consumer.make[IO, String](pulsar, topic, subs, schema)
producer <- Producer.make[IO, String](pulsar, topic, schema)
} yield consumer -> producer
val run: IO[Unit] =
Stream
.resource(resources)
.flatMap {
case (consumer, producer) =>
val consume =
consumer
.autoSubscribe
.evalMap(IO.println)
val produce =
Stream
.emit("test data")
.covary[IO]
.metered(3.seconds)
.evalMap(producer.send_)
consume.concurrently(produce)
}
.compile
.drain
}
0.8.0