The interview process on 8/8/07 consisted of the following:
Queue operations (these are not thread safe)
Initial problem code skeleton.
Producer.java
import java.util.Queue; public class Producer implements Runnable { // This will be assigned in the constructor private Queue queue = null; public void run() { // Binds to socket, reads messages in // packages message calls doSomething() // doSomething(Object msg); } public void doSomething(Object msg) { //How do you put something on the queue to the consumer } }
Consumer.java
import java.util.Queue; public class Consumer implements Runnable { // This will be assigned in the constructor private Queue queue = null; public void process(Object msg) { //process message non-trivially (IE: it takes awhile). } public void run() { while(true) { doStuff(); } } public void doStuff() { //Implement processing of Queue in a thread-safe manner } }
Initially I implemented the code using coarse grained synchronized blocks, and I believe that's what they were looking for at a first pass.
public class Producer implements Runnable { //... public void doSomething(Object msg) { synchronized(queue) { queue.offer(msg); } } }
public class Consumer implements Runnable { //... public void run() { while(true) { doStuff(); } } public void doStuff() { synchronized(queue) { if(!queue.isEmpty()) { Object msg = queue.dequeue(); process(msg); } } } }
I improved the Consumer to make an explicit wait() call so that it's not busy waiting in it's run() method. This is to meet the constraint that we only receive bursts of messages for minutes of high activity, followed by hours of inactivity.
To minimize the time that the consumer would synchronize and hold a lock on the queue, we'd want to rip a message off the queue and then allow it to be processed outside the synchronization block.
We want to do this also because in the problem description we were told that the processing takes a non-trivial amount of time so you need to let the consumer thread work while the producer is still receiving connections. Otherwise this would remove all concurrency and you'd essentially have one large critical section, as if the Producer directly called the Consumer's process() method.
public class Consumer implements Runnable { //... public void run() { while(true) { doStuff(); } } public void doStuff() { Object msg = null; synchronized(queue) { if(!queue.isEmpty()) { msg = queue.dequeue(); } } //Process outside of the synchronized block process(msg); } }
The producer now makes a call to notifyAll() which notifies any threads currently in a wait() blocking state, that the lock is being released. This allows the Consumer to implement a blocking wait instead of a busy wait.
Producer
public void doSomething(Object msg) { synchronized(queue) { queue.enqueue(msg); //Wake up anyone waiting for a message notifyAll(); } }
Consumer
public void doStuff() {
Object msg = null;
synchronized(queue) {
while (queue.isEmpty()) {
wait();
}
msg = queue.dequeue();
}
//Process outside of the synchronized block
process(msg);
}
We talked about using a queue whose operations are thread safe. The main issue is that you still need synchronization to guarantee state information across multiple calls. Essentially you need to create atomic transaction blocks in your code.
We discussed improving this further by encapsulating the synchronization behaviors into the Queue class itself. In effect creating a BlockingQueue that would allow clients to enqueue and dequeue from the Queue, and potentially block on the dequeue if nothing is in the queue.
BlockingQueue
public class BlockingQueue implements Queue { private java.util.Queue queue = new java.util.LinkedList(); /** * Make a blocking Dequeue call so that we'll only return when the queue has * something on it, otherwise we'll wait until something is put on it. * * This will return null if the thread wait() call is interrupted. */ public synchronized Object dequeue() { Object msg = null; while (queue.isEmpty()) { try { wait(); } catch (InterruptedException e) { // Error return the client a null item return msg; } } msg = queue.remove(); return msg; } /** * Enqueue will add an object to this queue, and will notify any waiting * threads that there is an object available. */ public synchronized void enqueue(Object o) { queue.add(o); // Wake up anyone waiting for something to be put on the queue. notifyAll(); } }
Producer
public class Producer implements Runnable { // This will be assigned in the constructor private Queue queue = null; public void run() { // Binds to socket, reads messages in // packages message calls doSomething() // doSomething(Object msg); } public void doSomething(Object msg) { queue.enqueue(msg); } }
Consumer
public class Consumer implements Runnable { // This will be assigned in the constructor private Queue queue = null; public void process(Object msg) { try { //process message non-trivially (IE: it takes awhile). Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void run() { while(true) { doStuff(); } } public void doStuff() { Object msg = queue.dequeue(); process(msg); } }