MultithreadedTC

William Pugh, Nat Ayewah

Table of Contents

  1. What is MultithreadedTC?
  2. How does it work?
  3. How do I use it?
  4. Examples

MultithreadedTC Javadoc Documentation

  1. class TestFramework
  2. class MultithreadedTestCase
  3. class MultithreadedTest

 

1. What is MultithreadedTC?

MultithreadedTC is a framework for testing concurrent applications. It features a metronome that is used to provide fine control over the sequence of activites in multiple threads.

Many failures in concurrent applications are not deterministic. They do not occur every time. Different interleavings of application threads yield different behaviors.

Concurrent application designers often want to run many (unrelated or loosely related) threads of activity to maximize throughput. Sometimes it is useful for test designers to mimic this style and run multiple threads, generating as many interleavings as possible. Many test frameworks support this paradigm (e.g. IBM's ConTest, GroboUtils MultiThreadedTestRunner, and JUnit's ActiveTestSuite).

MultithreadedTC is different. It supports test cases that exercise a specific interleaving of threads. This is motivated by the principle that concurrent applications should be built using small concurrent abstractions such as bounded buffers, semaphores and latches. Separating the concurrency logic from the rest of the application logic in this way makes it easier to understand and test concurrent applications. Since these abstractions are small, it should be possible to deterministically test every (significant) interleaving in separate tests.

But how can one guarantee a specific interleaving of different threads in the presence of blocking and timing issues? Consider the following example:

		[Initialize]

		ArrayBlockingQueue buf = new ArrayBlockingQueue(1);

		[Thread 1]		[Thread 2]

		buf.put(42);
		buf.put(17);
					assertEquals(42, buf.take());
					assertEquals(17, buf.take());

In this test, the second put() should cause Thread 1 to block. It should remain blocked until the first take() occurs in Thread 2. In order words, the test must guarantee that the first take() statement does not occur until after the second put() statement. How could a designer guarantee this interleaving of the two threads?

One approach is to use Thread.sleep() in Thread 2 to delay its first statement long enough to "guarantee" that Thread 1 has blocked. But this approach makes the test timing-dependent -- timing can be thrown off by, say, an ill-timed garbage collector. This also does not work well when stepping through the code in a debugger.

Another common approach for coordinating activities in two threads is to use a CountDownLatch. A CountDownLatch will not work in this example as illustrated by the following diagram:

		[Initialize]

		ArrayBlockingQueue buf = new ArrayBlockingQueue(1);
		CountDownLatch c = new CountDownLatch(1);

		[Thread 1]		[Thread 2]

		buf.put(42);		c.await();
		buf.put(17);
		c.countDown();
					assertEquals(42, buf.take());
					assertEquals(17, buf.take());

Of course the problem is that the statement c.countDown() cannot be executed until after Thread 1 unblocks... which will not occur until Thread 2 take()s. In other words, this test is deadlocked!

MultithreadedTC provides an elegant solution to this problem, illustrated in the following example:

class MTCBoundedBufferTest extends MultithreadedTestCase {
   
ArrayBlockingQueue<Integer> buf;
   
@Override public void initialize() {
       
buf = new ArrayBlockingQueue<Integer>(1);
   
}

   
public void thread1() throws InterruptedException {
       
buf.put(42);
        buf.put
(17);
        assertTick
(1);
   
}

   
public void thread2() throws InterruptedException {
       
waitForTick(1);
        assertEquals
(Integer.valueOf(42), buf.take());
        assertEquals
(Integer.valueOf(17), buf.take());
   
}

   
@Override public void finish() {
       
assertTrue(buf.isEmpty());
   
}
}

Multithreaded has an internal metronome (or clock). But don't try and use it to set the tempo for your jazz band. The clock only advances to the next tick when all threads are blocked.

The clock starts at tick 0. In this example, the first statement in Thread 2, waitForTick(1), makes it block until the clock reaches tick 1 before resuming. Thread 1 is allowed to run freely in tick 0, until it blocks on the second put. At this point, all threads are blocked, and the clock can advance to the next tick.

In tick 1, the first take() in Thread 2 is executed, and this frees up Thread 1. The final statement in Thread 1 asserts that the clock is in tick 1, in effect asserting that the thread blocked on the second put().

This approach does not deadlock like the CountDownLatch, and is more reliable than Thread.sleep(). Some other high level observations are:

2. So how does this work?

The class TestFramework, provides most of the scaffolding required to run MultithreadedTC tests. It uses reflection to identify all relevant methods in the test class, invokes them simultaneously in different threads. It regulates these threads using a separate clock thread.

The clock thread checks periodically to see if all threads are blocked. If all threads are blocked and at least one is waiting for a tick, the clock thread advances the clock to the next desired tick. The clock thread also detects deadlock (when all threads are blocked, none are waiting for a tick, and none are in state TIMED_WAITING), and can stop a test that is going on too long (a thread is in state RUNNABLE for too long.)

The test threads are placed into a new thread group, and any threads created by these test cases will be placed in this thread group by default. All threads in the thread group will be considered by the clock thread when deciding whether to advance the clock, declare a deadlock, or stop a long-running test.

3. Cool! How do I use this?

MultithreadedTC tests are created by extending one of two classes:

  1. class MultithreadedTestCase extends junit.framework.Assert and provides the base functionality required all tests. (NOTE: MultithreadedTestCase does NOT extend junit.framework.TestCase). A test using this class consists of:
    • an optional initialize() method,
    • one or more "thread" methods which are invoked in different threads,
    • an optional finish() method that is run after all threads have completed.
    In addition to the MultithreadedTestCase subclass, an additional JUnit test method is used to call one of the "run" methods in TestFramework. The run methods receive an instance of a MultithreadedTestCase and test it in different ways. The primary run methods are:
    • runOnce(MultithreadedTestCase) -- run a test sequence once.
    • runManyTimes(MultithreadedTestCase, int) -- run a test sequence as many times as specified by the int parameter, until one of the test runs fails
       
  2. class MultithreadedTest extends MultithreadedTestCase and implements junit.framework.Test. So it can be added to a junit.framework.TestSuite and run directly. This class includes a runTest() method that calls: TestFramework.runOnce(this). To change the way a test is run, override the runTest() method. (MultithreadedTC provides a convencience method: TestFramework.buildTestSuite(...), that looks for inner-classes that implement junit.framework.Test and builds a TestSuite out of these classes.)

4. Some Examples of Test Cases

Example 1: Compare And Set

Here is a simple example to demonstrate the basic layout of a MultithreadedTC test. In this example, one thread stays in a while loop until its condition is met due to an action in another thread. Both threads are started simultaneously.

Notice that in the MultithreadedTC version, the AtomicInteger parameter is initialized in the initialize() method, not in a constructor. This is so that if the test is run many times (with TestFramework.runManyTimes(...)), a fresh instance of AtomicInteger is used each time.

This simple example demonstrates that MultithreadedTC takes care of much of the scaffolding work needed to set up and start a thread, and eliminates the need to join the thread before the test ends.

// Plain Version

public void testCompareAndSet() throws InterruptedException {
   
final AtomicInteger ai = new AtomicInteger(1);
    Thread t =
new Thread(new Runnable() {
       
public void run() {
           
while(!ai.compareAndSet(2, 3))
               
Thread.yield();
       
}
    })
;

    t.start
();
    assertTrue
(ai.compareAndSet(1, 2));
    t.join
(2500);
    assertFalse
(t.isAlive());
    assertEquals
(ai.get(), 3);
}
// MultithreadedTC Version

class MTCCompareAndSet extends MultithreadedTest {
   
AtomicInteger ai;
   
@Override public void initialize() {
       
ai = new AtomicInteger(1);
   
}

   
public void thread1() {
       
while(!ai.compareAndSet(2, 3))
           
Thread.yield();
   
}

   
public void thread2() {       
       
assertTrue(ai.compareAndSet(1, 2));
   
}

   
@Override public void finish() {
       
assertEquals(ai.get(), 3);           
   
}
}

Example 2: Interrupted Acquire

This example shows some of the strengths of MultithreadedTC. An acquire() on a Semaphore in one thread is expected to block, and throw an InterruptedException when interrupted by another thread.

The MultithreadedTC version can assert that the InterruptedException is not thrown until tick 1, which is when Thread 1 is interrupted. (So an implementation of acquire that unconditionally throws the exception may pass the plain version but will not pass the MultithreadedTC version.)

Notice also that an unchecked AssertionError (caused by the fail(...) statement in Thread 1) will cause the test to kill all threads and fail immediately. In the plain version, this failure kills the auxillary thread but not the test. Some extra work is needed to set a flag indicating thread failure, and assert the flag in JUnit's tearDown() method.

// Plain Version

volatile boolean threadFailed;

protected void setUp() throws Exception {
   
threadFailed = false;
}

public void threadShouldThrow() {
   
threadFailed = true;
    fail
("should throw exception");
}

protected void tearDown() throws Exception {
   
assertFalse(threadFailed);
}

public void testInterruptedAcquire() {
   
final Semaphore s = new Semaphore(0);
    Thread t =
new Thread(new Runnable() {
       
public void run() {
           
try {
               
s.acquire();
                threadShouldThrow
();
           
} catch(InterruptedException success){}
        }
    })
;
    t.start
();
   
try {
       
Thread.sleep(50);
        t.interrupt
();
        t.join
();
   
} catch(InterruptedException e){
       
fail("Unexpected exception");
   
}
}
// MultithreadedTC Version

class MTCInterruptedAcquire extends MultithreadedTestCase {
   
Semaphore s;
   
@Override public void initialize() {
       
s = new Semaphore(0);
   
}

   
public void thread1() {
       
try {
           
s.acquire();
            fail
("should throw exception");
       
} catch(InterruptedException success){ assertTick(1); }
    }

   
public void thread2() {
       
waitForTick(1);
        getThread
(1).interrupt();
   
}
}

public void testMTCInterruptedAcquire() throws Throwable {
   
TestFramework.runOnce( new MTCInterruptedAcquire() );
}   

MultithreadedTC provides some convenience methods for getting access to a thread. The call to getThread(n) returns a reference to the thread running the method threadn() where n is an integer. The more general method, getThreadByName(threadMethod) returns a reference to the thread running the method threadMethod(). So getThread(1) is equivalent to getThreadByName("thread1").

Example 3

Most of the time, waitForTick() is used to wait until another thread blocks, as in Example 2. Occassionally though, it is useful for coordinating two or more threads that do not contain any blocking. This example shows that even though, the same can be accomplished using CountDownLatch's, the MultithreadedTC version using waitForTick() is easier to write and understand.

// CountDownLatch version

volatile boolean threadFailed;

protected void setUp() throws Exception {
   
threadFailed = false;
}

protected void tearDown() throws Exception {
   
assertFalse(threadFailed);
}

public void unexpectedException() {
   
threadFailed = true;
    fail
("Unexpected exception");
}

public void testLatchBasedThreadOrdering() throws InterruptedException {
   
final CountDownLatch c1 = new CountDownLatch(1);
   
final CountDownLatch c2 = new CountDownLatch(1);
   
final CountDownLatch c3 = new CountDownLatch(1);       
   
final AtomicInteger ai = new AtomicInteger(0);

    Thread t1 =
new Thread(new Runnable() {
       
public void run() {
           
try {
               
assertTrue(ai.compareAndSet(0, 1)); // S1
               
c1.countDown();
                c3.await
();
                assertEquals
(ai.get(), 3);          // S4
           
} catch (Exception e) { 
               
unexpectedException();
           
}
        }
    })
;

    Thread t2 =
new Thread(new Runnable() {
       
public void run() {
           
try {
               
c1.await();
                assertTrue
(ai.compareAndSet(1, 2)); // S2
               
c2.countDown();
                c3.await
();
                assertEquals
(ai.get(), 3);          // S4
           
} catch (Exception e) {
               
unexpectedException();
           
}
        }
    })
;

    t1.start
();
    t2.start
();

    c2.await
();
    assertTrue
(ai.compareAndSet(2, 3)); // S3   
   
c3.countDown();

    t1.join
();
    t2.join
();
}
// MultithreadedTC Version

class MTCThreadOrdering extends MultithreadedTestCase {
   
AtomicInteger ai;
   
@Override public void initialize() {
       
ai = new AtomicInteger(0);
   
}

   
public void thread1() {
       
assertTrue(ai.compareAndSet(0, 1)); // S1
       
waitForTick(3);
        assertEquals
(ai.get(), 3);          // S4
   
}

   
public void thread2() {  
       
waitForTick(1);
        assertTrue
(ai.compareAndSet(1, 2)); // S2
       
waitForTick(3);
        assertEquals
(ai.get(), 3);          // S4
   
}

   
public void thread3() {
       
waitForTick(2);
        assertTrue
(ai.compareAndSet(2, 3)); // S3
   
}
}

public void testMTCThreadOrdering() throws Throwable {
   
TestFramework.runOnce( new MTCThreadOrdering() );
}

Example 4

// Plain Version

volatile boolean threadFailed;

protected void setUp() throws Exception {
   
threadFailed = false;
}

public void threadShouldThrow() {
   
threadFailed = true;
    fail
("should throw exception");
}

public void threadAssertFalse(boolean b) {
   
if (b) {
       
threadFailed = true;
        assertFalse
(b);
   
}
}

protected void tearDown() throws Exception {
   
assertFalse(threadFailed);
}

public void testTimedOffer() {
   
final ArrayBlockingQueue<Object> q =
       
new ArrayBlockingQueue<Object>(2);
    Thread t =
new Thread(new Runnable() {           
       
public void run() {
           
try {
               
q.put(new Object());
                q.put
(new Object());
                threadAssertFalse
(q.offer( new Object(),
                   
25, TimeUnit.MILLISECONDS));
                q.offer
(new Object(),
                   
2500, TimeUnit.MILLISECONDS);
                threadShouldThrow
();
           
} catch (InterruptedException success){}
        }
    })
;

   
try {
       
t.start();
        Thread.sleep
(50);
        t.interrupt
();
        t.join
();
   
} catch (Exception e) {
       
fail("Unexpected exception");
   
}
}
// MultithreadedTC Version

/**
* In this test, the first offer is allowed to timeout,
* the second offer is interrupted. Use `freezeClock`
* to prevent the clock from advancing during the first
* offer.
*/
class MTCTimedOffer extends MultithreadedTestCase {
   
ArrayBlockingQueue<Object> q;

   
@Override public void initialize() {
       
q = new ArrayBlockingQueue<Object>(2);
   
}

   
public void thread1() {
       
try {
           
q.put(new Object());
            q.put
(new Object());

            freezeClock
();
            assertFalse
(q.offer(new Object(),
               
25, TimeUnit.MILLISECONDS));
            unfreezeClock
();

            q.offer
(new Object(),
               
2500, TimeUnit.MILLISECONDS);
            fail
("should throw exception");
       
} catch (InterruptedException success){
           
assertTick(1);
       
}
    }

   
public void thread2() {
       
waitForTick(1);
        getThread
(1).interrupt();
   
}
}

public void testMTCTimedOffer() throws Throwable {
   
TestFramework.runOnce( new MTCTimedOffer() );
}