One of the most misunderstood parts of the Java language is the use of three methods defined in the class “Object” – wait(), notify(), and notifyAll(). These methods are pivotal for Thread synchronization using guarded locks using shared objects. Hopefully, this post will help you understand this a little better.
wait(), notify() and notifyAll() can only be used within a synchronized context – that is one cannot call these methods without acquiring an exclusive lock on the shared object.
The scenario here in the below example is that of a PUBLISHER->SUBSCRIBER. The Subscriber thread waits for published messages from the Publisher thread, and terminates when it encounters a special “DONE !” message.
The shared object, using which locking is done is defined as a class Message.
package com.threads.sync;
import java.util.logging.Logger;
public class Message {
public static final Logger logger = Logger.getLogger(Message.class.getName());;
protected String message;
//when isEmpty = false -> message is ready to be consumed
protected boolean isEmpty;
public synchronized String getMessage(){
if(isEmpty){
try{
logger.info("Consumer Waiting...");
wait();
}catch(InterruptedException e) {
logger.info(e.getMessage());
}
}
//notification received., isEmpty = false -> toggle status
isEmpty = true;
//notify all that status has changed
notifyAll();
return message;
}
public synchronized void publish(String message){
if(!isEmpty){
try{
logger.info("Producer Waiting...");
wait();
}catch(InterruptedException e){
logger.info(e.getMessage());
}
}
//isEmpty -> true, update message
this.message = message;
//toggle status
isEmpty = false;
notifyAll();
}
}
The above class is pretty self explanatory. The Message class defines a flag called isEmpty which is false when there is a message to be read by the consumer (which in this case is the Subscriber thread). When the Subscriber thread receives a notify() / notifyAll() from the Publisher it reads the message and toggles the isEmpty flag.
The classes Publisher.java and Subscriber.java are defined below:
package com.threads.sync;
import java.util.logging.Logger;
public class Publisher implements Runnable {
public static final Logger logger = Logger.getLogger(Publisher.class.getName());
protected String [] messages;
protected Message msg;
public Publisher(Message msg){
messages = new String [] {
"Lorem", "Ipsum", "Testing Thread Synchronization", "DONE !"
};
//pass in the shared objectS
this.msg = msg;
}
public Publisher(String [] messages, Message msg){
this.messages = messages;
this.msg = msg;
}
@Override
public void run() {
if(messages == null)
return;
for(String message : messages) {
msg.publish(message);
//simulating thread randomness using sleep between 1-5 seconds
int factor = (int)(Math.random() * 4 + 1);
long millis = factor * 1000;
try {
logger.info("Publisher sleeping for " + millis + "ms.");
Thread.sleep(millis);
}catch(InterruptedException e){
logger.info(e.getMessage());
}
}
}
}
package com.threads.sync;
import java.util.logging.Logger;
public class Subscriber implements Runnable {
protected Message msg;
public static final Logger logger = Logger.getLogger(Subscriber.class.getName());
public Subscriber(Message msg){
this.msg = msg;
}
@Override
public void run() {
while(true){
String message = msg.getMessage();
//sometimes the call to getMessage can result in a wait() -> handle 'null' cases
if(message != null)
logger.info("Message recd : " + message);
if("DONE !".equalsIgnoreCase(message))
break;
//simulating thread randomness using sleep between 1-5 seconds
int factor = (int)(Math.random() * 4 + 1);
long millis = factor * 1000;
try {
logger.info("Subscriber sleeping for " + millis + "ms.");
Thread.sleep(millis);
}catch(InterruptedException e){
logger.info(e.getMessage());
}
}
}
}
The ThreadSyncTest.java which is the driver program is defined as -
package com.threads.sync;
public class ThreadSyncTests {
public static void main(String[] args) {
//->Initialize Shared Object
Message msg = new Message();
//->Initialize Publisher
new Thread(new Publisher(msg)).start();
//->Initialize Subscriber
new Thread(new Subscriber(msg)).start();
}
}
Once you get around the small details, the implementation of Thread synchronization using locks is quite simple to understand.