The Lean Language Reference

21.12. Asynchronous Programming🔗

The Async monad provides tools and abstractions for constructing asynchronous programs that can safely multiplex different sources of data. Typical use cases include network servers and other interactive applications that perform IO and must react to a variety of events, such as incoming data, timeouts, and disconnections. Generally speaking, sequential programs that interact with the operating system can use IO alone. Parallel programs should use Tasks. Async is most appropriate when a program may spend a significant amount of time waiting on external events or I/O.

The most important feature of Async is event selection. Given a set of potential inputs, and a computation to be carried out in response to each, event selection ensures that computations are triggered as events occur. Each computation is triggered exactly once, as its associated event occurs, and data can never be lost. These properties are very difficult to ensure without appropriate library support.

Behind the scenes, asynchronous tasks are represented using tasks and promises. An asynchronous computation runs on the current thread until it must wait for a result that is not yet available, such as a timer or incoming network data. The missing data is represented by a Promise. At that point, the asynchronous computation suspends. Rather than blocking the thread, it yields control and arranges to resume once the awaited promise is resolved. A single thread can therefore make progress on many waiting computations at once. The standard library resolves these promises in response to operating system events—timers, sockets, signals, and name resolution—using the libuv event loop as its I/O backend. The asynchronous model itself depends only on tasks and promises, however: any source that resolves a promise, such as a channel, can be used to reinvoke an asynchronous computation just as well.

21.12.1. Asynchronous Computations🔗

There are three monads for writing asynchronous programs, each corresponding to one of the variants of IO:

  • Async describes asynchronous computations that may throw an IO.Error, and corresponds to IO.

  • EAsync describes asynchronous computations that may throw a specified type of error, and corresponds to EIO.

  • BaseAsync describes asynchronous computations that cannot throw an error, and corresponds to BaseIO.

🔗def
Std.Async.Async (α : Type) : Type
Std.Async.Async (α : Type) : Type

An asynchronous computation that may produce an error of type IO.Error..

🔗def
Std.Async.EAsync (ε α : Type) : Type
Std.Async.EAsync (ε α : Type) : Type

An asynchronous computation that may produce an error of type ε.

🔗def
Std.Async.BaseAsync (α : Type) : Type
Std.Async.BaseAsync (α : Type) : Type

An asynchronous computation that never fails.

Infinite loops in EAsync and Async use a special instance of ForIn that ensures that they don't consume stack frames. They can therefore be used in long-running asynchronous applications such as servers without the stack overflowing.

Each of these monads has a corresponding type of asynchronous tasks that it can coordinate. These tasks can be thought of as handles to an in-flight computation. Calling async on a monadic action creates a task that runs in the current thread until it suspends, and calling await on a task results in a monadic action that waits for the task to complete.

🔗def
Std.Async.ETask (ε α : Type) : Type
Std.Async.ETask (ε α : Type) : Type

A Task that may resolve to either a value of type α or an error value of type ε.

🔗def
Std.Async.AsyncTask (α : Type) : Type
Std.Async.AsyncTask (α : Type) : Type

A Task that may resolve to a value or an error value of type IO.Error. Alias for ETask IO.Error.

🔗inductive type
Std.Async.MaybeTask (α : Type) : Type
Std.Async.MaybeTask (α : Type) : Type

A MaybeTask α represents a computation that either:

  • Is immediately available as an α value, or

  • Is an asynchronous computation that will eventually produce an α value.

Constructors

Std.Async.MaybeTask.pure {α : Type} : α  MaybeTask α
Std.Async.MaybeTask.ofTask {α : Type} : Task α  MaybeTask α

Crucially, calling await on a task never blocks an OS-level thread. Threads are only blocked at the boundary between the IO and the Async monads. Under the hood, asynchronous tasks are invoked when needed by the libuv event loop.

Asynchronous tasks use the same system of priorities as other Lean tasks, and are run by the same scheduler.

21.12.1.1. Running Asynchronous Computations🔗

Asynchronous computations can be run from IO by either waiting or blocking. When a thread waits on an asynchronous computation, the asynchronous computation is run on the thread that is waiting. When a thread blocks on an asynchronous computation or task, the computation is run on a worker thread in an ordinary task with the specified priority, and the calling thread calls Task.get to block on the result. Because Async is a defined alias for EAsync, generalized field notation can be used to call EAsync.wait on a term with type Async.

🔗def
Std.Async.EAsync.wait {ε α : Type} (self : EAsync ε α) : EIO ε α
Std.Async.EAsync.wait {ε α : Type} (self : EAsync ε α) : EIO ε α

Waits for the result of the EAsync computation, blocking if necessary.

🔗def
Std.Async.BaseAsync.wait {α : Type} (self : BaseAsync α) : BaseIO α
Std.Async.BaseAsync.wait {α : Type} (self : BaseAsync α) : BaseIO α

Waits for the result of the BaseAsync computation, blocking if necessary.

🔗def
Std.Async.Async.block {α : Type} (x : Async α) (prio : Task.Priority := Task.Priority.default) : IO α
Std.Async.Async.block {α : Type} (x : Async α) (prio : Task.Priority := Task.Priority.default) : IO α

Block until the Async finishes and returns its value. Propagates any error encountered during execution.

🔗def
Std.Async.EAsync.block {ε α : Type} (x : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EIO ε α
Std.Async.EAsync.block {ε α : Type} (x : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EIO ε α

Block until the EAsync finishes and returns its value. Propagates any error encountered during execution.

🔗def
Std.Async.ETask.block {ε α : Type} (x : ETask ε α) : EIO ε α
Std.Async.ETask.block {ε α : Type} (x : ETask ε α) : EIO ε α

Block until the ETask in x finishes and returns its value. Propagates any error encountered during execution.

🔗def
Std.Async.AsyncTask.block {α : Type} (x : AsyncTask α) : IO α
Std.Async.AsyncTask.block {α : Type} (x : AsyncTask α) : IO α

Block until the AsyncTask in x finishes.

Asynchronous computations can also be run as ordinary Tasks in IO.

🔗def
Std.Async.Async.toIO {α : Type} (x : Async α) : IO (AsyncTask α)
Std.Async.Async.toIO {α : Type} (x : Async α) : IO (AsyncTask α)

Converts a Async to a AsyncTask.

🔗def
Std.Async.EAsync.toEIO {ε α : Type} (x : EAsync ε α) : EIO ε (ETask ε α)
Std.Async.EAsync.toEIO {ε α : Type} (x : EAsync ε α) : EIO ε (ETask ε α)

Converts a BaseAsync to a EIO ETask.

🔗def

Converts a BaseAsync to a BaseIO Task.

🔗def
Std.Async.EAsync.asTask {ε α : Type} (x : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EIO ε (ETask ε α)
Std.Async.EAsync.asTask {ε α : Type} (x : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EIO ε (ETask ε α)

Lifts an EAsync computation into an ETask that can be awaited and joined.

🔗def

Lifts a BaseAsync computation into a Task that can be awaited and joined.

Compared to IO.asTask, EAsync.asTask schedules an asynchronous task. While tasks from IO.asTask are synchronous, occupying their worker thread until completed, tasks from EAsync.asTask release their worker threads at suspension points and are reinvoked as needed by the libuv event loop.

Running an Asynchronous Computation

Async.block runs an asynchronous computation and returns its result in IO. The following program prints a message, waits ten milliseconds, and then prints another:

module import Std.Async open Std.Async def greet : Async Unit := do IO.println "before sleeping" sleep 10 IO.println "after sleeping" public def main : IO Unit := greet.block

It prints both messages, with a brief pause between them:

stdoutbefore sleepingafter sleeping

21.12.1.2. Managing Tasks🔗

The typical interface to asynchronous tasks is via the MonadAsync and MonadAwait instances for a monad. Their respective methods MonadAsync.async and MonadAwait.await are exported from Std.Async. Typically, the main thread of execution will create some number of asynchronous tasks, then await their results when needed to make progress. The async and await functions are not built in to the Lean compiler, and they don't trigger a whole-program transformation. They just create or consume tasks that are associated with underlying promises in the correct manner for the framework.

🔗type class
Std.Async.MonadAwait (t m : Type Type) : Type 1
Std.Async.MonadAwait (t m : Type Type) : Type 1

Typeclass for monads that can "await" a computation of type t α in a monad m until the result is available.

Instance Constructor

Std.Async.MonadAwait.mk

Methods

await : {α : Type}  t α  m α

Awaits the result of t α and returns it inside the m monad.

🔗type class
Std.Async.MonadAsync (t m : Type Type) : Type 1
Std.Async.MonadAsync (t m : Type Type) : Type 1

Represents monads that can launch computations asynchronously of type t in a monad m.

Instance Constructor

Std.Async.MonadAsync.mk

Methods

async : {α : Type}  m α  optParam Task.Priority Task.Priority.default  m (t α)

Starts an asynchronous computation in another monad.

To launch an asynchronous task whose value will never be needed, use background.

🔗def
Std.Async.background {m t : Type Type} {α : Type} [Monad m] [MonadAsync t m] (action : m α) (prio : Task.Priority := Task.Priority.default) : m Unit
Std.Async.background {m t : Type Type} {α : Type} [Monad m] [MonadAsync t m] (action : m α) (prio : Task.Priority := Task.Priority.default) : m Unit

This function transforms the operation inside the monad m into a task and let it run in the background.

In addition to instances for the Async monads and tasks, the library includes instances that allow reader and state monad transformers to be used with async and await.

Spawning and Awaiting Tasks

async starts a computation as a task that runs concurrently, and await waits for a task's result. Here, a color and a flavor are fetched concurrently, and the two results are combined into a pair:

def fetchColor : Async String := do sleep 20 return "green" def fetchFlavor : Async String := do sleep 20 return "sweet" def fetchBoth : Async (String × String) := do let color async fetchColor let flavor async fetchFlavor return ( await color, await flavor) ("green", "sweet")#eval fetchBoth.block
("green", "sweet")
Background Tasks

background starts a computation whose result is never awaited. Here, a logger runs in the background and prints each message sent to a channel:

module import Std.Async import Std.Sync.Channel open Std.Async open Std (Channel) def logger (ch : Channel String) : Async Unit := do while true do IO.println ( await ( ch.recv)) public def main : IO Unit := do let ch Channel.new (α := String) Async.block do background (logger ch) discard <| ch.send "hello from the background" sleep 20

The background logger prints the message it receives before the program exits:

stdouthello from the background

21.12.1.3. Transforming and Inspecting Tasks🔗

The eventual result of an asynchronous task can be transformed without first awaiting it. AsyncTask.map applies a function to a task's result, while AsyncTask.bindIO and AsyncTask.mapTaskIO sequence further IO work onto it. In each case, an error in the original task propagates to the transformed task.

🔗def
Std.Async.AsyncTask.map {α β : Type} (f : α β) (x : AsyncTask α) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : AsyncTask β
Std.Async.AsyncTask.map {α β : Type} (f : α β) (x : AsyncTask α) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : AsyncTask β

Create a new AsyncTask that will run after x has finished. If x:

  • errors, return an AsyncTask that resolves to the error.

  • succeeds, return an AsyncTask that resolves to f x.

🔗def
Std.Async.AsyncTask.bindIO {α β : Type} (x : AsyncTask α) (f : α IO (AsyncTask β)) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : BaseIO (AsyncTask β)
Std.Async.AsyncTask.bindIO {α β : Type} (x : AsyncTask α) (f : α IO (AsyncTask β)) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : BaseIO (AsyncTask β)

Similar to bind, however f has access to the IO monad. If f throws an error, the returned AsyncTask resolves to that error.

🔗def
Std.Async.AsyncTask.mapTaskIO {α β : Type} (f : α IO β) (x : AsyncTask α) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : BaseIO (AsyncTask β)
Std.Async.AsyncTask.mapTaskIO {α β : Type} (f : α IO β) (x : AsyncTask α) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : BaseIO (AsyncTask β)

Similar to map, however f has access to the IO monad. If f throws an error, the returned AsyncTask resolves to that error.

A task's progress can be inspected without blocking by retrieving its IO.TaskState.

🔗def
Std.Async.ETask.getState {ε α : Type} (x : ETask ε α) : BaseIO IO.TaskState
Std.Async.ETask.getState {ε α : Type} (x : ETask ε α) : BaseIO IO.TaskState

Obtain the IO.TaskState of x.

A MaybeTask is either an immediately-available value or a task that will produce one. It can be converted to an ordinary Task, have its value read by blocking, be mapped over, and have a Task of a MaybeTask collapsed into a single Task.

🔗def
Std.Async.MaybeTask.toTask {α : Type} : MaybeTask α Task α
Std.Async.MaybeTask.toTask {α : Type} : MaybeTask α Task α

Constructs an Task from a MaybeTask.

🔗def
Std.Async.MaybeTask.get {α : Type} : MaybeTask α α
Std.Async.MaybeTask.get {α : Type} : MaybeTask α α

Gets the value of the MaybeTask by blocking.

🔗def
Std.Async.MaybeTask.map {α β : Type} (f : α β) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : MaybeTask α MaybeTask β
Std.Async.MaybeTask.map {α β : Type} (f : α β) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : MaybeTask α MaybeTask β

Maps a function over a MaybeTask.

🔗def
Std.Async.MaybeTask.joinTask {α : Type} (t : Task (MaybeTask α)) : Task α
Std.Async.MaybeTask.joinTask {α : Type} (t : Task (MaybeTask α)) : Task α

Join the MaybeTask to an Task.

21.12.1.4. Conversions🔗

An existing Task, IO.Promise, or Except value can be converted into an Async computation. These conversions make it possible to call code that produces a Task or IO.Promise, such as a wrapper around a callback-based API or a hand-written asynchronous primitive, directly from within an Async program. The corresponding conversions from Task and Except are also available for EAsync and BaseAsync; the conversions from IO.Promise are specific to Async because a dropped promise is reported as an IO.Error.

An IO.Promise can be dropped before it is ever resolved, for example if the code that was expected to resolve it is canceled or abandoned and the last reference to the promise goes away. After that, the promise can never be resolved. Because reference counts are deterministic, the runtime detects this the moment it happens, rather than at some unpredictable later time. Async.ofPromise and Async.ofPurePromise detect a dropped promise and produce an Async error rather than panicking; the message can be supplied via their error parameter, and defaults to the promise linked to the Async was dropped.

🔗def
Std.Async.Async.ofTask {α : Type} (task : Task α) : Async α
Std.Async.Async.ofTask {α : Type} (task : Task α) : Async α

Converts Task to Async.

🔗def
Std.Async.EAsync.ofTask {ε α : Type} (x : ETask ε α) : EAsync ε α
Std.Async.EAsync.ofTask {ε α : Type} (x : ETask ε α) : EAsync ε α

Creates a new EAsync out of a RTask.

🔗def
Std.Async.EAsync.ofETask {ε α : Type} (x : ETask ε α) : EAsync ε α
Std.Async.EAsync.ofETask {ε α : Type} (x : ETask ε α) : EAsync ε α

Creates a new EAsync out of a ETask.

🔗def
Std.Async.BaseAsync.ofTask {α : Type} (x : Task α) : BaseAsync α
Std.Async.BaseAsync.ofTask {α : Type} (x : Task α) : BaseAsync α

Creates a new BaseAsync out of a Task.

🔗def
Std.Async.Async.ofIOTask {α : Type} (task : IO (Task α)) : Async α
Std.Async.Async.ofIOTask {α : Type} (task : IO (Task α)) : Async α

Converts IO (Task α) into Async.

🔗def
Std.Async.Async.ofAsyncTask {α : Type} (task : AsyncTask α) : Async α
Std.Async.Async.ofAsyncTask {α : Type} (task : AsyncTask α) : Async α

Converts AsyncTask into Async.

🔗def
Std.Async.Async.ofPromise {α : Type} (task : IO (IO.Promise (Except IO.Error α))) (error : String := "the promise linked to the Async was dropped") : Async α
Std.Async.Async.ofPromise {α : Type} (task : IO (IO.Promise (Except IO.Error α))) (error : String := "the promise linked to the Async was dropped") : Async α

Converts Promise into Async.

🔗def
Std.Async.Async.ofPurePromise {α : Type} (task : IO (IO.Promise α)) (error : String := "the promise linked to the Async was dropped") : Async α
Std.Async.Async.ofPurePromise {α : Type} (task : IO (IO.Promise α)) (error : String := "the promise linked to the Async was dropped") : Async α

Converts IO (IO.Promise α) to Async.

ETask.ofPromise! converts a promise to a task directly, panicking if the promise is dropped rather than producing an error.

🔗def
Std.Async.ETask.ofPromise! {ε α : Type} (x : IO.Promise (Except ε α)) : ETask ε α
Std.Async.ETask.ofPromise! {ε α : Type} (x : IO.Promise (Except ε α)) : ETask ε α

Create an ETask that resolves to the value of the promise x. If the promise gets dropped then it panics.

🔗def
Std.Async.Async.ofExcept {α : Type} (except : Except IO.Error α) : Async α
Std.Async.Async.ofExcept {α : Type} (except : Except IO.Error α) : Async α

Converts Except to Async.

🔗def
Std.Async.EAsync.ofExcept {ε α : Type} (except : Except ε α) : EAsync ε α
Std.Async.EAsync.ofExcept {ε α : Type} (except : Except ε α) : EAsync ε α

Converts Except to EAsync.

🔗def
Std.Async.BaseAsync.ofExcept {α : Type} (except : Except Empty α) : BaseAsync α
Std.Async.BaseAsync.ofExcept {α : Type} (except : Except Empty α) : BaseAsync α

Converts Except to BaseAsync.

21.12.2. Concurrent Composition🔗

Concurrent composition runs several asynchronous computations at the same time and combines their results. These operators are defined in terms of async and await, but they provide a higher-level, more structured approach to concurrent asynchronous programming. Each operator launches tasks on the shared scheduler at the priority given by the optional prio parameter, and then awaits them. There are two families of concurrent operators: those that wait for every subcomputation and return all results, and those that return the result of the first subcomputation that finishes.

Async.concurrently runs two computations and returns their results as a pair, while Async.concurrentlyAll runs an array of computations and returns their results in the same order. Both wait for every subcomputation to finish, awaiting them positionally rather than chronologically, so an exception is reported in the position of the failing subcomputation rather than in the order in which failures occur (see errors and concurrency).

Async.race runs two computations and returns the result of whichever finishes first, while Async.raceAll does the same for an array of computations. The result of the call to Async.race or Async.raceAll is that of the first subcomputation to chronologically finish, whether it is a thrown exception or a returned value. A computation that fails quickly takes precedence over one that succeeds slowly.

None of these operators cancel the computations whose results are not used. In Async.race and Async.raceAll, the computations that do not finish first continue running to completion, and their results are discarded. In Async.concurrently and Async.concurrentlyAll, a failure in one subcomputation does not stop the others. The corresponding operators on ContextAsync, such as ContextAsync.race, do cancel the computations that are no longer needed.

To start a computation concurrently without awaiting its result, use background.

Each operator is available for BaseAsync, EAsync, and Async.

🔗def
Std.Async.Async.concurrently {α β : Type} (x : Async α) (y : Async β) (prio : Task.Priority := Task.Priority.default) : Async (α × β)
Std.Async.Async.concurrently {α β : Type} (x : Async α) (y : Async β) (prio : Task.Priority := Task.Priority.default) : Async (α × β)

Runs two computations concurrently and returns both results as a pair.

🔗def
Std.Async.EAsync.concurrently {ε α β : Type} (x : EAsync ε α) (y : EAsync ε β) (prio : Task.Priority := Task.Priority.default) : EAsync ε (α × β)
Std.Async.EAsync.concurrently {ε α β : Type} (x : EAsync ε α) (y : EAsync ε β) (prio : Task.Priority := Task.Priority.default) : EAsync ε (α × β)

Runs two computations concurrently and returns both results as a pair.

🔗def

Runs two computations concurrently and returns both results as a pair.

🔗def

Runs all computations in an Array concurrently and returns all results as an array.

🔗def

Runs all computations in an Array concurrently and returns all results as an array.

🔗def

Runs all computations in an Array concurrently and returns all results as an array.

🔗def
Std.Async.Async.race {α : Type} [Inhabited α] (x y : Async α) (prio : Task.Priority := Task.Priority.default) : Async α
Std.Async.Async.race {α : Type} [Inhabited α] (x y : Async α) (prio : Task.Priority := Task.Priority.default) : Async α

Runs two computations concurrently and returns the result of the one that finishes first. The other result is lost and the other task is not cancelled, so the task will continue the execution until the end.

🔗def
Std.Async.EAsync.race {α ε : Type} [Inhabited α] (x y : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EAsync ε α
Std.Async.EAsync.race {α ε : Type} [Inhabited α] (x y : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EAsync ε α

Runs two computations concurrently and returns the result of the one that finishes first. The other result is lost and the other task is not cancelled, so the task will continue the execution until the end.

🔗def

Runs two computations concurrently and returns the result of the one that finishes first. The other result is lost and the other task is not cancelled, so the task will continue the execution until the end.

🔗def
Std.Async.Async.raceAll.{u_1} {c : Type u_1} {α : Type} [ForM Async c (Async α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : Async α
Std.Async.Async.raceAll.{u_1} {c : Type u_1} {α : Type} [ForM Async c (Async α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : Async α

Runs all computations concurrently and returns the result of the first one to finish. All other results are lost, and the tasks are not cancelled, so they'll continue their execution until the end.

🔗def
Std.Async.EAsync.raceAll.{u_1} {α ε : Type} {c : Type u_1} [Inhabited α] [ForM (EAsync ε) c (EAsync ε α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : EAsync ε α
Std.Async.EAsync.raceAll.{u_1} {α ε : Type} {c : Type u_1} [Inhabited α] [ForM (EAsync ε) c (EAsync ε α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : EAsync ε α

Runs all computations concurrently and returns the result of the first one to finish. All other results are lost, and the tasks are not cancelled, so they'll continue their execution until the end.

🔗def
Std.Async.BaseAsync.raceAll.{u_1} {α : Type} {c : Type u_1} [Inhabited α] [ForM BaseAsync c (BaseAsync α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : BaseAsync α
Std.Async.BaseAsync.raceAll.{u_1} {α : Type} {c : Type u_1} [Inhabited α] [ForM BaseAsync c (BaseAsync α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : BaseAsync α

Runs all computations concurrently and returns the result of the first one to finish. All other results are lost, and the tasks are not cancelled, so they'll continue their execution until the end.

21.12.3. Event Selection🔗

Event selection involves both selectors, which are the source of events, and selectables, which pair selectors with code to be executed when the selector's event occurs. When a selector's event occurs, the selector has resolved. A selectable's code is not executed immediately when its selector resolves; instead, it is run when invoked by event selection. When a selectable whose selector has resolved is chosen for execution, it is selected.

A Selector α provides a value of type α when its event occurs, while a Selectable α contains an Async action to run when its selector has resolved. The type of the selector in a Selectable is a field of the constructor Selectable.case, rather than a parameter to the type; this means that selectables that are waiting on different types of event data can be used together.

🔗structure
Std.Async.Selector (α : Type) : Type
Std.Async.Selector (α : Type) : Type

An event source that can be multiplexed using Selectable.one, see the documentation of Selectable.one for how the protocol of communicating with a Selector works.

Constructor

Std.Async.Selector.mk

Fields

tryFn : Async (Option α)

Attempts to retrieve a piece of data from the event source in a non-blocking fashion, returning some if data is available and none otherwise.

registerFn : Waiter α  Async Unit

Registers a Waiter with the event source. Once data is available, the event source should attempt to call Waiter.race and resolve the Waiter's promise if it wins. It is crucial that data is never actually consumed from the event source unless Waiter.race wins in order to prevent data loss.

unregisterFn : Async Unit

A cleanup function that is called once any Selector has won the Selectable.one race.

🔗structure
Std.Async.Selectable (α : Type) : Type 1
Std.Async.Selectable (α : Type) : Type 1

An event source together with a continuation to call on data obtained from that event source, usually used together in conjunction with Selectable.one.

Constructor

Std.Async.Selectable.case

Fields

β : Type
selector : Selector self.β

The event source.

cont : self.β  Async α

The continuation that is called on results from the event source.

Event selection is invoked using three operators:

🔗def
Std.Async.Selectable.one {α : Type} (selectables : Array (Selectable α)) : Async α
Std.Async.Selectable.one {α : Type} (selectables : Array (Selectable α)) : Async α

Performs fair and data-loss free multiplexing on the Selectables in selectables.

The protocol for this is as follows:

  1. The selectables are shuffled randomly.

  2. Run Selector.tryFn for each element in selectables. If any succeed, the corresponding Selectable.cont is executed and its result is returned immediately.

  3. If none succeed, a Waiter is registered with each Selector using Selector.registerFn. Once one of them resolves the Waiter, all Selector.unregisterFn functions are called, and the Selectable.cont of the winning Selector is executed and returned.

🔗def
Std.Async.Selectable.tryOne {α : Type} (selectables : Array (Selectable α)) : Async (Option α)
Std.Async.Selectable.tryOne {α : Type} (selectables : Array (Selectable α)) : Async (Option α)

Performs fair and data-loss free non-blocking multiplexing on the Selectables in selectables.

This function only tries the non-blocking tryFn for each Selectable without registering waiters or blocking. It returns some result if any Selectable is immediately available, or none if all would block.

The protocol for this is as follows:

  1. The selectables are shuffled randomly for fairness.

  2. Run Selector.tryFn for each element in selectables. If any succeed, the corresponding Selectable.cont is executed and its result is returned as some result.

  3. If none succeed, none is returned immediately without blocking.

🔗def
Std.Async.Selectable.combine {α : Type} (selectables : Array (Selectable α)) : IO (Selector α)
Std.Async.Selectable.combine {α : Type} (selectables : Array (Selectable α)) : IO (Selector α)

Creates a Selector that performs fair and data-loss free multiplexing on multiple Selectables. This allows the multiplexing operation to be composed with other selectors.

Polling Without Blocking

Selectable.tryOne checks whether any selector has already resolved and returns the corresponding value immediately, or none if none has, rather than blocking. Defining selection with := rather than makes pick the Async computation itself rather than its result, so the same poll can be run more than once.

(none, some "gray", some "salty")#eval show IO (Option String × Option String × Option String) from do let colors Channel.new (α := String) let flavors Channel.new (α := String) let pick := Selectable.tryOne #[ .case colors.recvSelector fun color => return color, .case flavors.recvSelector fun flavor => return flavor ] let whenEmpty pick.block discard <| colors.send "gray" let afterColor pick.block discard <| flavors.send "salty" let afterFlavor pick.block return (whenEmpty, afterColor, afterFlavor)
(none, some "gray", some "salty")
Selection and Timeouts

A CloseableChannel provides a selector via CloseableChannel.recvSelector that resolves when the channel receives a value. Selector.sleep is a selector that resolves after the specified number of milliseconds have passed. The function recv combines these, waiting for up to 100 milliseconds to receive a value, after which it terminates without one:

def recv (ch : CloseableChannel Nat) : Async (Option Nat) := do Selectable.one #[ .case ch.recvSelector fun n? => return n?, .case ( Selector.sleep 100) fun () => return none ]

If the channel contains a value, then the recvSelector wins:

some 42#eval show IO _ from do let ch CloseableChannel.new (α := Nat) discard <| ch.send 42 (recv ch).block

If not, the timer wins:

none#eval show IO _ from do let ch CloseableChannel.new (α := Nat) -- nothing sent: the timeout wins (recv ch).block
Selection

A CloseableChannel provides a selector via CloseableChannel.recvSelector that resolves when the channel receives a value. The function recv2 selects the first value returned on either channel:

def recv2 (ch1 ch2 : CloseableChannel Nat) : Async (Option Nat) := do Selectable.one #[ .case ch1.recvSelector fun n? => return n?, .case ch2.recvSelector fun n? => return n? ]

If only one channel contains a value, then it is returned:

some 1#eval show IO _ from do let ch1 CloseableChannel.new (α := Nat) let ch2 CloseableChannel.new (α := Nat) discard <| ch1.send 1 (recv2 ch1 ch2).block some 2#eval show IO _ from do let ch1 CloseableChannel.new (α := Nat) let ch2 CloseableChannel.new (α := Nat) discard <| ch2.send 2 (recv2 ch1 ch2).block

If neither channel contains a value, then recv2 blocks until one does; the first one to have a value wins:

some 2#eval show IO _ from do let ch1 CloseableChannel.new (α := Nat) let ch2 CloseableChannel.new (α := Nat) discard <| IO.asTask (prio := .dedicated) do IO.sleep 100 ch1.send 1 discard <| IO.asTask (prio := .dedicated) do IO.sleep 50 ch2.send 2 (recv2 ch1 ch2).block
some 2

Selectable.one throws an exception when passed an empty array of selectables, because it's impossible to get a value from nothing. Selectable.tryOne always returns none when passed an empty array.

Event selection is fair. This means that there is an equal probability that any of the selectables with currently-resolved selectors have an equal chance of winning and having their associated code invoked. This is important because a bias in event selection can lead to one of the selectables never being called, which can in turn cause data to accumulate without bound in the source it would have handled. Behind the scenes, fairness is ensured by randomizing the order of selectables each time.

Furthermore, event selection never results in data being lost in the losing selectables. The implementation ensures that data is never removed from a selector without being passed to the selectable's code, and that resolving a selector calls the associated selectable's code at most once. Data loss and double delivery are ruled out via a protocol that distinguishes checking whether a selector is resolved from actually consuming its data along with an atomic means of selecting one of the resolved selectors.

21.12.3.1. Selection Protocol🔗

This section is primarily intended for authors of new selectors.

Event selection begins by randomizing the order of the selectables. It consults each selector's non-blocking poll Selector.tryFn until one of them returns some. This is the winning selectable; its code is invoked and no further work is needed. On this fast path, only one selector is ever consumed, so there is no risk of data loss or double delivery.

If no selector was resolved in the first iteration (that is, each tryFn returned none), then it is necessary to wait until one of the selectors is resolved. Waiting consists of first registering a waiter with each selector; the first selector that has data wins the race via the waiters. The winning selector consumes its event, invokes code to clean up the other waiting selectors, computes the selectable's value, and resolves an overall promise that Selectable.one is blocked on.

More specifically, this is done by creating an atomic flag (indicating that a winner has been selected) and a promise for the result of Selectable.one. A registration loop processes each selectable in the array:

  1. The system checks whether the flag is now set, indicating that a prior selector has won the race. If so, the loop terminates.

  2. A waiter is registered with the selector using Selector.registerFn. This registration process may not consume data; it merely registers interest in data should it become available. The waiter includes a reference to the atomic flag along with a promise that can be resolved with the selector's data. The selector must call race on the waiter when the event has occurred, but it may only consume data if it wins the race.

  3. A task is created that observes the waiter's promise. When the promise is resolved, indicating that it has won the race, this completion callback is invoked with none if the promise was dropped (e.g. due to cancellation or unregistering); in this case, it should do nothing. If it is invoked with some around the result, then it must run an Async computation that: a. propagates any error indicated by the data source's result, b. blocks until the entire registration loop is complete, c. unregisters the waiter from every selectable in the array using its Selector.unregisterFn, and d. runs the winning selectable's code, resolving the result promise.

When the registration loop is complete, an internal promise is resolved that unblocks the winning waiter's callback. This block ensures that all registration occurs before all cleanup.

Finally, Selectable.one awaits the overall result promise, which will be resolved as soon as there is a winning callback.

21.12.3.1.1. Waiters🔗

A waiter is a means of atomically selecting a single offered value. Internally, it contains an atomic flag that indicates that a winner has been selected. When a client has a value, it calls Waiter.race with two callbacks: one is used when the offered value was not accepted (it did not win the race), the other is used when it is accepted. The callback that wins the race should resolve the waiter's promise, which is provided to the winning callback. This two-phase protocol ensures that there is no data loss, because selectors only consume events once they've already won the race.

🔗structure
Std.Async.Waiter (α : Type) : Type
Std.Async.Waiter (α : Type) : Type

The core data structure for racing on winning a Selectable.one if multiple event sources are ready at the same time. A Task can try to finish the waiter by calling Waiter.race.

Fields

promise : IO.Promise (Except IO.Error α)
🔗def
Std.Async.Waiter.race {m : Type Type} {α β : Type} [Monad m] [MonadLiftT (ST IO.RealWorld) m] (w : Waiter α) (lose : m β) (win : IO.Promise (Except IO.Error α) m β) : m β
Std.Async.Waiter.race {m : Type Type} {α β : Type} [Monad m] [MonadLiftT (ST IO.RealWorld) m] (w : Waiter α) (lose : m β) (win : IO.Promise (Except IO.Error α) m β) : m β

Try to atomically finish the Waiter. If the race for finishing it is won, win is executed with the internal IO.Promise of the Waiter. This promise must under all circumstances be resolved by win. If the race is lost some cleanup work can be done in lose.

🔗def
Std.Async.Waiter.withPromise {α β : Type} (w : Waiter α) (p : IO.Promise (Except IO.Error β)) : Waiter β
Std.Async.Waiter.withPromise {α β : Type} (w : Waiter α) (p : IO.Promise (Except IO.Error β)) : Waiter β

Swap out the IO.Promise within the Waiter. Note that the part which determines whether the Waiter is finished is not swapped out.

🔗def
Std.Async.Waiter.checkFinished {m : Type Type} {α : Type} [Monad m] [MonadLiftT (ST IO.RealWorld) m] (w : Waiter α) : m Bool
Std.Async.Waiter.checkFinished {m : Type Type} {α : Type} [Monad m] [MonadLiftT (ST IO.RealWorld) m] (w : Waiter α) : m Bool

Atomically checks whether the Waiter has already finished. Note that right after this function call ends this might have already changed.

Natural Number Ticker

A natTicker is a selector that makes a Nat available every 100 milliseconds, incrementing each time. Its state is determined by two values:

  1. a counter, which is an IO.Ref that contains the next Nat to emit

  2. the time at which the process was started

The Selector.tryFn checks whether at least 100ms have elapsed for each emitted Nat. If so, the value is incremented and returned immediately:

def tickerTryFn (counter : IO.Ref Nat) (startMs : Nat) := do let nowMs IO.monoMsNow let n counter.get if nowMs startMs + n * 100 then counter.set (n + 1) return (some n) else return none

If the race was not immediately run, a waiter is registered. After sleeping until the next Nat is ready, the waiter's race is invoked; if the race is won, then the counter is incremented:

def tickerRegisterFn (counter : IO.Ref Nat) (startMs : Nat) (waiter : Waiter Nat) : Async Unit := do let n counter.get let delay := startMs + n * 100 - ( IO.monoMsNow) let sleep Sleep.mk <| .ofNat delay sleep.wait waiter.race (pure ()) fun promise => do counter.set (n + 1) promise.resolve (.ok n)

These components can be combined into a selector:

def natTicker : IO (Selector Nat) := do let current IO.mkRef 0 let startMs IO.monoMsNow return { tryFn := tickerTryFn current startMs registerFn := tickerRegisterFn current startMs unregisterFn := pure () }

This selector is not thread-safe. Multiple uses in a single Selectable.one are safe, because they do not lose data (the set is only invoked when the race has been definitively won). However, concurrent invocations of Selectable.one on the same natTicker can lead to data races. Fixing this requires careful locking.

21.12.4. Standard Selectors🔗

The standard library includes a number of selectors for events such as timers, receiving values through channels, and network sockets. These selectors allow Async programs to reliably process inputs from many different sources.

When a selector is built on some data source, it is very important not to use the same data source directly. For example, recvSelector and recv should not be used on the same channel. This can lead to violations of the selector protocol when the selector relies on exclusive control over the real-world state of the data source.

🔗def

Create a Selector that resolves once s has finished. s only starts when it runs inside of a Selectable.

🔗def
Std.Channel.recvSelector {α : Type} [Inhabited α] (ch : Std.Channel α) : Selector α
Std.Channel.recvSelector {α : Type} [Inhabited α] (ch : Std.Channel α) : Selector α

Creates a Selector that resolves once ch has data available and provides that data.

🔗def

Creates a Selector that resolves once ch has data available and provides that data. In particular if ch is closed while waiting on this Selector and no data is available already this will resolve to none.

🔗def
Std.Broadcast.Receiver.recvSelector {α : Type} [Inhabited α] (ch : Std.Broadcast.Receiver α) : Selector (Option α)
Std.Broadcast.Receiver.recvSelector {α : Type} [Inhabited α] (ch : Std.Broadcast.Receiver α) : Selector (Option α)

Creates a Selector that resolves once the broadcast channel ch has data available and provides that data.

🔗def
Std.Notify.selector (notify : Std.Notify) : Selector Unit
Std.Notify.selector (notify : Std.Notify) : Selector Unit

Creates a selector that waits for notifications

🔗def
Std.CancellationContext.doneSelector (x : Std.CancellationContext) : Selector Unit
Std.CancellationContext.doneSelector (x : Std.CancellationContext) : Selector Unit

Creates a selector that waits for cancellation.

🔗def
Std.Async.Selector.cancelled : ContextAsync (Selector Unit)
Std.Async.Selector.cancelled : ContextAsync (Selector Unit)

Returns a selector that completes when the current context is cancelled. This is useful for selecting on cancellation alongside other asynchronous operations.

🔗def

Creates a Selector that resolves once s has a connection available. Calling this function does not start the connection wait, so it must not be called in parallel with accept.

🔗def

Creates a Selector that resolves once s has data available, up to at most size bytes, and provides that data. Calling this function does not starts the data wait, so it must not be called in parallel with recv?.

🔗def
Std.Async.UDP.Socket.recvSelector (s : UDP.Socket) (size : UInt64) : Selector (ByteArray × Option Std.Net.SocketAddress)
Std.Async.UDP.Socket.recvSelector (s : UDP.Socket) (size : UInt64) : Selector (ByteArray × Option Std.Net.SocketAddress)

Creates a Selector that resolves once s has data available, up to at most size bytes, and provides that data. If the socket has not been previously bound with bind, it is automatically bound to 0.0.0.0 (all interfaces) with a random port. Calling this function does starts the data wait, only when it's used with Selectable.one or combine. It must not be called in parallel with recv.

🔗def
Std.Async.Signal.Waiter.selector (s : Signal.Waiter) : Selector Unit
Std.Async.Signal.Waiter.selector (s : Signal.Waiter) : Selector Unit

Create a Selector that resolves once s has received the signal. Note that calling this function does not start the signal waiter.

🔗def
Std.StreamMap.selector {α β : Type} (stream : Std.StreamMap α β) : Async (Selector (α × β))
Std.StreamMap.selector {α β : Type} (stream : Std.StreamMap α β) : Async (Selector (α × β))

Get a combined selector that returns the stream name and value

21.12.5. Errors🔗

Error handling in Async mirrors error handling in IO:

The details of error handling in Async are consequences of this arrangement. When an asynchronous task (spawned via async) throws an exception, this is not observable in the parent. The error surfaces when the task's result is requested via await. If the task is never awaited, the error vanishes. In other words, errors in tasks created via background or ContextAsync.disown are not propagated at all.

21.12.5.1. Errors and Concurrency🔗

The concurrency operators Async.concurrently and Async.concurrentlyAll await the results of their sub-tasks positionally rather than chronologically. This means that errors that result from these tasks are reported in source-code order, rather than the chronological order in which the errors occurred.

Concurrency and Error Propagation

failFast waits 5 milliseconds before throwing an exception, while failSlow waits 250 milliseconds:

def failFast : Async Nat := do sleep 5 throw <| .userError "Fast failure" def failSlow : Async Nat := do sleep 250 throw <| .userError "Slow failure"

When run via Async.concurrently, the program fails with the error from failSlow. Even though it was chronologically produced after the failure from failFast, the result of failSlow was awaited first.

Slow failure#eval Async.block do let val Async.concurrently (prio := .dedicated) failSlow failFast pure ()
Slow failure

Async.race and Async.raceAll return the result of the first completed task, whether it is a success or a failure. This means that a quickly-produced error takes precedence over a slowly-produced success.

21.12.5.2. Errors in Event Selection🔗

During selection, errors might occur at any stage of the protocol. Errors thrown by a selector during the initial tryFn loop terminate the selection immediately. An error thrown from a registerFn or unregisterFn, by contrast, can leave selectors that were already registered without a matching call to unregisterFn. A selector that wins the race may resolve the promise with either Except.ok or Except.error; in the latter case, the result of the call to Selectable.one is itself an error.

21.12.6. Timers🔗

🔗def
Std.Async.sleep (duration : Std.Time.Millisecond.Offset) : Async Unit
Std.Async.sleep (duration : Std.Time.Millisecond.Offset) : Async Unit

Return an Async computation that completes after duration.

🔗structure

Sleep can be used to sleep for some duration once. The underlying timer has millisecond resolution.

Fields

native : Std.Internal.UV.Timer
🔗def
Std.Async.Sleep.mk (duration : Std.Time.Millisecond.Offset) : Async Sleep
Std.Async.Sleep.mk (duration : Std.Time.Millisecond.Offset) : Async Sleep

Set up a Sleep that waits for duration milliseconds. This function only initializes but does not yet start the timer.

🔗def

If:

  • s is not yet running start it and return an Async computation that will complete once the previously configured duration has elapsed.

  • s is already or not anymore running return the same Async computation as the first call to wait.

🔗def

If:

  • s is still running the timer restarts counting from now and completes after duration milliseconds.

  • s is not yet or not anymore running this is a no-op.

🔗def

If:

  • s is still running this stops s without completing any remaining Async computations that were created through wait. Note that if another Async computation is binding on any of these it will hang forever without further intervention.

  • s is not yet or not anymore running this is a no-op.

🔗structure

Interval can be used to repeatedly wait for some duration like a clock. The underlying timer has millisecond resolution.

Fields

native : Std.Internal.UV.Timer
🔗def
Std.Async.Interval.mk (duration : Std.Time.Millisecond.Offset) : autoParam (0 < duration) Interval.mk._auto_1 IO Interval
Std.Async.Interval.mk (duration : Std.Time.Millisecond.Offset) : autoParam (0 < duration) Interval.mk._auto_1 IO Interval

Setup up an Interval that waits for duration milliseconds. This function only initializes but does not yet start the timer.

🔗def

If:

  • i is not yet running start it and return an Async computation that completes right away as the 0th multiple of duration has elapsed.

  • i is already running and:

    • the tick from the last call of i has not yet finished return the same Async computation as the last call

    • the tick from the last call of i has finished return a new Async computation that waits for the closest next tick from the time of calling this function.

  • i is not running anymore this is a no-op.

🔗def

If:

  • Interval.tick was called on i before the timer restarts counting from now and the next tick happens in duration.

  • i is not yet or not anymore running this is a no-op.

🔗def

If:

  • i is still running this stops i without completing any remaining Async computations that were created through tick. Note that if another Async computation is binding on any of these it will hang forever without further intervention.

  • i is not yet or not anymore running this is a no-op.

🔗def
Std.Async.Selector.sleep (duration : Std.Time.Millisecond.Offset) : Async (Selector Unit)
Std.Async.Selector.sleep (duration : Std.Time.Millisecond.Offset) : Async (Selector Unit)

Return a Selector that completes after duration.

Sleep.stop/Interval.stop leave pending waits hanging forever, and Selector.sleep's timer only starts once it's used inside a Selectable.

Selectors and Timers

This program runs a loop. At each iteration, it waits up to two seconds for a line of input. If the input is provided, then it echoes it and loops again. If the iteration times out, then the program exits. Checking for the timeout is done by using Selectable.one to race the timer against a channel that delivers the lines of input. This channel can be selected against, and it is fed by a dedicated thread that reads stdin.

module import Std.Async import Std.Sync.Channel open Std.Async open Std (CloseableChannel) -- Blocking reader on a dedicated thread: forward each line, close on EOF. partial def reader (stdin : IO.FS.Stream) (ch : CloseableChannel String) : IO Unit := do let line stdin.getLine if line.isEmpty then discard <| (ch.close).toBaseIO else discard <| ch.send line reader stdin ch -- Echo each line; stop on EOF (channel closed) or 2s of silence. partial def echo (ch : CloseableChannel String) : Async Unit := do let more Selectable.one #[ .case ch.recvSelector fun | some line => do IO.print (s!"got: {line}"); return true | none => do IO.println "done"; return false, .case ( Selector.sleep 2000) fun _ => do IO.println "done" return false ] if more then echo ch public def main : IO Unit := do let ch CloseableChannel.new (α := String) discard <| IO.asTask (prio := .dedicated) (reader ( IO.getStdin) ch) (echo ch).block

When run with this input:

stdinOne lineAnother

it produces this output:

stdoutgot: One linegot: Anotherdone

21.12.7. Asynchronous I/O🔗

🔗type class
Std.Async.IO.AsyncRead (α β : Type) : Type
Std.Async.IO.AsyncRead (α β : Type) : Type

Interface for asynchronous reading operations.

Instance Constructor

Std.Async.IO.AsyncRead.mk

Methods

read : α  Async β
🔗type class
Std.Async.IO.AsyncWrite (α β : Type) : Type
Std.Async.IO.AsyncWrite (α β : Type) : Type

Interface for asynchronous writing operations.

Instance Constructor

Std.Async.IO.AsyncWrite.mk

Methods

write : α  β  Async Unit
writeAll : α  Array β  Async Unit
flush : α  Async Unit
🔗type class
Std.Async.IO.AsyncStream (α : Type) (β : outParam Type) : Type
Std.Async.IO.AsyncStream (α : Type) (β : outParam Type) : Type

Interface for asynchronous streaming with selector-based iteration.

Instance Constructor

Std.Async.IO.AsyncStream.mk

Methods

next : α  Selector β
stop : α  IO Unit

21.12.7.1. Network🔗

The standard library provides asynchronous TCP and UDP sockets along with DNS name resolution. Operations that wait for the network, such as accepting a connection, receiving data, or resolving a name, are Async actions. TCP and UDP sockets additionally provide selectors, namely TCP.Socket.Server.acceptSelector, TCP.Socket.Client.recvSelector, and UDP.Socket.recvSelector, so that network events can be multiplexed with other I/O using event selection. Socket addresses are represented by the types Std.Net.SocketAddress and Std.Net.IPAddr. As with other selectors, a socket's selector and its corresponding blocking operation each expect exclusive control of the socket. They must not be used at the same time on the same socket.

21.12.7.1.1. TCP🔗

TCP is connection-oriented: a client establishes a connection to a server, after which the two exchange a reliable, ordered stream of bytes. The protocol includes measures for ensuring that the data that is sent actually arrives, including re-transmission of missing parts; these features rely on having an established connection with its associated state. A TCP server socket accepts incoming connections, while a TCP client socket connects to a server and exchanges data. A server is set up by creating it, binding it to an address, listening, and then accepting connections. A client is created, connected to an address, and then used to send and receive data.

🔗structure

Represents a TCP server socket, managing incoming client connections.

Fields

native : Std.Internal.UV.TCP.Socket
🔗def
Std.Async.TCP.Socket.Server.bind (s : TCP.Socket.Server) (addr : Std.Net.SocketAddress) : IO Unit
Std.Async.TCP.Socket.Server.bind (s : TCP.Socket.Server) (addr : Std.Net.SocketAddress) : IO Unit

Binds the server socket to the specified address. Address reuse is enabled to allow rebinding the same address.

🔗def

Listens for incoming connections with the given backlog.

🔗def

Gets the local address of the server socket.

🔗def

Enables the Nagle algorithm for all client sockets accepted by this server socket.

🔗def
Std.Async.TCP.Socket.Server.keepAlive (s : TCP.Socket.Server) (enable : Bool) (delay : Std.Time.Second.Offset) : autoParam (delay.val 1) TCP.Socket.Server.keepAlive._auto_1 IO Unit
Std.Async.TCP.Socket.Server.keepAlive (s : TCP.Socket.Server) (enable : Bool) (delay : Std.Time.Second.Offset) : autoParam (delay.val 1) TCP.Socket.Server.keepAlive._auto_1 IO Unit

Enables TCP keep-alive for all client sockets accepted by this server socket.

🔗structure

Represents a TCP client socket, used to connect to a server.

Fields

native : Std.Internal.UV.TCP.Socket
🔗def
Std.Async.TCP.Socket.Client.bind (s : TCP.Socket.Client) (addr : Std.Net.SocketAddress) : IO Unit
Std.Async.TCP.Socket.Client.bind (s : TCP.Socket.Client) (addr : Std.Net.SocketAddress) : IO Unit

Binds the server socket to the specified address. Address reuse is enabled to allow rebinding the same address.

🔗def
Std.Async.TCP.Socket.Client.connect (s : TCP.Socket.Client) (addr : Std.Net.SocketAddress) : Async Unit
Std.Async.TCP.Socket.Client.connect (s : TCP.Socket.Client) (addr : Std.Net.SocketAddress) : Async Unit

Connects the client socket to the given address.

🔗def

Sends multiple data buffers through the client socket.

🔗def

Receives data from the client socket. If data is received, it’s wrapped in .some. If EOF is reached, the result is .none, indicating no more data is available. Receiving data in parallel on the same socket is not supported. Instead, we recommend binding multiple sockets to the same address. Furthermore calling this function in parallel with recvSelector is not supported.

🔗def

Gets the remote address of the client socket.

🔗def

Gets the local address of the client socket.

🔗def
Std.Async.TCP.Socket.Client.keepAlive (s : TCP.Socket.Client) (enable : Bool) (delay : Std.Time.Second.Offset) : autoParam (delay.val 0) TCP.Socket.Client.keepAlive._auto_1 IO Unit
Std.Async.TCP.Socket.Client.keepAlive (s : TCP.Socket.Client) (enable : Bool) (delay : Std.Time.Second.Offset) : autoParam (delay.val 0) TCP.Socket.Client.keepAlive._auto_1 IO Unit

Enables TCP keep-alive with a specified delay for the client socket.

21.12.7.1.2. UDP🔗

Unlike TCP, UDP is connectionless: rather than first establishing a connection, a single socket sends and receives independent messages, called datagrams, to and from any address. There is no provision for ensuring that the datagrams actually arrive; with UDP, this is an application-level concern. A datagram can also be delivered to many recipients at once using broadcast or multicast.

🔗def

Creates a new UDP socket.

🔗def
Std.Async.UDP.Socket.bind (s : UDP.Socket) (addr : Std.Net.SocketAddress) : IO Unit
Std.Async.UDP.Socket.bind (s : UDP.Socket) (addr : Std.Net.SocketAddress) : IO Unit

Binds the UDP socket to the given address. Address reuse is enabled to allow rebinding the same address.

🔗def
Std.Async.UDP.Socket.connect (s : UDP.Socket) (addr : Std.Net.SocketAddress) : IO Unit
Std.Async.UDP.Socket.connect (s : UDP.Socket) (addr : Std.Net.SocketAddress) : IO Unit

Associates the UDP socket with the given address and port, so every message sent by this socket is automatically sent to that destination.

🔗def
Std.Async.UDP.Socket.send (s : UDP.Socket) (data : ByteArray) (addr : Option Std.Net.SocketAddress := none) : Async Unit
Std.Async.UDP.Socket.send (s : UDP.Socket) (data : ByteArray) (addr : Option Std.Net.SocketAddress := none) : Async Unit

Sends data through an UDP socket. The addr parameter specifies the destination address. If addr is none, the data is sent to the default peer address set by connect.

🔗def
Std.Async.UDP.Socket.sendAll (s : UDP.Socket) (data : Array ByteArray) (addr : Option Std.Net.SocketAddress := none) : Async Unit
Std.Async.UDP.Socket.sendAll (s : UDP.Socket) (data : Array ByteArray) (addr : Option Std.Net.SocketAddress := none) : Async Unit

Sends multiple data buffers through an UDP socket. The addr parameter specifies the destination address. If addr is none, the data is sent to the default peer address set by connect.

🔗def
Std.Async.UDP.Socket.recv (s : UDP.Socket) (size : UInt64) : Async (ByteArray × Option Std.Net.SocketAddress)
Std.Async.UDP.Socket.recv (s : UDP.Socket) (size : UInt64) : Async (ByteArray × Option Std.Net.SocketAddress)

Receives data from an UDP socket. size is for the maximum bytes to receive. The promise resolves when some data is available or an error occurs. If the socket has not been previously bound with bind, it is automatically bound to 0.0.0.0 (all interfaces) with a random port. Furthermore calling this function in parallel with recvSelector is not supported.

🔗def
Std.Async.UDP.Socket.getSockName (s : UDP.Socket) : IO Std.Net.SocketAddress
Std.Async.UDP.Socket.getSockName (s : UDP.Socket) : IO Std.Net.SocketAddress

Gets the local address of the UDP socket.

🔗def
Std.Async.UDP.Socket.getPeerName (s : UDP.Socket) : IO Std.Net.SocketAddress
Std.Async.UDP.Socket.getPeerName (s : UDP.Socket) : IO Std.Net.SocketAddress

Gets the remote address of the UDP socket. On unconnected handles, it throws the .invalidArgument. error.

🔗def
Std.Async.UDP.Socket.setBroadcast (s : UDP.Socket) (enable : Bool) : IO Unit
Std.Async.UDP.Socket.setBroadcast (s : UDP.Socket) (enable : Bool) : IO Unit

Enables or disables broadcasting for the UDP socket.

🔗def
Std.Async.UDP.Socket.setTTL (s : UDP.Socket) (ttl : UInt32) : IO Unit
Std.Async.UDP.Socket.setTTL (s : UDP.Socket) (ttl : UInt32) : IO Unit

Sets the TTL for outgoing packets.

🔗def
Std.Async.UDP.Socket.setMulticastLoop (s : UDP.Socket) (enable : Bool) : IO Unit
Std.Async.UDP.Socket.setMulticastLoop (s : UDP.Socket) (enable : Bool) : IO Unit

Enables or disables multicast loopback for the UDP socket.

🔗def

Sets the time-to-live (TTL) for multicast packets.

🔗def
Std.Async.UDP.Socket.setMulticastInterface (s : UDP.Socket) (interfaceAddr : Std.Net.IPAddr) : IO Unit
Std.Async.UDP.Socket.setMulticastInterface (s : UDP.Socket) (interfaceAddr : Std.Net.IPAddr) : IO Unit

Sets the multicast interface for sending packets.

🔗def
Std.Async.UDP.Socket.setMembership (s : UDP.Socket) (multicastAddr : Std.Net.IPAddr) (interfaceAddr : Option Std.Net.IPAddr) (membership : UDP.Membership) : IO Unit
Std.Async.UDP.Socket.setMembership (s : UDP.Socket) (multicastAddr : Std.Net.IPAddr) (interfaceAddr : Option Std.Net.IPAddr) (membership : UDP.Membership) : IO Unit

Sets the membership for joining or leaving a multicast group.

21.12.7.1.3. DNS🔗

DNS resolution converts between names and socket addresses. DNS.getAddrInfo performs forward resolution from a host and service to a list of addresses, while DNS.getNameInfo performs reverse resolution from an address to a host and service.

🔗def
Std.Async.DNS.getAddrInfo (host service : String) (addrFamily : Option Std.Net.AddressFamily := none) : Async (Array Std.Net.IPAddr)
Std.Async.DNS.getAddrInfo (host service : String) (addrFamily : Option Std.Net.AddressFamily := none) : Async (Array Std.Net.IPAddr)

Asynchronously resolves a hostname and service to an array of socket addresses.

🔗def
Std.Async.DNS.getNameInfo (host : Std.Net.SocketAddress) : Async DNS.NameInfo
Std.Async.DNS.getNameInfo (host : Std.Net.SocketAddress) : Async DNS.NameInfo

Performs a reverse DNS lookup on a SocketAddress.

🔗structure

Represents a resolved hostname and service name from a socket address.

Constructor

Std.Async.DNS.NameInfo.mk

Fields

host : String

The resolved hostname (e.g., "example.com").

service : String

The service name (e.g., "http" for port 80).

21.12.7.2. Signals🔗

Unix-style signals are asynchronous notifications that can be received from the operating system at any time. For example, when a user presses Ctrl-C, the SIGINT signal is sent to the process. A Signal.Waiter is a Lean representation of an underlying signal handler. The signals that can be handled are enumerated in the type Signal:

🔗inductive type

Unix style signals for Unix and Windows. SIGKILL and SIGSTOP are missing because they cannot be caught. SIGBUS, SIGFPE, SIGILL, and SIGSEGV are missing because they cannot be caught safely by libuv. SIGPIPE is not present because the runtime ignores the signal.

Constructors

Std.Async.Signal.sighup : Signal

Hangup detected on controlling terminal or death of controlling process.

On Windows:

  • SIGHUP is generated when the user closes the console window. The program is given ~10 seconds to perform cleanup before Windows unconditionally terminates it.

Std.Async.Signal.sigint : Signal

Interrupt program.

Notes:

  • Normally delivered when the user presses CTRL+C. Not generated when terminal raw mode is enabled.

Std.Async.Signal.sigtrap : Signal

Trace/breakpoint trap.

Std.Async.Signal.sigabrt : Signal

Abort signal.

Notes:

  • SIGABRT is not catchable if generated by certain runtime functions, such as abort or assertion failure.

  • On Windows, watchers can be created for SIGABRT, but they never receive the signal.

Std.Async.Signal.sigusr1 : Signal

User-defined signal 1.

Std.Async.Signal.sigusr2 : Signal

User-defined signal 2.

Std.Async.Signal.sigalrm : Signal

Real-time timer expired.

Std.Async.Signal.sigterm : Signal

Termination signal.

Notes:

  • On Windows, watchers can be created for SIGTERM, but they never receive the signal.

Std.Async.Signal.sigchld : Signal

Child status has changed.

Std.Async.Signal.sigcont : Signal

Continue after stop.

Std.Async.Signal.sigtstp : Signal

Stop typed at terminal.

Std.Async.Signal.sigttin : Signal

Background read attempted from control terminal.

Std.Async.Signal.sigttou : Signal

Background write attempted to control terminal.

Std.Async.Signal.sigurg : Signal

Urgent condition on socket.

Std.Async.Signal.sigxcpu : Signal

CPU time limit exceeded.

Std.Async.Signal.sigxfsz : Signal

File size limit exceeded.

Std.Async.Signal.sigvtalrm : Signal

Virtual alarm clock.

Std.Async.Signal.sigprof : Signal

Profiling timer expired.

Std.Async.Signal.sigwinch : Signal

Window size change.

Notes:

  • SIGWINCH is raised whenever the runtime detects the console has been resized.

  • Under console emulators, or on 32-bit apps on 64-bit systems, SIGWINCH is emulated.

  • In these cases, signals may not be delivered timely.

Std.Async.Signal.sigio : Signal

I/O now possible.

Std.Async.Signal.sigsys : Signal

Bad system call.

Depending on the platform, some signals cannot be caught. On Unix-like operating systems, SIGKILL and SIGSTOP can't be caught. SIGBUS, SIGFPE, SIGILL, or SIGSEGV can't be handled because Lean uses libuv to install signal handlers, and libuv cannot safely catch these signals. Finally, the Lean run-time system ignores SIGPIPE. On Windows, waiters can be created for SIGTERM and SIGABRT, but they never fire. SIGHUP fires when the console is closed, with approximately ten seconds provided for cleanup. SIGINT is not delivered in terminal raw mode, and SIGWINCH is emulated and may be untimely.

To install a signal handler, use Signal.Waiter.mk to register a signal itself. The waiter can be used via Signal.Waiter.wait, which allows it to be waited for using await, but most use cases probably want to use Signal.Waiter.selector together with event selection to handle arriving signals by canceling ongoing work and cleaning up. This pattern, and the Signal.Waiter API, mirror those of timers; unlike timers, the arrival of a signal is unpredictable.

🔗structure

Signal.Waiter can be used to handle a specific signal once.

Fields

native : Std.Internal.UV.Signal
🔗def

Set up a Signal.Waiter that waits for the specified signum. This function only initializes but does not yet start listening for the signal.

🔗def

If:

  • s is not yet running start listening and return an AsyncTask that will resolve once the previously configured signal is received.

  • s is already or not anymore running return the same AsyncTask as the first call to wait.

The resolved AsyncTask contains the signal number that was received.

🔗def

If:

  • s is still running this stops s without resolving any remaining AsyncTasks that were created through wait. Note that if another AsyncTask is binding on any of these it is going hang forever without further intervention.

  • s is not yet or not anymore running this is a no-op.

🔗def
Std.Async.Signal.Waiter.selector (s : Signal.Waiter) : Selector Unit
Std.Async.Signal.Waiter.selector (s : Signal.Waiter) : Selector Unit

Create a Selector that resolves once s has received the signal. Note that calling this function does not start the signal waiter.

Selectors and Signals

This program runs a loop. At each iteration, it waits for a line of input or Ctrl-C, which sends SIGINT. If the input is provided, then it echoes it and loops again. If it receives SIGINT, then iteration stops and the program terminates. Checking for the signal is done by using Selectable.one to race the signal handler against a channel that delivers the lines of input. This channel can be selected against, and it is fed by a dedicated thread that reads stdin.

module import Std.Async import Std.Sync.Channel open Std.Async open Std (CloseableChannel) -- Blocking reader on a dedicated thread: forward each line, close on EOF. partial def reader (stdin : IO.FS.Stream) (ch : CloseableChannel String) : IO Unit := do let line stdin.getLine if line.isEmpty then discard <| (ch.close).toBaseIO else discard <| ch.send line reader stdin ch -- Echo each line; stop on EOF (channel closed) or SIGINT (Ctrl-C). partial def echo (sigint : Signal.Waiter) (ch : CloseableChannel String) : Async Unit := do let more Selectable.one #[ .case ch.recvSelector fun | some line => do IO.print (s!"got: {line}"); return true | none => do IO.println "done"; return false, .case sigint.selector fun _ => do IO.println "interrupted" return false ] if more then echo sigint ch public def main : IO Unit := do let ch CloseableChannel.new (α := String) let sigint Signal.Waiter.mk .sigint (repeating := true) discard <| IO.asTask (prio := .dedicated) (reader ( IO.getStdin) ch) (echo sigint ch).block

21.12.8. Cancellation🔗

Typical asynchronous applications need to handle cancellation, where work needs to be abandoned. For example, if a user presses Ctrl-C or a timeout occurs, then a download may be abandoned and temporary files cleaned up without terminating the entire application. The ContextAsync monad provides tools for managing hierarchical trees of tasks, where canceling a task also cancels its children.

Cancellation is cooperative: tasks must explicitly check whether they've been canceled and terminate themselves. In other words, cancellation is an event that tasks may opt into observing, rather than a mechanism to forcibly terminate other tasks.

There are two primary ways to cancel a tree of ContextAsync computations:

  • ContextAsync.run executes a cancellable tree of tasks as an ordinary Async task. When the root task is completed, the entire tree is canceled.

  • ContextAsync.cancel cancels the current task and all of its children.

For cancellation to work as expected, concurrent tasks should be started with the helpers that are specifically designed for ContextAsync. When this is not possible, use ContextAsync.runIn to associate the current cancellation context with the new computation.

🔗def
Std.Async.ContextAsync (α : Type) : Type
Std.Async.ContextAsync (α : Type) : Type

An asynchronous computation with cooperative cancellation support via a CancellationContext. ContextAsync α is equivalent to ReaderT CancellationContext Async α, providing a CancellationContext value to async computations.

🔗def

Cancels the current context with the given reason, cascading to all child contexts. Cancellation is cooperative, operations must explicitly check isCancelled or use awaitCancellation to respond.

🔗def

Runs a ContextAsync computation with a new context that cancels after the execution of the computation. See also ContextAsync.runIn for running with an existing context.

🔗def

Runs a ContextAsync computation with a given context. See also ContextAsync.run for running with a new context that automatically cancels after execution.

🔗def

Launches a ContextAsync computation in the background, discarding its result.

The computation runs independently in the background in its own child context. The parent computation does not wait for background tasks to complete. This means that if the parent finishes its execution it will cause the cancellation of the background functions. See also disown for launching tasks that continue independently even after parent cancellation.

🔗def

Launches a ContextAsync computation in the background, discarding its result. It's similar to background, but the child context is not automatically cancelled when the action completes. This allows the disowned computation to continue running independently, even if the parent context is cancelled. The child context will remain alive as long as the computation needs it. See also background for launching tasks that are cancelled when the parent finishes.

🔗def

Runs two computations concurrently and returns both results. Each computation runs in its own child context; if either fails or is cancelled, both are cancelled immediately and the exception is propagated.

🔗def

Runs two computations concurrently and returns the result of the first to complete. Each computation runs in its own child context; when either completes, the other is cancelled immediately.

🔗def

Runs all computations concurrently and returns the first result. Each computation runs in its own child context; when the first completes successfully, all others are cancelled immediately.

21.12.8.1. Reacting to Cancellation🔗

Asynchronous computations can react to cancellation via explicit polling with ContextAsync.isCancelled. They can also block until the current context is canceled using ContextAsync.awaitCancellation; this is useful when there is no more work to be done until cancellation, but still allows for cleanup. Finally, cancellation can be awaited together with other events using event selection with Selector.cancelled or ContextAsync.doneSelector (they are synonymous).

🔗def

Checks if the current context is cancelled. Returns true if the context (or any ancestor) has been cancelled. Long-running operations should periodically check this and exit gracefully when cancelled.

Observing Cancellation

ContextAsync.isCancelled reports whether the current context has been canceled. Here, the context is canceled explicitly with ContextAsync.cancel:

(false, true)#eval Async.block <| ContextAsync.run do let before ContextAsync.isCancelled ContextAsync.cancel .cancel let after ContextAsync.isCancelled return (before, after)
(false, true)
Cooperating with Cancellation

Because cancellation is cooperative, a long-running computation must check ContextAsync.isCancelled itself and stop once it has been canceled. This worker records numbers until its context is canceled. The cancellation here comes from the worker itself after three iterations, but in practice it would come from a timeout or a parent task; the worker's reaction is the same:

def worker : ContextAsync (Array Nat) := do let log IO.mkRef (#[] : Array Nat) for i in [0:100] do if ContextAsync.isCancelled then break log.modify (·.push i) if i == 2 then ContextAsync.cancel .cancel log.get #[0, 1, 2]#eval Async.block <| ContextAsync.run worker
#[0, 1, 2]
🔗def
Std.Async.Selector.cancelled : ContextAsync (Selector Unit)
Std.Async.Selector.cancelled : ContextAsync (Selector Unit)

Returns a selector that completes when the current context is cancelled. This is useful for selecting on cancellation alongside other asynchronous operations.

Interrupting a Wait

Cancellation can be awaited alongside other events using event selection. Here, a computation waits for either a value on a channel or cancellation, whichever comes first. Because the context is canceled before the selection runs, the cancellation branch wins and the result is none:

def waitOrCancel (ch : CloseableChannel Nat) : ContextAsync (Option Nat) := do Selectable.one #[ .case ch.recvSelector (fun n? => return n?), .case ( Selector.cancelled) (fun _ => return none) ] none#eval Async.block <| ContextAsync.run do let ch CloseableChannel.new (α := Nat) ContextAsync.cancel .cancel waitOrCancel ch
none
🔗def

Returns a selector that completes when the current context is cancelled.

🔗def

Gets the cancellation reason if the context is cancelled. Returns some reason if cancelled, none otherwise, allowing you to distinguish between different cancellation types.

21.12.8.2. Cancellation Contexts🔗

ContextAsync is a reader on top of Async that provides access to a cancellation context. This context contains an ID along with a mutex-guarded mutable state that encodes a tree of IDs, each with a cancellation token, and a source of unique ID values. When child tasks are created, they are assigned new IDs and associated with the current task. When tasks are canceled, the tree in the state is used to cancel their children.

🔗structure

A cancellation context that allows multiple consumers to wait until cancellation is requested. Forms a tree structure where cancelling a parent cancels all children.

Constructor

Std.CancellationContext.mk

Fields

state : Std.Mutex Std.CancellationContext.State
token : Std.CancellationToken
id : UInt64
🔗structure

Constructor

Std.CancellationContext.State.mk

Fields

tokens : Std.TreeMap UInt64 (Std.CancellationToken × Array UInt64) compare

Map of token IDs to optional tokens and their children.

id : UInt64

Next available ID

🔗def

Forks a child context from a parent. If the parent is already cancelled, returns the parent context. Otherwise, creates a new child that will be cancelled when the parent is cancelled.

🔗def

Cancels this context and all child contexts with the given reason.

🔗def

Waits for cancellation. Returns a task that completes when the context is cancelled.

🔗def
Std.CancellationContext.doneSelector (x : Std.CancellationContext) : Selector Unit
Std.CancellationContext.doneSelector (x : Std.CancellationContext) : Selector Unit

Creates a selector that waits for cancellation.

🔗inductive type

Reasons for cancellation.

Constructors

Std.CancellationReason.deadline : Std.CancellationReason

Cancelled due to a deadline or timeout

Std.CancellationReason.custom (msg : String) :
  Std.CancellationReason

Custom cancellation reason

21.12.8.3. Cancellation Tokens🔗

A cancellation token is a mutex-guarded piece of shared mutable state that tracks whether the token has been canceled along with a set of consumers that have requested notification when cancellation occurs. Behind the scenes, ContextAsync.isCancelled checks the current context to get the token for the current task's ID, then checks whether the cancellation reason is some or none.

🔗structure

A cancellation token is a synchronization primitive that allows multiple consumers to wait until cancellation is requested.

Constructor

Std.CancellationToken.mk

Fields

state : Std.Mutex Std.CancellationToken.State
🔗structure

The central state structure for a CancellationToken.

Constructor

Std.CancellationToken.State.mk

Fields

reason : Option Std.CancellationReason

The cancellation reason if cancelled, none otherwise.

consumers : Std.Queue Std.CancellationToken.Consumer

Consumers that are blocked waiting for cancellation.