Threads are lightweight processes that share the same memory space. A scheduler can swap between threads. Java uses preemptive multithreading, where the OS is responsible for swapping threads.
Java 5 has introduced a significant new paradigm to handle threads, compared to the primitive concurrent commands of pre version 5. It is recommended that you use the new java.util.concurrent.* package. In the following I show the thread commands for Java pre-version 5 and post-version 5.
* Prefer use of the Java 6 concurrent collections instead of the much slower pre Java 5 synchronized ones.
Here's an example of how to use the new API:
A thread, created by ExecutorService, calls the doWait() method, while the main method calls doNotify(). Here's the output.
It is more secure to use
References
Java 5 has introduced a significant new paradigm to handle threads, compared to the primitive concurrent commands of pre version 5. It is recommended that you use the new java.util.concurrent.* package. In the following I show the thread commands for Java pre-version 5 and post-version 5.
Java 1-4 | Java 5-7 |
Thread Creation | |
public class MyThread extends Thread { public void run() { // your concurrent task here } } MyThread thread = new MyThread(); thread.start(); or public class MyTask implements Runnable { public void run() { // your concurrent task here } } Thread thread = new Thread(new MyTask()); thread.start(); | public class MyTask implements Runnable { public void run(){ // your concurrent task here } } // Creates new threads as needed and destroys old threads ExecutorService exec = Executors.newCachedThreadPool(); // Keeps a permanent total number of threads ExecutorService exec = Executors.newFixedThreadPool(5); // Like newFixedThreadPool but only for one thread ExecutorService exec = Executors.newSingleThreadExecutor(); // To be scheduled, to replace java.util.Timer ExecutorService exec = Executors.newScheduledThreadPool(); // Like before but for one thread only ExecutorService exec = Executors.newSingleThreadScheduledExecutor(); exec.execute(new MyTask()); exec.shutdown(); int processors = Runtime.getRuntime().availableProcessors(); ExecutorService executor = Executors.newFixedThreadPool(processors); Future<Integer> futureResult = executor.submit( new Callable<Integer>() { public Integer call() { // long running computation that // returns an integer } }); Integer result = futureResult.get(); // block until result is ready exec.shutdown(); |
Critical sections | |
public synchronized void method() { // same as synchronising on "this" // which is dangerous as another // thread could get hold of "this" // and block your code. // When the lock is acquired for // the synchronized block, other // synchronized methods and // critical sections in the object // cannot be called. } // use this instead // In Java, every object contains a “monitor” // that can be used to provide // mutual exlusion access to critical sections of code. private final Object monitor = new Object(); public void method() { synchronized(monitor) { // your critical section here } } | import java.util.concurrent.locks.*; private final Lock lock = new ReentrantLock(); public void method() { // cannot be interrupted lock.lock(); // can be interrupted // lock.lockInterruptibly(); try{ // your critical section here } finally { lock.unlock(); } } // can try lock public void method() { boolean locked = lock.tryLock(); if (locked) { try{ // your critical section here } finally { lock.unlock(); } } else { // do something else } } import java.util.concurrent.locks.*; private final ReadWriteLock lock = new ReentrantReadWriteLock(); public void doWrite() { lock.writeLock().lock(); try { // your critical section here } finally { lock.writeLock().unlock(); } } public void doRead() { lock.readLock().lock(); try { // your critical section here } finally { lock.readLock().unlock(); } } |
Thread interactions | |
private final Object lock = new Object(); // must lock on the same object private volatile boolean notified = false; public void doWait() { synchronized(lock) { while(!notified) { // avoid spurious wakeups try { // releases the object lock and waits // until it gets notified // otherwise the notifying thread could // never acquire the lock! lock.wait(); } catch(InterruptedException e) { } } //clear signal for next waiting thread and continue running notified = false; } } public void doNotify() { synchronized(lock) { notified = true; // sends a message to a thread waiting on the object lock lock.notify(); // sends a message to all the threads waiting on the object lock // lock.notifyAll(); } } | import java.util.concurrent.locks.*; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); private volatile boolean notified = false; public void doWait() { lock.lock(); try{ while(!notified) { condition.await(); // like wait(), releases // the lock and suspends current thread } } finally { lock.unlock(); } notified = false; } public void doNotify() { lock.lock(); try{ notified = true; condition.signal(); // like notify() // condition.signalAll(); // like notifyAll() } finally { lock.unlock(); } } final CountDownLatch latch = new CountDownLatch(3); ExecutorService exec = Executors.newSingleThreadExecutor(); exec.execute(new Runnable() { public void run() { try { // puts it into WAITING state latch.await(); // until latch == 0 } catch (InterruptedException e) { Thread.currentThread.interrupt(); return; } } }); for (int i=0; i<3; i++) { Thread.sleep(1000); latch.countDown(); // --latch; } final Semaphore semaphore = new Semaphore(3); ExecutorService exec = Executors.newSingleThreadExecutor(); exec.execute(new Runnable() { public void run() { try { semaphore.acquire(); try { Thread.sleep(1000); } finally { semaphore.release(); } } catch (InterruptedException e) { Thread.currentThread.interrupt(); return; } } }); Also check: CyclicBarrier, Semaphore, Exchanger |
Atomicity/Visibility | |
/* The volatile modifier can be used to mark a field and indicate that changes to that field must be seen by all subsequent reads by other threads, regardless of synchronization.*/ public class MyClass implements Runnable { private volatile boolean finished = false; public void finish() { finished = true; } public void run() { while(!finished) { // .. do processing } } } | import java.util.concurrent.atomic.*; public class Counter { private AtomicInteger value = new AtomicInteger(); public int next() { return value.incrementAndGet(); } } java.util.concurrent.atomic package:
|
Periodic Tasks | |
try { Thread.sleep(1000); // or TimeUnit.SECONDS.sleep(1); } catch(InterruptedException ex) { Thread.currentThread().interrupt(); // set interrupted status to true break; } | TimerTask task = new TimerTask() { public void run() { // your concurrent task here } }; Timer timer = new Timer(); timer.schedule(task, 1000, 1000); // (TimerTask, long delay, long period)
// Better use this instead of java.util.Timer
ExecutorService exec = Executors.newScheduledThreadPool();
// or this
ExecutorService exec = Executors.newSingleThreadScheduledExecutor();
new javax.swing.Timer(1000, new ActionListener() { public void actionPerformed( ActionEvent evt) { //...Perform a task... } }).start(); |
Concurrent Collections* | |
Lists (Collections.synchronizedList) | CopyOnWriteArrayList |
Sets (Collections.synchronizedSet, Collections.synchronizedSortedSet) | CopyOnWriteArraySet, ConcurrentSkipListSet |
Maps (Collections.synchronizedMap, Collections.synchronizedSortedMap) | ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListMap |
Queues | Queue, BlockingQueue, ConcurrentLinkedQueue, ArrayBlockingQueue, LinkedBlockingQueue, PriorityQueue, PriorityBlockingQueue, DelayQueue, SynchronousQueue, Deque, BlockingDeque, ArrayDeque, LinkedBlockingDeque |
Here's an example of how to use the new API:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadInteractions implements Runnable {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile boolean notified = false;
public void doWait() {
System.out.println("Awaiting ...");
lock.lock();
while (!notified) {
try {
// like wait(), releases the lock
// and suspends current thread
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
notified = false;
System.out.println("Resuming ...");
}
public void doNotify() {
lock.lock();
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("Notifying ...");
notified = true;
condition.signal(); // like notify()
// condition.signalAll(); // like notifyAll()
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
@Override
public void run() {
this.doWait();
}
/**
* @param args
*/
public static void main(String[] args) {
ThreadInteractions obj = new ThreadInteractions();
// a thread executes doWait()
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(obj);
exec.shutdown();
// the main thread executes doNotify()
obj.doNotify();
}
}
A thread, created by ExecutorService, calls the doWait() method, while the main method calls doNotify(). Here's the output.
Awaiting ... Notifying ... Resuming ...
It is more secure to use
condition.signalAll()
instead of condition.signal()
. For that reason, the notified
flag is set back to false in order for the next thread (not in this example however) to set it back to true and the next thread to be notified. References
- Jenkov, J., Introduction to Java Concurrency / Multithreading, http://tutorials.jenkov.com/java-concurrency/index.html
- Eckel, B. (2007), Thinking in Java, 4th Edition, Prentice Hall.
- Kabutz, H. (1999-2011), Newsletters