Tuesday, March 31, 2015

Concurrent collections: CopyOnWriteArrayList


Working with collections in a multi-thread application is a challenge. Just imagine the list which is accessible by a few threads and every thread is seeking for a chance to change the list data, that’s a typical showcase of the ConcurrentModificationException. In order to get rid of this and other problems JDK (since version 1.5) provides improved mechanisms for storing ‘Iterable’ data in multithread application.
This article is an overview of widely used concurrent collection such as CopyOnWriteArrayList

The name of the collection is very straightforward, every ‘write’ operation (add, remove, set) causes the copying and creation of the modified collection. It allows us to prevent ConcurrentModificationException as long as every thread’s iterator will have its own copy of the collection. The official Oracle documentation names such iterators as “snapshot” style iterator; this iterator “uses a reference to the state of the array at the point that the iterator was created”

This is the add operation for CopyOnWriteArrayList


public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

Every time you modify collection the new copy of the whole collection is created – it’s quite expensive operation and that’s why it’s not recommended to use CopyOnWriteArrayList for frequently modified data. Another interesting method from the listing is setArray(newElements); it updates actual array and every new thread’s iterator will have updated version of the array – other thread iterators (which run in parallel) won’t be affected using their own local copies of the array.
Question: two or more threads modify the CopyOnArrayList simultaneously, what will be the result of their job?

Saturday, March 28, 2015

What is ThreadLocal?

Take a look on the following piece of code:

private int var;

public synchronized void changeVar() {
 var++;
}

Using the synchronized block we define that the var is available only for a thread which captures the monitor/lock of the synchronized block. For example, thread-A changes the value of var and leaves the block (releases monitor), after that thread-B captures the monitor and modifies the value of var changed by thread-A, var gets value 2. That’s a normal and expected behavior, but sometimes we need to have our variables with thread visibility scope.

What I mean is having var independently modified by thread-A and thread-B, if the thread-A calls changeVar method 10 times then var will have value 10 only for thread-A, thread-B may have var equals 0 if the thread has not call changeVar method; to do that we may use ThreadLocal (http://docs.oracle.com/javase/7/docs/api/java/lang/ThreadLocal.html) instance.

Monday, March 23, 2015

What is Future?



"The future depends on what you do today" M. Gandhi



Future is java interface: public interface Future<V>
A Future represents the result of an asynchronous computation.
Methods of Future interface are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation.  The class that implements Future might look as follows


  public class FutureImpl implements Future<String> {   

    public boolean cancel(boolean mayInterruptIfRunning) {      
      return false;
    }

    public boolean isCancelled() {      
      return false;
    }

    public boolean isDone() {     
      return false;
    }

    public String get() throws InterruptedException, ExecutionException {     
      return null;
    }

    public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
        TimeoutException {      
      return null;
    }       
  }
 

Methods of Future:
cancel() - attempts to cancel execution of this task. This attempt will fail if the task has already completed, has already been cancelled, or could not be cancelled for some other reason.
isDone() - Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true
get() - Waits if necessary for the computation to complete, and then retrieves its result.
get(long timeout, TimeUnit unit) - Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available

Thread pool and executors

In order to prevent overhead of thread creation and reuse existed threads Java supports thread pools since version 1.5. The mechanism of thread pool is based on the equally called pattern. Roughly speaking, thread pool is a queue of initialized threads which makes its usage less expensive neither than using classic single Thread approach.
Java Virtual Machine provides fast access to the queue and every thread has its own instruction to be performed; instructions are customized with objects that implement Callable or Runnable.

Instead of creating threads directly with operations like Thread th = new Thread(); you may use instance of Executor interface.


Executor executor = new Executor() {
    public void execute(Runnable command) {
        command.run();
     }
};
executor.execute(someRunnableInstance);


Executor uses already existed thread and makes the thread to perform operations defined at someRunnableInstance.
ExecutorService (http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html) extends Executor interface within new very useful methods. If you’d like to have custom class which implements ExecutorService you must override about 13 methods (jdk 1.8.0.31)
Let’s take a look on a few of them

  •  Future<?> submit(Runnable task), <T> Future<T> submit(Callable<T> task) – submits task to be executed.
  • void shutdown() – initiates the shutdown of the submitted task (no guarantee that the task will be stopped)

Saturday, March 21, 2015

Deadlock

Chip and Dale must save Zipper and beat Fat Cat then but they cannot decide who is responsible for saving Zipper and who is responsible for beating their enemy.


public class Deadlock {
  
 public void init() {
  
  RescueRangers chip = new RescueRangers();
  RescueRangers dale = new RescueRangers();
  chip.setMate(dale);
  dale.setMate(chip);
  
  Thread t1 = new Thread(chip, "Chip");
  Thread t2 = new Thread(dale, "Dale");
  t1.start();  
  t2.start();
 }  
 
 class RescueRangers implements Runnable {
  
  private RescueRangers mate;
  
  public void setMate(RescueRangers mate) {
   this.mate = mate;
  }
  
  public synchronized void saveZipper() {
   System.out.println("Saving Zipper " + Thread.currentThread().getName());
   mate.attackFatCat();   
  }
  
  public synchronized void attackFatCat() {
   System.out.println("Attack Fat Cat " + Thread.currentThread().getName());   
  }
  
  public void run() {
   saveZipper();
  }
 } 
}


Once you launch this code you will see that Chip and Dale cannot attack Fat Cat because they are stuck saving Zipper – it is Fat Cat’s trap.

Output:
Saving Zipper Chip
Saving Zipper Dale

Chip and Dale wait for each other saving Zipper, neither can proceed attacking Fat Cat until the other does first, so both are stuck.

Synchronization block and atomic operations

When two or more threads modify some single object state we can get unexpected result. For example, we expect to get the value of object modified by Thread-A, but we are getting some other unexpected value because object was also modified by Thread-B.


class MyObject {
 private String val;

 public String getVal() {
  return val;
 }

 public void setVal(String val) {
  this.val = val;
  for (int iter = 0; iter < 1000; ++iter) {
   String str = "Useless Operation";
   str += iter;
  }
  System.out.println(this.val);  
 }
  
}


Thread-A changed the state of the object using the setVal, then Thread-B also changed the object state and Thread-A will output the value that has been set by Thread-B. That’s the code snippet of threads initialization


Thread thread_a = new Thread(new Runnable() {

 @Override
 public void run() {
  obj.setVal("1");

 }
});

Thread thread_b = new Thread(new Runnable() {

 @Override
 public void run() {
  obj.setVal("9");

 }
});
thread_a.start();
thread_b.start();

The output is going to be “9”, “9”. Surely, if you removed the useless cycle from setVal then you would get proper results because the value would be printed by Thread-A faster than the Thread-B changes it. Another option to prevent this behavior is using synchronized keyword.

Friday, March 20, 2015

What is Thread?

Thread in java is:
1) an instance of java.lang.Thread
2) a thread of execution – some kind of “lightweight” process which has its own call stack
To initialize the instance of Thread it’s enough to use the default Thread constructor
Thread t = new Thread();
This instruction is useless, because thread does not have any functionality. Another Thread’s constructor which takes an instance of java.lang.Runnable interface allows us to bring some functionality to our thread.

Thread t = new Thread(new Runnable() {

 @Override
 public void run() {
   System.out.println("Hey from the thread");
 }
});

Note: as long as any instance of java.lang.Thread implements java.lang.Runnable we can pass any thread instance as a constructor parameter of other thread. To run our thread we need to invoke start 
method:

t.start();
Then we will see ‘Hey from the thread’ console message. 
Don’t be fooled by Thread’s run() method – it doesn’t launch your thread instructions for asynchronous execution but it in the same thread where this method has been invoked.
There are other Threads frequently used methods such as: sleep [static], yield [static], join. In order to remember what do they all need for it’d be better to take a look on a figure with possible thread states: 



Thursday, March 19, 2015

Apache Spark checkpoint issue on windows

"To keep track of the log statistics for all of time, state must be maintained between processing RDD's in a DStream.

To maintain state for key-pair values, the data may be too big to fit in memory on one machine - Spark Streaming can maintain the state for you. To do that, call the updateStateByKey function of the Spark Streaming library.

First, in order to use updateStateByKey, checkpointing must be enabled on the streaming context. To do that, just call checkpoint on the streaming context with a directory to write the checkpoint data." (from http://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/total.html)

When you enable checkpointing for your streaming context with ssc.checkpoint(<PATH_TO_DIRECTORY>); you may get the error messages

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
...
Exception in thread "pool-8-thread-1" java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)


Also, looking inside checkpoint directory you can find some files created at the moment of the stream processing, however these files are empty. These files store state information (act as checkpoint) without this data we cannot use updateStateByKey properly.

To solve this issue you need: