Skip to content

Concurrency with Java

Furkan Kürşat Danışmaz edited this page Jan 1, 2019 · 2 revisions

Outline

  • Executor
  • Executor Service
  • Scheduled Executor Service
  • Future
  • Runnable vs Callable
  • Lock vs Synchronized
  • Volatile
  • wait() vs sleep()
  • Semaphore
  • Blocking Queue
  • Spring Async

Executor

Executor is designed to be the common interface of the classes that are able to execute provided tasks. The provided task can be executed in a seperate thread or the in the same thread based on the implementation.

public class MyExecutor implements Executor {
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

MyExecutor e = new MyExecutor();
e.execute(() -> {
    ...
});

or

Executor e = runnable -> runnable.run();
e.execute(() -> {
    ...
});

Using this interface, we can decouple the task execution flow from the actual execution mechanism.

Executor Service

ExecutorService is more capable for async operations. It has an in-memory queue where we can submit tasks (Runnable or Callable) and it can schedule them based on the thread availability.

ExecutorService e = Executors.newFixedThreadPool(10);
e.submit(() -> { 
...
});

We can also create a single threaded ExecutorService instance like below:

ExecutorService e = Executors.newSingleThreadExecutor();

Once we create an ExecutorService instance, we can assign tasks to it with its submit method. It also has 2 different termination methods shutdown() and shutdownNow(). The former waits till all submitted tasks are finished whereas the latter terminates immediately all pending and executing tasks.

There is also another termination method called awaitTermination which blocks until all tasks are completed after a shutdown event is triggered.

try {
    executorService.awaitTermination(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    ...
}

Scheduled Executor Service

ScheduledExecutorService is similar to the ExecutorService, but it can perform tasks periodically.

ScheduledExecutorService s = Executors.newSingleThreadedScheduledExecutor();
Future<String> future = s.schedule(() -> { ... }, 1, TimeUnit.SECONDS);
s.shutdown();

The schedule(runnable or callable, delay, time-unit) takes three parameters. The API also has scheduleAtFixedRate and scheduleWithFixDelay methods.

scheduleAtFixedRate(runnable or callable, initial-delay, period, time-unit)
scheduleWithFixDelay(runnable or callable, initial-delay, delay, time-unit)

The former creates and executes a periodic action after the initial delay. The latter is very similar but it does not executes tasks with a fixed period. Instead, it puts a time interval (the delay parameter) between the one task's termination and the other's commencement.

For both of them, notice that if any of the task being executed encounters an exception, the subsequent executions are supressed.

Cached Thread Pool vs Fixed Thread Pool

With fixed thread pool you can create a pool of threads where the threads are reused. At most n number of threads are active at any point. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.

ExecutorService e = Executors.newFixedThreadPool(10);

Cached thread pool on the other hand, creates new threads as needed, but it reuses previously constructed threads when they are available. Using cached thread pool typically improves the performance of programs if they execute many short-lived tasks.

Be aware of that the threads that are not used for 60 seconds are terminated and removed from the cache.

ExecutorService e = Executors.newCachedThreadPool();

Future

Future represents a result of an async operation. It has methods for checking if the operation is completed or not (isDone() and isCancelled()) and getting the result (get()).

It also has cancel(interruptIfRunning: boolean) method which cancels the operation and releases the executing thread.

ExecutorService e = Executors.newCachedThreadPool();
Future<String> f = s.submit(() -> { return "a value"; });

We can also specify a timeout for a given operation.

try {
    ...
    future.get(10, TimeUnit.SECONDS);
}
catch (TimeoutException e) {
    ...
}

Functional Interface

A functional interface is an interface having only one abstract method. The compiler treats any interface meeting this criteria as a functional interface no matter the @FunctionalInterface annotation exists or not.

Instances of functional interfaces can be created with lambda.

Runnable r = () -> { ... };

Runnable vs Callable

They are very similar and they are both designed for classes instances of which are potentially executed in a separate thread.

A runnable does not return a result nor throw a checked exception whereas a callable does.

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

@FunctionalInterface
public interface Callable<T> {
    T call() throws Exception;
}

CountDownLatch

CountDownLatch is a class instances of which can be used for synchronization of a multi-thread application.

Below is an example of a scenario where you want to create a number of threads but do not want them to start until they are all created. Once they are all created, you want to let all the threads to start. Finally, you want to send notification when all threads complete their tasks.

In this scenario, we need 3 CountDownLatch instances:

  • One for monitoring how many threads are created and entered their run method.
  • One for giving the start sign to all threads
  • One for monitoring how many threads are completed
public static class Worker implements Runnable {
    private int id;
    private CountDownLatch readyThreadCounter;
    private CountDownLatch launcherBlocker;
    private CountDownLatch completedThreadCounter;

    public Worker(int id, CountDownLatch readyThreadCounter,
                  CountDownLatch launcherBlocker, CountDownLatch completedThreadCounter) {
        this.id = id;
        this.readyThreadCounter = readyThreadCounter;
        this.launcherBlocker = launcherBlocker;
        this.completedThreadCounter = completedThreadCounter;
    }

    @Override
    public void run() {
        // Thread is ready. Countdown
        System.out.println(String.format("Worker %d is ready", this.id));
        this.readyThreadCounter.countDown();

        // Wait for other threads to be ready
        try {
            this.launcherBlocker.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // do work
        System.out.println(String.format("Running job %d", this.id));
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // Thread has completed its job, countdown
        this.completedThreadCounter.countDown();
    }
}

public class App {
    private static CountDownLatch readyThreadCounter = new CountDownLatch(15);
    private static CountDownLatch launcherBlocker = new CountDownLatch(1);
    private static CountDownLatch completedThreadCounter = new CountDownLatch(15);

    public static int id = 1;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService s = Executors.newCachedThreadPool();
        System.out.println("Genearting threads...");
        List<? extends Runnable> workerList = Stream
                .generate(() -> new Worker(id++, readyThreadCounter, launcherBlocker, completedThreadCounter))
                .limit(15)
                .collect(Collectors.toList());

        workerList.forEach(worker -> s.submit(worker));
        System.out.println("Threads are generated, waiting for them to become ready...");
        readyThreadCounter.await(); // Wait until all threads get in their run method
        System.out.println("All threads are ready");

        // all threads are waiting for the begin sign, so let's start the threads
        Thread.sleep(5000);
        System.out.println("Firing the begin sign...");
        launcherBlocker.countDown();

        completedThreadCounter.await();
        // all threads have completed their work
        System.out.println("All the workers completed their jobs");

        s.shutdown();
    }
}

Lock vs Synchronized

A synchronized block is fully contained in a method where as we can use the lock() and unlock() methods of the Lock in separate methods. This is the main difference.

The synchornized keyword can be used in different places:

  • Instance methods
  • Static methods
  • Code blocks

Volatile

In java, each thread has a separate memory space called Working Memory. Threads read values of variables from the main memory to their working memory and updates the main memory after performing an operation on it.

Using the volatile keyword, we mark a variable so that all the threads always fo to the main memory both for reading it and updating its value.

So, when to use volatile:

  • Of course, it is meaningless in a single-threaded applicaiton
  • When we want to make sure each thread always gets the up-to-date value of a variable.

We can have the same thing using the synchronized keyword but it would probably be slower.

wait() vs sleep()

  • wait() is used for thread synchronization whereas sleep() is not.
  • wait() can only be called from a synchronized block whereas sleep() can be called from any context.
  • sleep() pauses the current thread and does not release any locks.
  • Threads the are paused with wait() method can be waken up via notify() or notifyAll() methods whereas a thread paused with the sleep() method is automatically waken up when the given period of time passes.

Semaphore

Semaphore is also used for thread synchronization. Unlike synchronized block, it allows a specific number of threads to enter to the critical section.

A semaphore contains a set of permits, which all the threads need to acquire before entering to the critial section. If no permit is available, then the threads are blocked and not allowed to enter to the critical section.

Semaphore s = new Semaphore(10);
...
if (s.tryAcquire()) {
    s.acquire();
    ...
    s.release();
}

Notice that you can specify timeout for acquiring permit like below:

s.tryAcquire(5L, TimeUnit.SECONDS);

BlockingQueue

The difference of BlockingQueue instances compared to other queue implementations is that they block the calling threads if it is not possible to do the required operation. Therefore, they are mostly used in multi-threaded applications.

There are 2 types of blocking queues:

  • Unbounded queue - can grow to the Integer.MAX_VALUE
  • Bounded queue - can grow to the specified limit

Methods:

  • add(element) - returns true or throws IllegalStateException
  • put(element) - inserts the specified element into the queue, or waits for a free slot if there are none
  • offer(element) - returns true if the insertion is successful, false otherwise
  • offer(element, timeout, time-unit) - tries to insert the given element to the queue for a given period of time. Returns false if it fails
  • take() - waits for a head element of a queue to become available and removes it. If the queue is empty, the calling thread is blocked until an element is available in the queue.
  • poll(timeout, time-unit) - retrieves and removes the head of the queue, waiting up to the specified time if necessary. Returns null after a timeout.

The ThreadLocal API

The ThreadLocal allows us to store data that can only be accessible from the associated thread.

Spring Async

In spring framework, annotating a method of a bean with @Async will make it executed in a separate thread.

Conditions:

  • Spring async should be enabled with @EnableAsync
  • @Async can be applied to public methods only
  • self-invocation does not work (when you call a method annotated with @Async from the same instance)
  • The return types of the @Async annotated methods should be Future<T> or void
@Async
public void asyncMethod() {
    ...
}

@Async
public Future<String> asyncMethod2() {
    ...
    return "a value";
}

By default, Spring uses SimpleAsyncTaskExecutor to run these @Async annotated methods. But, you can change that and make it use your own custom executor:

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    
    @Override
    public Executor getAsyncExecutor() {
        return new ThreadPoolTaskExecutor();
    }
}

When a method's return type is Future and if an exception is occurred during the execution of the related task, then the get() method will throw that exception. But, if the return type is void then the exception will not be propagated to the calling thread.

Task Executor Types

  • SyncTaskExecutor : Each invocation takes place in the calling thread
  • SimpleAsyncTaskExecutor : The default behavior. Does not reuse any threads. Starts up a new thread for each invocation
  • ConcurrentTaskExecutor : The adaptor implementation for java.util.concurrent.Executor
  • ThreadPoolTaskExecutor : The most commonly used implementation that has specific number of threads in its pool.
@Bean
TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
    t.setCorePoolSize(10);
    t.setMaxPoolSize(100);
    t.setQueueCapacity(500);
    ...
    return t;
}