Monday, June 9, 2014

SynchronousQueue Example in Java - Produer Consumer Solution

SynchronousQueue is special kind of BlockingQueue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. When you call put() method on SynchronousQueue it blocks until another thread is there to take that element out of the Queue. Similarly, if a thread tries to remove an element and no element is currently present, that thread is blocked until another thread puts an element into the queue. You can correlated SynchronousQueue with athletes (threads) running with Olympic torch, they run with torch (object need to be passed) and passes it to other athlete waiting at other end. If you pay attention to the name, you will also understand that it is named SynchronousQueue with a reason, it passes data synchronously to other thread; it wait for the other party to take the data instead of just putting data and returning (asynchronous operation). If you are familiar with CSP and Ada, then you know that synchronous queues are similar to rendezvous channels. They are well suited for hand-off designs, in which an object running in one thread must sync up with an object running in another thread in order to hand it some information, event, or task. In earlier multi-threading tutorials we have learned how to solve producer consumer problem using wait and notify, and BlockingQueue and in this tutorial we will learn how to implement producer consumer design pattern using synchronous queue. This class also supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness property set to true grants threads access in FIFO order.


Producer Consumer using SynchronousQueue in Java

Producer Consumer Solution using SynchronousQueue in JavaAs I have said before, nothing is better than a producer consumer problem to understand inter-thread communication in any programming language. In Producer consumer problem, one thread act as producer which produces event or task and other thread act as consumer. Shared buffer is used to transfer data from producer to consumer. Difficulty in solving producer consumer problem comes with edge cases e.g. producer must wait if buffer is full or consumer thread must wait if buffer is empty.  Later one was quite easy as blocking queue provides not only buffer to store data but also flow control to block thread calling put() method (PRODUCER) if buffer is full, and blocking thread calling take() method (CONSUMER) if buffer is empty.  In this tutorial, we will solve the same problem using SynchronousQueue, a special kind of concurrent collection which has zero capacity.

In following example, we have two threads which is named PRODUCER and CONSUMER (you should always name your threads, this is one of the best practice of writing concurrent application).  First thread, publishing cricket score, and second thread is consuming it. Cricket scores are nothing but a String object here. If you run the program as it is you won't notice any thing different. In order to understand how SynchronousQueue works, and how it solves producer consumer problem, you either need to debug this program in Eclipse or just start producer thread by commenting consumer.start(); If consumer thread is not running then producer will block at queue.put(event); call, and you won't see [PRODUCER] published event : FOUR. This happens because of special behaviour of SynchronousQueue, which guarantees that the thread inserting data will block until there is a thread to remove that data or vice-versa. You can test the other part of code by commenting producer.start(); and only starting consumer thread.

import java.util.concurrent.SynchronousQueue;

/**
 * Java Program to solve Producer Consumer problem using SynchronousQueue. A
 * call to put() will block until there is a corresponding thread to take() that
 * element.
 *
 * @author Javin Paul
 */
public class SynchronousQueueDemo{

    public static void main(String args[]) {

        final SynchronousQueue<String> queue = new SynchronousQueue<String>();

        Thread producer = new Thread("PRODUCER") {
            public void run() {
                String event = "FOUR";
                try {
                    queue.put(event); // thread will block here
                    System.out.printf("[%s] published event : %s %n", Thread
                            .currentThread().getName(), event);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };

        producer.start(); // starting publisher thread

        Thread consumer = new Thread("CONSUMER") {
            public void run() {
                try {
                    String event = queue.take(); // thread will block here
                    System.out.printf("[%s] consumed event : %s %n", Thread
                            .currentThread().getName(), event);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };

        consumer.start(); // starting consumer thread

    }

}

Output:
[CONSUMER] consumed event : FOUR 
[PRODUCER] published event : FOUR 

If you have send the output carefully then you would have noticed that order of events are reversed. Seems [CONSUMER] thread is consuming data even before [PRODUCER] thread has produced it. This happens because by default SynchronousQueue doesn't guarantee any order, but it has a fairness policy, which if set to true allows access to threads in FIFO order. You can enable this fairness policy by passing true to overloaded constructor of SynchronousQueue i.e. new SynchronousQueue(boolean fair).



Things to remember about SynchronousQueue in Java

Here are some of the important properties of this special blocking queue in Java. It's very useful to transfer data from one thread to another thread synchronously. It doesn't have any capacity and blocks until there is a thread on the other end.

1) SynchronousQueue blocks until another thread is ready to take the element, one thread is trying to put.

2) SynchronousQueue has zero capacity.

3) SynchronousQueue is used to implement queuing strategy of  direct hand-off, where thread hands-off to waiting thread, else creates new one if allowed, else task rejected.

4) This queue does not permit null elements, adding null elements will result in NullPointerException.

5) For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection.

6) You cannot peek at a synchronous queue because an element is only present when you try to remove it; Similarly you cannot insert an element (using any method) unless another thread is trying to remove it.

7) You cannot iterate over SynchronousQueue as there is nothing to iterate.

8) A SynchronousQueue constructed with fairness policy set to true grants threads access in FIFO order.

That's all about SynchronousQueue in Java. We have seen some special property of this special concurrent collection, and learned how to solve classical producer consumer problem using SynchronousQueue in Java.  By the way calling it a Queue is bit confusing because it doesn't have any capacity to hold your element. Call to put() operation will not complete until there is a thread which is calling take() operation. It's better be a rendezvous point between threads to share objects. In other words, its a utility to synchronously share data between two threads in Java, probably a safer alternative of wait and notify methods.

7 comments :

kewldude said...

Excellent. You explain things the best understandable way. Code is too neat.
Thanks

Gaurav said...

First of all very nice article.
I have a query on this. In case of one producer and multiple consumer scenario, how can i ensure that only one of the consumer consumes the message and not all of them.
Thanks

Javin Paul said...

@Gaurav by removing that element from Queue, but in case of multiple consumer having separate queue for each consumer is better design because it will reduce contention and prevent performance penalty due to locking and synchronization.

Gaurav said...

Thanks Javin for reply. I am using the Synchronous Queue and by testing I realised only one consumer is able to consume the message. So that's what I was looking for.
Earlier I thought of having separate queue for each consumer, however that will defeat the purpose I want to achieve. Producer in my case is producing about million message hour and I want to process those message in parallel. For this purpose, I need multiple worker/consumers and at the same time I want to make sure no two consumer receive the same messages.
I hope I am making sense here.

Javin Paul said...

Given your producer is very fast, there is a risk of OutOfMemoryException in case of using blocking queue if consumer happened to be slow. I would suggest using a 2 tier approach where producer insert in a BlockingQueue and one Consumer takes and distribute messages into different queue, then you can use thread pool executor to assign each thread their own bounded queue. The intermediate thread will be very fast because it will just taking element and putting into right queue so that BlockingQueue will not grow, and if your consumer is fast enough then individual queues will also be empty by then. I hope this helps.

Gaurav said...

Thanks Javin. That's really a good suggestion. I could try the two tier approach. I think i need to develop some sort of fairness logic for the first consumer who distributes the messages to different queues such that second tier queues have fair distribution of messages.
Also Can you throw some light on how to do deal with the clean up operation in case of producer has died. I guess I need to kill the single consumer thread in the first tier as it is sharing the blocking queue reference with the producer.

Javin Paul said...

Hello Gaurav, you can develop that logic based upon type of message, or content of message if no two message is related to other, otherwise you need to ensure that some messages are processed by one particular thread and should be put in one queue. For example in Electronic Trading System, we usually process all messages for one symbol in one queue to avoid processing Cancel/Modification request before New Order. Regarding what happens if producer die, If you use BlockingQueue consumer will wait until your Producer comes up again, in fact that's the main reason why you use BlockingQueue.

Post a Comment