Author: doogie
Date: Sun Apr 4 19:14:04 2010 New Revision: 930739 URL: http://svn.apache.org/viewvc?rev=930739&view=rev Log: Thread-pool baesd dependency resolution framework. This implementation is also non-blocking. Added: ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java Modified: ofbiz/trunk/framework/base/build.xml ofbiz/trunk/framework/base/testdef/basetests.xml Modified: ofbiz/trunk/framework/base/build.xml URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/build.xml?rev=930739&r1=930738&r2=930739&view=diff ============================================================================== --- ofbiz/trunk/framework/base/build.xml (original) +++ ofbiz/trunk/framework/base/build.xml Sun Apr 4 19:14:04 2010 @@ -55,6 +55,7 @@ under the License. <file name="org/ofbiz/base/util/test/UtilIOTests.java"/> <file name="org/ofbiz/base/test/BaseUnitTests.java"/> <file name="org/ofbiz/base/util/collections/test/GenericMapTest.java"/> + <file name="org/ofbiz/base/concurrent/test/DependencyPoolTests.java"/> <file name="org/ofbiz/base/concurrent/test/SyncTTLObjectTest.java"/> <file name="org/ofbiz/base/concurrent/test/AsyncTTLObjectTest.java"/> <file name="org/ofbiz/base/concurrent/test/TTLCachedObjectTest.java"/> Added: ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java?rev=930739&view=auto ============================================================================== --- ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java (added) +++ ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java Sun Apr 4 19:14:04 2010 @@ -0,0 +1,139 @@ +package org.ofbiz.base.concurrent; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.ofbiz.base.lang.LockedBy; +import org.ofbiz.base.lang.SourceMonitored; + +@SourceMonitored +public class DependencyPool<K, I extends DependencyPool.Item<I, K, V>, V> { + private final Executor executor; + private final ConcurrentMap<K, I> allItems = new ConcurrentHashMap<K, I>(); + private final ConcurrentMap<K, Future<V>> results = new ConcurrentHashMap<K, Future<V>>(); + private final ReentrantLock submitLock = new ReentrantLock(); + private final Condition submitCondition = submitLock.newCondition(); + @LockedBy("submitLock") + private final Set<I> outstanding = new HashSet<I>(); + @LockedBy("submitLock") + private final List<I> pending = new LinkedList<I>(); + + public DependencyPool(Executor executor) { + this.executor = executor; + } + + public void add(I item) { + if (allItems.putIfAbsent(item.getKey(), item) == null) { + submitLock.lock(); + try { + pending.add(item); + } finally { + submitLock.unlock(); + } + } + } + + public void addAll(Collection<I> items) { + for (I item: items) { + add(item); + } + } + + public void start() { + submitLock.lock(); + try { + submitWork(); + } finally { + submitLock.unlock(); + } + } + + public V getResult(I item) throws InterruptedException, ExecutionException { + Future<V> future = results.get(item.getKey()); + if (future == null) { + return null; + } else { + return future.get(); + } + } + + public int getResultCount() { + return results.size(); + } + + public boolean await() throws InterruptedException { + submitLock.lock(); + try { + submitWork(); + while (!outstanding.isEmpty()) { + submitCondition.await(); + } + return pending.isEmpty(); + } finally { + submitLock.unlock(); + } + } + + @LockedBy("submitLock") + private int submitWork() { + Iterator<I> pendingIt = pending.iterator(); + int submittedCount = 0; +OUTER: + while (pendingIt.hasNext()) { + I item = pendingIt.next(); + for (K dep: item.getDependencies()) { + if (!results.containsKey(dep)) { + continue OUTER; + } + } + submittedCount++; + pendingIt.remove(); + outstanding.add(item); + executor.execute(new ItemTask(item)); + } + return submittedCount; + } + + private class ItemTask extends FutureTask<V> { + private final I item; + + protected ItemTask(I item) { + super(item); + this.item = item; + } + + protected void done() { + super.done(); + results.put(item.getKey(), this); + submitLock.lock(); + try { + outstanding.remove(item); + if (submitWork() == 0 && outstanding.isEmpty()) { + submitCondition.signal(); + } + } finally { + submitLock.unlock(); + } + } + } + + public interface Item<I extends Item<I, K, V>, K, V> extends Callable<V> { + K getKey(); + Collection<K> getDependencies(); + } +} Added: ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java?rev=930739&view=auto ============================================================================== --- ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java (added) +++ ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java Sun Apr 4 19:14:04 2010 @@ -0,0 +1,108 @@ +package org.ofbiz.base.concurrent.test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; + +import org.ofbiz.base.concurrent.DependencyPool; +import org.ofbiz.base.concurrent.ExecutionPool; +import org.ofbiz.base.lang.SourceMonitored; +import org.ofbiz.base.test.GenericTestCaseBase; +import org.ofbiz.base.util.UtilMisc; + +@SourceMonitored +public class DependencyPoolTests extends GenericTestCaseBase { + public DependencyPoolTests(String name) { + super(name); + } + + public void testDependencyPool() throws Exception { + // always use more threads than cpus, so that the single-cpu case can be tested + ScheduledExecutorService executor = ExecutionPool.getNewOptimalExecutor(getName()); + DependencyPool pool = new DependencyPool(executor); + int itemSize = 100, depMax = 5, subMax = 3; + List<TestItem> items = new ArrayList<TestItem>(itemSize); + List<TestItem> previousItems = new ArrayList<TestItem>(itemSize); + for (int i = 0; i < itemSize; i++) { + int depSize = (int) (Math.random() * Math.min(depMax, itemSize - i - 1)); + List<Integer> deps = new ArrayList<Integer>(depSize); + for (int j = i + 1, k = 0; j < itemSize && k < depSize; j++) { + if (Math.random() * (itemSize - j) / (depSize - k + 1) < 1) { + deps.add(j); + k++; + } + } + int subSize = (int) (Math.random() * Math.min(subMax, i)); + List<TestItem> subItems = new ArrayList<TestItem>(subSize); +OUTER: + for (int j = 0; j < previousItems.size() && subItems.size() < subSize;) { + if (Math.random() * j < 1) { + TestItem previousItem = previousItems.get(j); + for (int k = 0; k < deps.size(); k++) { + if (previousItem.getDependencies().contains(deps.get(k))) { + j++; + continue OUTER; + } + } + subItems.add(previousItem); + previousItems.remove(j); + } else { + j++; + } + } + TestItem item = new TestItem(pool, Integer.valueOf(i), Integer.toString(i), deps, subItems); + items.add(item); + previousItems.add(item); + } + pool.addAll(items); + pool.start(); + pool.await(); + assertEquals("result count", itemSize, pool.getResultCount()); + for (int i = 0; i < itemSize; i++) { + TestItem item = items.get(i); + assertEquals("item(" + i + ") result", Integer.toString(i), pool.getResult(item)); + } + executor.shutdown(); + } + + private static class TestItem implements DependencyPool.Item<TestItem, Integer, String> { + private final DependencyPool pool; + private final Integer key; + private final String result; + private final Collection<Integer> dependencies; + private final Collection<TestItem> subItems; + + protected TestItem(DependencyPool pool, Integer key, String result, Collection<Integer> dependencies, Collection<TestItem> subItems) { + this.pool = pool; + this.key = key; + this.result = result; + this.dependencies = dependencies; + this.subItems = subItems; + } + + public Integer getKey() { + return key; + } + + public Collection<Integer> getDependencies() { + return dependencies; + } + + public Collection<TestItem> getSubItems() { + return subItems; + } + + public String call() throws Exception { + int sleepTime = (int) (Math.random() * 100); + Thread.sleep(sleepTime); + if (!subItems.isEmpty()) { + pool.addAll(subItems); + } + return result; + } + } +} Modified: ofbiz/trunk/framework/base/testdef/basetests.xml URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/testdef/basetests.xml?rev=930739&r1=930738&r2=930739&view=diff ============================================================================== --- ofbiz/trunk/framework/base/testdef/basetests.xml (original) +++ ofbiz/trunk/framework/base/testdef/basetests.xml Sun Apr 4 19:14:04 2010 @@ -31,6 +31,7 @@ <junit-test-suite class-name="org.ofbiz.base.util.cache.test.UtilCacheTests"/> <junit-test-suite class-name="org.ofbiz.base.conversion.test.DateTimeTests.java"/> <junit-test-suite class-name="org.ofbiz.base.conversion.test.MiscTests.java"/> + <junit-test-suite class-name="org.ofbiz.base.concurrent.test.DepedencyPoolTests"/> <junit-test-suite class-name="org.ofbiz.base.util.test.UtilIOTests"/> <junit-test-suite class-name="org.ofbiz.base.test.BaseUnitTests"/> </test-case> |
Free forum by Nabble | Edit this page |