Java 9 Flow API example – Processor

In previous post, we have general knowledge about Reactive Streams and Java 9 Flow API Components and Behaviour. In this tutorial, we’re gonna look at an example that implements Publisher, Subscriber with Processor as a bridge for reactive programming.

Related Articles:
Java 9 Flow API – Reactive Streams
Java 9 Flow API example – Publisher and Subscriber
Java 9 FLow SubmissionPublisher – A Concrete Publisher

I. Technologies

– Java 9
– Eclipse with Java 9 Support for Oxygen (4.7)

II. Overview
1. Processor

A Processor is a component that sits between the Publisher and Subscriber. It acts as:
+ a Subscriber when emitting a request signal to Publisher
+ a Publisher when pushing items to Subscriber.

We can create one or more Processors in chain which link a Publisher to a Subscriber.

2. Project

We will create a Publisher that is subscribed by a Processor, and that Processor will publish data to a Subscriber.

Publisher define a Subscription to work with Processor.
Processor define its own Subscription to work with Subscriber.

– Using Subscriber::onNext() method, Publisher pushes items to Processor, and Processor pushes items to Subscriber.
– Using Subscription::request() method, Processor requests items from Publisher, and Subscriber requests items from Processor.

Publisher and Processor defines an Executor for multi-threading. Then request() and onNext() method work asynchronously.

Processor has a data buffer to store items in case the demand number of items requested by Subscriber and Processor are different.

III. Practice

To understand how Publisher, Subscriber, Subscription and Processor behave and way to implementing them, please visit: Java 9 Flow API – Reactive Streams

Publisher<Integer> —— Processor<Integer, String> —— Subscriber<String>

1. Create implementation of Publisher

2. Create implementation of Processor

3. Create implementation of Subscriber

4. Check Result

We uses this class to test:

4.1 Subscriber DEMAND == Processor DEMAND

The result:

4.2 Subscriber DEMAND > Processor DEMAND

In this case, we invoke Subscriber::onComplete() method to notice Subscriber that Processor have already processed all its items and pushed them to Subscriber.
The result:

4.3 Subscriber DEMAND < Processor DEMAND

The result:

Related Posts

Got Something To Say:

Your email address will not be published. Required fields are marked *