P2p_workers.Make
The functor generates a valid Worker
, using P2p_request
for requests. However, it is recommended to use the activated_worker
kind as it enables decoupling creation of the state and activation of the loop.
module Name : Tezos_base.Worker_intf.NAME
module P2p_request : P2P_REQUEST
module Types : Tezos_base.Worker_intf.TYPES
include Tezos_workers.Worker.T
with module Name = Name
and type ('response, 'error) Request.t = ('response, 'error) P2p_request.t
and type Request.view = P2p_request.view
and module Types = Types
module Name = Name
module Request :
Tezos_base.Worker_intf.REQUEST
with type ('response, 'error) t = ('response, 'error) P2p_request.t
with type view = P2p_request.view
module Types = Types
Internal buffer kinds used as parameters to t
.
type 'a message_error =
| Closed of Tezos_base.TzPervasives.error list option
| Request_error of 'a
| Any of exn
An error returned when waiting for a message pushed to the worker. Closed errs
is returned if the worker is terminated or has crashed. If the worker is terminated, errs
is an empty list. Request_error err
is returned if the request failed with an error. Any
exn
is returned if the request failed with an exception.
type _ buffer_kind =
| Queue : infinite queue buffer_kind
| Bounded : {
} -> bounded queue buffer_kind
| Dropbox : {
merge : dropbox t -> any_request -> any_request option -> any_request option;
} -> dropbox buffer_kind
| Callback : (unit -> any_request Lwt.t) -> callback buffer_kind
Supported kinds of internal buffers.
val create_table : 'kind buffer_kind -> 'kind table
Create a table of workers.
module type HANDLERS = sig ... end
The callback handlers specific to each worker instance.
val launch :
'kind table ->
?timeout:Tezos_base.Time.System.Span.t ->
Name.t ->
Types.parameters ->
(module HANDLERS
with type launch_error = 'launch_error
and type self = 'kind t) ->
('kind t, 'launch_error) Stdlib.result Lwt.t
Creates a new worker instance. Parameter queue_size
not passed means unlimited queue.
val wait_for_completion : _ t -> unit Lwt.t
Waits for completion, but doesn't trigger the shutdown.
module type BOX = sig ... end
The following interface are common elements of multiple modules below. They are used to minimize repetition.
module type QUEUE = sig ... end
module Dropbox : sig ... end
module Queue : sig ... end
val canceler : _ t -> Lwt_canceler.t
Exports the canceler to allow cancellation of other tasks when this worker is shutdown or when it dies.
val trigger_shutdown : _ t -> unit
Triggers a worker termination.
val state : _ t -> Types.state
Access the internal state, once initialized.
val state_opt : _ t -> Types.state option
Access the internal state if available.
val with_state :
_ t ->
(Types.state -> (unit, 'request_error) Stdlib.result Lwt.t) ->
(unit, 'request_error) Stdlib.result Lwt.t
with_state w f
calls f
on the current state of worker w
if it was intialized and not closed or crashed, otherwise returns immediately.
val pending_requests :
_ queue t ->
(Tezos_base.Time.System.t * Request.view) list
Introspect the message queue, gives the times requests were pushed.
val status : _ t -> Tezos_base.Worker_types.worker_status
Get the running status of a worker.
val current_request :
_ t ->
(Tezos_base.Time.System.t * Tezos_base.Time.System.t * Request.view) option
Get the request being treated by a worker. Gives the time the request was pushed, and the time its treatment started.
val information : _ t -> Tezos_base.Worker_types.worker_information
val activated_callback :
?callback:(unit -> any_request Lwt.t) ->
unit ->
callback buffer_kind * unit Lwt.u
val create :
?timeout:Tezos_base.Time.System.Span.t ->
?callback:(unit -> any_request Lwt.t) ->
Name.t ->
Types.parameters ->
(module HANDLERS with type launch_error = 'a and type self = callback t) ->
(activated_worker, 'a) Stdlib.result Lwt.t
create ?timeout ?callback name parameters handlers
creates a worker whose table is a callback that returns Loop
. The worker loop isn't launched yet, and needs to be started with activate
.
If callback
is not given, it will by default simply return P2p_request.default_callback_value
.
val activate : activated_worker -> unit
activate worker
starts the worker loop.
val shutdown : activated_worker -> unit Lwt.t
shutdown worker
triggers a worker termination and waits for its completion.