Category Archives: Java

Concurrency and Multi-Threaded Programming in Java – Part 2

This is the second part in a three part discussion about concurrency in Java. The first article introduces the common terminology used and how many of the basic locking mechanism work. This post will focus on the java.util.concurrent package. The final post will discuss some of the more complicated issue that arise in Java when using concurrency and synchronization.


Before talking about the java.util.concurrent package I want to go over how a couple of the Java keywords effect concurrency in Java 1.5+. In an effort to maintain good performance, when synchronization is not explicitly dictated, the compiler, JVM and cache take serious liberties with reordering memory operations. This can provide significant performance gains but also can cause your programs to function in unexpected ways. The only rule on reordering is that the thread that is currently executing can’t tell the difference. This is called within-thread as-if-serial semantics. Luckily, rules are added with the use of the keywords volatile and final.

The volatile keyword guarantees that reads and writes to those variables are ordered totally across all threads; meaning that neither the compiler or cache can reorder these calls with each other. Furthermore, volatile reads and writes will not be reordered with any other memory operations. So this means that when thread A writes to a volatile variable X, and thread B reads from X, any variables that were visible to thread A at the time X was written will also be visible to B. This means that X can be used to indicate state to other threads. There are also similar guarantees when a thread is started, a thread is joined with another thread and when a thread enters and exits a synchronized block.

The final keyword means just that final, except when it doesn’t… Before Java 1.5, without synchronization another thread could potentially see the default value (null or 0) for a final field and then later see the correct value. Java 1.5+ now guarantees that when a final object’s constructor completes that all final fields are frozen and that any other threads that reference the object will see the correct values from those fields. Like volatile, operations that initialize final fields also will not be reordered with operations that follow the freeze. Beyond this, any variable that can be reached through a final field are also guaranteed to be visible to other threads. The impact of this is that immutable objects are inherently thread-safe.

java.util.concurrent

The concurrency library was added in Java 1.5. The library uses hardware level constructs such as compare and swap (CAS) to create thread safety mechanisms largely without the need for locking at the object level. This generally provides a significant performance increase over using the synchronized keyword to lock critical paths (the performance improvement can become a performance hit in extremely heavy traffic situations where extended periods of waiting are necessary because many of these algorithms poll rather than sleep and therefore waste CPU cycles). It is usually advisable to use the canned implementations from this package rather than implementing your own class. The concurrent package introduces the first truly thread safe data structures to the standard library. You can see the full documentation here. I am going to go over a couple of the most useful and a few of the interesting classes in the package and give some usage examples.

Semaphore

This class is very straight forward. It works primarily through the acquire() and release() methods. This implementation simply keeps an internal count variable that controls access. The primary use case is for restricting access to a limited resource. In the example below we use a semaphore to control a finite number of available db connections.


public static class DbConnectionFactorExample{

    public static final int NUM_CONNECTIONS = 10;
    private static final String DB_URL = "jdbc:mysql://localhost/exampledb";

    public static Connection getConnection(){
        try{
            return LimitedConnectionDecorator.getConnection();
        }catch(InterruptedException e){
            return null;
        }
    }
}

class LimitedConnectionDecorator implements Connection{

    private static final Semaphore connectionAccess = new Semaphore(NUM_CONNECTIONS);
    private Connection dbConnection;

    public LimitedDBConnection(Connection dbConnection){
        this.dbConnection = dbConnection;
    }

    public static Connection getConnection() throws InterruptedException{
        connectionAccess.acquire();
		try{
        	return new LimitedDBConnection(DriverManager.getConnection(DB_URL));
		}catch(SQLException e){
			connectionAccess.release();
			return null;
		}
    }

    @Override
	public void close() throws SQLException{
	    if(!dbConnection.isClosed()){
	        try{
	            dbConnection.close()
	            connectionAccess.release();
	        }catch (SQLException e){
	            //Roll back logic
	            //May release lock anyways depending on db
            }
        }
    }
    //... rest of implementation omitted
}

Exchanger

The exchanger is an interesting convenience class that facilitates the coordinated exchange of a data structure between two threads. Once the first thread finishes processing the object it calls exchange(object) and waits for the other thread to be done as well. Then they swap. The obvious use case here is for having one thread fill and one thread empty that structure. In the example below, I do just that.


public class ExchangerExampler{

    public Exchanger<MagicDataStructure> exchanger = new Exchanger<MagicDataStructure>();

    public MagicDataStructure mds1 = new MagicDataStructure();
    public MagicDataStructure mds2 = new MagicDataStructure();

    public static void main(Strings args[]){
        new Thread(new Filler()).start();
        new Thread(new Emptier()).start();
    }

    class Filler implements Runnable {
        public void run(){
            MagicDataStructure myMds = mds1;
            try{
                while(true){
                    if(myMds.isFull()){
                        myMds = exchanger.exchange(myMds);
                    }
                    myMds.fill(new MagicStuff());
                }
            }catch(InterruptedException e){e.printStackTrace();}
        }
    }

    class Emptier implements Runnable {
        public void run(){
            MagicDataStructure myMds = mds1;
            try{
                while(true){
                    if(myMds.isEmpty()){
                        myMds = exchanger.exchange(myMds);
                    }
                    myMds.empty();
                }
            }catch(InterruptedException e){e.printStackTrace();}
        }
    }

}

FutureTask

The FutureTask class is interesting because it basically combines a Thread with a method. When you spawn a thread you are basically just creating an asynchronous computation. The FutureTask wraps this up and allows that computation to return a result. You create a FutureTask by passing in a Runnable and, optionally, the result to be returned. You now start the FutureTask and then get() can be called some time later in the future. If the task has completed then get() will return the result, otherwise it will block until the result is finished being computed. Below is a simple example implementation.


public class FutureTaskExample{

    public static void main(String args[]){
        //Do you get it? It's a computater...
        FutureTask computater = new FutureTask(new OverclockedPotato());
        computater.run();
        //Do other awesome computing
        //....
        Object frenchFries = computater.get();
    }

}

class OverclockedPotato implements Callable{

    public Object call(){
        potato = new Object();
        //Do stuff to it
        return potato;
    }
}

CopyOnWriteArrayList

The previous examples were all nice concurrency constructs but now let’s talk about some of the interesting data structures provided by the concurrent package. The CopyOnWriteArrayList is a completely thread safe List designed to be used in situations where traversals occur significantly more often than mutation. In this case it is beneficial to be able to iterate over the List without locking. This is a common need when a List is used to hold Listeners such as in AWT or Swing applications. CopyOnWriteArrayList accomplishes lock-less iteration by creating a copy of the List when a mutation occurs. This allows iteration to guarantee that it will return the list in the same state as at the time it was constructed. What this means is that the CopyOnWriteArrayList holds a mutable reference to an immutable array, rather than the other way around. I’m not going to include a code sample here because the use is identical to that of a normal list and it is the use case and data structure implementation that is interesting.

ConcurrentHashMap

The ConcurrentHashMap is an extremely interesting class that provides complete thread safety with minimal locking. It is optimized for the most common operation, retrieving an element that already exists in the map, and can succeed without locking the majority of the time. In a highly parallel system with many threads interacting with the map the performance excels far beyond that of a synchronizedMap or a HashTable. Several neat tricks are used to accomplish this.

First, rather than having a single object wide lock, the ConcurrentHashMap has a lock for each bucket (or however many you want as this value can be set in the constructor). This means that theoretically that many threads can be writing to the Map all at once. While in practice it will be lower, it is still significantly higher than only one.

The second trick takes advantage of the JMM rules on immutability. All of the fields that make up the Map are final except for the entry value fields, which are volatile. What this means is that elements cannot be added or removed from anywhere but the front of the hash chain. Although you can’t know if you are at the head or not, this does guarantee that if you have a reference into the hash chain that the rest of the list won’t change its structure. Since the entry value is volatile, you are also guaranteed to see any updates to the field immediately, thus avoiding the potential for a stale view of memory.

The get() operation begins by getting the head reference in the desired bucket, without locking. It then traverses the chain looking for the value. If it is not found then a bucket lock is acquired, the head pointer is found again and it traverses again. Below is a simplified partial implementation.

public Object get(Object key){
    int hash = hash(key);
    //First pass without locking.
    HashEntry[] tab = table;
    int i = hash & (tab.length-1);
    HashEntry head = tab[i];
    HashEntry e;
    for(e=head; e!=null; e = e.next){
        if(e.hash == hash && equal(key, e.key)){
            Object value = e.value;
            if(value != null){//null value indicates the element has been removed
                return value;
            }else{
                break;
            }
        }
    }
    //Second pass with a lock
    Segment seg = segments[hash & SEGMENT_MASK];
    synchronized(seg){
        tab = table;
        i = hash & (tab.length-1);
        HashEntry newHead = tab[i];
        if(e != null || head != newHead){
            for(e=newHead; e!=null; e=e.next){
                if(e.hash == hash && equal(key, e.key)){
                    return e.value;
                }
            }
        }
        return null;
    }
}

The remove() operation can’t simply remove the element from the chain because it is possible that other threads traversing that chain would see the stale value. Instead removal begins by finding the correct element and sets the value to null. Since the field is volatile, any threads that check this value will immediately see the null value and know that they need to re-traverse the chain with a lock. Then the elements from the head of the chain to the removed element is cloned and joined to the remainder of the chain.

I highly recommend you read through the complete class implementation. It is a really well thought out piece of code that minimized the impact of locks and is cognizant of how the JMM rules works.

I hope this article built on the basic knowledge introduced in Part 1. You should now be familiar with the kind of tools provided by the java.util.concurrent package. It contains a variety of helper class both for building data structures that are thread-safe and for controlling synchronization. When doing multi-threaded programming this should be one of the first places you look for tools to facilitate your needs. You are hopefully also now more confident in your understanding of the effects that using final and volatile keywords has on program execution. In Part 3 of this series I am going to go into some examples of problems that can arise from seemingly harmless code, including incorrect use of the volatile keyword, this references that escape during construction, how to use ThreadLocal to render a stateful class thread-safe and using fork-join to split a workload into multiple tasks.

I am by no means an expert so I welcome any comments, corrections or requests for additional information in this series. I think this is an important and poorly understood topic and I would like to provide a strong introduction here.

Concurrency and Multi-threaded Programming in Java – Part 1

I am giving a talk at the office this week about some of the idiosyncrasies of concurrency in Java. So I thought it would be nice to just expand that discussion into a couple of blog posts. This first post will go over the basic constructs for synchronization and concurrency as well as how to implement them in Java, in the second post I will talk about the java.util.concurrent library provided in Java 5+ and talk about the most common error resulting made using concurrency, and in the final post I will discuss advanced concurrency issues specifically related to the Java programming language.

What is Concurrency?

Concurrency in programming is the idea of breaking up computation into pieces that can be executed simultaneously. How this actually occurs depends on the underlying hardware but for our purposes we will assume that execution occurs in parallel. One of these pieces of computation is called a Thread. Concurrency becomes especially interesting and complex when the threads interact with each other. This can generally be through shared access to the same data, direct action on each other’s fields or shared need for limited resources. When this is the case it becomes necessary to synchronize the actions of individual threads.

Terminology

There is a lot of terminology involved with concurrency so I am going to go ahead and define the terms I am going to use and what I mean by them. Many of these terms have multiple meanings but unless otherwise stated I am referring to the following meanings.

Lock: A lock is what a thread gains when it enters a block of code (or gains access to a resource) that is synchronized. In general, when one thread has a lock, all other threads are blocked from accessing that resource, object or block of code.

Critical path/section: A critical path is a piece of code that is synchronized.

Condition Variable: A condition variable is a variable that is associated with a lock.

Mutex: A mutex (mutual exclusion) is a construct for creating a lock and is generally used interchangeably with lock.

Semaphore: A semaphore is a mutex that supports the use of a counter to allow multiple threads access to the same block or resource. (A semaphore with a counter of 1 is equivalent to a mutex)

Barrier: A barrier is a holding zone for multiple threads. It acts as a gate that only releases threads when a certain number have reached that point.

Monitor: A monitor is a term that has an inconsistent meaning and I will avoid using it but if you see it elsewhere it will generally be referring to a lock or an implementation of a locking mechanism.

Concurrency in Java

Java is especially interesting because, unlike many programming languages, it has support for multi-threaded execution and synchronization built directly into the language. All objects in Java implicitly have a single condition variable used for locking associated with it. This variable is accessed through the wait() / notify() methods. Prior to Java 5 this was the primary way synchronization was achieved. Concurrency is a very abstract topic so let’s just dive into an example that uses multiple threads.

This first example demonstrates how multiple threads work independently. Here we create three simple thread objects that simply print the given character 100 times.

public class MultiThreadExample {

    public static void main(String args[]) {

        //Create 3 Thread objects
        Thread aThrd = new PrintThread("a");
        Thread bThrd = new PrintThread("b");
        Thread cThrd = new PrintThread("c");

        //Start the threads
        aThrd.start();
        bThrd.start();
        cThrd.start();

        //You might expect this code to simply print 100 a's followed by b's
        //and c's but instead you see the values are interweaved and likely
        //to be different each time you run this file.
    }

    static class PrintThread extends Thread {

        String val;

        public PrintThread(String val){ this.val = val;}

        //Our run method here simply prints the val 100 times
        @Override
        public void run() {
            for(int i=0; i&lt;100; i++){
                System.out.print(val);
            }
        }
    }
}

Example 1 output:

andrew@andrew-Inspiron-1520:~/dev$ java MultiThreadExample
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb

As you can see from running the last example, what we get is our three threads executing in parallel and as a result printing to the console in no specific order. The threads share access to the standard out. We could modify the code to guarantee one thread completes it’s use of the standard out prior to another beginning. That is boring though so let’s instead look at a slightly more complicated example.

In this example we are going to build a Queue implemented with a linked list that is thread safe. Before we do so, let’s look at what could go wrong if we didn’t. Below is the implementation of our queue. Take a second to try and figure out where a problem might arise with using a Linked List if we didn’t include the synchronized keyword.

public class QueueExample {

    class Node {
        public Object data;
        public Node next;

        public Node(Object data){this.data = data;}
    }

    private Node head, tail;

    public synchronized void enqueue(Object data){
        if (null == head){
            head = new Node(data);
            tail = head;
        }else{
            tail.next = new Node(data);
            tail = tail.next;
        }
    }

    public synchronized Object dequeue(){
        Object result = null;
        if(null != head){
            result = head.data;
            front = front.next;
        }
        return result;
    }
}

Figure it out? That’s right the problem occurs when two threads add an element at the same times. Since the act of adding an element is not an atomic operation, there is room for an inconsistent state to develop. Consider the following execution given our queue q and two threads t1, t2. Let’s also assume q already contains elements and the tail is called x.

Thread 1 execution Thread 2 execution
t1 calls q.enqueue(y) T2 calls q.enqueue(z)
(x) tail.next is set to a new Node with data = y ….
…. (x) tail.next is set to a new Node with data = z
tail is set to y ….
…. tail is set to null

So what does this leave us with once both threads have finished executing? A broken queue is what. The former end of the list X is now pointing to Z as it’s next node when it should be Y; X is completely lost with nothing referencing it at all; and our queue thinks a null node is the tail element.

Why does that magic word ‘synchronized’ fix the problem? ‘synchronized’ is the Java keyword for requiring a lock or mutex on the object to use it’s method. It means that in order to enter that method a thread must first obtain a lock. Since only one thread can have a lock on that object at a time, our problem is fixed because an element can only be added or removed when no other thread is acting on the queue. So in our execution example above t2 would simply wait for t1 to exit the method and be notified that it was free before it began executing the enqueue() call. To further use our terminology, we would say that the enqueue method was our critical path because it required a lock to use.

What if we have a much more complex method and only a small part of it is a critical path? Naturally we don’t want to synchronize the entire method because it would waste cycles waiting and reduce the level of concurrency. Furthermore, what if we don’t need a lock on the entire object whose method we are using but rather just the small critical path? We instead lock only on what is absolutely required and we use a separate object. An example might be a method that modifies a user’s account given some identifying information about the user. We don’t need to lock on any computation to get the account, but instead only the portion that is making changes to the account.

public void modifyAccount(Object userId){
    //...
    //perform complex steps to get userAccount object
    //...
    synchronized(userAccount){
        //execute critical path
        //make changes to account
    }
    //...
    //finish computation
}

Note that we are locking on the userAccount object and not the object whose method we are executing. Often times the object that you use to lock a critical path is insignificant and used purely to construct the mutex with it’s implicit condition variable.

So now we know how to lock critical code blocks, but what if we have a limited resource that has a finite quantity available. A simple mutex won’t be sufficient because it doesn’t allow us to make full use of our resources. Here is where we use a semaphore and take advantage of the count variable. Below is a simple example implementation of a Semaphore class.

public class Semaphore {

    private int permitCount;

    public Semaphore(int permitCount){
        this.permitCount = permitCount;
    }

    /**
     * Get one resource lock
     */
    public synchronized void acquire(){
        try{
            while(permitCount == 0)
                wait();
            permitCount--;
        }catch(InterruptedException ie){System.err.println(ie.getMessage());}
    }

    /**
     * Release numPermits resource locks
     */
    public synchronized void release(){
        permitCount++;
        notify();
    }
}

We have now successfully created a basic semaphore. It is essentially just a mutex that allows multiple threads to have a lock on the resource. Some good examples of when this is useful is when you have a finite number of database connection available or you have a pool of threads handling messages as they arrive. There are a couple of things worth noting about our implementation that is different from what will be found in the Java 5 library. In our implementation we make no guarantee that threads will receive resources in the order they are requested and we allow threads to release resources independent of whether or not they had acquired one to begin with.

The last basic concurrency construct I am going to discuss is the barrier. A barrier is very simple and is quite often overlooked because its functionality is easily achieved in a variety of ways. It is essentially exactly what it sounds like. An object that holds up a set of threads until a certain number have arrive, then releases them. It can be thought of as the reverse of a semaphore. Below is a simple implementation:

public class Barrier {

    private int numMembers, generation = 0, waitingCount = 0;

    public Barrier(int numMembers){this.numMembers = numMembers;}

    public synchronized void await() throws InterruptedException {
        int currentGeneration = generation;
        waitingCount++;
        if(waitingCount == numMembers){
            waitingCount = 0;
            generation++;
            notifyAll();
        }else{
            while(generation == currentGeneration){
                wait();
            }
        }
    }
}

This simple construct simply forces the first numMembers – 1 threads to wait and then releases them all once enough have arrived. The barrier is then reset and available for use again. The barrier is a very straight forward construct that can be used for a variety of things.

To wrap up we have now been introduced to the basic terminology related to synchronization and concurrency as well as looked at how to implement the basic constructs in the Java programming language. You should hopefully now feel comfortable using threads to facilitate simple computing in parallel. In the next post, I will discuss the most common errors that result from incorrect synchronization, introduce the java.util.concurrent library that provides complete implementation for all of the constructs we created above as well as some more advanced tools. Finally in a third post I will discuss advanced concurrency issues specifically related to the Java programming language.

I am by no means an expert so I welcome any comments, corrections or requests for additional information in this series. I think this is an important and poorly understood topic and I would like to provide a strong introduction here.