Head_daemon.Daemon
type t = {
daemon : unit Tezos_base.TzPervasives.tzresult Lwt.t;
head_stream_stopper : Tezos_rpc.Context.stopper;
applied_block_stream_stopper : Tezos_rpc.Context.stopper;
}
fair_lwt_stream_get push s1 s2
aims to get the value available from s1
and s2
and push
them to a stream, so that, all the values pushed to that stream are interleaved to preserve some fairness.
val make_stream_daemon :
on_head:
((Tezos_base.TzPervasives.Block_hash.t * Tezos_base.Block_header.t) ->
(unit, Tezos_base.TzPervasives.Error_monad.tztrace) Stdlib.result Lwt.t) ->
on_applied_block:
((Tezos_base.TzPervasives.Block_hash.t * Tezos_base.Block_header.t) ->
(unit, Tezos_base.TzPervasives.Error_monad.tztrace) Stdlib.result Lwt.t) ->
head_stream:
((Tezos_base.TzPervasives.Block_hash.t * Tezos_base.Block_header.t)
Lwt_stream.t
* Tezos_rpc.Context.stopper)
Tezos_base.TzPervasives.tzresult
Lwt.t ->
applied_block_stream:
((Tezos_base.TzPervasives.Chain_id.t
* Tezos_base.TzPervasives.Block_hash.t
* Tezos_base.Block_header.t
* Tezos_base.Operation.t Tezos_base.TzPervasives.trace
Tezos_base.TzPervasives.trace)
Lwt_stream.t
* Tezos_rpc.Context.stopper)
Tezos_base.TzPervasives.tzresult
Lwt.t ->
(t, Tezos_base.TzPervasives.tztrace) Stdlib.result Lwt.t
make_stream_daemon ~on_head ~on_applied_block ~head_stream
~applied_block_stream
calls on_head
or on_applied_block
depending on the value received from the stream composed of head_stream
and applied_block_stream
. The composed stream is interleaved for fairness. It returns a couple (p, stopper)
where p
is a promise resolving when the stream closes and stopper
is a function closing the stream.
val shutdown : t -> unit Lwt.t