This is the second part in a three part discussion about concurrency in Java. The first article introduces the common terminology used and how many of the basic locking mechanism work. This post will focus on the java.util.concurrent package. The final post will discuss some of the more complicated issue that arise in Java when using concurrency and synchronization.
Before talking about the java.util.concurrent package I want to go over how a couple of the Java keywords effect concurrency in Java 1.5+. In an effort to maintain good performance, when synchronization is not explicitly dictated, the compiler, JVM and cache take serious liberties with reordering memory operations. This can provide significant performance gains but also can cause your programs to function in unexpected ways. The only rule on reordering is that the thread that is currently executing can’t tell the difference. This is called within-thread as-if-serial semantics. Luckily, rules are added with the use of the keywords volatile and final.
The volatile keyword guarantees that reads and writes to those variables are ordered totally across all threads; meaning that neither the compiler or cache can reorder these calls with each other. Furthermore, volatile reads and writes will not be reordered with any other memory operations. So this means that when thread A writes to a volatile variable X, and thread B reads from X, any variables that were visible to thread A at the time X was written will also be visible to B. This means that X can be used to indicate state to other threads. There are also similar guarantees when a thread is started, a thread is joined with another thread and when a thread enters and exits a synchronized block.
The final keyword means just that final, except when it doesn’t… Before Java 1.5, without synchronization another thread could potentially see the default value (null or 0) for a final field and then later see the correct value. Java 1.5+ now guarantees that when a final object’s constructor completes that all final fields are frozen and that any other threads that reference the object will see the correct values from those fields. Like volatile, operations that initialize final fields also will not be reordered with operations that follow the freeze. Beyond this, any variable that can be reached through a final field are also guaranteed to be visible to other threads. The impact of this is that immutable objects are inherently thread-safe.
java.util.concurrent
The concurrency library was added in Java 1.5. The library uses hardware level constructs such as compare and swap (CAS) to create thread safety mechanisms largely without the need for locking at the object level. This generally provides a significant performance increase over using the synchronized keyword to lock critical paths (the performance improvement can become a performance hit in extremely heavy traffic situations where extended periods of waiting are necessary because many of these algorithms poll rather than sleep and therefore waste CPU cycles). It is usually advisable to use the canned implementations from this package rather than implementing your own class. The concurrent package introduces the first truly thread safe data structures to the standard library. You can see the full documentation here. I am going to go over a couple of the most useful and a few of the interesting classes in the package and give some usage examples.
Semaphore
This class is very straight forward. It works primarily through the acquire() and release() methods. This implementation simply keeps an internal count variable that controls access. The primary use case is for restricting access to a limited resource. In the example below we use a semaphore to control a finite number of available db connections.
public static class DbConnectionFactorExample{
public static final int NUM_CONNECTIONS = 10;
private static final String DB_URL = "jdbc:mysql://localhost/exampledb";
public static Connection getConnection(){
try{
return LimitedConnectionDecorator.getConnection();
}catch(InterruptedException e){
return null;
}
}
}
class LimitedConnectionDecorator implements Connection{
private static final Semaphore connectionAccess = new Semaphore(NUM_CONNECTIONS);
private Connection dbConnection;
public LimitedDBConnection(Connection dbConnection){
this.dbConnection = dbConnection;
}
public static Connection getConnection() throws InterruptedException{
connectionAccess.acquire();
try{
return new LimitedDBConnection(DriverManager.getConnection(DB_URL));
}catch(SQLException e){
connectionAccess.release();
return null;
}
}
@Override
public void close() throws SQLException{
if(!dbConnection.isClosed()){
try{
dbConnection.close()
connectionAccess.release();
}catch (SQLException e){
//Roll back logic
//May release lock anyways depending on db
}
}
}
//... rest of implementation omitted
}
Exchanger
The exchanger is an interesting convenience class that facilitates the coordinated exchange of a data structure between two threads. Once the first thread finishes processing the object it calls exchange(object) and waits for the other thread to be done as well. Then they swap. The obvious use case here is for having one thread fill and one thread empty that structure. In the example below, I do just that.
public class ExchangerExampler{
public Exchanger<MagicDataStructure> exchanger = new Exchanger<MagicDataStructure>();
public MagicDataStructure mds1 = new MagicDataStructure();
public MagicDataStructure mds2 = new MagicDataStructure();
public static void main(Strings args[]){
new Thread(new Filler()).start();
new Thread(new Emptier()).start();
}
class Filler implements Runnable {
public void run(){
MagicDataStructure myMds = mds1;
try{
while(true){
if(myMds.isFull()){
myMds = exchanger.exchange(myMds);
}
myMds.fill(new MagicStuff());
}
}catch(InterruptedException e){e.printStackTrace();}
}
}
class Emptier implements Runnable {
public void run(){
MagicDataStructure myMds = mds1;
try{
while(true){
if(myMds.isEmpty()){
myMds = exchanger.exchange(myMds);
}
myMds.empty();
}
}catch(InterruptedException e){e.printStackTrace();}
}
}
}
FutureTask
The FutureTask class is interesting because it basically combines a Thread with a method. When you spawn a thread you are basically just creating an asynchronous computation. The FutureTask wraps this up and allows that computation to return a result. You create a FutureTask by passing in a Runnable and, optionally, the result to be returned. You now start the FutureTask and then get() can be called some time later in the future. If the task has completed then get() will return the result, otherwise it will block until the result is finished being computed. Below is a simple example implementation.
public class FutureTaskExample{
public static void main(String args[]){
//Do you get it? It's a computater...
FutureTask computater = new FutureTask(new OverclockedPotato());
computater.run();
//Do other awesome computing
//....
Object frenchFries = computater.get();
}
}
class OverclockedPotato implements Callable{
public Object call(){
potato = new Object();
//Do stuff to it
return potato;
}
}
CopyOnWriteArrayList
The previous examples were all nice concurrency constructs but now let’s talk about some of the interesting data structures provided by the concurrent package. The CopyOnWriteArrayList is a completely thread safe List designed to be used in situations where traversals occur significantly more often than mutation. In this case it is beneficial to be able to iterate over the List without locking. This is a common need when a List is used to hold Listeners such as in AWT or Swing applications. CopyOnWriteArrayList accomplishes lock-less iteration by creating a copy of the List when a mutation occurs. This allows iteration to guarantee that it will return the list in the same state as at the time it was constructed. What this means is that the CopyOnWriteArrayList holds a mutable reference to an immutable array, rather than the other way around. I’m not going to include a code sample here because the use is identical to that of a normal list and it is the use case and data structure implementation that is interesting.
ConcurrentHashMap
The ConcurrentHashMap is an extremely interesting class that provides complete thread safety with minimal locking. It is optimized for the most common operation, retrieving an element that already exists in the map, and can succeed without locking the majority of the time. In a highly parallel system with many threads interacting with the map the performance excels far beyond that of a synchronizedMap or a HashTable. Several neat tricks are used to accomplish this.
First, rather than having a single object wide lock, the ConcurrentHashMap has a lock for each bucket (or however many you want as this value can be set in the constructor). This means that theoretically that many threads can be writing to the Map all at once. While in practice it will be lower, it is still significantly higher than only one.
The second trick takes advantage of the JMM rules on immutability. All of the fields that make up the Map are final except for the entry value fields, which are volatile. What this means is that elements cannot be added or removed from anywhere but the front of the hash chain. Although you can’t know if you are at the head or not, this does guarantee that if you have a reference into the hash chain that the rest of the list won’t change its structure. Since the entry value is volatile, you are also guaranteed to see any updates to the field immediately, thus avoiding the potential for a stale view of memory.
The get() operation begins by getting the head reference in the desired bucket, without locking. It then traverses the chain looking for the value. If it is not found then a bucket lock is acquired, the head pointer is found again and it traverses again. Below is a simplified partial implementation.
public Object get(Object key){
int hash = hash(key);
//First pass without locking.
HashEntry[] tab = table;
int i = hash & (tab.length-1);
HashEntry head = tab[i];
HashEntry e;
for(e=head; e!=null; e = e.next){
if(e.hash == hash && equal(key, e.key)){
Object value = e.value;
if(value != null){//null value indicates the element has been removed
return value;
}else{
break;
}
}
}
//Second pass with a lock
Segment seg = segments[hash & SEGMENT_MASK];
synchronized(seg){
tab = table;
i = hash & (tab.length-1);
HashEntry newHead = tab[i];
if(e != null || head != newHead){
for(e=newHead; e!=null; e=e.next){
if(e.hash == hash && equal(key, e.key)){
return e.value;
}
}
}
return null;
}
}
The remove() operation can’t simply remove the element from the chain because it is possible that other threads traversing that chain would see the stale value. Instead removal begins by finding the correct element and sets the value to null. Since the field is volatile, any threads that check this value will immediately see the null value and know that they need to re-traverse the chain with a lock. Then the elements from the head of the chain to the removed element is cloned and joined to the remainder of the chain.
I highly recommend you read through the complete class implementation. It is a really well thought out piece of code that minimized the impact of locks and is cognizant of how the JMM rules works.
I hope this article built on the basic knowledge introduced in Part 1. You should now be familiar with the kind of tools provided by the java.util.concurrent package. It contains a variety of helper class both for building data structures that are thread-safe and for controlling synchronization. When doing multi-threaded programming this should be one of the first places you look for tools to facilitate your needs. You are hopefully also now more confident in your understanding of the effects that using final and volatile keywords has on program execution. In Part 3 of this series I am going to go into some examples of problems that can arise from seemingly harmless code, including incorrect use of the volatile keyword, this references that escape during construction, how to use ThreadLocal to render a stateful class thread-safe and using fork-join to split a workload into multiple tasks.
I am by no means an expert so I welcome any comments, corrections or requests for additional information in this series. I think this is an important and poorly understood topic and I would like to provide a strong introduction here.