Java SE 5 Concurrency
Concurrency Utilities: JSR-166
- Enables development of simple yet powerful multithreaded applications
- Like Collection provides rich data structure handling capability
- Beat C performance in high-end server applications
- Provide richer set of concurrency building blocks
- wait(), notify() and synchronized are too primitive
- Enhance scalability, performance, readability and thread safety of Java applications
Why Use Concurrency Utilities?
- Reduced programming effort
- Increased performance
- Increased reliability
- Eliminate threading hazards such as deadlock, starvation, race conditions, or excessive context switching are eliminated
- Improved maintainability
- Increased productivity
Concurrency Utilities
- Task Scheduling Framework
- Callable's and Future's
- Semaphores
- Concurrent Collections
- Atomic Variables
- Locks
- Nanosecond-granularity timing
Task Scheduling Framework
- Executor/ExecutorService/Executors framework supports
- Standardizing task submission
- Scheduling
- Execution
- Executor is an interface
- ExecutorService interface extends Executor
- Executors is factory class for creating various kinds of ExercutorService implementations
Executor Interface
- Executor interface provides a way of de-coupling task submission from the execution
- Task submission is standardized
- Task execution: mechanics of how each task will be run, including details of thread use, scheduling – captured in the implementation class
- Example:
Executor executor = getSomeKindofExecutor();
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
- Many Executor implementations impose some sort of limitation on how and when tasks are scheduled
Executor and ExecutorService
ExecutorService adds lifecycle management
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout,
TimeUnit unit);
// other convenience methods for submitting tasks
}
ExecutorService adds task submission methods
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout,
TimeUnit unit);
// other convenience methods for submitting tasks
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
}
execute(..) vs submit(..)
// For submitting a Task, you can use either
// execute(..) method of Executor interface
// or submit(..) method of ExecutorService interface
// Example of using execute(..) method of Executor
// interface
Executor executor = geExecutor();
executor.execute(new MyTask());
// Example of using submit(..) method of
// ExecutorService interface
ExecutorService executorService = getExecutorService();
Future f = executorService.submit(new MyTask());
// You can the use Future object for find more
// information on the task
String doneStatus = future.isDone();
Executors: Factory for creating various types of ExecutorService
public class Executors {
static ExecutorService
newFixedThreadPool(int n);
static ExecutorService
newCachedThreadPool(int n);
static ExecutorService
newSingleThreadedExecutor();
static ScheduledExecutorService
newScheduledThreadPool(int n);
static ScheduledExecutorService
newSingleThreadScheduledExecutor();
// additional versions specifying ThreadFactory
// additional utility methods
newFixedThreadPool(int n)
public class Executors {
// newFixedThreadPool() Creates a thread pool
// that reuses a fixed number of threads operating
// off a shared unbounded queue.
static ExecutorService
newFixedThreadPool(int n);
..
}
----------------------------------------------
// Usage example – Create ExecutorService object
ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_THREADS);
// Submit a task
future = executorService.submit(new MyTask(i));
// Check if the task is done
String doneStatus = future.isDone();
pre-J2SE 5.0 Code
Web Server—poor resource management
class WebServer {
public static void main(String[] args) {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable r = new Runnable() {
public void run() {
handleRequest(connection);
}
};
// Don't do this!
new Thread(r).start();
}
}
}
Executors Example
Web Server—better resource management
class WebServer {
Executor pool =
Executors.newFixedThreadPool(7);
public static void main(String[] args) {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable r = new Runnable() {
public void run() {
handleRequest(connection);
}
};
pool.execute(r);
}
}
}
Lab: Exercise 1 1108_javase5_concurrency.zip
newCachedThreadPool()
...
// newCachedThreadPool() creates a thread pool that creates
// new threads as needed, but will reuse previously constructed
// threads when they are available.
static ExecutorService
newCachedThreadPool();
..
}
----------------------------------------------------------
// Usage example – Create ExecutorService object
ExecutorService executorService =
Executors.newCachedThreadPool();
// Submit a task
future = executorService.submit(new MyTask(i))
// Check if the task is done
String doneStatus = future.isDone()
newScheduledThreadPool()
public class Executors {
..
// newScheduledThreadPool(POOL_SIZE) creates a thread pool
// that can schedule commands to run after a given delay, or
// to execute periodically
static ScheduledExecutorService
newScheduledThreadPool(int corePoolSize);
..
}
----------------------------------------------------------
// Usage example – Create ScheduledExecutorService object
ScheduledExecutorService scheduledExecutedService =
Executors.newScheduledThreadPool(POOL_SIZE);
// Schedule a task
ScheduledFuture<?> timeHandle1 =
scheduledExecutedService.scheduleAtFixedRate(
new TimePrinterTask1(System.out), // Task to execute
1, // Initial delay
3, // the period between successive executions
SECONDS); // the time unit
Lab: Exercise 2 1108_javase5_concurrency.zip
Callables and Futures
Callable's and Future's: Problem (pre-J2SE 5.0)
- If a new thread (callee thread) is started in an application, there is currently no way to return a result from that thread to the thread that started it (calling thread) without the use of a shared variable and appropriate synchronization
- This is complex and makes code harder to understand and maintain
- Callable thread (Callee) implements Callable interface
- Implement call() method rather than run()
- Calling thread (Caller) submits Callable object to Executor and then moves on
- Through submit() not execute()
- The submit() returns a Future object
- Calling thread (Caller) then retrieves the result using get() method of Future object
- If result is ready, it is returned
- If result is not ready, calling thread will block
Build CallableExample (This is Callee)
class CallableExample
implements Callable<String> {
public String call() {
/* Do some work and create a result */
String result = “The work is ended”;
return result;
}
}
Future Example (Caller)
ExecutorService es =
Executors.newSingleThreadExecutor();
Future<String> f =
es.submit(new CallableExample());
/* Do some work in parallel */
/* Then later on, check the result */
try {
String callableResult = f.get();
} catch (InterruptedException ie) {
/* Handle */
} catch (ExecutionException ee) {
/* Handle */
}
Lab: Exercise 3: Callable and Future 1108_javase5_concurrency.zip
Semaphores
- Typically used to restrict access to fixed size pool of resources
- New Semaphore object is created with same count as number of resources
- Thread trying to access resource calls acquire()
- Returns immediately if semaphore count > 0
- Blocks if count is zero until release() is called by different thread
- acquire() and release() are thread safe atomic operations
Semaphore Example
private Semaphore semaphore;
private Resource[] resources;
private boolean[] used;
public Resource(int poolSize) {
semaphore = new Semaphore(poolSize);
/* Initialise resource pool */
}
public Resource getResource() {
try { semaphore.acquire() } catch (IE) {}
/* Acquire resource */
}
public void returnResource(Resource r) {
/* Return resource to pool */
semaphore.release();
}
Lab: Exercise 4: Semaphore 1108_javase5_concurrency.zip
Concurrent Collections
BlockingQueue Interface
- Provides thread safe way for multiple threads to manipulate collection
- ArrayBlockingQueue is simplest concrete implementation
- Full set of methods
- put()
- offer() [non-blocking]
- peek()
- take()
- poll() [non-blocking and fixed time blocking]
Blocking Queue Example (1)
private BlockingQueue<String> msgQueue;
public Logger(BlockingQueue<String> mq) {
msgQueue = mq;
}
public void run() {
try {
while (true) {
String message = msgQueue.take();
/* Log message */
}
} catch (InterruptedException ie) {
/* Handle */
}
}
Blocking Queue Example (2)
private ArrayBlockingQueue messageQueue =
new ArrayBlockingQueue<String>(10);
Logger logger = new Logger(messageQueue);
public void run() {
String someMessage;
try {
while (true) {
/* Do some processing */
/* Blocks if no space available */
messageQueue.put(someMessage);
}
} catch (InterruptedException ie) { }
}
Lab: Exercise 5: BlockingQueue 1108_javase5_concurrency.zip
Concurrency: Atomic Variables
Atomics
- java.util.concurrent.atomic
- Small toolkit of classes that support lock-free threadsafe programming on single variables
AtomicInteger balance = new AtomicInteger(0);
public int deposit(integer amount) {
return balance.addAndGet(amount);
}
Lab: Exercise 6: Automatic Variable 1108_javase5_concurrency.zip
Concurrency: Locks
Lock Interface & ReentrantLock
- Lock interface
- More extensive locking operations than synchronized block
- No automatic unlocking – use try/finally to unlock
- Non-blocking access using tryLock()
- ReentrantLock class
- Concrete implementation of Lock
- Holding thread can call lock() multiple times and not block
- Useful for recursive code
ReadWriteLock Interface & ReentrantReadWriteLock
- ReadWriteLock
- Has two locks controlling read and write access
- Multiple threads can acquire the read lock if no threads have a write lock
- If a thread has a read lock, others can acquire read lock but nobody can acquire write lock
- If a thread has a write lock, nobody can have read/write lock
- Methods to access locks
rwl.readLock().lock();
rwl.writeLock().lock();
- ReentrantReadWriteLock
ReadWrite Lock Example
class ReadWriteMap {
final Map<String, Data> m = new TreeMap<String, Data>();
final ReentrantReadWriteLock rwl =
new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();
public Data get(String key) {
r.lock();
try { return m.get(key) }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}
Lab: Exercise 7: Lock 1108_javase5_concurrency.zip
Download course content