svn commit: r930739 - in /ofbiz/trunk/framework/base: build.xml src/org/ofbiz/base/concurrent/DependencyPool.java src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java testdef/basetests.xml

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r930739 - in /ofbiz/trunk/framework/base: build.xml src/org/ofbiz/base/concurrent/DependencyPool.java src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java testdef/basetests.xml

doogie-3
Author: doogie
Date: Sun Apr  4 19:14:04 2010
New Revision: 930739

URL: http://svn.apache.org/viewvc?rev=930739&view=rev
Log:
Thread-pool baesd dependency resolution framework.  This implementation
is also non-blocking.

Added:
    ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java
    ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java
Modified:
    ofbiz/trunk/framework/base/build.xml
    ofbiz/trunk/framework/base/testdef/basetests.xml

Modified: ofbiz/trunk/framework/base/build.xml
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/build.xml?rev=930739&r1=930738&r2=930739&view=diff
==============================================================================
--- ofbiz/trunk/framework/base/build.xml (original)
+++ ofbiz/trunk/framework/base/build.xml Sun Apr  4 19:14:04 2010
@@ -55,6 +55,7 @@ under the License.
         <file name="org/ofbiz/base/util/test/UtilIOTests.java"/>
         <file name="org/ofbiz/base/test/BaseUnitTests.java"/>
         <file name="org/ofbiz/base/util/collections/test/GenericMapTest.java"/>
+        <file name="org/ofbiz/base/concurrent/test/DependencyPoolTests.java"/>
         <file name="org/ofbiz/base/concurrent/test/SyncTTLObjectTest.java"/>
         <file name="org/ofbiz/base/concurrent/test/AsyncTTLObjectTest.java"/>
         <file name="org/ofbiz/base/concurrent/test/TTLCachedObjectTest.java"/>

Added: ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java?rev=930739&view=auto
==============================================================================
--- ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java (added)
+++ ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java Sun Apr  4 19:14:04 2010
@@ -0,0 +1,139 @@
+package org.ofbiz.base.concurrent;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.ofbiz.base.lang.LockedBy;
+import org.ofbiz.base.lang.SourceMonitored;
+
+@SourceMonitored
+public class DependencyPool<K, I extends DependencyPool.Item<I, K, V>, V> {
+    private final Executor executor;
+    private final ConcurrentMap<K, I> allItems = new ConcurrentHashMap<K, I>();
+    private final ConcurrentMap<K, Future<V>> results = new ConcurrentHashMap<K, Future<V>>();
+    private final ReentrantLock submitLock = new ReentrantLock();
+    private final Condition submitCondition = submitLock.newCondition();
+    @LockedBy("submitLock")
+    private final Set<I> outstanding = new HashSet<I>();
+    @LockedBy("submitLock")
+    private final List<I> pending = new LinkedList<I>();
+
+    public DependencyPool(Executor executor) {
+        this.executor = executor;
+    }
+
+    public void add(I item) {
+        if (allItems.putIfAbsent(item.getKey(), item) == null) {
+            submitLock.lock();
+            try {
+                pending.add(item);
+            } finally {
+                submitLock.unlock();
+            }
+        }
+    }
+
+    public void addAll(Collection<I> items) {
+        for (I item: items) {
+            add(item);
+        }
+    }
+
+    public void start() {
+        submitLock.lock();
+        try {
+            submitWork();
+        } finally {
+            submitLock.unlock();
+        }
+    }
+
+    public V getResult(I item) throws InterruptedException, ExecutionException {
+        Future<V> future = results.get(item.getKey());
+        if (future == null) {
+            return null;
+        } else {
+            return future.get();
+        }
+    }
+
+    public int getResultCount() {
+        return results.size();
+    }
+
+    public boolean await() throws InterruptedException {
+        submitLock.lock();
+        try {
+            submitWork();
+            while (!outstanding.isEmpty()) {
+                submitCondition.await();
+            }
+            return pending.isEmpty();
+        } finally {
+            submitLock.unlock();
+        }
+    }
+
+    @LockedBy("submitLock")
+    private int submitWork() {
+        Iterator<I> pendingIt = pending.iterator();
+        int submittedCount = 0;
+OUTER:
+        while (pendingIt.hasNext()) {
+            I item = pendingIt.next();
+            for (K dep: item.getDependencies()) {
+                if (!results.containsKey(dep)) {
+                    continue OUTER;
+                }
+            }
+            submittedCount++;
+            pendingIt.remove();
+            outstanding.add(item);
+            executor.execute(new ItemTask(item));
+        }
+        return submittedCount;
+    }
+
+    private class ItemTask extends FutureTask<V> {
+        private final I item;
+
+        protected ItemTask(I item) {
+            super(item);
+            this.item = item;
+        }
+
+        protected void done() {
+            super.done();
+            results.put(item.getKey(), this);
+            submitLock.lock();
+            try {
+                outstanding.remove(item);
+                if (submitWork() == 0 && outstanding.isEmpty()) {
+                    submitCondition.signal();
+                }
+            } finally {
+                submitLock.unlock();
+            }
+        }
+    }
+
+    public interface Item<I extends Item<I, K, V>, K, V> extends Callable<V> {
+        K getKey();
+        Collection<K> getDependencies();
+    }
+}

Added: ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java?rev=930739&view=auto
==============================================================================
--- ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java (added)
+++ ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java Sun Apr  4 19:14:04 2010
@@ -0,0 +1,108 @@
+package org.ofbiz.base.concurrent.test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.ofbiz.base.concurrent.DependencyPool;
+import org.ofbiz.base.concurrent.ExecutionPool;
+import org.ofbiz.base.lang.SourceMonitored;
+import org.ofbiz.base.test.GenericTestCaseBase;
+import org.ofbiz.base.util.UtilMisc;
+
+@SourceMonitored
+public class DependencyPoolTests extends GenericTestCaseBase {
+    public DependencyPoolTests(String name) {
+        super(name);
+    }
+
+    public void testDependencyPool() throws Exception {
+        // always use more threads than cpus, so that the single-cpu case can be tested
+        ScheduledExecutorService executor = ExecutionPool.getNewOptimalExecutor(getName());
+        DependencyPool pool = new DependencyPool(executor);
+        int itemSize = 100, depMax = 5, subMax = 3;
+        List<TestItem> items = new ArrayList<TestItem>(itemSize);
+        List<TestItem> previousItems = new ArrayList<TestItem>(itemSize);
+        for (int i = 0; i < itemSize; i++) {
+            int depSize = (int) (Math.random() * Math.min(depMax, itemSize - i - 1));
+            List<Integer> deps = new ArrayList<Integer>(depSize);
+            for (int j = i + 1, k = 0; j < itemSize && k < depSize; j++) {
+                if (Math.random() * (itemSize - j) / (depSize - k + 1) < 1) {
+                    deps.add(j);
+                    k++;
+                }
+            }
+            int subSize = (int) (Math.random() * Math.min(subMax, i));
+            List<TestItem> subItems = new ArrayList<TestItem>(subSize);
+OUTER:
+            for (int j = 0; j < previousItems.size() && subItems.size() < subSize;) {
+                if (Math.random() * j < 1) {
+                    TestItem previousItem = previousItems.get(j);
+                    for (int k = 0; k < deps.size(); k++) {
+                        if (previousItem.getDependencies().contains(deps.get(k))) {
+                            j++;
+                            continue OUTER;
+                        }
+                    }
+                    subItems.add(previousItem);
+                    previousItems.remove(j);
+                } else {
+                    j++;
+                }
+            }
+            TestItem item = new TestItem(pool, Integer.valueOf(i), Integer.toString(i), deps, subItems);
+            items.add(item);
+            previousItems.add(item);
+        }
+        pool.addAll(items);
+        pool.start();
+        pool.await();
+        assertEquals("result count", itemSize, pool.getResultCount());
+        for (int i = 0; i < itemSize; i++) {
+            TestItem item = items.get(i);
+            assertEquals("item(" + i + ") result", Integer.toString(i), pool.getResult(item));
+        }
+        executor.shutdown();
+    }
+
+    private static class TestItem implements DependencyPool.Item<TestItem, Integer, String> {
+        private final DependencyPool pool;
+        private final Integer key;
+        private final String result;
+        private final Collection<Integer> dependencies;
+        private final Collection<TestItem> subItems;
+
+        protected TestItem(DependencyPool pool, Integer key, String result, Collection<Integer> dependencies, Collection<TestItem> subItems) {
+            this.pool = pool;
+            this.key = key;
+            this.result = result;
+            this.dependencies = dependencies;
+            this.subItems = subItems;
+        }
+
+        public Integer getKey() {
+            return key;
+        }
+
+        public Collection<Integer> getDependencies() {
+            return dependencies;
+        }
+
+        public Collection<TestItem> getSubItems() {
+            return subItems;
+        }
+
+        public String call() throws Exception {
+            int sleepTime = (int) (Math.random() * 100);
+            Thread.sleep(sleepTime);
+            if (!subItems.isEmpty()) {
+                pool.addAll(subItems);
+            }
+            return result;
+        }
+    }
+}

Modified: ofbiz/trunk/framework/base/testdef/basetests.xml
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/testdef/basetests.xml?rev=930739&r1=930738&r2=930739&view=diff
==============================================================================
--- ofbiz/trunk/framework/base/testdef/basetests.xml (original)
+++ ofbiz/trunk/framework/base/testdef/basetests.xml Sun Apr  4 19:14:04 2010
@@ -31,6 +31,7 @@
         <junit-test-suite class-name="org.ofbiz.base.util.cache.test.UtilCacheTests"/>
         <junit-test-suite class-name="org.ofbiz.base.conversion.test.DateTimeTests.java"/>
         <junit-test-suite class-name="org.ofbiz.base.conversion.test.MiscTests.java"/>
+        <junit-test-suite class-name="org.ofbiz.base.concurrent.test.DepedencyPoolTests"/>
         <junit-test-suite class-name="org.ofbiz.base.util.test.UtilIOTests"/>
         <junit-test-suite class-name="org.ofbiz.base.test.BaseUnitTests"/>
     </test-case>