====== Google Interview ======
* Posted an overview on [[http://ddaniels.net/ddanielsblog/2007/08/14/google-interview/|my blog]]. --- //[[ddaniels@ddaniels.net|Doug Daniels]] 2007/08/16 05:59//
* [[ProducerConsumerQuestion]]
The interview process on 8/8/07 consisted of the following:
- Intro to process
- Techincal problem
* Producer Consumer pattern, Server socket listening for connection.
- Initial Problem discussion
- Initial implementation from skeleton code
- Discussion of implementation
- Additional problem twist (Thread safe queue synchronization issues, atomic transactions)
- Discussion of removing synchronization from client code (add behavior to Queue to make it a BlockQueue)
- 5 min Google Engineer insight
===== Technical Question and Requirements =====
* Producer Consumer pattern, Server socket listening for connection.
* Required to process requests in order (once one is received we must process it before processing the next request).
* Messages come in bursts, a flurry for a couple minutes followed by hours of no activity.
* Queue doesn't have a max capacity but it can be empty
**Queue operations** (these are not thread safe)
* offer(Object o) - puts an object on the queue
* Object dequeue() - takes the next object off the queue
* isEmpty() - Checks if the queue is empty
===== Producer Consumer Code Skeleton =====
Initial problem code skeleton.
**Producer.java**
import java.util.Queue;
public class Producer implements Runnable {
// This will be assigned in the constructor
private Queue queue = null;
public void run() {
// Binds to socket, reads messages in
// packages message calls doSomething()
// doSomething(Object msg);
}
public void doSomething(Object msg) {
//How do you put something on the queue to the consumer
}
}
**Consumer.java**
import java.util.Queue;
public class Consumer implements Runnable {
// This will be assigned in the constructor
private Queue queue = null;
public void process(Object msg) {
//process message non-trivially (IE: it takes awhile).
}
public void run() {
while(true) {
doStuff();
}
}
public void doStuff() {
//Implement processing of Queue in a thread-safe manner
}
}
===== Producer Consumer Initial Implementation =====
Initially I implemented the code using coarse grained synchronized blocks, and I believe that's what they were looking for at a first pass.
public class Producer implements Runnable {
//...
public void doSomething(Object msg) {
synchronized(queue) {
queue.offer(msg);
}
}
}
public class Consumer implements Runnable {
//...
public void run() {
while(true) {
doStuff();
}
}
public void doStuff() {
synchronized(queue) {
if(!queue.isEmpty()) {
Object msg = queue.dequeue();
process(msg);
}
}
}
}
===== Improving Consumer to minimize synchronization block =====
I improved the Consumer to make an explicit wait() call so that it's not busy waiting in it's run() method. This is to meet the constraint that we only receive bursts of messages for minutes of high activity, followed by hours of inactivity.
To minimize the time that the consumer would synchronize and hold a lock on the queue, we'd want to rip a message off the queue and then allow it to be processed outside the synchronization block.
We want to do this also because in the problem description we were told that the processing takes a non-trivial amount of time so you need to let the consumer thread work while the producer is still receiving connections. Otherwise this would remove all concurrency and you'd essentially have one large critical section, as if the Producer directly called the Consumer's process() method.
==== Consumer with wait() ====
public class Consumer implements Runnable {
//...
public void run() {
while(true) {
doStuff();
}
}
public void doStuff() {
Object msg = null;
synchronized(queue) {
if(!queue.isEmpty()) {
msg = queue.dequeue();
}
}
//Process outside of the synchronized block
process(msg);
}
}
==== Minimize Consumer Blocking ====
The producer now makes a call to notifyAll() which notifies any threads currently in a wait() blocking state, that the lock is being released. This allows the Consumer to implement a blocking wait instead of a busy wait.
**Producer**
public void doSomething(Object msg) {
synchronized(queue) {
queue.enqueue(msg);
//Wake up anyone waiting for a message
notifyAll();
}
}
**Consumer**
public void doStuff() {
Object msg = null;
synchronized(queue) {
while (queue.isEmpty()) {
wait();
}
msg = queue.dequeue();
}
//Process outside of the synchronized block
process(msg);
}
===== Issues using Thread safe Queue =====
We talked about using a queue whose operations are thread safe. The main issue is that you still need synchronization to guarantee state information across multiple calls. Essentially you need to create atomic transaction blocks in your code.
===== Further Improvements - BlockingQueue =====
We discussed improving this further by encapsulating the synchronization behaviors into the Queue class itself. In effect creating a BlockingQueue that would allow clients to enqueue and dequeue from the Queue, and potentially block on the dequeue if nothing is in the queue.
**BlockingQueue**
public class BlockingQueue implements Queue {
private java.util.Queue queue = new java.util.LinkedList();
/**
* Make a blocking Dequeue call so that we'll only return when the queue has
* something on it, otherwise we'll wait until something is put on it.
*
* This will return null if the thread wait() call is interrupted.
*/
public synchronized Object dequeue() {
Object msg = null;
while (queue.isEmpty()) {
try {
wait();
} catch (InterruptedException e) {
// Error return the client a null item
return msg;
}
}
msg = queue.remove();
return msg;
}
/**
* Enqueue will add an object to this queue, and will notify any waiting
* threads that there is an object available.
*/
public synchronized void enqueue(Object o) {
queue.add(o);
// Wake up anyone waiting for something to be put on the queue.
notifyAll();
}
}
**Producer**
public class Producer implements Runnable {
// This will be assigned in the constructor
private Queue queue = null;
public void run() {
// Binds to socket, reads messages in
// packages message calls doSomething()
// doSomething(Object msg);
}
public void doSomething(Object msg) {
queue.enqueue(msg);
}
}
**Consumer**
public class Consumer implements Runnable {
// This will be assigned in the constructor
private Queue queue = null;
public void process(Object msg) {
try {
//process message non-trivially (IE: it takes awhile).
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void run() {
while(true) {
doStuff();
}
}
public void doStuff() {
Object msg = queue.dequeue();
process(msg);
}
}