Πέμπτη 27 Οκτωβρίου 2011

Concurrency in Java 6

Threads are lightweight processes that share the same memory space. A scheduler can swap between threads. Java uses preemptive multithreading, where the OS is responsible for swapping threads.
Java 5 has introduced a significant new paradigm to handle threads, compared to the primitive concurrent commands of pre version 5. It is recommended that you use the new java.util.concurrent.* package. In the following I show the thread commands for Java pre-version 5 and post-version 5.

Java 1-4 Java 5-7
Thread Creation
public class
MyThread extends Thread {
   public void run() {
     // your concurrent task here
   }
}

MyThread thread = new MyThread();
thread.start();

or

public class MyTask implements Runnable {
   public void run() {
     // your concurrent task here
   }
}

Thread thread = new Thread(new MyTask());

thread.start();

public class MyTask implements Runnable {
   public void run(){
     // your concurrent task here
   }
}
// Creates new threads as needed and destroys old threads
ExecutorService exec = Executors.newCachedThreadPool();
// Keeps a permanent total number of threads
ExecutorService exec = Executors.newFixedThreadPool(5);
// Like newFixedThreadPool but only for one thread
ExecutorService exec = Executors.newSingleThreadExecutor();
// To be scheduled, to replace java.util.Timer
ExecutorService exec = Executors.newScheduledThreadPool();
// Like before but for one thread only
ExecutorService exec = Executors.newSingleThreadScheduledExecutor();

exec.execute(new MyTask());
exec.shutdown();


int processors =
Runtime.getRuntime().availableProcessors();

ExecutorService executor = Executors.newFixedThreadPool(processors);

Future<Integer> futureResult = executor.submit(
   new Callable<Integer>() {
     public Integer call() {
       // long running computation that
       // returns an integer
     }
   });
Integer result = futureResult.get();
// block until result is ready
exec.shutdown();
Critical sections
public synchronized void method() {
  // same as synchronising on "this"
  // which is dangerous as another 
  // thread could get hold of "this"
  // and block your code.
  // When the lock is acquired for
  // the synchronized block, other
  // synchronized methods and
  // critical sections in the object
  // cannot be called.
}

// use this instead
// In Java, every object contains a “monitor”
// that can be used to provide
// mutual exlusion access to critical sections of code.

private final Object monitor = new Object();

public void method() {
  synchronized(monitor) {
    // your critical section here
  }
}

import java.util.concurrent.locks.*;

private final Lock lock = new ReentrantLock();

public void method() {     
  // cannot be interrupted
  lock.lock();
  // can be interrupted
  // lock.lockInterruptibly();
  try{
    // your critical section here
  } finally {
    lock.unlock();
  }
}

// can try lock
public void method() {
  boolean locked = lock.tryLock();
  if (locked) {
    try{
      // your critical section here
    } finally {
      lock.unlock();
    }
  } else {
    // do something else
  }
}


import java.util.concurrent.locks.*;

private final ReadWriteLock lock = new ReentrantReadWriteLock();

public void doWrite() {
   lock.writeLock().lock();
   try {
     // your critical section here
   } finally {
     lock.writeLock().unlock();
   }
}

public void doRead() {
   lock.readLock().lock();
   try {
      // your critical section here
   } finally {
      lock.readLock().unlock();
   }
}
Thread interactions


private final Object lock = new Object();  // must lock on the
same object

private volatile boolean notified = false;

public void doWait() {
  synchronized(lock) {
   while(!notified) {  // avoid spurious wakeups
    try {
      // releases the object lock and waits
      // until it gets notified
      // otherwise the notifying thread could
      // never acquire the lock!
      lock.wait();
    } catch(InterruptedException e) {
    }
   }

   //clear signal for next waiting thread and continue running
   notified = false;
  }
}

public void doNotify() {
  synchronized(lock) {
    notified = true;
    // sends a message to a thread waiting on the object lock
    lock.notify();
    // sends a message to all the threads waiting on the object lock
    // lock.notifyAll();
  }
}
import java.util.concurrent.locks.*;

private final Lock lock = new ReentrantLock();

private final Condition condition = lock.newCondition();

private volatile boolean notified = false;

public void doWait() {     
  lock.lock();
  try{
   while(!notified) {
    condition.await(); // like wait(), releases              
// the lock and suspends current thread
   }
  } finally {
    lock.unlock();
  }
  notified = false;
}

public void doNotify() {     
  lock.lock();
  try{
    notified = true;
    condition.signal();       // like notify()
    // condition.signalAll(); // like notifyAll()
  } finally {
    lock.unlock();
  }
}

final CountDownLatch latch = new CountDownLatch(3);
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(new Runnable() {
  public void run() {
    try { // puts it into WAITING state
      latch.await();  // until latch == 0
    } catch (InterruptedException e) {
      Thread.currentThread.interrupt();
      return;
    }
  }
});
for (int i=0; i<3; i++) {
  Thread.sleep(1000);
  latch.countDown(); // --latch;
}

final Semaphore semaphore = new Semaphore(3);
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(new Runnable() {
  public void run() {
    try {
      semaphore.acquire();
      try { 
        Thread.sleep(1000);
      } finally {
        semaphore.release();
      }
    } catch (InterruptedException e) {
      Thread.currentThread.interrupt();
      return;
    }
  }
});
Also check: CyclicBarrier, Semaphore, Exchanger

Atomicity/Visibility
/* The volatile modifier can be used to mark a field and indicate that changes to that field must be seen by all subsequent reads by other threads, regardless of synchronization.*/
public class MyClass implements Runnable {

private volatile boolean finished = false;

   public void finish() {
     finished = true;
   }

   public void run() {
     while(!finished) {
        // .. do processing
     }
   }
}
import java.util.concurrent.atomic.*;

public class Counter {
  private AtomicInteger value = new AtomicInteger();

  public int next() {
   return value.incrementAndGet();
 }
}

java.util.concurrent.atomic package:
  • AtomicBoolean 
  • AtomicLong 
  • AtomicInteger
  • AtomicReference 

Periodic Tasks
try {
  Thread.sleep(1000);
// or TimeUnit.SECONDS.sleep(1);
} catch(InterruptedException ex) {
   Thread.currentThread().interrupt();   // set interrupted status to true
  break;
}

TimerTask task = new TimerTask() {
  public void run() {
   // your concurrent task here
  }

};

Timer timer = new Timer();
timer.schedule(task, 1000, 1000);   // (TimerTask, long delay, long period)


// Better use this instead of java.util.Timer

ExecutorService exec = Executors.newScheduledThreadPool();
// or this
ExecutorService exec = Executors.newSingleThreadScheduledExecutor();

new javax.swing.Timer(1000, new ActionListener() {
      public void actionPerformed( ActionEvent evt) {
          //...Perform a task...
      }
}).start();

Concurrent Collections*
Lists (Collections.synchronizedList) CopyOnWriteArrayList
Sets (Collections.synchronizedSet, Collections.synchronizedSortedSet) CopyOnWriteArraySet, ConcurrentSkipListSet
Maps (Collections.synchronizedMap, Collections.synchronizedSortedMap) ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListMap
Queues

Queue, BlockingQueue, ConcurrentLinkedQueue, ArrayBlockingQueue, LinkedBlockingQueue, PriorityQueue, PriorityBlockingQueue, DelayQueue, SynchronousQueue, Deque, BlockingDeque, ArrayDeque, LinkedBlockingDeque
* Prefer use of the Java 6 concurrent collections instead of the much slower pre Java 5 synchronized ones.

Here's an example of how to use the new API:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadInteractions implements Runnable {

    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private volatile boolean notified = false;

    public void doWait() {
        System.out.println("Awaiting ...");
        lock.lock();
        while (!notified) {
            try {
                // like wait(), releases the lock
                // and suspends current thread
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        notified = false;
        System.out.println("Resuming ...");
    }

    public void doNotify() {
        lock.lock();
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println("Notifying ...");
            notified = true;
            condition.signal(); // like notify()
            // condition.signalAll(); // like notifyAll()
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void run() {
        this.doWait();
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        ThreadInteractions obj = new ThreadInteractions();
        // a thread executes doWait()
        ExecutorService exec = Executors.newSingleThreadExecutor();
        exec.execute(obj);
        exec.shutdown();
        // the main thread executes doNotify()
        obj.doNotify();
    }
    
}


A thread, created by ExecutorService, calls the doWait() method, while the main method calls doNotify(). Here's the output.

Awaiting ...
Notifying ...
Resuming ...

It is more secure to use condition.signalAll() instead of condition.signal(). For that reason, the notified flag is set back to false in order for the next thread (not in this example however) to set it back to true and the next thread to be notified.

References
  1. Jenkov, J., Introduction to Java Concurrency / Multithreading, http://tutorials.jenkov.com/java-concurrency/index.html
  2. Eckel, B. (2007), Thinking in Java, 4th Edition, Prentice Hall.
  3. Kabutz, H. (1999-2011), Newsletters