1. What is MultithreadedTC?

MultithreadedTC is a framework for testing concurrent applications. It features a metronome that is used to provide fine control over the sequence of activities in multiple threads.

Many failures in concurrent applications are not deterministic. They do not occur every time the application is run. Different interleavings of application threads yield different behaviors.

Concurrent application designers often want to run many (unrelated or loosely related) threads of activity to maximize throughput. Sometimes it is useful for test designers to mimic this style and run multiple threads, generating as many interleavings as possible. Many test frameworks support this paradigm (e.g. IBM's ConTest, GroboUtils' MultiThreadedTestRunner, and JUnit's ActiveTestSuite).

MultithreadedTC is different. It supports test cases that exercise a specific interleaving of threads. This is motivated by the principle that concurrent applications should be built using small concurrent abstractions such as bounded buffers, semaphores and latches. Separating the concurrency logic from the rest of the application logic in this way makes it easier to understand and test concurrent applications. Since these abstractions are small, it should be possible to deterministically test every (significant) interleaving in separate tests.

But how can one guarantee a specific interleaving of different threads in the presence of blocking and timing issues? Consider the following example of some operations on a bounded blocking buffer (e.g. ArrayBlockingQueue) with capacity 1:

We want a test to confirm that put 17 causes Thread 1 to block until take 42 occurs in Thread 2. The test must guarantee that take 42 is not executed until after put 17. How could a designer guarantee this interleaving of the two threads?

One approach is to use Thread.sleep() in Thread 2 to delay its first statement long enough to "guarantee" that Thread 1 has blocked. But this approach makes the test timing-dependent ― timing can be thrown off by, say, an ill-timed garbage collector. This also does not work well when stepping through the code in a debugger.

Another common approach for coordinating activities in two threads is to use a CountDownLatch. A CountDownLatch will not work in this example as illustrated by the following pseudocode:

Initialization
ArrayBlockingQueue buf = new ArrayBlockingQueue(1);
CountDownLatch c = new CountDownLatch(1);
Thread 1 Thread 2
buf.put(42);
buf.put(17);
c.countDown();


c.await();


assertEquals(42, buf.take());
assertEquals(17, buf.take());

Of course the problem is that the statement c.countDown() cannot be executed until after Thread 1 unblocks... which will not occur until Thread 2 take()s. In other words, this test is deadlocked!

MultithreadedTC provides an elegant solution to this problem, illustrated in the following example:

class MTCBoundedBufferTest extends MultithreadedTestCase {
   
ArrayBlockingQueue<Integer> buf;
   
@Override public void initialize() {
       
buf = new ArrayBlockingQueue<Integer>(1);
   
}

   
public void thread1() throws InterruptedException {
       
buf.put(42);
        buf.put
(17);
        assertTick
(1);
   
}

   
public void thread2() throws InterruptedException {
       
waitForTick(1);
        assertEquals
(Integer.valueOf(42), buf.take());
        assertEquals
(Integer.valueOf(17), buf.take());
   
}

   
@Override public void finish() {
       
assertTrue(buf.isEmpty());
   
}
}
This example is illustrated by the following diagram:

Multithreaded has an internal metronome (or clock). But don't try and use it to set the tempo for your jazz band. The clock only advances to the next tick when all threads are blocked.

The clock starts at tick 0. In this example, the statement waitForTick(1) makes Thread 2 block until the clock reaches tick 1 before resuming. Thread 1 is allowed to run freely in tick 0, until it blocks on the call to put(17). At this point, all threads are blocked, and the clock can advance to the next tick.

In tick 1, the statement take(42) in Thread 2 is executed, and this frees up Thread 1. The final statement in Thread 1 asserts that the clock is in tick 1, in effect asserting that the thread blocked on the call to put(17).

This approach does not deadlock like the CountDownLatch, and is more reliable than Thread.sleep(). Some other high level observations are:

  • The test is encapsulated in a class that extends MultithreadedTestCase. Each of the threads is represented by a method whose name starts with "thread", returns void, and has no arguments. The initialize() method is invoked first; then the thread methods are invoked simultaneously in different threads; finally the finish() method is invoked when all threads have completed.
  • This test can be run using the following JUnit test:
        public void testMTCBoundedBuffer() throws Throwable {
           
    TestFramework.runOnce( new MTCBoundedBufferTest() );
       
    }
    This creates an instance of the test class and passes it to the TestFramework. The TestFramework creates the necessary threads, manages the metronome, and runs the test.
  • All the components of the test are represented using classes and methods, constructs that are recognizable to Java programmers.
  • The framework handles exceptions thrown by any of the threads, and propagates them up to JUnit. This solves a problem with anonymous Threads, whose exceptions are not detected by JUnit without some extra scaffolding provided by the test designer. (See Examples 2 to 4).
  • The clock is not necessarily incremented by units of one. When all threads are blocked it advances to the next requested tick specified by a waitForTick() method. If none of the threads are waiting for a tick, the test is declared to be in deadlock (unless one of the threads is in state TIMED_WAITING).

2. So how does this work?

The class TestFramework, provides most of the scaffolding required to run MultithreadedTC tests. It uses reflection to identify all relevant methods in the test class, invokes them simultaneously in different threads. It regulates these threads using a separate clock thread.

The clock thread checks periodically to see if all threads are blocked. If all threads are blocked and at least one is waiting for a tick, the clock thread advances the clock to the next desired tick. The clock thread also detects deadlock (when all threads are blocked, none are waiting for a tick, and none are in state TIMED_WAITING), and can stop a test that is going on too long (a thread is in state RUNNABLE for too long.)

The test threads are placed into a new thread group, and any threads created by these test cases will be placed in this thread group by default. All threads in the thread group will be considered by the clock thread when deciding whether to advance the clock, declare a deadlock, or stop a long-running test.

3. Cool! How do I use this?

MultithreadedTC tests are created by extending one of two classes:

  1. class MultithreadedTestCase extends junit.framework.Assert and provides the base functionality required by all tests. (NOTE: MultithreadedTestCase does NOT extend junit.framework.TestCase). A test using this class consists of:
    • an optional initialize() method,
    • one or more "thread" methods which are invoked in different threads,
    • an optional finish() method that is run after all threads have completed.
    In addition to the MultithreadedTestCase subclass, an additional JUnit test method is used to call one of the "run" methods in TestFramework. The run methods receive an instance of a MultithreadedTestCase and test it in different ways. The primary run methods are:
    • runOnce(MultithreadedTestCase) -- run a test sequence once,
    • runManyTimes(MultithreadedTestCase, int) -- run a test sequence as many times as specified by the int parameter, until one of the test runs fails.
       
  2. class MultithreadedTest extends MultithreadedTestCase and implements junit.framework.Test. So it can be added to a junit.framework.TestSuite and run directly.

    This class includes a runTest() method that calls: TestFramework.runOnce(this). To change the way a test is run, override the runTest() method.

    (MultithreadedTC provides a convencience method: TestFramework.buildTestSuite(...), that looks for inner-classes that implement junit.framework.Test and builds a TestSuite out of these classes.)

4. Some Examples of Test Cases

Example 1: Compare And Set

Here is a simple example to demonstrate the basic layout of a MultithreadedTC test. In this example, one thread stays in a while loop until its condition is met due to an action in another thread. Both threads are started simultaneously.

Notice that in the MultithreadedTC version, the AtomicInteger parameter is initialized in the initialize() method, not in a constructor. This is so that if the test is run many times (with TestFramework.runManyTimes(...)), a fresh instance of AtomicInteger is used each time.

This simple example demonstrates that MultithreadedTC takes care of much of the scaffolding work needed to set up and start a thread, and eliminates the need to join the thread before the test ends.

MultithreadedTC Version
class MTCCompareAndSet extends MultithreadedTest {
   
AtomicInteger ai;
   
@Override public void initialize() {
       
ai = new AtomicInteger(1);
   
}

   
public void thread1() {
       
while(!ai.compareAndSet(2, 3)) Thread.yield();
   
}

   
public void thread2() {       
       
assertTrue(ai.compareAndSet(1, 2));
   
}

   
@Override public void finish() {
       
assertEquals(ai.get(), 3);           
   
}
}
Plain Version
public void testCompareAndSet() throws InterruptedException {
   
final AtomicInteger ai = new AtomicInteger(1);
    Thread t =
new Thread(new Runnable() {
       
public void run() {
           
while(!ai.compareAndSet(2, 3))
               
Thread.yield();
       
}
    })
;

    t.start
();
    assertTrue
(ai.compareAndSet(1, 2));
    t.join
(2500);
    assertFalse
(t.isAlive());
    assertEquals
(ai.get(), 3);
}

Example 2: Interrupted Acquire

This example shows some of the strengths of MultithreadedTC. An acquire() on a Semaphore in one thread is expected to block, and throw an InterruptedException when interrupted by another thread.

The MultithreadedTC version can assert that the InterruptedException is not thrown until tick 1, which is when Thread 1 is interrupted. (So an implementation of acquire that unconditionally throws the exception may pass the plain version but will not pass the MultithreadedTC version.) waitForTick() is used instead of Thread.sleep() to eliminate timing dependencies.

Notice also that an unchecked AssertionError (caused by the fail(...) statement in Thread 1) will cause the test to kill all threads and fail immediately. In the plain version, this failure kills the auxillary thread but not the test. Some extra work is needed to set a flag indicating thread failure, and assert the flag in JUnit's tearDown() method.

MultithreadedTC Version
class MTCInterruptedAcquire extends MultithreadedTestCase {
   
Semaphore s;
   
@Override public void initialize() {
       
s = new Semaphore(0);
   
}

   
public void thread1() {
       
try {
           
s.acquire();
            fail
("should throw exception");
       
} catch(InterruptedException success){ assertTick(1); }
    }

   
public void thread2() {
       
waitForTick(1);
        getThread
(1).interrupt();
   
}
}

public void testMTCInterruptedAcquire() throws Throwable {
   
TestFramework.runOnce( new MTCInterruptedAcquire() );
}   
Plain Version
volatile boolean threadFailed;

protected void setUp() throws Exception {
   
threadFailed = false;
}

public void threadShouldThrow() {
   
threadFailed = true;
    fail
("should throw exception");
}

protected void tearDown() throws Exception {
   
assertFalse(threadFailed);
}

public void testInterruptedAcquire() {
   
final Semaphore s = new Semaphore(0);
    Thread t =
new Thread(new Runnable() {
       
public void run() {
           
try {
               
s.acquire();
                threadShouldThrow
();
           
} catch(InterruptedException success){}
        }
    })
;
    t.start
();
   
try {
       
Thread.sleep(50);
        t.interrupt
();
        t.join
();
   
} catch(InterruptedException e){
       
fail("Unexpected exception");
   
}
}

MultithreadedTC provides some convenience methods for getting access to a thread. The call to getThread(n) returns a reference to the thread running the method threadn() where n is an integer. The more general method, getThreadByName("threadMethod") returns a reference to the thread running the method threadMethod(). So getThread(1) is equivalent to getThreadByName("thread1").

Example 3: Thread Ordering

Most of the time, waitForTick() is used to wait until another thread blocks, as in Example 2. Occasionally though, it is useful for coordinating two or more threads that do not contain any blocking. This example shows that even though, the same can be accomplished using CountDownLatchs, the MultithreadedTC version using waitForTick() is easier to write and understand.

MultithreadedTC Version
class MTCThreadOrdering extends MultithreadedTestCase {
   
AtomicInteger ai;
   
@Override public void initialize() {
       
ai = new AtomicInteger(0);
   
}

   
public void thread1() {
       
assertTrue(ai.compareAndSet(0, 1)); // S1
       
waitForTick(3);
        assertEquals
(ai.get(), 3);          // S4
   
}

   
public void thread2() {  
       
waitForTick(1);
        assertTrue
(ai.compareAndSet(1, 2)); // S2
       
waitForTick(3);
        assertEquals
(ai.get(), 3);          // S4
   
}

   
public void thread3() {
       
waitForTick(2);
        assertTrue
(ai.compareAndSet(2, 3)); // S3
   
}
}

public void testMTCThreadOrdering() throws Throwable {
   
TestFramework.runOnce( new MTCThreadOrdering() );
}
CountDownLatch version
volatile boolean threadFailed;

protected void setUp() throws Exception {
   
threadFailed = false;
}

protected void tearDown() throws Exception {
   
assertFalse(threadFailed);
}

public void unexpectedException() {
   
threadFailed = true;
    fail
("Unexpected exception");
}

public void testLatchBasedThreadOrdering() throws InterruptedException {
   
final CountDownLatch c1 = new CountDownLatch(1);
   
final CountDownLatch c2 = new CountDownLatch(1);
   
final CountDownLatch c3 = new CountDownLatch(1);       
   
final AtomicInteger ai = new AtomicInteger(0);

    Thread t1 =
new Thread(new Runnable() {
       
public void run() {
           
try {
               
assertTrue(ai.compareAndSet(0, 1)); // S1
               
c1.countDown();
                c3.await
();
                assertEquals
(ai.get(), 3);          // S4
           
} catch (Exception e) { 
               
unexpectedException();
           
}
        }
    })
;

    Thread t2 =
new Thread(new Runnable() {
       
public void run() {
           
try {
               
c1.await();
                assertTrue
(ai.compareAndSet(1, 2)); // S2
               
c2.countDown();
                c3.await
();
                assertEquals
(ai.get(), 3);          // S4
           
} catch (Exception e) {
               
unexpectedException();
           
}
        }
    })
;

    t1.start
();
    t2.start
();

    c2.await
();
    assertTrue
(ai.compareAndSet(2, 3)); // S3   
   
c3.countDown();

    t1.join
();
    t2.join
();
}

Example 4: Allowing Timeouts

Some concurrent abstractions rely on timing. An example is the offer() method on the ArrayBlockingQueue, which tries to add an element to the buffer and blocks for a specified time limit if the buffer is full. If the offer() times out, the method returns false. This example shows how waitForTick() can interact with these timing elements.

If the goal of the test is to cause a timeout in one thread, we may want to prevent the clock from advancing in another thread. MultithreadedTC provides the methods freezeClock() and unfreezeClock(), to respectively disable and re-enable the clock thread. (The clock thread can still kill a test that is running for too long when it is disabled.)

In this test, the first offer() is allowed to timeout, the second offer() is interrupted. To guarantee that the interrupt does not occur on the first offer(), it uses freezeClock() to freeze the clock during the first offer. Thread 2 is awakened when Thread 1 blocks on the second offer(). (Note that the timeout in the offer() statement should be much longer than the clock period (see Javadoc) to ensure that the clock thread has a chance to wake up.) Obviously tests like this that inherently include timing do not play well in a debugger.

MultithreadedTC Version
class MTCTimedOffer extends MultithreadedTestCase {
   
ArrayBlockingQueue<Object> q;

   
@Override public void initialize() {
       
q = new ArrayBlockingQueue<Object>(2);
   
}

   
public void thread1() {
       
try {
           
q.put(new Object());
            q.put
(new Object());

            freezeClock
();
            assertFalse
(q.offer(new Object(),
               
25, TimeUnit.MILLISECONDS));
            unfreezeClock
();

            q.offer
(new Object(),
               
2500, TimeUnit.MILLISECONDS);
            fail
("should throw exception");
       
} catch (InterruptedException success){
           
assertTick(1);
       
}
    }

   
public void thread2() {
       
waitForTick(1);
        getThread
(1).interrupt();
   
}
}

public void testMTCTimedOffer() throws Throwable {
   
TestFramework.runOnce( new MTCTimedOffer() );
}
Plain Version
volatile boolean threadFailed;

protected void setUp() throws Exception {
   
threadFailed = false;
}

public void threadShouldThrow() {
   
threadFailed = true;
    fail
("should throw exception");
}

public void threadAssertFalse(boolean b) {
   
if (b) {
       
threadFailed = true;
        assertFalse
(b);
   
}
}

protected void tearDown() throws Exception {
   
assertFalse(threadFailed);
}

public void testTimedOffer() {
   
final ArrayBlockingQueue<Object> q =
       
new ArrayBlockingQueue<Object>(2);
    Thread t =
new Thread(new Runnable() {           
       
public void run() {
           
try {
               
q.put(new Object());
                q.put
(new Object());
                threadAssertFalse
(q.offer( new Object(),
                   
25, TimeUnit.MILLISECONDS));
                q.offer
(new Object(),
                   
2500, TimeUnit.MILLISECONDS);
                threadShouldThrow
();
           
} catch (InterruptedException success){}
        }
    })
;

   
try {
       
t.start();
        Thread.sleep
(50);
        t.interrupt
();
        t.join
();
   
} catch (Exception e) {
       
fail("Unexpected exception");
   
}
}

 

Web Accessibility