Java 9 FLow SubmissionPublisher – A Concrete Publisher

JDK 9 provides a concrete Publisher named SubmissionPublisher that acts as a compliant Reactive Streams Publisher relying on drop handling and/or blocking for flow control. In this tutorial, we’re gonna take a look at SubmissionPublisher and an example that generates items for Subscribers.

Related Articles:
Java 9 Flow API – Reactive Streams
Java 9 Flow API example – Publisher and Subscriber
Java 9 Flow API example – Processor

I. Technologies

– Java 9
– Eclipse with Java 9 Support for Oxygen (4.7)

II. Overview
1. SubmissionPublisher

SubmissionPublisher is an implementation of Java 9 Flow.Publisher that asynchronously issues items to its subscribers until closing.

Depending on usage, we can indicate the Executor for SubmissionPublisher in its constructor methods:
– If we wanna submitting items in separate threads, and can estimate number of subscribers, consider using Executors.newFixedThreadPool(int) and constructor method:

– Otherwise, just call the default constructor (no input parameter) that will use ForkJoinPool.commonPool().

If a Subscriber has only one action that requests and processes all items, we can consider using consume(Consumer) method (which returns a CompletableFuture object) like this:

There are 2 publication methods:
submit​(): asynchronously publishes the given item to each subscriber, but blocks until resources are available.
offer(): publishes the given item asynchronously, to each current subscriber if possible, but the item may be dropped by one or more subscribers if resource limits are exceeded.

Some more useful methods:

  • close​(): issues onComplete signals to all subscribers, and disallows subsequent attempts to publish.
  • closeExceptionally​(Throwable error): issues onError signals to all subscribers with the given error, and disallows subsequent attempts to publish.
  • estimateMaximumLag​(): returns an estimate of the maximum number of items produced but not yet consumed among all subscribers.
  • estimateMinimumDemand​(): returns an estimate of the minimum number of items requested (via request) but not yet produced, among all subscribers.
  • getNumberOfSubscribers​().
  • hasSubscribers​().
  • isSubscribed​(Subscriber).
  • getSubscribers​(): returns list of current subscribers.
2. Project

submissionpublisher
We will create a Publisher (extends SubmissionPublisher) that is subscribed by two Subscribers:
– We don’t need to define any implementation of Subscription interface. Why?
SubmissionPublisher contains a linked list of BufferedSubscriptions, everytime we invoke subscribe() method to a Subscriber, there will be a new BufferedSubscription item in list which is related to that Subscriber automatically.
– Using submit(T item) method, Publisher periodically publishes the items generated from a Supplier to Subscribers (Publisher submit() method will invoke Subscription onNext() method).
– After receiving all items successfully, Subscriber can request new data or cancel Subscription (random).
– When generated items reach to MAX_ITEM_TO_PUBLISH, we will stop Publisher by using close() method (that will send onComplete signal to Subscribers).

III. Practice
1. Create subclass of SubmissionPublisher

2. Create implementation of Subscriber

3. Create Test Class

4. Check Result

Case 1:
– Subscriber A requests 3 items, then cancel subscribe.
– Subscriber B requests 6 items, then cancel subscribe.

Case 2:
– Subscriber A requests 3 items, then cancel subscribe.
– Subscriber B requests 6 items, then request more 6 items.
– Because Publisher submits only 11 items, so Subscriber B only receives 11 items (while requesting total 12), then receives onComplete signal from Publisher (via Publisher close() method) when it still subscribes.

Case 3: We change MAX_ITEM_TO_PUBLISH to 5:
– Subscriber A requests 3 items, then request more 3 items.
– Subscriber B requests 6 items, then request more 6 items.
– Because Publisher submits only 5 items, so Subscriber A and B only receives 5 items (while request total 6 items for each), then receive onComplete signal from Publisher (via Publisher close() method) when they still subscribe.

Special case: Using consume(Consumer) method:

The result:

Look at estimateMinimumDemand which is the returned value of estimateMinimumDemand() method.
When using consume() method, we don’t specify any Subscriber for Publisher, so it will initiate estimateMinimumDemand value by Long.MAX_VALUE, and subtract 1 every consumption.


Related Posts



Got Something To Say:

Your email address will not be published. Required fields are marked *

*