| ThreadUtil.java |
package ij.util;
import java.util.concurrent.*;
public class ThreadUtil {
/** Start all given threads and wait on each of them until all are done.
* From Stephan Preibisch's Multithreading.java class. See:
* http://repo.or.cz/w/trakem2.git?a=blob;f=mpi/fruitfly/general/MultiThreading.java;hb=HEAD
* @param threads
*/
public static void startAndJoin(Thread[] threads) {
for (int ithread = 0; ithread < threads.length; ++ithread) {
threads[ithread].setPriority(Thread.NORM_PRIORITY);
threads[ithread].start();
}
try {
for (int ithread = 0; ithread < threads.length; ++ithread) {
threads[ithread].join();
}
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
public static Thread[] createThreadArray(int nb) {
if (nb == 0) {
nb = getNbCpus();
}
Thread[] threads = new Thread[nb];
return threads;
}
public static Thread[] createThreadArray() {
return createThreadArray(0);
}
public static int getNbCpus() {
return Runtime.getRuntime().availableProcessors();
}
/*--------------------------------------------------------------------------*/
/* The following is for parallelization using a ThreadPool, which avoids the
* overhead of creating threads, and is therefore faster if each thread has
* only a short task to perform */
/** The threadPoolExecutor holds at least as many threads for parallel execution as the number of
* processors; additional threads are added as required. These additional threads will be
* terminated if idle for 120 seconds. */
public static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), //minimum number of threads
Integer.MAX_VALUE, //maximum number of threads
120, //unused threads are terminated after this time
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>() //requests will be processed immediately (not a real queue)
);
/** Starts all callables for parallel execution (using a ThreadPoolExecutor)
* and waits until each of them has finished.
* If the current thread is interrupted, each of the callables gets
* cancelled and interrupted. Also in that case, waits until all callables have
* finished. The 'interrupted' status of the current thread is
* preserved, as required for preview in an ImageJ ExtendedPlugInFilter.
* Note that ImageJ requires that all callables can run concurrently,
* and none of them must stay in the queue while others run.
* (This is required by the RankFilters, where the threads are not independent)
* @param callables Array of tasks. If no return value is needed,
* best use <code>Callable<Void></code> (then the <code>Void call()</code> method
* should return null). If the array size is 1, the <code>call()</code> method
* is executed in the current thread.
* @return Array of the <code>java.util.concurrent.Future</code>s,
* corresponding to the callables. If the call methods of the callables
* return results, the get() methods of these Futures may be used to get the results.
*/
public static Future[] startAndJoin(Callable[] callables) {
if (callables.length == 1) { //special case: call in current thread and create a Future
Object callResult = null;
try {
callResult = callables[0].call();
} catch (Exception e) {
ij.IJ.handleException(e);
}
final Object result = callResult;
Future[] futures = new Future[] {
new Future() {
public boolean cancel(boolean mayInterruptIfRunning) {return false;}
public Object get() {return result;}
public Object get(long timeout, TimeUnit unit) {return result;}
public boolean isCancelled() {return false;}
public boolean isDone() {return true;}
}
};
return futures;
} else {
Future[] futures = start(callables);
joinAll(futures);
return futures;
}
}
/** Starts all callables for parallel execution (using a ThreadPoolExecutor)
* without waiting for the results.
* @param callables Array of tasks; these might be <code>Callable<Void></code>
* if no return value is needed (then the <code>call</code> methods should
* return null).
* @return Array of the <code>java.util.concurrent.Future</code>s,
* corresponding to the callables. The futures may be used to wait for
* completion of the callables or cancel them.
* If the call methods of the callables return results, these Futures
* may be used to get the results.
*/
public static Future[] start(Callable[] callables) {
Future[] futures = new Future[callables.length];
for (int i=0; i<callables.length; i++)
futures[i] = threadPoolExecutor.submit(callables[i]);
return futures;
}
/** Waits for completion of all <code>Callable</code>s corresponding to the
* <code>Future</code>s given.
* If the current thread is interrupted, each of the <code>Callable</code>s
* gets cancelled and interrupted. Also in that case, this method waits
* until all callables have finished.
* The 'interrupted' status of the current thread is preserved,
* as required for preview in an ImageJ ExtendedPlugInFilter.
*/
public static void joinAll(Future[] futures) {
boolean interrupted = false;
for (int i=0; i<futures.length; i++) {
Future f = futures[i];
try {
f.get();
} catch (InterruptedException e) {
interrupted = true;
for (int j=i; j<futures.length; j++)
futures[j].cancel(true);
i--; //we still have to wait for completion of this one
} catch (CancellationException e) { //cancellation is allowed, e.g. during preview
} catch (Exception eOther) {
ij.IJ.log("Error in thread called by "+Thread.currentThread().getName()+":\n"+eOther);
}
}
if (interrupted) {
Thread.currentThread().interrupt();
threadPoolExecutor.purge();
}
}
}