Kotlin – Asynchronous streams

About

Kotlin – Asynchronous streams

Two weeks ago, Xebia organized an online Meetup about Kotlin. In the meetup we showed how Kotlin’s coroutines take away much of the headache when writing asynchronous code from the application developer. In this series of blog posts we show what a positive effect Kotlin’s coroutines have for the library developer. Specifically, we look at the difference between asynchronous streaming libraries implemented in Java and Kotlin.

By: Matthisk Heimensen | Software Engineering Consultant at Xebia.

Asynchronous streams and callbacks in Java

Asynchronous streaming solutions, such as Rx or Reactor, rely on callbacks to perform flow control (or backpressure). Without flow control, systems could overflow and eventually crash. What is the difference between flow control in synchronous applications and their asynchronous counterparts?

Understanding backpressure

A stream consists of a chain of publishers and subscribers. A publisher produces new values and the subscriber consumes them. In between the two sits a buffer that can hold values until the consumer is ready to consume them. The following image illustrates a chain of a single publisher and subscriber:

If the publisher and subscriber operate concurrently, the publisher could overflow the buffer. This happens when the publisher is emitting items faster than the subscriber can consume them. Throttling the publisher to avoid overflowing the subscriber is what we call backpressure.

If the publisher and subscriber are synchronous; we could block the producer when the buffer is about to overflow. The Java standard library ships with a Queue implementation for this called a blocking queue. This queue will block the thread performing a put operation on a full queue and unblock the producer once an item is taken from the queue.

Note on blocking

What does it mean for the put operation to block? It means that the thread on which this operation runs will be parked, freeing up the CPU on which it was running. The operating system will schedule one or more threads to run in the meantime. When the subscriber takes an element from the queue, the producer’s thread will be unparked and execution of the producer will continue.

By parking the producer’s thread, the blocking queue makes it impossible for the producer to outpace the subscriber. Since the producer will not be granted CPU time until more space becomes available in the queue.

Synchronous streams can block the threads on which producers and subscribers execute, as such they can rely on a BlockingQueue to guarantee backpressure. But what if we wanted to implement a stream that operates asynchronously, such streams are not allowed to block the threads upon which the producer and subscriber execute. Because they are non-blocking, they cannot rely on a BlockingQueue to prevent the buffer from overflowing.

Because asynchronous code cannot rely on a BlockingQueue, flow control needs to be simulated through an asynchronous concurrency primitive. In the Java runtime the only primitive available to write asynchronous code is the callback; how can we use callbacks to throttle a producer?

Note on asynchronous primitives and promises

Futures or Promises also rely on callbacks, you create them by supplying a callback that will be invoked once the operation completes. Futures help with composing multiple asynchronous tasks but underneath there is no other concurrency primitive.

Simulating flow control

Let’s take a look at part of the reactive streams’ specification, to understand how it uses callbacks to simulate flow control. In a reactive stream the publisher uses a callback on the subscriber to send elements:

public interface Subscriber<T> {

    public void onNext(T t);

}

Simplified variant of the reactive stream Subscriber interface.

Since the subscriber could be operating on concurrently from the producer, the single callback cannot guarantee flow control. So, a reactive stream introduces a second callback, not from publisher to subscriber but from subscriber to publisher. This callback is part of the subscription a subscriber has to a producer. On this subscription backpressure is simulated by allowing the subscriber to communicate the number of items it can handle:

public interface Subscription {

    public void request(long n);

}

Simplified variant of the reactive stream Subscription interface.

This second callback allows a reactive stream to simulate flow control without blocking underlying threads. Where a blocking implementation guarantees that a publisher could not overflow a subscriber, because the operating system can park the producer’s thread, the non-blocking variant has no such guarantee. A publisher does not have to adhere to the limits supplied by the subscriber through the request callback.

Implications for operators

An operator (e.g. map) is both a subscriber, because it receives items from the upstream, and a publisher because it produces items for the downstream. Operators are used to alter the stream by reacting to items passing through it. Because an operator is a publisher it needs to account for backpressure from the downstream inside its implementation.

Accounting for backpressure in the implementation of an operator means that the implementer of an operator is responsible for adhering to the flow control contract. It is not allowed to send more items to the downstream than are requested. And the operator needs to ensure it requests enough items from the upstream; as to not stall the stream.

To demonstrate how a stream could stall we can look at the filter operator from the Rx docs. Because a filter operator does not emit all elements it receives, it needs to request additional items from the upstream for each ignored element:

public void onNext(Integer item) {

    if (item % 2 != 0) {

       downstream.onNext(item);

    } else {

       // If we forget to request an additional item, our stream will stall

       upstream.request(1);

    }

}

Simplified implementation of a filter operator.

The synchronization required to account for backpressure in an asynchronous stream becomes more apparent when implementing a buffer operator.  Here a drain method simulates flow control together with the concern of cancellation, error handling, and completion. Where a BlockingQueue of a synchronous stream would provide this pattern without additional synchronization. The asynchronous variant requires complex orchestration.

Conclusion

Streams require flow control to prevent a publisher from overflowing a subscriber. Where synchronous streams can rely on primitives such as a BlockingQueue to provide this flow control, asynchronous streams need to rely on callbacks to simulatebackpressure. This creates a complex contract between a publisher and subscriber, making implementation and usage of asynchronous streams harder and more error prone than their synchronous counterparts. This is not to say asynchronous streams are bad. But it is to demonstrate that while useful, asynchronous streams are build on top of a complex abstraction, namely that of simulating flow control with callbacks. What if we could build asynchronous streams on top of an asynchronous blocking queue, this would combine the best of both worlds.

 

This is part 1 in a multi part series on asynchronous streaming

  1. Asynchronous streams and callbacks
  2. Asynchronous streams and co-routines
  3. Asynchronous streams and Kotlin

 

Would you like to learn more about Java & Kotlin? Join our Webinar Week on 29 May from 9.30 – 10.30 AM. Our Kotlin trainer Urs Peters will give a presentation about Java & Kotlin. Save your spot for free and get to know more about Kotlin from a Java perspective with an in-depth comparison between the two languages.

 

Share
December 2024
January 2025
No event found!

Related Topics