RenameTo and Poor Man’s File Based Cache

Jun 17, 2011 by

I have been assigned to investigate a performance problem in one of the servers in my project and I found an interesting concurrent issue that caused a noticeable drop of throughput figure in performance test. The main business logic of the server is to generate image chart for a Servlet front-end module. Let me use an oversimplified example code below to show the pattern of the problem.

class ChartServer{

    public String generateChart(ChartParameters params) throws IOException{

        String cacheFile = computeCacheFileName(params);
        if( isInterDayRequest(params) && isCacheFileExist(cacheFile) ){
            return new File(cacheDir, cacheFile).getAbsolutePath();
        }

        BufferedImage img = generateImageChart();
        File output = new File(cacheDir, cacheFile);
        synchronized( cacheLock ){
            ImageIO.write(img, "PNG", output );
        }

        return output.getAbsolutePath();
    }
}

All request parameters will be used to compute the cache key. Each cache image will be saved to file using this key as the file name. The server can just return the existing cache file for the subsequent request that contains the exact same set of parameters.

To prevent each worker thread from writing to the same file concurrently, all write to cache directory will block on cacheLock object. This block cause a big bottleneck since all thread will have to block on the same object. The question is that, in order to get rid of the global write-lock, how to make a worker thread be able to lock only the file it is going to write

I have some ideas in approaching the problem but I end up not choosing them.

- Apply the cache file name to a hash function to choose a worker thread so the write to the same file will be done sequentially in a thread.

Problem How can I have a good hash function to distribute tasks to all worker threads evenly?

- Create a lock object for each cache file and store it in HashMap. Each thread uses file name as key to get the corresponding lock (in a thread safe manner) from the HashMap and then block on the lock before writing. Now two threads writing on different files will block on different locks. All thread still block on the same HashMap object but let’s hope retrieving object from map will be very fast.

Problem Will all lock be stored in the HashMap for the whole life of process? If the system employs 16 worker threads then there will be 16 files that are being written at a time but each thread must find the lock for its target file in a HashMap that contains all lock objects in cache system.

It would be great if there is a way to fix this without too many changes in the old execution flow.

The FileChannel locking can’t be applied with this case

Since JDK 1.4, Java has provided a way to perform file locking with the use of lock() and tryLock() methods of class FileChannel. Unfortunately, those methods are meant to be used in the process-to-process level. The mechanism can’t be used to coordinate accesses between threads. The code below will result in java.nio.channels.OverlappingFileLockException when the second FileChannel try to get the lock already hold by the first FileChannel.

public static void concurrentWrite() throws IOException {
        FileChannel f1 = new FileOutputStream(new File("data.txt")).getChannel();
        FileChannel f2 = new FileOutputStream(new File("data.txt")).getChannel();

        FileLock lock1 = f1.tryLock();
        System.out.println("lock1 = " + (lock1 == null ? null : " acquired"));

        FileLock lock2 = f2.tryLock();
        System.out.println("lock2 = " + (lock2 == null ? null : " acquired"));
    }

In contrast, if I start the ConcurrentWriter thread below in 2 different processes, the file locking mechanism will work.

class ConcurrentWriter extends Thread {
    private final File output;

    public ConcurrentWriter(File output) {
        this.output = output;
    }

    @Override
    public void run() {
        try {
            writeInLoop();

        } catch (InterruptedException ex) {
            //Let this thread stop;
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }//run

    public void writeInLoop()
    throws InterruptedException, FileNotFoundException, IOException{

        while (true) {
            FileChannel fs = new FileOutputStream(output).getChannel();
            FileLock lock = tryLock(fs);
            try{
                writeData(fs);
            }finally{
                lock.release();
                fs.close();
            }

            Thread.sleep(100);
        }
    }

    public FileLock tryLock(FileChannel fs)
    throws InterruptedException, IOException {

        FileLock lock = null;
        while ((lock = fs.tryLock()) == null) {
            System.out.println(getName() + ": the file is locked, sleep then try again later");
            Thread.sleep(200);
        }
        System.out.println(getName() + ": got the file lock");
        return lock;
    }

    public void writeData(FileChannel fs)
    throws IOException, InterruptedException {
        //write data
    }
}

The output of a writer thread is shown below.

W1: got the file lock
W1: got the file lock
W1: the file is locked, sleep then try again later
W1: the file is locked, sleep then try again later
W1: the file is locked, sleep then try again later
W1: the file is locked, sleep then try again later
W1: the file is locked, sleep then try again later
W1: the file is locked, sleep then try again later
W1: the file is locked, sleep then try again later
W1: the file is locked, sleep then try again later
W1: got the file lock
W1: the file is locked, sleep then try again later
W1: the file is locked, sleep then try again later
W1: got the file lock
W1: got the file lock

Save by renaming

I came across this approach when I was searching about file locking. Each worker thread can just write to a temporary file with unique file name; may be the real target file name appended with worker thread’s id. Now each thread is exclusively writing to a temporary file. The temporary file will be renamed to be the real target file after it has been successfully written. The only step that is in high contention is the renaming operation. The approach will wok only if the renaming operation is atomic. Although I found various developers claiming that this operation is atomic in most file system, I haven’t found the good solid reference to support the claim yet. I have tested it on my performance environment with 16 cores machine and I didn’t see any corrupt data or IO exception. All responses contained the correct content size and the content in target cache file was updated so I am quite confident this technique practically works.

With this technique, I can get rid of the global write-lock which causes the contented synchronization point and improve the throughput figure dramatically. The technique may be implemented as the code shown below.

public class FileUtil {
    private FileUtil(){}

    public static boolean saveByRename(BufferedImage img, String format, File destination)
    throws FileNotFoundException, IOException{
        String tmpFileName = destination.getName() + getUniqueSuffixPerThread();
        File tmpFile = new File( destination.getParent(),tmpFileName);

        FileOutputStream out = new FileOutputStream(tmpFile);
        try{
            ImageIO.write(img, format, tmpFile);

        }finally{
            out.close();
        }

        return tmpFile.renameTo(destination);
    }

    private static String getUniqueSuffixPerThread(){
        return Thread.currentThread().getId() + ".tmp";
    }
}

Note: On window, a file can’t be renamed to replace an existing file. There will be no error but the renameTo() method just return false to indicate fail attempt.

What I personally like the most about this approach is that I doesn’t require much change. I don’t need to change the execution flow and it doesn’t involve complicated logic which may require a lot of test cases.

read more

Related Posts

Share This

Deadlock in Real World

May 4, 2010 by

Last year, I was assigned to handle some defects that would occur only when the system was being in high load. Those defects were apparently the result of some concurrent problems both in third party API and my own module. The characteristic of the problems is quite interesting because it features a couple well known problems that have already been described in many programming articles. What I like the most about these problems is that it evolved from a simple exception to a serious deadlock by just adding a few line of code. I will log about the problems here hoping that it could be any useful for other developers.

Again, like all of my programming posts, I can’t show you the real production code so I will show you an example that will simulate the problems. The usage scenarios and the nature of the problem will be just the same.

Let’s say I have an API for sending/receiving message over network. This API has been designed to be asynchronous by nature. Users can send a message to subscribe for a type of data and gets update of the data from server until users decide to unsubscribe.

public class AsyncTransport {

    private final Vector subscription = new Vector();
    private final MockUpdateMsgGenerator idGen = new MockUpdateMsgGenerator();
    private volatile DispatchingThread disp;
    private volatile boolean stop;

    public void connect(String host, String port) {
        //Mock implementation don't actually connect to any thing.
        disp = new DispatchingThread();
        disp.start();
    }

    public Subscription subScribe(Message request, AsyncMsgListener listener) {
        String subID = idGen.genIDforMessage(request);
        Subscription sub = new Subscription(subID, listener);

        subscription.add(sub);
        sendMessage(subID, request);

        return sub;
    }

    private void sendMessage(String subID, Message request) {
        //Mock implementation, just add the subID to MockUpdateMsgGenerator
        //so it can generate mock update for this subscription.
        idGen.addActiveID(subID);
    }

    public Update readNextUpdateFromNetwork() throws InterruptedException{
        //Read the next mock update
        return idGen.genMockUpdateFromSubIDList();
    }

    public void unsubScribe(Subscription sub) {
        subscription.remove(sub);

        //Tell MockUpdateMsgGenerator not to generate mock update for this id.
        idGen.removeActiveID( sub.getSubscriptionID() );
    }

    public void stop() {
        stop = true;
        this.disp.interrupt();
    }

    class DispatchingThread extends Thread {

        public DispatchingThread() {
            super("AsyncTransport Dispatching Thread");
        }

        public void run() {
            try {
                while (!stop) {
                    Update update = idGen.genMockUpdateFromSubIDList();
                    for (int i = 0; i < subscription.size(); i++) {
                        ((Subscription) subscription.get(i)).notifyIfSubscribeFor(update);
                    }
                }
            } catch (InterruptedException ex) {
                //Thread stop
            }
        }
    }//DispatchingThread
}

class Update {

    private final String ID;
    private final Message msg;

    public Update(String ID, Message msg) {
        this.ID = ID;
        this.msg = msg;
    }

    public String getSubscribtionID() {
        return ID;
    }

    public Message getMessage() {
        return msg;
    }
} 

public class Subscription{
    public final String ID;
    public final AsyncMsgListener listener;

    public Subscription(String ID, AsyncMsgListener listener) {
        this.ID = ID;
        this.listener = listener;
    }

    public String getSubscriptionID(){
        return this.ID;
    }

    public void notifyIfSubscribeFor(Update update){
        if( update.getSubscribtionID().equals(ID) ){
            listener.onMsg( update.getMessage() );
        }
    }
}

public interface AsyncMsgListener {
    public void onMsg(Message msg);
}

public class Message {
    private final String data;

    public Message(String data){
        this.data = data;
    }

    public String getData(){
        return this.data;
    }
}

I have filtered out unnecessary complexity by making the AsyncTransport a standalone class. It doesn’t really connect to anything. The class MockUpdateMsgGenerator will generate mock update for us.

Once the transport connects to server, the DispatchingThread will be started to keep listening to socket connection to see if there is any update from server. When an update arrives, the thread will iterate over subscription list and ask each subscription to fire a notification if the update contains ID match the ID of the subscription. The data structure used to store all subscriptions is Vector. Since all methods in Vector are synchronized (It’s a well known fact and stated in API doc), the designer of this API might think that it’s safe to access this subscription list concurrently. Users can subscribe/unsubscribe while the dispatching thread is iterating over the list.

                while (!stop) {
                    Update update = idGen.genMockUpdateFromSubIDList();
                    for (int i = 0; i < subscription.size(); i++) {
                        ((Subscription) subscription.get(i)).notifyIfSubscribeFor(update);
                    }
                }
 

Here is the code to test the API.

public class ASyncTester {
    public static void main(String[] args) throws InterruptedException {
        AsyncTransport transport = new AsyncTransport();
        transport.connect("localhost", "9999");

        int threadCount = 20;
        int taskSize = 100;
        ExecutorService exec = Executors.newFixedThreadPool(threadCount);
        for (int i = 0; i < taskSize; i++) {
            exec.submit(new SubscribtionCallable(transport));
        }

        exec.shutdown();
        exec.awaitTermination(1, TimeUnit.DAYS);

        transport.stop();
        System.out.println("Done");
    }
}

class SubscribtionCallable implements Callable<Void> {

    private final AsyncTransport transport;

    public SubscribtionCallable(AsyncTransport transport) {
        this.transport = transport;
    }

    public Void call() throws Exception {
        Subscription sub = transport.subScribe(new Message("ASYNC request"), new AsyncMsgListener() {
            public void onMsg(Message msg) {
                System.out.println(msg.getData());
            }
        });

        //wait to get some updates
        Thread.sleep(Math.abs(new Random().nextLong() % 100));
        transport.unsubScribe(sub);
        return null;
    }
}

We have 20 threads sharing 100 Callable tasks. All the tasks are doing the same series of actions; subscribe for data, wait for some update and then unsubscribe.

Check Then Act

The most annoying thing about concurrent problems is that they are hard to be reproduced. A kind of concurrent problem may occur only when the execution of various threads is in a certain order and we can’t just go inside JVM and arrange that order to happen. So, it’s possible that the configuration that works on my machine may not produce the same result on your machine. The configuration in this place I mean the conditions that affect concurrent execution of the program; number of running thread, task. You may want to try running the program with various configurations to see that one may make the program run just fine but one may result in an exception.

For my machine, the configuration shown in the code snippet above will make the Main class run just fine. But when I increase thread count to 50 to run 500 tasks, I will get an exception.

Exception in thread "AsyncTransport Dispatching Thread" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 49
at java.util.Vector.get(Vector.java:694)
at deadlock.transport.AsyncTransport$DispatchingThread.run(AsyncTransport.java:62)

This is the first problem I am going to show you in this post. Although it’s not possible to corrupt the internal states of Vector by concurrently calling get(), set() or size() method, you still need to think carefully about what actually comprise your mutual exclusive section; the area of code that need to be executed without any intervention from any other threads.

In our example, the dispatching loop need both subscription.size() and subscription.get() to run in mutual exclusive section. This scenario has been known as check-then-act scenario. The dispatching thread will check if the current index is less than the vector size, if it’s true then the element at that index will be retrieved. The steps that lead to the exception are shown below.

  1. The current index is 49, the current size of vector is 50.
  2. The index has been compared to the size of the vector. Since 49 is less than 50 then the body of for loop is executed.
  3. Another thread call unsubscribe() result in removing an element of the vector. Now the vector’s size is 49. The possible index for retrieving element from this vector is now 0 – 48.
  4. The body of for loop gets to be executed and the expression trying to get the element at index 49 result in java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 49

The check-then-act problem above can be addressed in many ways. The easiest way may be using synchronized block to create a critical section.

                    synchronized(subscription){
                        for(int i=0; i<subscription.size(); i++){
                            ( (Subscription)subscription.get(i) ).notifyIfSubscribeFor( update );
                        }
                    }

I am using the vector object itself as the lock of the synchronized block so all methods call of the vector instance will not be able to run if the code in our critical section is running. The state of the vector will not be changed from other threads during the execution of the synchronized block. That means, in our dispatching loop, if subscription.size() return 50 then it’s guaranteed that there will be element with index 49 in the vector by the time subscription.get(i) get to be executed sine it’s not possible for any thread to remove an element from the vector while the dispatching thread is in the critical section.

Unfortunately, this simple fix will lead to another problem. I need to introduce you a new usage scenario of the AsyncTransport so I can demonstrate the consequence problems of the fix above.

Synchronous Requesting Model

Let’s say there are some messages in my program that is not suit to asynchronous call. They are just simple request/reply calls. I will create an API to act as an adapter to make simple synchronous call over asynchronous API.

public class SyncTransport {

    private final AsyncTransport transport;

    public SyncTransport(AsyncTransport transport) {
        this.transport = transport;
    }

    public Message request(Message request, long timeout) throws InterruptedException {
        SyncRequestor syncAdapter = new SyncRequestor(transport);
        return syncAdapter.makeRequest(request, timeout);
    }
}

class SyncRequestor implements AsyncMsgListener {

    private final AsyncTransport transport;
    private volatile Message msg;

    public SyncRequestor(AsyncTransport transport) {
        this.transport = transport;
    }

    public void onMsg(Message msg) {
        this.msg = msg;

        synchronized (this) {
            notify();
        }
    }

    public Message makeRequest(Message request, long timeout) throws InterruptedException {
        Subscription sub = transport.subScribe(request, this);
        synchronized (this) {
	wait(timeout);
            transport.unsubScribe( sub );
        }

        return this.msg;
    }
}

The request() method delegates the execution to SyncRequestor class. The class will subscribe for data and wait for the first update. The execution flow will be block here. Since it is synchronous call, SyncRequestor knows that there will be only one update from server. The update is synchronous reply. Once the first message update arrives, the requestor stores it and calls notify() to tell the waiting thread that the reply is ready. The requesting thread (the thread that call request() method ) then perform unsubscribing and return the stored reply message.

At this point, I have introduced you both synchronous and asynchronous requesting model in my example program. Let’s look at the code in MockUpdateMsgGenerator. The class is able to generate update for both requesting model. String data in Message will be used to identify the requesting model of the message.

public class MockUpdateMsgGenerator{
    public static final String SYNC_MSG_PREFIX = "SYNC";
    public static final String ASYNC_MSG_PREFIX = "ASYNC";

    private final List activeSubId = new LinkedList();
    private final List activeSyncSubId = new LinkedList();

    public String genIDforMessage(Message msg){
        String prefix =  msg.getData().startsWith(SYNC_MSG_PREFIX)? SYNC_MSG_PREFIX : ASYNC_MSG_PREFIX;
        return prefix  + ":" + randomInt();
    }

    public synchronized void addActiveID(String id){
        if(id.startsWith( SYNC_MSG_PREFIX ) ){
            activeSyncSubId.add(id);

        }else{
            activeSubId.add(id);
        }

        notify();
    }

    public synchronized void removeActiveID(String id){
        activeSyncSubId.remove(id);
        activeSubId.remove(id);
    }

    public synchronized Update genMockUpdateFromSubIDList() throws InterruptedException{
        while( activeSubId.size() == 0 && activeSyncSubId.size() == 0){
            wait();
        }

        String id;
        if( activeSyncSubId.size() != 0 ){
            //Synchronous request should get response quickly so send update for
            //this requesing model first.
            id =  (String)activeSyncSubId.remove(0);

        }else{
            //Randomly generates update for subscriber.
            int randomIndex =  randomInt() % activeSubId.size();
            id = (String)activeSubId.get(randomIndex);
        }

        return new Update(id , new Message(id + " : DummyData" ) );
    }

    public int randomInt(){
        return Math.abs(new Random().nextInt() );
    }
}

Now we are ready to run our new example.

public class SyncTester {
    public static void main(String[] args) throws InterruptedException {
        AsyncTransport transport = new AsyncTransport();
        transport.connect("localhost", "9999");

        final SyncTransport syncTransport = new SyncTransport(transport);

        ExecutorService exec = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 200; i++) {
            exec.submit(new Callable<Void>() {

                public Void call() throws Exception {
                    Message resp = syncTransport.request(new Message("SYNC msg"), 1000);
                    if(resp != null){
                        System.out.println(resp.getData());
                    }else{
                        System.out.println("Timeout");
                    }

                    return null;
                }
            });
        }

        exec.shutdown();
        exec.awaitTermination(1, TimeUnit.DAYS);

        transport.stop();
        System.out.println("Done");
    }
}

Wait Leak

When I call request() with timeout 1000 milliseconds, I will get all reply messages and the program terminate properly. This seems to indicate that our MockUpdateMsgGenerator is working quite efficiently. But a strange thing happens when I specify 0 as timeout to wait forever until the reply arrives. The program prints out some reply messages and then freeze. The program doesn’t seem to proceed anytime soon. I may need to know what exactly is going on in the program. What are the parts of the program that being executed? What are the blocking threads and what there are blocking on? The jstack command is the tool I need.

Please note that my real production code is Linux based. I am just simulating the problem using the example program on my window laptop.

C:\Documents and Settings\ThinkPad>jps
3600 Jps
688 SyncTester
2668

C:\Documents and Settings\ThinkPad>jstack 688
2010-05-03 15:49:15
Full thread dump Java HotSpot(TM) Client VM (11.3-b02 mixed mode, sharing):

"pool-1-thread-5" prio=6 tid=0x02b04c00 nid=0xa48 in Object.wait() [0x02fbf000..0x02fbfb14]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x229f0020> (a deadlock.transport.sync.SyncRequestor)
at deadlock.transport.sync.SyncRequestor.makeRequest(SyncTransport.java:53)
- locked <0x229f0020> (a deadlock.transport.sync.SyncRequestor)

at deadlock.transport.sync.SyncTransport.request(SyncTransport.java:22)
at deadlock.Main2$1.call(Main2.java:23)
at deadlock.Main2$1.call(Main2.java:20)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)

"pool-1-thread-4" prio=6 tid=0x02b03800 nid=0xeec in Object.wait() [0x02f6f000..0x02f6fc14]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x229e2d70> (a deadlock.transport.sync.SyncRequestor)
at deadlock.transport.sync.SyncRequestor.makeRequest(SyncTransport.java:53)
- locked <0x229e2d70> (a deadlock.transport.sync.SyncRequestor)

at deadlock.transport.sync.SyncTransport.request(SyncTransport.java:22)
at deadlock.Main2$1.call(Main2.java:23)
at deadlock.Main2$1.call(Main2.java:20)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)

Actually, all 5 threads in our thread pool are blocked but I cut out just 2 threads to show here since all threads are blocked in the same execution path. We can see in the stack trace that thread “pool-1-thread-4” is waiting on an instance of SyncRequestor. The program is freeze because all working threads are blocked waiting for notification of the readiness of reply messages. The question is why the threads are still waiting. It’s unlikely that some reply messages are not make it to the onMsg() call back method since this same test with timeout 1000 milliseconds is working properly.

Actually, the notifications have been fired but the requesting threads don’t catch it in time. There are two threads working concurrently. One is the update dispatching thread, another is the requesting thread. It is possible that once the requesting thread finish executing transport.subScribe() method, the subscription has been processed in a very short time and the update dispatching thread set the reply message to SyncRequestor and call notify() before the requesting thread get a chance to all wait().

This problem is called wait-leak describing the situation that notification has been made before the target audience start waiting for it. The code snippet below is our program with slightly modification to show that some notification has already fired before the requesting thread call wait() method.

public Message makeRequest(Message request, long timeout) throws InterruptedException {
        Subscription sub = transport.subScribe(request, this);

        synchronized (this) {
            if(msg != null){
                System.out.println("Reply already arrived");
            }

            wait(timeout);
            transport.unsubScribe(sub);
        }

        return this.msg;
    }

You may try running the program with the modification above to see that if there are 3 line of “Reply already arrived” then jstack tool will report that there are 3 threads still waiting for notification.

The wait leak problem is one of the reason that it’s a best practice to call wait() in condition checking loop. Another reason we should use the practice is that there is something called spurious wake up that can make things go wrong. Below is the SyncRequestor with the checking whether the reply has already arrived.

public Message makeRequest(Message request, long timeout) throws InterruptedException {
        Subscription sub = transport.subScribe(request, this);

        synchronized (this) {
            if(timeout > 0){
                waitWithTimeOut(timeout);
            }else{
                waitForever();
            }

            transport.unsubScribe(sub);
        }

        return this.msg;
    }

    private void waitWithTimeOut(long timeout) throws InterruptedException {
        long timeoutLimit = System.currentTimeMillis() + timeout;
        while (msg == null) {
            long remainingTimeout = timeoutLimit - System.currentTimeMillis();
            if (remainingTimeout > 0) {
                wait(remainingTimeout);
            } else {
                //timeout
                break;
            }
        }
    }

    private void waitForever() throws InterruptedException{
        while (msg == null) {
            wait();
        }
    }

Deadlock

Let’s get back to our original issue; the consequence of our check-then-act fix. Now we have the check-then-act and the wait-leak problem fixed. Should SyncTransport run without any problem?

Well, with a small thread pool size like 10 and a realistic timeout like 1 or 2 second then you may run it days and nights without any problem. Concurrent problems can be very hard to be detected because the problematic code can run just fine for years if the conditions are right. And just when our system goes under an unusual circumstance, the problems show up in the worst possible time.

I’ve found that, on my laptop, the program will freeze if I run it with thread pool size of 50 and 500 millisecond timeout. To find out what went wrong, jstack is our best friend again. I don’t need to spend much time figuring out what happen. Jstack shows the problem at the bottom of its report.

Found one Java-level deadlock:
=============================
"pool-1-thread-50":
waiting to lock monitor 0x02b1a404 (object 0x22a80330, a java.util.Vector),
which is held by "AsyncTransport Dispatching Thread"
"AsyncTransport Dispatching Thread":
waiting to lock monitor 0x02a83954 (object 0x22a81670, a deadlock.transport.sync.SyncRequestor),
which is held by "pool-1-thread-17"
"pool-1-thread-17":
waiting to lock monitor 0x02b1a404 (object 0x22a80330, a java.util.Vector),
which is held by "AsyncTransport Dispatching Thread"

Java stack information for the threads listed above:
===================================================
"pool-1-thread-50":
at java.util.Vector.add(Vector.java:727)
- waiting to lock <0x22a80330> (a java.util.Vector)
at deadlock.transport.AsyncTransport.subScribe(AsyncTransport.java:22)
at deadlock.transport.sync.SyncRequestor.makeRequest(SyncTransport.java:44)
at deadlock.transport.sync.SyncTransport.request(SyncTransport.java:22)
at deadlock.Main2$1.call(Main2.java:23)
at deadlock.Main2$1.call(Main2.java:20)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
"AsyncTransport Dispatching Thread":
at deadlock.transport.sync.SyncRequestor.onMsg(SyncTransport.java:39)
- waiting to lock <0x22a81670> (a deadlock.transport.sync.SyncRequestor)
at deadlock.transport.Subscription.notifyIfSubscribeFor(Subscription.java:23)
at deadlock.transport.AsyncTransport$DispatchingThread.run(AsyncTransport.java:67)
- locked <0x22a80330> (a java.util.Vector)
"pool-1-thread-17":
at java.util.Vector.removeElement(Vector.java:593)
- waiting to lock <0x22a80330> (a java.util.Vector)
at java.util.Vector.remove(Vector.java:745)
at deadlock.transport.AsyncTransport.unsubScribe(AsyncTransport.java:40)
at deadlock.transport.sync.SyncRequestor.makeRequest(SyncTransport.java:58)
- locked <0x22a81670> (a deadlock.transport.sync.SyncRequestor)
at deadlock.transport.sync.SyncTransport.request(SyncTransport.java:22)
at deadlock.Main2$1.call(Main2.java:23)
at deadlock.Main2$1.call(Main2.java:20)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)

Found 1 deadlock.

If you are good at reading stack trace then the report is straight forward. The execution order that cause deadlock is shown below.

  1. The requesting thread pool-1-thread-17 call SyncRequestor.makeRequest() and grabs lock in an instance of SyncTransport to wait for reply notification.

    at deadlock.transport.sync.SyncRequestor.makeRequest(SyncTransport.java:58)
    - locked <0x22a81670> (a deadlock.transport.sync.SyncRequestor)

            synchronized (this) {
                if(timeout > 0){
                    waitWithTimeOut(timeout);
                }else{
                    waitForever();
                }
    
                transport.unsubScribe(sub);
            }
    
  2. The timeout is too short. The requesting thread wakes up and proceeds to perform unsubscribing.
    at deadlock.transport.AsyncTransport.unsubScribe(AsyncTransport.java:40)

  3. The unsubscribing process involves removing subscription from subscription vector. Jstack shows that the requesting thread is blocked waiting to grab the lock of the vector.

    at java.util.Vector.removeElement(Vector.java:593)
    - waiting to lock <0x22a80330> (a java.util.Vector)
    at java.util.Vector.remove(Vector.java:745)

    It turns out that a thread has already grabs the lock of the vector. The requesting thread has to wait for that thread to release the lock of the vector before it can proceed.

  4. At this moment, the “AsyncTransport Dispatching Thread” which runs concurrently with the requesting thread is executing the update dispatching loop. The dispatching thread grabs the lock of subscription vector to iterate over all subscriptions.


    at deadlock.transport.AsyncTransport$DispatchingThread.run(AsyncTransport.java:67)
    - locked <0x22a80330> (a java.util.Vector)

          synchronized(subscription){
                for(int i=0; i<subscription.size(); i++){
                     ( (Subscription)subscription.get(i) ).notifyIfSubscribeFor( update );
                }
           }
    

    It’s the update dispatching thread that holds the lock of subscription vector which make the requesting thread cannot proceed.

  5. It’s a very bad coincidence that the update dispatching thread is calling onMsg() method on the to-be-unsubscribed subscription. The thread set reply message then tries to grab the lock of target SyncRequestor instance to be able to call notify().


    at deadlock.transport.sync.SyncRequestor.onMsg(SyncTransport.java:39)
    - waiting to lock <0x22a81670> (a deadlock.transport.sync.SyncRequestor)
    at deadlock.transport.Subscription.notifyIfSubscribeFor(Subscription.java:23)

           public void onMsg(Message msg) {
               this.msg = msg;
    
               synchronized (this) {
                   notify();
                }
           }
           
  6. The update dispatching thread couldn’t grab the lock of the SyncRequestor instance because, in bullet 1, pool-1-thread-17 hasn’t release it yet. Now you can see that this is a deadlock. The requesting thread is holding the lock of SyncRequestor instance and wait for the lock of subscription vector, the update dispatching thread is holding the lock of the vector and waiting for the lock of the SyncRequestor instance.

    Jstack has summed it up quite nice.

    "AsyncTransport Dispatching Thread":
    waiting to lock monitor 0x02a83954 (object 0x22a81670, a deadlock.transport.sync.SyncRequestor),
    which is held by "pool-1-thread-17"

    "pool-1-thread-17":
    waiting to lock monitor 0x02b1a404 (object 0x22a80330, a java.util.Vector),
    which is held by "AsyncTransport Dispatching Thread"

Again, there are many solutions for this problem. I choose to fix it by just move the unsubscribing method out of the synchronized block. The method doesn’t need to be executed in the synchronized block at all. It’s always a good idea to check if your synchronized block is too big and remove expressions that don’t really need to be in that critical section.

public Message makeRequest(Message request, long timeout) throws InterruptedException {
        Subscription sub = transport.subScribe(request, this);

        synchronized (this) {
            if(timeout > 0){
                waitWithTimeOut(timeout);
            }else{
                waitForever();
            }
        }

        transport.unsubScribe(sub);
        return this.msg;
}

Now, the requesting thread will release the lock of SyncRequestor instance before perform unsubscribing. This will break the circle lock acquisition and prevent deadlock in the program.

read more

Related Posts

Share This

Spurious Wakeup

Sep 14, 2008 by

I have finished reading Effective Java long time ago. It is such a great book. The more I passed through each page the more I realized how little I know about java programming. The distance between “coder” and “developer” is really far

I read most of the item listed in the book. One of the items I have skipped was Item 50: Never invoke wait outside a loop. The code below show the concept of this item

synchronized (obj) {
    while (<condition does not hold>)
        obj.wait();

     ... // Perform action appropriate to condition
 }

Looking at the name of the practice, I thought I knew all the reasons behind it so I just skipped it. Today I found an interesting post asking What is spurious wakeup. I read it and found that “threads can wake up on wait() for no reason at all”!!!!! There are quite many good references for this fact and one of them is, guess what, the item 50 of Effective Java. I would have known it for long time ago if I just read it. One of the reasons behind it stated in the item is that

The waiting thread could wake up in the absence of a notify. This is known as a spurious wakeup. Although The Java Language Specification [JLS] does not mention this possibility, many JVM implementations use threading facilities in which spurious wakeups are known to occur, albeit rarely [Posix, 11.4.3.6.1]

I thought sometimes it was OK to call wait() without condition-checking loop if the object to wait on represented just one condition, the object was shared only between waiting and notifying threads and the code’s execution order guaranteed that wait leaks would not occur. Now giving that JVM implementation can send spurious wakeup signal, the condition checking loop is A MUST

Apparently, the spurious wakeup is an issue (I doubt that it is a well known issue) that intermediate to expert developers know it can happen but it just has been clarified in JLS third edition which has been revised as part of JDK 5 development. The javadoc of wait method in JDK 5 has also been updated

A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup. While this will rarely occur in practice, applications must guard against it by testing for the condition that should have caused the thread to be awakened, and continuing to wait if the condition is not satisfied. In other words, waits should always occur in loops

You may wonder (like me) why JLS designer decided to allow this kind of thing to happen. It’s not that I don’t want to use condition checking loop. The checking is the best practice that developers should always do no matter of them knowing anything about the spurious wakeup or not. But I just don’t see the benefit of allowing the wakeup for no reason. It turns out that this is something about performance as stated in Multithread Programming with Java

Due to some arcania in the hardware design of modern SMP machines, it proves to be highly convenient to define them like this. The hardware runs a little faster, and the programmer needs to reevaluate the condition anyway

read more

Related Posts

Share This