From f8ea4e4e5946d741b84e7ae8e4b96e89ddfd89f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Falko=20Br=C3=A4utigam?= Date: Mon, 13 Mar 2017 20:21:32 +0100 Subject: [PATCH] #36: SMP support and framework (http://github.com/moovida/jgrasstools/issues/36) NEW: BlockingExecutorService - ommit polling after task rejection - allow any (unbound) ExecutorService to be used as delegate --- .../geomorphology/aspect/OmsAspectMP.java | 80 +++++++-- .../libs/modules/BlockingExecutorService.java | 163 ++++++++++++++++++ .../gears/libs/modules/ExecutionPlanner.java | 15 +- .../libs/modules/FixedChunkSizePlanner.java | 12 +- 4 files changed, 239 insertions(+), 31 deletions(-) create mode 100644 jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/BlockingExecutorService.java diff --git a/hortonmachine/src/main/java/org/jgrasstools/hortonmachine/modules/geomorphology/aspect/OmsAspectMP.java b/hortonmachine/src/main/java/org/jgrasstools/hortonmachine/modules/geomorphology/aspect/OmsAspectMP.java index 9bb474692..98ef8463f 100644 --- a/hortonmachine/src/main/java/org/jgrasstools/hortonmachine/modules/geomorphology/aspect/OmsAspectMP.java +++ b/hortonmachine/src/main/java/org/jgrasstools/hortonmachine/modules/geomorphology/aspect/OmsAspectMP.java @@ -44,6 +44,7 @@ import java.awt.image.WritableRaster; +import javax.media.jai.iterator.RandomIter; import javax.media.jai.iterator.RandomIterFactory; import javax.media.jai.iterator.WritableRandomIter; @@ -52,19 +53,16 @@ import org.jgrasstools.gears.libs.modules.FixedChunkSizePlanner; import org.jgrasstools.gears.libs.modules.GridNode; import org.jgrasstools.gears.libs.modules.GridNodeMultiProcessing; -import org.jgrasstools.gears.libs.monitor.DummyProgressMonitor; +import org.jgrasstools.gears.libs.monitor.PrintStreamProgressMonitor; import org.jgrasstools.gears.utils.coverage.CoverageUtilities; import org.jgrasstools.gears.utils.math.NumericsUtilities; import org.jgrasstools.hortonmachine.i18n.HortonMessageHandler; -import oms3.ComponentAccess; import oms3.annotations.Author; import oms3.annotations.Description; import oms3.annotations.Documentation; import oms3.annotations.Execute; -import oms3.annotations.Finalize; import oms3.annotations.In; -import oms3.annotations.Initialize; import oms3.annotations.Keywords; import oms3.annotations.Label; import oms3.annotations.License; @@ -126,6 +124,47 @@ public void process() throws Exception { } + public void processSerial() throws Exception { + double radtodeg = doRadiants ? 1.0 : NumericsUtilities.RADTODEG; + + int rows = regionMap( inElev ).getRows(); + int cols = regionMap( inElev ).getCols(); + double xRes = regionMap( inElev ).getXres(); + double yRes = regionMap( inElev ).getYres(); + WritableRaster aspectWR = CoverageUtilities.createDoubleWritableRaster( cols, rows, null, null, null ); + WritableRandomIter aspectIter = RandomIterFactory.createWritable( aspectWR, null ); + + pm.beginTask( msg.message( "aspect.calculating" ), rows*cols ); + RandomIter elevationIter = CoverageUtilities.getRandomIterator( inElev ); + + // Cycling into the valid region. + for (int r = 1; r < rows - 1; r++) { + for (int c = 1; c < cols - 1; c++) { + GridNode node = new GridNode( elevationIter, cols, rows, xRes, yRes, c, r ); + double aspect = calculate( node, radtodeg ); + aspectIter.setSample( node.col, node.row, 0, aspect ); + pm.worked( 1 ); + } + } + } + + + public void processPlanner() throws Exception { + double radtodeg = doRadiants ? 1.0 : NumericsUtilities.RADTODEG; + + int rows = regionMap( inElev ).getRows(); + int cols = regionMap( inElev ).getCols(); + WritableRaster aspectWR = CoverageUtilities.createDoubleWritableRaster( cols, rows, null, null, null ); + WritableRandomIter aspectIter = RandomIterFactory.createWritable( aspectWR, null ); + + pm.beginTask( msg.message( "aspect.calculating" ), rows*cols ); + processGridNodes( inElev, gridNode -> { + double aspect = calculate( gridNode, radtodeg ); + aspectIter.setSample( gridNode.col, gridNode.row, 0, aspect ); + }); + } + + /** * Calculates the aspect in a given {@link GridNode}. * @@ -223,22 +262,31 @@ public double calculate( GridNode node, double radtodeg ) { public static void main( String[] args ) throws Exception { OmsAspectMP aspect = new OmsAspectMP(); - aspect.pm = new DummyProgressMonitor(); - //aspect.pm = new PrintStreamProgressMonitor(); + //aspect.pm = new DummyProgressMonitor(); + aspect.pm = new PrintStreamProgressMonitor(); - ExecutionPlanner.defaultPlannerFactory = () -> new FixedChunkSizePlanner(); - //ExecutionPlanner.defaultPlannerFactory = () -> new InThreadExecutionPlanner(); - - long start = System.currentTimeMillis(); - aspect.inElev = aspect.getRaster( "/home/falko/Data/ncrast/elevation_3857.tif" ); + // aspect.inElev = aspect.getRaster( "/home/falko/Data/ncrast/elevation.tif" ); + aspect.inElev = aspect.getRaster( "/home/falko/Data/ncrast/DTM_calvello/dtm_all.asc" ); System.out.println( "inElev: " + aspect.inElev ); + long start = System.currentTimeMillis(); + +// // serial +// start = System.currentTimeMillis(); +// aspect.processSerial(); +// System.out.println( "Serial: " + (System.currentTimeMillis()-start) + "ms" ); + + // FixedChunkSizePlanner + ExecutionPlanner.defaultPlannerFactory = () -> new FixedChunkSizePlanner(); + start = System.currentTimeMillis(); + aspect.processPlanner(); + System.out.println( "FixedChunkSizePlanner: " + (System.currentTimeMillis()-start) + "ms" ); - ComponentAccess.callAnnotated( aspect, Initialize.class, true ); - ComponentAccess.callAnnotated( aspect, Execute.class, false ); - ComponentAccess.callAnnotated( aspect, Finalize.class, true ); +// // InThreadExecutionPlanner +// ExecutionPlanner.defaultPlannerFactory = () -> new InThreadExecutionPlanner(); +// start = System.currentTimeMillis(); +// aspect.processPlanner(); +// System.out.println( "" + (System.currentTimeMillis()-start) + "ms" ); - System.out.println( "" + (System.currentTimeMillis()-start) + "ms" ); - System.out.println( "outAspect: " + aspect.outAspect ); System.exit( 0 ); } diff --git a/jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/BlockingExecutorService.java b/jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/BlockingExecutorService.java new file mode 100644 index 000000000..e4304925e --- /dev/null +++ b/jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/BlockingExecutorService.java @@ -0,0 +1,163 @@ +/* + * polymap.org + * Copyright (C) 2017, the @authors. All rights reserved. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 3.0 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + */ +package org.jgrasstools.gears.libs.modules; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Submits up to a maximum count tasks to a delegate {@link ExecutorService}. Blocks + * submitting thread if it thries to submit more than the maximum number of tasks. + * + * @author Falko Bräutigam + */ +public class BlockingExecutorService + implements ExecutorService { + + private ExecutorService delegate; + + private Semaphore taskCount; + + /** + * + * + * @param delegate + * @param maxTaskCount The maximum number of tasks to submit to the delegate. + */ + public BlockingExecutorService( ExecutorService delegate, int maxTaskCount ) { + this.delegate = delegate; + this.taskCount = new Semaphore( maxTaskCount ); + } + + protected void beforeSubmit() { + try { + taskCount.acquire(); + } + catch (InterruptedException e) { + throw new RuntimeException( e ); + } + } + + @Override + public void execute( Runnable command ) { + beforeSubmit(); + delegate.execute( () -> { + try { + command.run(); + } + finally { + taskCount.release(); + } + }); + } + + @Override + public Future submit( Callable task ) { + beforeSubmit(); + return delegate.submit( () -> { + try { + return task.call(); + } + finally { + taskCount.release(); + } + }); + } + + @Override + public Future submit( Runnable task, T result ) { + beforeSubmit(); + return delegate.submit( () -> { + try { + task.run(); + } + finally { + taskCount.release(); + } + }, result ); + } + + @Override + public Future submit( Runnable task ) { + beforeSubmit(); + return delegate.submit( () -> { + try { + task.run(); + } + finally { + taskCount.release(); + } + }); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException { + return delegate.awaitTermination( timeout, unit ); + } + + @Override + public List> invokeAll( Collection> tasks ) throws InterruptedException { + // XXX Auto-generated method stub + throw new RuntimeException( "not yet implemented." ); + } + + @Override + public List> invokeAll( Collection> tasks, long timeout, TimeUnit unit ) + throws InterruptedException { + // XXX Auto-generated method stub + throw new RuntimeException( "not yet implemented." ); + } + + @Override + public T invokeAny( Collection> tasks ) throws InterruptedException, ExecutionException { + // XXX Auto-generated method stub + throw new RuntimeException( "not yet implemented." ); + } + + @Override + public T invokeAny( Collection> tasks, long timeout, TimeUnit unit ) + throws InterruptedException, ExecutionException, TimeoutException { + // XXX Auto-generated method stub + throw new RuntimeException( "not yet implemented." ); + } + +} diff --git a/jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/ExecutionPlanner.java b/jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/ExecutionPlanner.java index 0a2b6dd25..bf5639eb9 100644 --- a/jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/ExecutionPlanner.java +++ b/jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/ExecutionPlanner.java @@ -19,7 +19,7 @@ package org.jgrasstools.gears.libs.modules; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -48,15 +48,18 @@ public Thread newThread( Runnable r ) { }; int procNum = Runtime.getRuntime().availableProcessors(); - defaultExecutor = new ThreadPoolExecutor( procNum, procNum, 60L, TimeUnit.SECONDS, - // keep a small number of chunks queued so that Thread.sleep() after - // refused submit has something to do - new LinkedBlockingDeque( procNum ), + ThreadPoolExecutor threadPool = new ThreadPoolExecutor( procNum, procNum, 60L, TimeUnit.SECONDS, + // with BlockingExecutorService on top we can have unbound queue + new LinkedTransferQueue(), + // new LinkedBlockingDeque( procNum ), // new SynchronousQueue(), // new ArrayBlockingQueue( procNum ) ); threadFactory ); + + defaultExecutor = new BlockingExecutorService( threadPool, procNum ); } + /** * The default {@link ExecutorService} to be used by all planners. *

@@ -64,7 +67,7 @@ public Thread newThread( Runnable r ) { * unbound queue or an unlimited number of threads. In other words, the executor * has to refuse submits when system resources are running out. */ - public static ExecutorService defaultExecutor; + public static final BlockingExecutorService defaultExecutor; /** * Set this to change the default planner for all modules. diff --git a/jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/FixedChunkSizePlanner.java b/jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/FixedChunkSizePlanner.java index 99065dccd..d359c19b6 100644 --- a/jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/FixedChunkSizePlanner.java +++ b/jgrassgears/src/main/java/org/jgrasstools/gears/libs/modules/FixedChunkSizePlanner.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; /** * @@ -31,7 +30,7 @@ public class FixedChunkSizePlanner /** * The absolut upper limit of the chunk size. */ - public static final int MAX_CHUNK_SIZE = 100000; + public static final int MAX_CHUNK_SIZE = 10000; private int targetChunkSize = -1; @@ -83,13 +82,8 @@ protected void submitChunk( List chunk ) { // submit boolean success = false; for (int waitMillis=10; !success; waitMillis=Math.min( 100, waitMillis*2 ) ) { - try { - success = submitted.add( defaultExecutor.submit( work ) ); - } - catch (RejectedExecutionException e) { - // System.out.println( "waiting " + waitMillis + "ms ..." ); - try { Thread.sleep( waitMillis ); } catch (InterruptedException e1) {} - } + // System.out.println( Thread.currentThread().getName() + ": " + taskCount.availablePermits() ); + success = submitted.add( defaultExecutor.submit( work ) ); } }