liam-drinan-vcO5nYFjBlE-unsplash.jpg

ZStream companion object API

 
0
このエントリーをはてなブックマークに追加
Kazuki Moriyama
Kazuki Moriyama (森山 和樹)

zioの非同期streamコンポーネントであるZStreamのコンパニオンオブジェクトのAPI集。

但し書き

以下のコード例はすべて最終的にZStreamをrunしたZIOに変換されたものだが、最後のexitCodeとZIO自体のrun部分は省略してあるので適宜保管して読んでほしい。

absolve

EitherのstreamからRightなものまでを拾う。
またLeftはstreamをrunしたZIOのエラー型に統合される。

ZStream
  .absolve(ZStream(Right(1), Right(2), Right(3)))
  .foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

ZStream
  .absolve(ZStream(Right(1), Left("error"), Right(3)))
  .foreach(i => putStrLn(i.toString))
  .catchSome { case s: String => putStrLn(s) }
// 1
// error

access

環境の値にアクセスして、その値を持つZStreamを作成できる。

ZStream
  .access[Has[String]](_.get)
  .foreach(i => putStrLn(i))
  .provideSomeLayer[ZEnv](ZLayer.succeed("str"))
// str

accessM

accessの環境値をeffectを伴いながら取得できる。

ZStream
  .accessM[Has[String]](s => ZIO.succeed(s.get))
  .foreach(i => putStrLn(i))
  .provideSomeLayer[ZEnv](ZLayer.succeed("str"))
// str

accessStream

環境値を取得しつつ、それがZStreamの場合に対応できる。

ZStream
  .accessStream[Has[String]](s => ZStream(s.get, s.get, s.get))
  .foreach(i => putStrLn(i))
  .provideSomeLayer[ZEnv](ZLayer.succeed("str"))
// str
// str
// str

apply(ZManaged)

ZManagedからZStreamを作成できる。
ZStreamはZManagedがNoneのエラー値を持つZIOを排出してきた時点でstopする。
ZStream自体がエラーにならないことに注意。

ZStream(ZManaged.succeed(ZIO.succeed(Chunk(1)))).foreach(i =>
  putStrLn(i.toString)
)
// 1が無限にでる

ZStream(ZManaged.succeed(ZIO.fail(None).as(Chunk(1))))
  .foreach(i => putStrLn(i.toString))
// 何も出ない

apply(*A)

任意の数の値をとって、それを排出するZStreamを作成する。

ZStream(1, 2, 3).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

bracket

値を取得しつつ、そのクリーンアップ処理を差し込む事ができる。

ZStream
  .bracket(ZIO(1))(a => ZIO.succeed(println(s"clean up: ${a.toString}")))
  .foreach(i => putStrLn(i.toString))
// 1
// clean up: 1

bracketExit

bracketでExitを利用できる。

ZStream
  .bracketExit(ZIO(1))((a, e) => URIO(println(s"finish: $a $e")))
  .foreach(i => putStrLn(i.toString))
// 1
// finish: 1 Success(())

crossN

stream同士のデカルト積をとる。
引数のstreamの数の違いでいくつかのvariantが存在する。

ZStream
  .crossN(ZStream(1, 2), ZStream(3, 4))((a, b) => a + b)
  .foreach(i => putStrLn(i.toString))
// 4
// 5
// 5
// 6

concatAll

streamのChunk(ZIOが持つArrayみたいなデータ型)を結合する。

ZStream
  .concatAll(Chunk(ZStream(1, 2), ZStream(3, 4)))
  .foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
// 4

die

アプリケーション外のエラーとして失敗するようなstreamを生成する。

ZStream
  .die(new Exception("error")) // 死ぬstreamを
  .runDrain // とりあえず走らせて
  .sandbox // アプリケーション外のエラーをアプリケーションエラーとして取り出して
  .catchSome { case e => ZIO(e) } // catchして
  .tap(e => putStrLn(e.toString)) // print
// Traced(Die(java.lang.Exception: error), ... )

dieMessage

dieのメッセージのStringだけから死ぬようにできるバージョン。

ZStream
  .dieMessage("error")
  .runDrain
  .sandbox
  .catchSome(e => ZIO(e))
  .tap(i => putStrLn(i.toString))
// Traced(Die(java.lang.RuntimeException: error), ... )

done

Exit値からstreamを作成する。

ZStream.done(Exit.succeed(1)).foreach(i => putStrLn(i.toString))
// 1

empty

空っぽのstreamを作成する。

ZStream.empty.foreach(i => putStrLn(i.toString))
// 何も出ない

environment

環境値に依存するstreamを作成する。
環境値はrunしたあとにZIOに変換されたあとでも、前でもどちらのタイミングでもprovideできる。

ZStream
  .environment[Has[String]]
  .foreach(r => putStrLn(r.get[String]))
  .provideCustomLayer(ZLayer.succeed("str"))
// str

fail

アプリケーションエラーとして死ぬstreamを作成する。

ZStream
  .fail(new Exception("error"))
  .runDrain
  .catchSome { case e => ZIO(e) }
  .tap(i => putStrLn(i.toString))
// java.lang.Exception: error

finalizer

終了後の処理を持つstreamを作成する。
このメソッドで作成されたstreamはUnitを唯一要素として持つ。

ZStream.finalizer(UIO(println("finish"))).foreach(i => putStrLn(i.toString))
// ()
// finish

fromChunk

Chunkからstreamを作成する。

ZStream.fromChunk(Chunk(1, 2)).foreach(i => putStrLn(i.toString))
// 1
// 2

fromHub/fromChunkHub

Hubというzioが持つpub/sub実装からstreamを作成する。
中身がChunk版もある。

for {
  hub   <- ZHub.unbounded[Int]
  stream = ZStream.fromHub(hub)
  fiber <- stream.foreach(i => putStrLn(i.toString)).fork
  _     <- clock.sleep(1.seconds) // 適当にsleepしないとstreamのセットアップが終了する前にpublishしてしまう
  _     <- hub.publish(1)
  _     <- fiber.join
} yield ()
// 1

for {
  hub   <- ZHub.unbounded[Chunk[Int]]
  stream = ZStream.fromChunkHub(hub)
  fiber <- stream.foreach(i => putStrLn(i.toString)).fork
  _     <- clock.sleep(1.seconds)
  _     <- hub.publish(Chunk(1, 2, 3))
  _     <- fiber.join
} yield ()
// 1
// 2
// 3

fromHubManaged/fromChunkHubManaged

fromHubとかのようにHubからstreamを作成するのだが、streamを作成したあとの処理を追加することによってstreamの作成を待ってpublishなどの処理を行うことができる。
中身がChunk版もある。

val fromHubManaged = for {
  p      <- Promise.make[Nothing, Unit]
  hub    <- ZHub.unbounded[Int]
  managed = ZStream.fromHubManaged(hub).tapM(_ => p.succeed(()))
  stream  = ZStream.unwrapManaged(managed)
  fiber  <- stream.foreach(i => putStrLn(i.toString)).fork
  _      <- p.await
  _      <- hub.publish(1)
  _      <- fiber.join
} yield ()
// 1

for {
  p      <- Promise.make[Nothing, Unit]
  hub    <- ZHub.unbounded[Chunk[Int]]
  managed = ZStream.fromChunkHubManaged(hub).tapM(_ => p.succeed(()))
  stream  = ZStream.unwrapManaged(managed)
  fiber  <- stream.foreach(i => putStrLn(i.toString)).fork
  _      <- p.await
  _      <- hub.publish(Chunk(1, 2, 3))
  _      <- fiber.join
} yield ()
// 1
// 2
// 3

fromHubWithShutdown/fromChunkHubWithShutdown

Hubからstreamを作成しつつ、streamが止まったあとにhubをshutdownする。
中身がChunk版もある。

for {
  hub   <- ZHub.unbounded[Int]
  stream = ZStream.fromHubWithShutdown(hub)
  fiber <- stream.take(1).runCollect.tap(i => putStrLn(i.toString)).fork
  _     <- clock.sleep(1.seconds)
  _     <- hub.publish(1)
  _     <- fiber.join
  s     <- hub.isShutdown
  _     <- putStrLn(s.toString)
} yield ()
// Chunk(1)
// true

for {
  hub   <- ZHub.unbounded[Chunk[Int]]
  stream = ZStream.fromChunkHubWithShutdown(hub)
  _     <- stream.runHead.tap(i => putStrLn(i.toString)).fork
  _     <- clock.sleep(1.seconds)
  _     <- hub.publish(Chunk(1, 2, 3))
  _     <- clock.sleep(1.seconds)
  s     <- hub.isShutdown
  _     <- putStrLn(s.toString)
} yield ()
// Some(1)
// true

fromHubManagedWithShutdown/fromChunkHubManagedWithShutdown

Managedから作ることでstreamの作成後を保証し、streamが閉じたあとにhubも閉じる。
モリモリ。
中身がChunk版もある。

for {
  p      <- Promise.make[Nothing, Unit]
  hub    <- ZHub.unbounded[Int]
  managed = ZStream.fromHubManagedWithShutdown(hub).tapM(_ => p.succeed(()))
  stream  = ZStream.unwrapManaged(managed)
  fiber  <- stream.take(1).runCollect.tap(i => putStrLn(i.toString)).fork
  _      <- p.await
  _      <- hub.publish(1)
  _      <- fiber.join
  s      <- hub.isShutdown
  _      <- putStrLn(s.toString)
} yield ()
// Chunk(1)
// true

for {
  p      <- Promise.make[Nothing, Unit]
  hub    <- ZHub.unbounded[Chunk[Int]]
  managed =
    ZStream.fromChunkHubManagedWithShutdown(hub).tapM(_ => p.succeed(()))
  stream  = ZStream.unwrapManaged(managed)
  _      <- stream.runHead.tap(i => putStrLn(i.toString)).fork
  _      <- p.await
  _      <- hub.publish(Chunk(1, 2, 3))
  s      <- hub.isShutdown
  _      <- putStrLn(s.toString)
} yield ()
// Some(1)
// true

fromQueue/fromChunkQueue

QueueというZIOの非同期queue実装からstreamを作成する。
中身がChunk版もある。

val fromQueue = for {
  q     <- Queue.unbounded[Int]
  stream = ZStream.fromQueue(q)
  fiber <- stream.take(2).runCollect.tap(i => putStrLn(i.toString)).fork
  _     <- q.offer(1)
  _     <- q.offer(2)
  _     <- fiber.join
} yield ()
// Chunk(1,2)

for {
  q     <- Queue.unbounded[Chunk[Int]]
  stream = ZStream.fromChunkQueue(q)
  fiber <- stream.foreach(i => putStrLn(i.toString)).fork
  _     <- q.offer(Chunk(1, 2, 3))
  _     <- fiber.join
} yield ()
// 1
// 2
// 3

fromQueueWithShutdown/fromChunkQueueWithShutdown

Queueからstreamを作成するのだが、streamが閉じたあとにqueueも閉じる。

for {
  q     <- Queue.unbounded[Int]
  stream = ZStream.fromQueueWithShutdown(q)
  fiber <- stream.take(2).runCollect.tap(i => putStrLn(i.toString)).fork
  _     <- q.offer(1)
  _     <- q.offer(2)
  _     <- fiber.join
  s     <- q.isShutdown
  _     <- putStrLn(s.toString)
} yield ()
// Chunk(1,2)
// true

for {
  q     <- Queue.unbounded[Chunk[Int]]
  stream = ZStream.fromChunkQueueWithShutdown(q)
  fiber <- stream.take(3).runCollect.tap(i => putStrLn(i.toString)).fork
  _     <- q.offer(Chunk(1, 2, 3))
  _     <- fiber.join
  s     <- q.isShutdown
  _     <- putStrLn(s.toString)
} yield ()
// Chunk(1,2,3)
// true

fromChunks

Chunkたちからstreamを作成する。

ZStream
  .fromChunks(Chunk(1, 2), Chunk(3, 4))
  .foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
// 4

fromEffect

ZIOからstreamを作成。

ZStream.fromEffect(ZIO(1)).foreach(i => putStrLn(i.toString))
// 1

fromEffectOption

Noneで失敗するZIOからstreamを作成する。
Noneの場合にもstreamは失敗せずに、何も値が入っていないだけ。

ZStream
  .fromEffectOption(ZIO.fromOption(Some(1)))
  .foreach(i => putStrLn(i.toString))
// 1

ZStream
  .fromEffectOption(ZIO.fromOption(None))
  .foreach(i => putStrLn(i.toString))
// Nothing to show

fromIterable/fromIterableM

Iterableからstreamを作成する。
MはZIO[_, _, Iterable]から作る。

ZStream.fromIterable(Seq(1, 2, 3)).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

ZStream.fromIterableM(ZIO(Seq(1, 2, 3))).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

fromIterator/fromIteratorEffect

Iteratorから作成。
EffectはZIO[_, _, Iterator]から。
JavaのIteratorから作成する版もある。

ZStream.fromIterator(Seq(1, 2, 3).iterator).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

ZStream
  .fromIteratorEffect(ZIO(Seq(1, 2, 3).iterator))
  .foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

fromIteratorManaged

ZManaged入のIteratorからstreamを作成する。
Javaのiteratorから作成する版もある。

ZStream
  .fromIteratorManaged(ZManaged.succeed(Seq(1, 2, 3).iterator))
  .foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

fromIteratorTotal

Iteratorからstreamを作成するのだが、突っ込むiteratorは例外を射出してはいけない。
JavaのIteratorから作成する版もある。

ZStream
  .fromIteratorTotal(Seq(1, 2, 3).iterator)
  .foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

fromSchedule

Scheduleからstreamを作成する。

ZStream
  .fromSchedule(Schedule.recurs(4))
  .runCollect
  .tap(i => putStrLn(i.toString))
// Chunk(0,1,2,3)

fromTQueue

zioのstmモジュールのTQueueからstreamを作成する。

for {
  tq    <- STM.atomically(TQueue.unbounded[Int])
  stream = ZStream.fromTQueue(tq)
  fiber <- stream.take(2).runCollect.tap(i => putStrLn(i.toString)).fork
  _     <- STM.atomically(tq.offer(1))
  _     <- STM.atomically(tq.offer(2))
  _     <- fiber.join
} yield ()
// Chunk(1,2)

halt

Causeからstreamを作成する。

ZStream
  .halt(Cause.fail(new Exception("error")))
  .runDrain
  .foldCause(println, _ => ())
// Fail(java.lang.Exception: error)

iterate

reduceの様に初期値に与えた関数を繰り返し適用した結果の列をstreamとして作成する。

ZStream.iterate(1)(_ + 1).take(3).foreach(i => putStrLn(i.toString))
// Chunk(1,2,3)

managed

ZManagedからstreamを作成する。

ZStream.managed(ZManaged.succeed(1)).foreach(i => putStrLn(i.toString))
// 1

mergeAll

与えたstreamを並列にマージする。
並列数は引数で制御できる。

ZStream
  .mergeAll(3)(ZStream(1, 2), ZStream(3, 4), ZStream(5, 6))
  .foreach(i => putStrLn(i.toString))
// 実行するたびに順序が違う
// 3
// 4
// 5
// 6
// 1
// 2

ZStream
  .mergeAll(2)(ZStream(1, 2), ZStream(3, 4), ZStream(5, 6))
  .foreach(i => putStrLn(i.toString))
// 1~4は実行のたびに順序が違うが、5,6はかならず最後
// 3
// 4
// 1
// 2
// 5
// 6

mergeAllUnbounded

並列数が限界突破したmergeAll。

ZStream
  .mergeAllUnbounded()(ZStream(1, 2), ZStream(3, 4), ZStream(5, 6))
  .foreach(i => putStrLn(i.toString))
// 実行するたびに順序が違う
// 3
// 4
// 5
// 6
// 1
// 2

never

while {}のようなstreamを作成する。

ZStream.never.foreach(i => putStrLn(i.toString))
// 何も出ないし、終了しない

paginate/paginateM

stateを引き回しつつ、Some値のときは次の値も計算するがNoneのときはその回の処理でstreamを終了する。

ZStream
  .paginate(1)(i => (i.toString, Option.when(i < 3)(i + 1)))
  .foreach(i => putStrLn(i))
// 1
// 2
// 3

ZStream
  .paginateM(1)(i => ZIO((i.toString, Option.when(i < 3)(i + 1))))
  .foreach(i => putStrLn(i))
// 1
// 2
// 3

paginateChunk/paginateChunkM

paginate系のChunk版。

ZStream
  .paginateChunk(1)(i =>
    (Chunk(i.toString, i.toString), Option.when(i < 3)(i + 1))
  )
  .foreach(i => putStrLn(i))
// 1
// 1
// 2
// 2
// 3
// 3

ZStream
  .paginateChunkM(1)(i =>
    ZIO((Chunk(i.toString, i.toString), Option.when(i < 3)(i + 1)))
  )
  .foreach(i => putStrLn(i))
// 1
// 1
// 2
// 2
// 3
// 3

range

minとmaxを指定してその間の値を生成する。
minは含まれるがmaxは含まれない。

ZStream.range(1, 4).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

repeat/repeatEffect

同じ値を繰り返すstreamを作成する。

ZStream.repeat(1).take(3).foreach(i => putStrLn(i.toString))
// 1
// 1
// 1
val repeatEffect =
  ZStream.repeatEffect(ZIO(1)).take(3).foreach(i => putStrLn(i.toString))
// 1
// 1
// 1

repeatEffectOption

Noneで失敗するZIOをとってstreamを作成する。
NoneでZIOが失敗する場合でもstream自体は失敗せずに終了する。

for {
  ref   <- Ref.make(1)
  stream = ZStream.repeatEffectOption(
             ref.get.flatMap(i =>
               if (i < 4) ref.update(_ + 1).as(i)
               else ZIO.fail(None)
             )
           )
  _     <- stream.foreach(i => putStrLn(i.toString))
} yield ()
// 1
// 2
// 3

repeatEffectChunk

ZIOのChunkからstreamを作成する。

ZStream
  .repeatEffectChunk(ZIO(Chunk(1, 2)))
  .take(3)
  .foreach(i => putStrLn(i.toString))
// 1
// 2
// 1

repeatWith/repeatEffectWith

Scheduleで何度値の取得を行うか制御する。

ZStream.repeatWith(1, Schedule.recurs(2)).foreach(i => putStrLn(i.toString))
// 1
// 1
// 1

ZStream
  .repeatEffectWith(ZIO(1), Schedule.recurs(2))
  .foreach(i => putStrLn(i.toString))
// 1
// 1
// 1

service

Hasに囲まれた形で依存を宣言する。

ZStream
  .service[String]
  .foreach(i => putStrLn(i))
  .provideCustomLayer(ZLayer.succeed("str"))
// str

services

Has付きで依存を複数宣言できる。
いくつ依存を定義できるかで複数のvariantが存在する。

ZStream
  .services[String, Int]
  .foreach(a => putStrLn(a.toString()))
  .provideCustomLayer(ZLayer.succeed("str") ++ ZLayer.succeed(1))
// (str,1)

serviceWith

依存からZIO付きで値を取り出せる。

trait A {
  def a: UIO[String]
}

ZStream
  .serviceWith[A](_.a)
  .foreach(i => putStrLn(i))
  .provideCustomLayer(ZLayer.succeed(new A {
    override def a: UIO[String] = UIO("str")
  }))
// str

succeed

単一要素を持つstreamを作成。

ZStream.succeed(1).foreach(i => putStrLn(i.toString))
// 1

tick

Durationを渡すとその間隔ごとにUnitを吐き出す。

ZStream.tick(1.seconds).foreach(i => putStrLn(i.toString))
// 1秒おきに()

unit

単一のUnit値をもつstream。

ZStream.unit.foreach(i => putStrLn(i.toString))
// ()

unfold/unfoldM

paginateの様にSome値を返せば継続、Noneならば終了。
paginateと違うのはNoneのときにその回のイテレーションで生まれた値も捨てる。

ZStream
  .unfold(1)(i => Option.when(i < 4)((i.toString, i + 1)))
  .foreach(i => putStrLn(i))
// 1
// 2
// 3

ZStream
  .unfoldM(1)(i => ZIO(Option.when(i < 4)((i.toString, i + 1))))
  .foreach(i => putStrLn(i))
// 1
// 2
// 3

unfoldChunk/unfoldChunkM

unfold系の中身がChunk版。

ZStream
  .unfoldChunk(1)(i =>
    Option.when(i < 4)((Chunk(i.toString, i.toString), i + 1))
  )
  .foreach(i => putStrLn(i))
// 1
// 1
// 2
// 2
// 3
// 3

ZStream
  .unfoldChunkM(1)(i =>
    ZIO(Option.when(i < 4)((Chunk(i.toString, i.toString), i + 1)))
  )
  .foreach(i => putStrLn(i))
// 1
// 1
// 2
// 2
// 3
// 3

unwrap/unwrapManaged

ZIOやZManagedに包まれたstreamを取り出す。

ZStream.unwrap(ZIO(ZStream(1, 2, 3))).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

ZStream
  .unwrapManaged(ZManaged.succeed(ZStream(1, 2, 3)))
  .foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

when/whenM

Boolの値によってstream引数のstream、もしくはからのstreamを返す。

ZStream.when(true)(ZStream(1, 2, 3)).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

ZStream.when(false)(ZStream(1, 2, 3)).foreach(i => putStrLn(i.toString))
// 何も出ない

ZStream.whenM(ZIO(true))(ZStream(1, 2, 3)).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

ZStream.whenM(ZIO(false))(ZStream(1, 2, 3)).foreach(i => putStrLn(i.toString))
// 何も出ない

whenCase/whenCaseM

PartialFunctionでZStreamが返すと全体がそのstreamになる。
PartialFunctionが何も返さないときはempty。

ZStream
  .whenCase(1) { case 1 => ZStream(1, 2, 3) }
  .foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

ZStream
  .whenCase(2) { case 1 => ZStream(1, 2, 3) }
  .foreach(i => putStrLn(i.toString))
// 何も出ない

ZStream
  .whenCaseM(ZIO(1)) { case 1 => ZStream(1, 2, 3) }
  .foreach(i => putStrLn(i.toString))
// 1
// 2
// 3

ZStream
  .whenCaseM(ZIO(2)) { case 1 => ZStream(1, 2, 3) }
  .foreach(i => putStrLn(i.toString))
// 何も出ない

zipN

複数のstreamをzipする。
いくつzipするかでvariantが存在する。

ZStream
  .zipN(ZStream(1, 2), ZStream(3, 4))(_ + _)
  .foreach(i => putStrLn(i.toString))
// 4
// 6
info-outline

お知らせ

K.DEVは株式会社KDOTにより運営されています。記事の内容や会社でのITに関わる一般的なご相談に専門の社員がお答えしております。ぜひお気軽にご連絡ください。