Java SE 5 Concurrency

Concurrency Utilities: JSR-166

  • Enables development of simple yet powerful multithreaded applications 
    • Like Collection provides rich data structure handling capability 
  • Beat C performance in high-end server applications 
  • Provide richer set of concurrency building blocks 
    • wait(), notify() and synchronized are too primitive 
  • Enhance scalability, performance, readability and thread safety of Java applications

Why Use Concurrency Utilities?

  • Reduced programming effort
  • Increased performance 
  • Increased reliability 
    • Eliminate threading hazards such as deadlock, starvation, race conditions, or excessive context switching are eliminated 
  • Improved maintainability 
  • Increased productivity

Concurrency Utilities

  • Task Scheduling Framework 
  • Callable's and Future's 
  • Semaphores 
  • Concurrent Collections 
  • Atomic Variables 
  • Locks 
  • Nanosecond-granularity timing

Task Scheduling Framework

  • Executor/ExecutorService/Executors framework supports 
    • Standardizing task submission 
    • Scheduling 
    • Execution 
  • Executor is an interface 
  • ExecutorService interface extends Executor 
  • Executors is factory class for creating various kinds of ExercutorService implementations

 Executor Interface

  • Executor interface provides a way of de-coupling task submission from the execution 
    • Task submission is standardized 
    • Task execution: mechanics of how each task will be run, including details of thread use, scheduling – captured in the implementation class 
  • Example:
Executor executor = getSomeKindofExecutor();
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
  • Many Executor implementations impose some sort of limitation on how and when tasks are scheduled

Executor and ExecutorService

ExecutorService adds lifecycle management

public interface Executor {
 void execute(Runnable command);
}
public interface ExecutorService extends Executor {
 void shutdown();
 List<Runnable> shutdownNow();
 boolean isShutdown();
 boolean isTerminated();
 boolean awaitTermination(long timeout,
 TimeUnit unit);

 // other convenience methods for submitting tasks
}

ExecutorService adds task submission methods

public interface Executor {
 void execute(Runnable command);
}
public interface ExecutorService extends Executor {
 void shutdown();
 List<Runnable> shutdownNow();
 boolean isShutdown();
 boolean isTerminated();
 boolean awaitTermination(long timeout,
 TimeUnit unit);

 // other convenience methods for submitting tasks
 <T> Future<T> submit(Callable<T> task);
 Future<?> submit(Runnable task);
 <T> Future<T> submit(Runnable task, T result);
}

execute(..) vs submit(..)

// For submitting a Task, you can use either
// execute(..) method of Executor interface
// or submit(..) method of ExecutorService interface
// Example of using execute(..) method of Executor
// interface
Executor executor = geExecutor();
executor.execute(new MyTask());
// Example of using submit(..) method of
// ExecutorService interface
ExecutorService executorService = getExecutorService();
Future f = executorService.submit(new MyTask());
// You can the use Future object for find more
// information on the task
String doneStatus = future.isDone();

Executors: Factory for creating various types of ExecutorService

public class Executors {
 static ExecutorService
 newFixedThreadPool(int n);
 static ExecutorService
 newCachedThreadPool(int n);
 static ExecutorService
 newSingleThreadedExecutor();
 static ScheduledExecutorService
 newScheduledThreadPool(int n);
 static ScheduledExecutorService
 newSingleThreadScheduledExecutor();
 // additional versions specifying ThreadFactory
 // additional utility methods

newFixedThreadPool(int n)

public class Executors {
 // newFixedThreadPool() Creates a thread pool
 // that reuses a fixed number of threads operating
 // off a shared unbounded queue.
 static ExecutorService
 newFixedThreadPool(int n);
 ..
}
----------------------------------------------
// Usage example – Create ExecutorService object
ExecutorService executorService =
 Executors.newFixedThreadPool(NUMBER_THREADS);
// Submit a task
future = executorService.submit(new MyTask(i));
// Check if the task is done
String doneStatus = future.isDone();

pre-J2SE 5.0 Code

Web Server—poor resource management

class WebServer {
 public static void main(String[] args) {
 ServerSocket socket = new ServerSocket(80);
 while (true) {
 final Socket connection = socket.accept();
 Runnable r = new Runnable() {
 public void run() {
 handleRequest(connection);
 }
 };
 // Don't do this!
 new Thread(r).start();
 }
 }
}

Executors Example

Web Server—better resource management

class WebServer {
 Executor pool =
 Executors.newFixedThreadPool(7);
 public static void main(String[] args) {
 ServerSocket socket = new ServerSocket(80);
 while (true) {
 final Socket connection = socket.accept();
 Runnable r = new Runnable() {
 public void run() {
 handleRequest(connection);
 }
 };
 pool.execute(r);
 }
 }
}

Lab: Exercise 1 1108_javase5_concurrency.zip

newCachedThreadPool()

 ...
 // newCachedThreadPool() creates a thread pool that creates
 // new threads as needed, but will reuse previously constructed
 // threads when they are available.
 static ExecutorService
 newCachedThreadPool();
 ..
}
----------------------------------------------------------
// Usage example – Create ExecutorService object
ExecutorService executorService =
 Executors.newCachedThreadPool();
// Submit a task
future = executorService.submit(new MyTask(i))
// Check if the task is done
String doneStatus = future.isDone()

newScheduledThreadPool()

public class Executors {
 ..
 // newScheduledThreadPool(POOL_SIZE) creates a thread pool
 // that can schedule commands to run after a given delay, or
 // to execute periodically
 static ScheduledExecutorService
 newScheduledThreadPool(int corePoolSize);
 ..
}
----------------------------------------------------------
// Usage example – Create ScheduledExecutorService object
ScheduledExecutorService scheduledExecutedService =
 Executors.newScheduledThreadPool(POOL_SIZE);
// Schedule a task
ScheduledFuture<?> timeHandle1 =
 scheduledExecutedService.scheduleAtFixedRate(
 new TimePrinterTask1(System.out), // Task to execute
 1, // Initial delay
 3, // the period between successive executions
 SECONDS); // the time unit

Lab: Exercise 2 1108_javase5_concurrency.zip

Callables and Futures

Callable's and Future's: Problem (pre-J2SE 5.0)

  • If a new thread (callee thread) is started in an application, there is currently no way to return a result from that thread to the thread that started it (calling thread) without the use of a shared variable and appropriate synchronization 
    • This is complex and makes code harder to understand and maintain
  • Callable thread (Callee) implements Callable interface 
    • Implement call() method rather than run() 
  • Calling thread (Caller) submits Callable object to Executor and then moves on 
    • Through submit() not execute() 
    • The submit() returns a Future object 
  • Calling thread (Caller) then retrieves the result using get() method of Future object 
    • If result is ready, it is returned 
    • If result is not ready, calling thread will block

Build CallableExample (This is Callee)

class CallableExample
 implements Callable<String> {
 public String call() {
 /* Do some work and create a result */
 String result = “The work is ended”;
 return result;
 }
}

Future Example (Caller)

ExecutorService es =
 Executors.newSingleThreadExecutor();
Future<String> f =
 es.submit(new CallableExample());
/* Do some work in parallel */
/* Then later on, check the result */
try {
 String callableResult = f.get();
} catch (InterruptedException ie) {
 /* Handle */
} catch (ExecutionException ee) {
 /* Handle */
} 

Lab: Exercise 3: Callable and Future 1108_javase5_concurrency.zip

Semaphores

  • Typically used to restrict access to fixed size pool of resources 
  • New Semaphore object is created with same count as number of resources 
  • Thread trying to access resource calls acquire() 
    • Returns immediately if semaphore count > 0 
    • Blocks if count is zero until release() is called by different thread 
    • acquire() and release() are thread safe atomic operations

Semaphore Example

private Semaphore semaphore;
 private Resource[] resources;
 private boolean[] used;
 public Resource(int poolSize) {
 semaphore = new Semaphore(poolSize);
 /* Initialise resource pool */
 }
 public Resource getResource() {
 try { semaphore.acquire() } catch (IE) {}
 /* Acquire resource */
 }
 public void returnResource(Resource r) {
 /* Return resource to pool */
 semaphore.release();
 }

Lab: Exercise 4: Semaphore 1108_javase5_concurrency.zip

Concurrent Collections

BlockingQueue Interface

  • Provides thread safe way for multiple threads to manipulate collection 
  • ArrayBlockingQueue is simplest concrete implementation 
  • Full set of methods 
    • put() 
    • offer() [non-blocking] 
    • peek() 
    • take() 
    • poll() [non-blocking and fixed time blocking]

Blocking Queue Example (1)

private BlockingQueue<String> msgQueue;
public Logger(BlockingQueue<String> mq) {
 msgQueue = mq;
}
public void run() {
 try {
 while (true) {
 String message = msgQueue.take();
 /* Log message */
 }
 } catch (InterruptedException ie) {
 /* Handle */
 }
}

Blocking Queue Example (2)

private ArrayBlockingQueue messageQueue =
 new ArrayBlockingQueue<String>(10);
Logger logger = new Logger(messageQueue);
public void run() {
 String someMessage;
 try {
 while (true) {
 /* Do some processing */
 /* Blocks if no space available */
 messageQueue.put(someMessage);
 }
 } catch (InterruptedException ie) { }
}

Lab: Exercise 5: BlockingQueue 1108_javase5_concurrency.zip

Concurrency: Atomic Variables

Atomics

  • java.util.concurrent.atomic 
    • Small toolkit of classes that support lock-free threadsafe programming on single variables
AtomicInteger balance = new AtomicInteger(0);
public int deposit(integer amount) {
 return balance.addAndGet(amount);
}

Lab: Exercise 6: Automatic Variable 1108_javase5_concurrency.zip

Concurrency: Locks

Lock Interface & ReentrantLock

  • Lock interface 
    • More extensive locking operations than synchronized block 
    • No automatic unlocking – use try/finally to unlock 
    • Non-blocking access using tryLock() 
  • ReentrantLock class 
    • Concrete implementation of Lock 
    • Holding thread can call lock() multiple times and not block 
    • Useful for recursive code

ReadWriteLock Interface & ReentrantReadWriteLock

  • ReadWriteLock 
    • Has two locks controlling read and write access 
    • Multiple threads can acquire the read lock if no threads have a write lock 
    • If a thread has a read lock, others can acquire read lock but nobody can acquire write lock 
    • If a thread has a write lock, nobody can have read/write lock 
    • Methods to access locks
rwl.readLock().lock();
rwl.writeLock().lock(); 
  • ReentrantReadWriteLock

ReadWrite Lock Example

class ReadWriteMap {
final Map<String, Data> m = new TreeMap<String, Data>();
final ReentrantReadWriteLock rwl =
new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();
public Data get(String key) {
r.lock();
try { return m.get(key) }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}

Lab: Exercise 7: Lock 1108_javase5_concurrency.zip

Download course content