Asynchronous Programming

    To address these scenarios, Julia provides s (also known by several other names, such as symmetric coroutines, lightweight threads, cooperative multitasking, or one-shot continuations). When a piece of computing work (in practice, executing a particular function) is designated as a Task, it becomes possible to interrupt it by switching to another . The original Task can later be resumed, at which point it will pick up right where it left off. At first, this may seem similar to a function call. However there are two key differences. First, switching tasks does not use any space, so any number of task switches can occur without consuming the call stack. Second, switching among tasks can occur in any order, unlike function calls, where the called function must finish executing before control returns to the calling function.

    You can think of a Task as a handle to a unit of computational work to be performed. It has a create-start-run-finish lifecycle. Tasks are created by calling the Task constructor on a 0-argument function to run, or using the macro:

    @task x is equivalent to Task(()->x).

    This task will wait for five seconds, and then print done. However, it has not started running yet. We can run it whenever we’re ready by calling schedule:

    1. julia> schedule(t);

    If you try this in the REPL, you will see that schedule returns immediately. That is because it simply adds t to an internal queue of tasks to run. Then, the REPL will print the next prompt and wait for more input. Waiting for keyboard input provides an opportunity for other tasks to run, so at that point t will start. t calls , which sets a timer and stops execution. If other tasks have been scheduled, they could run then. After five seconds, the timer fires and restarts t, and you will see done printed. t is then finished.

    The wait function blocks the calling task until some other task finishes. So for example if you type

    1. julia> schedule(t); wait(t)

    instead of only calling schedule, you will see a five second pause before the next input prompt appears. That is because the REPL is waiting for t to finish before proceeding.

    It is common to want to create a task and schedule it right away, so the macro is provided for that purpose –- @async x is equivalent to schedule(@task x).

    In some problems, the various pieces of required work are not naturally related by function calls; there is no obvious “caller” or “callee” among the jobs that need to be done. An example is the producer-consumer problem, where one complex procedure is generating values and another complex procedure is consuming them. The consumer cannot simply call a producer function to get a value, because the producer may have more values to generate and so might not yet be ready to return. With tasks, the producer and consumer can both run as long as they need to, passing values back and forth as necessary.

    Julia provides a Channel mechanism for solving this problem. A is a waitable first-in first-out queue which can have multiple tasks reading from and writing to it.

    Let’s define a producer task, which produces values via the put! call. To consume values, we need to schedule the producer to run in a new task. A special constructor which accepts a 1-arg function as an argument can be used to run a task bound to a channel. We can then take! values repeatedly from the channel object:

    The returned can be used as an iterable object in a for loop, in which case the loop variable takes on all the produced values. The loop is terminated when the channel is closed.

    1. julia> for x in Channel(producer)
    2. println(x)
    3. end
    4. start
    5. 2
    6. 4
    7. 6
    8. 8
    9. stop

    Note that we did not have to explicitly close the channel in the producer. This is because the act of binding a Channel to a associates the open lifetime of a channel with that of the bound task. The channel object is closed automatically when the task terminates. Multiple channels can be bound to a task, and vice-versa.

    While the Task constructor expects a 0-argument function, the method that creates a task-bound channel expects a function that accepts a single argument of type Channel. A common pattern is for the producer to be parameterized, in which case a partial function application is needed to create a 0 or 1 argument .

    For Task objects this can be done either directly or by use of a convenience macro:

    1. function mytask(myarg)
    2. ...
    3. end
    4. taskHdl = Task(() -> mytask(7))
    5. # or, equivalently
    6. taskHdl = @task mytask(7)

    To orchestrate more advanced work distribution patterns, and schedule can be used in conjunction with and Channel constructors to explicitly link a set of channels with a set of producer/consumer tasks.

    A channel can be visualized as a pipe, i.e., it has a write end and a read end :

    • Multiple writers in different tasks can write to the same channel concurrently via calls.

    • Multiple readers in different tasks can read data concurrently via take! calls.

    • As an example:

    • Channels are created via the Channel{T}(sz) constructor. The channel will only hold objects of type T. If the type is not specified, the channel can hold objects of any type. sz refers to the maximum number of elements that can be held in the channel at any time. For example, Channel(32) creates a channel that can hold a maximum of 32 objects of any type. A Channel{MyType}(64) can hold up to 64 objects of MyType at any time.

    • If a is empty, readers (on a take! call) will block until data is available.

    • tests for the presence of any object in the channel, while wait waits for an object to become available.

    • A is in an open state initially. This means that it can be read from and written to freely via take! and calls. close closes a . On a closed Channel, will fail. For example:

      1. julia> c = Channel(2);
      2. julia> put!(c, 1) # `put!` on an open channel succeeds
      3. 1
      4. julia> close(c);
      5. julia> put!(c, 2) # `put!` on a closed channel throws an exception.
      6. Stacktrace:
      7. [...]
    • and (which retrieves but does not remove the value) on a closed channel successfully return any existing values until it is emptied. Continuing the above example:

      1. julia> fetch(c) # Any number of `fetch` calls succeed.
      2. 1
      3. julia> fetch(c)
      4. 1
      5. julia> take!(c) # The first `take!` removes the value.
      6. 1
      7. julia> take!(c) # No more data available on a closed channel.
      8. ERROR: InvalidStateException("Channel is closed.",:closed)
      9. Stacktrace:
      10. [...]

    Consider a simple example using channels for inter-task communication. We start 4 tasks to process data from a single jobs channel. Jobs, identified by an id (job_id), are written to the channel. Each task in this simulation reads a job_id, waits for a random amount of time and writes back a tuple of job_id and the simulated time to the results channel. Finally all the results are printed out.

    Task operations are built on a low-level primitive called yieldto. yieldto(task, value) suspends the current task, switches to the specified task, and causes that task’s last call to return the specified value. Notice that yieldto is the only operation required to use task-style control flow; instead of calling and returning we are always just switching to a different task. This is why this feature is also called “symmetric coroutines”; each task is switched to and from using the same mechanism.

    is powerful, but most uses of tasks do not invoke it directly. Consider why this might be. If you switch away from the current task, you will probably want to switch back to it at some point, but knowing when to switch back, and knowing which task has the responsibility of switching back, can require considerable coordination. For example, put! and are blocking operations, which, when used in the context of channels maintain state to remember who the consumers are. Not needing to manually keep track of the consuming task is what makes put! easier to use than the low-level .

    In addition to yieldto, a few other basic functions are needed to use tasks effectively.

    • gets a reference to the currently-running task.
    • istaskdone queries whether a task has exited.
    • queries whether a task has run yet.

    Most task switches occur as a result of waiting for events such as I/O requests, and are performed by a scheduler included in Julia Base. The scheduler maintains a queue of runnable tasks, and executes an event loop that restarts tasks based on external events such as message arrival.

    The basic function for waiting for an event is wait. Several objects implement ; for example, given a Process object, wait will wait for it to exit. is often implicit; for example, a wait can happen inside a call to to wait for data to be available.

    In all of these cases, wait ultimately operates on a object, which is in charge of queueing and restarting tasks. When a task calls wait on a , the task is marked as non-runnable, added to the condition’s queue, and switches to the scheduler. The scheduler will then pick another task to run, or block waiting for external events. If all goes well, eventually an event handler will call notify on the condition, which causes tasks waiting for that condition to become runnable again.

    A task created explicitly by calling is initially not known to the scheduler. This allows you to manage tasks manually using yieldto if you wish. However, when such a task waits for an event, it still gets restarted automatically when the event happens, as you would expect.