Java Concurrency - Reference Guide
Table of Contents generated with DocToc
- Information and Guidelines
- Running and Scheduling Tasks
- Thread-Safe Data Structures
- Locks
- Thread-Local
- References
Information and Guidelines
- Multiple threads run concurrently, by using separate processors or different time slices on the same processor
- Prefer using parallel algorithms and threadsafe data structures over programming with locks
- Visibility: if multiple threads update the same variable, their updates may not be visible across other threads by
default; synchronisation (or volatile declaration) may be necessary
- Value of a final variable is visible after initialisation
- Initial value of static variable is available after static initialisation
- Changes to volatile variables are visible
- Changes that happen before releasing a lock are visible to anyone acquiring the same lock
(Ref: Java Concurrency in Practice, Brain Goetz)
- Race conditions: can occur whenever state shared across threads is mutated
- Strategies for safe concurrency
- Confinement
- Immutability
- Locking / synchronisation (however, Locking is error-prone, and it can be expensive since it reduces opportunities for concurrent execution)
- For
parallelStream
to work well,- The stream operations should not block (because it uses ForkJoinPool.commonPool by default and you donโt want to exhaust this common pool)
- The data should be in memory
- There needs to be enough data; There is a substantial overhead for parallel streams that is only repaid for large data sets
Running and Scheduling Tasks
- Tasks can be defined either using the
Runnable
orCallable
interface - Best to use
ExecutorService
to handle creation and scheduling of threads Executors.newCachedThreadPool()
is optimised for use-cases with many tasks that are short-lived or spend most of their time waiting; there is no bound on the umber of concurrent threads- A fixed size thread pool created using
Executors.newFixedThreadPool(nthreads)
is best suited for computationally intensive tasks so-as-to not exceed number of available processors (Runtime.getRuntime().availableProcessors()
) or to limit the resource consumption of a service - Running tasks defined using
Runnable
and scheduled using anExecutorService
Runnable hellos = () -> {
for (int i = 0; i < 1000; i++) {
System.out.println("Hello " + i);
}
};
Runnable goodbyes = () -> {
for (int i = 0; i < 1000; i++) {
System.out.println("Goodbye " + i);
}
};
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(hellos);
executor.execute(goodbyes);
- Running tasks defined using
Callable
and obtaining result wrapped in aFuture
Callable<Integer> callable1 = () -> {
int a;
// ... some processing
return a;
};
Callable<Integer> callable2 = () -> {
int a;
// ... some processing
return a;
};
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result1 = executor.submit(callable1);
Future<Integer> result2 = executor.submit(callable2);
// blocking calls
result1.get()
result2.get()
// Alternatively, a list of callables can be submitted to an executor
List<Callable<Integer>> callables = List.of(callable1, callable2);
List<Future<Integer>> results = executor.invokeAll(callables); // blocking call, waits for *all* callables to complete before returning
- Running a task asynchronously and obtaining a
CompletableFuture
// Using a Supplier that returns a result
Supplier<Integer> supplier = () -> {
int a;
// ... some processing
return sum;
};
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(supplier, executorService);
// Alternatively, if no executorService is supplied, ForkJoinPool.commonPool() is used
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(supplier);
// Using a Runnable that returns void
CompletableFuture.runAsync(() -> {
// do something
});
Callable<T>
vsSupplier<T>
- A callable can throw a checked exception, while supplier canโt
CompletableFuture.supplyAsync
takes aSupplier
as an argumentExecutorService.submit
takes aCallable
as an argument
- Composing CompletableFutures
public static CompletableFuture<String> readPage(URI url) {
// do something with the url...
}
public static CompletableFuture<URI> getURLInput(String prompt) {
// do something with the prompt...
}
public static void main(String[] args) {
getURLInput("example")
.thenCompose(uri -> readPage(uri))
.thenAccept(System.out::println);
}
- Actions on CompletableFutures (Ref: Core Java SE 9 for the Impatient)
Thread-Safe Data Structures
- ConcurrentHashMap
// Increment if present else initialise
// Not thread-safe
ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();
Long oldValue = map.get("foo");
Long newValue = oldValue == null ? 1 : oldValue + 1;
map.put("foo", newValue);
// thread-safe
map.compute("foo", (k, v) -> v == null ? 1 : v+1);
System.out.println(map.values().toString()); // prints [1]
// thread same
map = new ConcurrentHashMap<>();
map.merge("foo", 1L, (oldVal, newVal) -> oldVal + newVal);
System.out.println(map.values().toString()); // prints [1]
map.merge("foo", 1L, (oldVal, newVal) -> oldVal + newVal);
System.out.println(map.values().toString()); // prints [2]
- BlockingQueue
- CopyOnWriteArrayList
- CopyOnWriteArraySet
Locks
- Explicit locks
Lock countLock = new ReentrantLock(); // Shared among multiple threads
int count; // Shared among multiple threads
//...
countLock.lock();
try {
count++; // Critical section
} finally {
countLock.unlock(); // Make sure the lock is unlocked
}
- Intrinsic locks: use the
synchronized
keyword
public class Counter {
private int value;
public synchronized int increment() {
value++;
return value;
}
}
Thread-Local
- Define a thread-local with initial value
ThreadLocal<T> myThreadLocal = ThreadLocal.withInitial(Supplier<T>);
References
- Core Java SE 9 for the Impatient, Cay S. Hortsmann, Chpater 10 - Concurrent Programming
- Java Concurrency in Practice, Brain Goetz