produce

fun <E> CoroutineScope.produce(context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.RENDEZVOUS, block: suspend ProducerScope<E>.() -> Unit): ReceiveChannel<E>(source)

Launches a new coroutine to produce a stream of values by sending them to a channel and returns a reference to the coroutine as a ReceiveChannel. This resulting object can be used to receive elements produced by this coroutine.

The receiver of block is a ProducerScope, which implements both SendChannel and CoroutineScope. This allows invoking send directly from the block to send elements to the channel while treating block as a coroutine.

The kind of the resulting channel depends on the specified capacity parameter. See the Channel interface documentation for details. By default, an unbuffered channel is created. If an invalid capacity value is specified, an IllegalArgumentException is thrown.

Behavior specifics

Behavior on termination

The channel is closed when the coroutine completes.

val values = listOf(1, 2, 3, 4)
val channel = produce<Int> {
for (value in values) {
send(value)
}
}
check(channel.toList() == values)

The running coroutine is cancelled when the channel is cancelled.

val channel = produce<Int> {
send(1)
send(2)
try {
send(3) // will throw CancellationException
} catch (e: CancellationException) {
println("The channel was cancelled!")
throw e // always rethrow CancellationException
}
}
check(channel.receive() == 1)
check(channel.receive() == 2)
channel.cancel()

If this coroutine finishes with an exception, it will attempt to close the channel with that exception as the cause, so after receiving all the existing elements, all further attempts to receive from it will throw the exception with which the coroutine finished. In addition, the exception will cancel the parent coroutine through structured concurrency.

val produceJob = Job()
val scope = CoroutineScope(produceJob)
// create and populate a channel with a buffer
val channel = scope.produce<Int>(capacity = Channel.UNLIMITED) {
repeat(5) { send(it) }
throw TestException()
}
produceJob.join() // wait for the parent of the `produce` to get cancelled
check(produceJob.isCancelled == true)
// prints 0, 1, 2, 3, 4, then throws `TestException`
for (value in channel) { println(value) }

If the channel is already closed and the exception cannot be propagated through structured concurrency (for example, because the parent has a SupervisorJob), the last-resort error-handling logic described in the CoroutineExceptionHandler will get invoked:

withContext(CoroutineExceptionHandler { ctx, e ->
// Will be invoked with `Failed to cancel`
println("Failure in the produce coroutine: $e")
}) {
supervisorScope {
// Because the parent job is a supervisor,
// the exception will not be propagated to the parent.
val channel = produce(capacity = Channel.UNLIMITED) {
send(1)
try {
awaitCancellation()
} catch (e: CancellationException) {
throw IllegalStateException("Failed to cancel", e)
}
}
channel.receive()
// Cancelling a channel also closes it,
// so now, the exception with which `produce` fails
// cannot be propagated.
channel.cancel()
}
}

When the coroutine is cancelled via structured concurrency and not the cancel function, the channel does not automatically close until the coroutine completes, so it is possible that some elements will be sent even after the coroutine is cancelled:

val parentScope = CoroutineScope(Dispatchers.Default)
val channel = parentScope.produce<Int>(capacity = Channel.UNLIMITED) {
repeat(5) {
send(it)
}
parentScope.cancel()
// suspending after this point would fail, but sending succeeds
send(-1)
}
for (c in channel) {
println(c) // 0, 1, 2, 3, 4, -1
} // throws a `CancellationException` exception after reaching -1

Note that cancelling produce via structured concurrency closes the channel with a cause.

The behavior around coroutine cancellation and error handling is experimental and may change in a future release.

See newCoroutineContext for a description of debugging facilities available for newly created coroutines.

Undelivered elements

Some values that produce creates may be lost:

val channel = produce(Dispatchers.Default, capacity = 5) {
repeat(100) {
send(it)
println("Sent $it")
}
}
channel.cancel() // no elements can be received after this!

There is no way to recover these lost elements. If this is unsuitable, please create a Channel manually and pass the onUndeliveredElement callback to the constructor: Channel(onUndeliveredElement = ...).

Structured concurrency

Coroutine context

produce creates a child coroutine of this CoroutineScope.

See the corresponding subsection in the launch documentation for details on how the coroutine context is created. In essence, the elements of context are combined with the elements of the CoroutineScope.coroutineContext, typically overriding them. It is incorrect to pass a Job element there, as this breaks structured concurrency.

Interactions between coroutines

The details of structured concurrency are described in the CoroutineScope interface documentation. Here is a restatement of some main points as they relate to produce:

  • The lifecycle of the parent CoroutineScope cannot end until this coroutine (as well as all its children) completes.

  • If the parent CoroutineScope is cancelled, this coroutine is cancelled as well.

  • If this coroutine fails with a non-CancellationException exception and the parent CoroutineScope has a non-supervisor Job in its context, the parent Job is cancelled with this exception.

  • If this coroutine fails with a non-CancellationException exception, the parent CoroutineScope has a supervisor Job or no job at all (as is the case with GlobalScope or malformed scopes), and the channel is already closed, the exception cannot be propagated and is handled as the CoroutineExceptionHandler documentation describes.

  • The lifecycle of the CoroutineScope passed as the receiver to the block will not end until the block completes (or gets cancelled before ever having a chance to run).

  • If the block throws a CancellationException, the coroutine is considered cancelled, cancelling all its children in turn, but the parent does not get notified.

Usage example

/* Generate random integers until we find the square root of 9801.
To calculate whether the given number is that square root,
use several coroutines that separately process these integers.
Alternatively, we may randomly give up during value generation.
`produce` is used to generate the integers and put them into a
channel, from which the square-computing coroutines take them. */
val parentScope = CoroutineScope(SupervisorJob())
val channel = parentScope.produce<Int>(
Dispatchers.IO,
capacity = 16 // buffer of size 16
) {
// this code will run on Dispatchers.IO
while (true) {
val request = run {
// simulate waiting for the next request
delay(5.milliseconds)
val randomInt = Random.nextInt(-1, 100)
if (randomInt == -1) {
// external termination request received
println("Producer: no longer accepting requests")
return@produce
}
println("Producer: sending a request ($randomInt)")
randomInt
}
send(request)
}
}
// Launch consumers
repeat(4) {
launch(Dispatchers.Default) {
for (request in channel) {
// simulate processing a request
delay(25.milliseconds)
println("Consumer $it: received a request ($request)")
if (request * request == 9801) {
println("Consumer $it found the square root of 9801!")
/* the work is done, the producer may finish.
the internal termination request will cancel
the producer on the next suspension point. */
channel.cancel()
}
}
}
}

Note: This is an experimental api. Behaviour of producers that work as children in a parent scope with respect to cancellation and error handling may change in the future.


fun <E> CoroutineScope.produce(context: Job, capacity: Int = Channel.RENDEZVOUS, block: suspend ProducerScope<E>.() -> Unit): ReceiveChannel<E>(source)

Deprecated

Passing a Job to coroutine builders breaks structured concurrency, leading to hard-to-diagnose errors. This pattern should be avoided. This overload will be deprecated with an error in the future.

Deprecated version of produce that accepts a Job.

See the documentation for the non-deprecated produce function to learn about the functionality of this function. This piece of documentation explains why this overload is deprecated.

It is incorrect to pass a Job as context to produce, async, or launch, because this violates structured concurrency. The passed Job becomes the sole parent of the newly created coroutine, which completely severs the tie between the new coroutine and the CoroutineScope in which it is launched.

Benefits of structured concurrency

Structured concurrency ensures that

  • Cancellation of the parent job cancels the children as well, which helps avoid unnecessary computations when they are no longer needed.

  • Cancellation of children also can be necessary for reliability: if the CoroutineScope's lifecycle is bound to some component that may not be used after it's destroyed, performing computations after the parent CoroutineScope is cancelled may lead to crashes.

  • For concurrent decomposition of work (when the CoroutineScope contains a non-supervisor job), failure of the newly created coroutine also causes the sibling coroutines to fail, improving the responsiveness of the program: unnecessary computations will not proceed when it's obvious that they are not needed.

  • The CoroutineScope can only complete when all its children complete. If the CoroutineScope is lexically scoped (for example, created by coroutineScope, supervisorScope, or withContext), this means that the lexical scope will only be exited (and the calling function will finish) once all child coroutines complete.

Possible alternatives

In some scenarios, one or more of the properties guaranteed by structured concurrency are actually undesirable. However, breaking structured concurrency altogether and losing the other properties can often be avoided.

Ignoring cancellation

Sometimes, it is undesirable for the child coroutine to react to the cancellation of the parent: for example, some computations have to be performed unconditionally.

Seeing produce(NonCancellable) in code is a reliable sign that this was the intention. Alternatively, you may see produce(Job()). Both patterns break structured concurrency and prevent cancellation from being propagated.

Here's an alternative approach that preserves structured concurrency:

scope.produce(start = CoroutineStart.ATOMIC) {
withContext(NonCancellable) {
// this line will be reached even if the parent is cancelled
}
}

This way, the child coroutine is guaranteed to complete, but the scope is still aware of the child. This allows the parent scope to await the completion of the child and to react to its failure.

Not cancelling other coroutines on failure

Often, the failure of one child does not require the work of the other coroutines to be cancelled.

producer(SupervisorJob()) is a telling sign that this was the reason for breaking structured concurrency in code, though producer(Job()) has the exact same effect. By breaking structured concurrency, producer(SupervisorJob()) { error("failure") } will prevent failure from affecting the parent coroutine and the siblings.

If all coroutines in a scope should fail independently, this suggests that the scope is a supervisor:

supervisorScope {
val producers = List(10) {
produce<Int> {
delay(10.milliseconds * it)
throw IllegalStateException("$it is tired of all this")
}
}
producers.forEach {
println(runCatching { it.receive() })
}
}

Every coroutine here will run to completion and will fail with its own error.

For non-lexically-scoped CoroutineScope instances, use SupervisorJob instead of Job when constructing the CoroutineScope.

If only some coroutines need to individually have their failures invisible to others in a non-lexically-scoped CoroutineScope, the correct approach from the point of view of structured concurrency is this:

val supervisorJob = SupervisorJob(scope.coroutineContext.job)
val childSupervisorScope = CoroutineScope(
scope.coroutineContext + supervisorJob
)
childSupervisorScope.produce {
// failures in this coroutine will not affect other children
}
// `cancel` or `complete` the `supervisorJob` when it's no longer needed
supervisorJob.complete()
supervisorJob.join()

For a lexically scoped CoroutineScope, it may be possible to use a supervisorScope at the end of the outer scope, depending on the code structure:

coroutineScope {
launch {
// failures in this coroutine will affect everyone
}
supervisorScope {
val channel = produce {
// failures in this coroutine
// are only available through `channel`
}
}
// this line will only be reached when the `produce` coroutine completes
}
// this line will be reached when both `launch` and `produce` complete

All of these approaches preserve the ability of a parent to cancel the children and to wait for their completion.

Avoiding both cancelling and being cancelled

Sometimes, coroutines to be spawned are just completely unrelated to the CoroutineScope used as the receiver, and no structured concurrency mechanisms are needed.

In that case, GlobalScope is the semantically clearer way of expressing opting out of structured concurrency:

GlobalScope.produce {
// this computation is explicitly outside structured concurrency
}

The reason why GlobalScope is marked as delicate is exactly that the coroutines created in it are not benefitting from structured concurrency.