Combining ZIO's powerful effect system with ZStream allows for expressive and efficient streaming computations, but the step between ZIO and ZStream can be confusing for the beginner. This tutorial will guide you in using ZSink, ZStream.fromZIO, and ZStream.runHead in a Scala application. We'll develop a simple step-by-step application to demonstrate these concepts.
Prerequisites
Basic understanding of Scala and functional programming
Familiarity with ZIO 2.x library
Setting Up Your Environment
Ensure Scala (2.13.x or 3.x) and sbt are installed. Add ZIO 2 and ZIO Streams to your build.sbt:
"dev.zio" %% "zio" % "2.0.21",
"dev.zio" %% "zio-streams" % "2.0.21"
Introduction
A couple of weeks ago, I wrote a post about ZIO:s monadic nature. As ZIO is a monad we can use map and flatMap to chain effects, resulting in a new monad. ZStreams are also monads in the same way.
We often use for-comprehensions to combine the maps and flatMaps in a more readable way.
Example:
val effectOne: IO[IOException, Unit] = Console.printLine("One")
val effectTwo: IO[IOException, Unit] = Console.printLine("Two")
val combination: ZIO[Any, IOException, Unit] = for {
one <- effectOne
two <- effectTwo
} yield ()
As you can see, the combination of the two effects is also an effect, stated as a ZIO-monad. If we desugar the for comprehension we get:
val combination: ZIO[Any, IOException, Unit] = effectOne
.flatMap(one =>
effectTwo
)
We work with ZIOs all the way.
What is confusing to the beginner in using ZStream is that ZStreams are not ZIOs. So we cannot combine ZIOs and ZStreams in a for-comprehension; they are not the same monad.
To combine them we need to use different techniques. ZSink, ZStream.fromZIO, and ZStream.runHead are some easy ways to do this.
Example Application
We'll create an application that demonstrates chaining of streams and effects using a for-comprehension. This application will:
Create several ZIO effects.
Wrap these effects in streams.
Chain these streams in a for-comprehension.
Finally, use runHead to get the result of the chained computation.
Step 1: Creating Simple ZIO Effects
First, define some simple ZIO effects:
import zio._
val effectOne: ZIO[Any, Nothing, Int] = ZIO.succeed(42)
val effectTwo: ZIO[Any, Nothing, String] = ZIO.succeed("Meaning of life")
Step 2: Wrapping ZIO Effects in Streams
Wrap these ZIO effects in ZStream:
import zio.stream._
ZSink
Now, let us use a ZSink instead to consume a stream.
Step 1: Creating a Simple ZStream
Let's start by creating a ZStream.
val simpleStream: ZStream[Any, Nothing, Int] = ZStream.fromIterable(1 to 10)
This stream simply gives values from 1 to 10.
Step 2: Processing Streams with ZSink
ZSink is used to consume streams. Create a sink that sums all integers in a stream.
val sumSink: ZSink[Any, Nothing, Int, Nothing, Int] = ZSink.foldLeft(0)(_ + _)
Step 3: Running the Stream with a Sink
To run our stream with the sink and compute the sum of all elements emitted by the stream, you can use the run method.
val runStream: ZIO[Any, Nothing, Int] = simpleStream >>> sumSink
Putting It All Together
Combine all the pieces into a ZIO App to execute our example.
import zio._
import zio.stream._
object ZioStreamApp extends zio.ZIOAppDefault {
val simpleStream: ZStream[Any, Nothing, Int] = ZStream.fromIterable(1 to 10)
val sumSink: ZSink[Any, Nothing, Int, Nothing, Int] = ZSink.foldLeft(0)(_ + _)
val runStream: ZIO[Any, Nothing, Int] = simpleStream >>> sumSink
def run: ZIO[Any, Nothing, ExitCode] = {
val program = for {
sum <- runStream
_ <- Console.printLine(s"Sum is: $sum")
} yield ()
program.exitCode
}
}
Conclusion
This tutorial demonstrated a functional way to chain multiple streams and ZIO effects using a for-comprehension and how to consume the resulting stream to get a specific result with runHead. This approach showcases the power of ZIO 2 and ZStreams for composing complex asynchronous and concurrent operations in a type-safe and expressive manner.
In the second example, you've learned how to create a ZStream from an iterable using ZStream.fromIterable, consume the stream with ZSink, and use the sink to get a ZIO that can be executed and used to print the calculation result.
Comments
Post a Comment