Author: lektran
Date: Sun Apr 7 07:39:47 2019 New Revision: 1857071 URL: http://svn.apache.org/viewvc?rev=1857071&view=rev Log: Implemented: Allow Jobs to specify a priority and be queued accordingly by the JobPoller so that important jobs can be prioritized over normal jobs, and low priority jobs can be left until last. [OFBIZ-10865] Changes are as follows: * Add a "priority" field to JobSandbox entity (numeric/Long) * Add JobPriority constants class containing fields LOW (0), NORMAL (50) and HIGH (100) * Add getPriority method to the Job interface and implement methods for AbstractJob (returns NORMAL), PersistedServiceJob (returns JobSandbox.priority) and PurgeJob (returns LOW) * Change the JobPoller executor's queue to use PriorityBlockingQueue (unbounded) instead of LinkedBlockingQueue (bounded) * Implement custom Comparator for the priority queue to sort by priority descending and then runTime ascending * Change the poll size per poll to be (queueSize() - queue.size) instead of queue.remainingCapacity() due to the new queue being unbounded * I've also opted to limit the database poll query to the poll size using maxRows() because it seemed dangerous to me to use an unconstrained query on this table * Ensured recurring jobs receive the default (NORMAL) priority when being rescheduled so that they're sorted correctly on the next time they show up in the database poll * Ensured jobs generated at runtime are given a default priority of NORMAL Added: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java Modified: ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java Modified: ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml?rev=1857071&r1=1857070&r2=1857071&view=diff ============================================================================== --- ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml (original) +++ ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml Sun Apr 7 07:39:47 2019 @@ -889,6 +889,7 @@ under the License. <create-value value-field="runtimeData"/> <!-- Create Job For ProductGroupOrder --> + <!-- FIXME: Jobs should not be manually created --> <make-value entity-name="JobSandbox" value-field="jobSandbox"/> <sequenced-id sequence-name="JobSandbox" field="jobSandbox.jobId"/> <set field="jobId" from-field="jobSandbox.jobId"/> @@ -900,6 +901,7 @@ under the License. <set field="jobSandbox.runAsUser" value="system"/> <set field="jobSandbox.runtimeDataId" from-field="runtimeDataId"/> <set field="jobSandbox.maxRecurrenceCount" value="1" type="Long"/> + <set field="jobSandbox.priority" value="50" type="Long"/> <create-value value-field="jobSandbox"/> <set field="productGroupOrder.jobId" from-field="jobId"/> Modified: ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml?rev=1857071&r1=1857070&r2=1857071&view=diff ============================================================================== --- ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml (original) +++ ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml Sun Apr 7 07:39:47 2019 @@ -44,6 +44,7 @@ under the License. <field name="jobId" type="id"></field> <field name="jobName" type="name"></field> <field name="runTime" type="date-time"></field> + <field name="priority" type="numeric"></field> <field name="poolId" type="name"></field> <field name="statusId" type="id"></field> <field name="parentJobId" type="id"></field> Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java?rev=1857071&r1=1857070&r2=1857071&view=diff ============================================================================== --- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java (original) +++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java Sun Apr 7 07:39:47 2019 @@ -41,6 +41,7 @@ import org.apache.ofbiz.service.job.Gene import org.apache.ofbiz.service.job.Job; import org.apache.ofbiz.service.job.JobManager; import org.apache.ofbiz.service.job.JobManagerException; +import org.apache.ofbiz.service.job.JobPriority; /** * Generic Asynchronous Engine @@ -112,6 +113,7 @@ public abstract class GenericAsyncEngine jFields.put("loaderName", localName); jFields.put("maxRetry", (long) modelService.maxRetry); jFields.put("runtimeDataId", dataId); + jFields.put("priority", JobPriority.NORMAL); if (UtilValidate.isNotEmpty(authUserLoginId)) { jFields.put("authUserLoginId", authUserLoginId); } Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff ============================================================================== --- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java (original) +++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java Sun Apr 7 07:39:47 2019 @@ -115,4 +115,12 @@ public abstract class AbstractJob implem public Date getStartTime() { return (Date) startTime.clone(); } + + /* + * Returns JobPriority.NORMAL, the default setting + */ + @Override + public long getPriority() { + return JobPriority.NORMAL; + } } Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java?rev=1857071&r1=1857070&r2=1857071&view=diff ============================================================================== --- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java (original) +++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java Sun Apr 7 07:39:47 2019 @@ -72,5 +72,10 @@ public interface Job extends Runnable { * Returns the time this job is scheduled to start. */ Date getStartTime(); + + /** + * Returns the priority of this job, higher the number the higher the priority + */ + long getPriority(); } Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java?rev=1857071&r1=1857070&r2=1857071&view=diff ============================================================================== --- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java (original) +++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java Sun Apr 7 07:39:47 2019 @@ -213,7 +213,10 @@ public final class JobManager { Debug.logWarning("Unable to poll JobSandbox for jobs; unable to begin transaction.", module); return poll; } - try (EntityListIterator jobsIterator = EntityQuery.use(delegator).from("JobSandbox").where(mainCondition).orderBy("runTime").queryIterator()) { + try (EntityListIterator jobsIterator = EntityQuery.use(delegator) + .from("JobSandbox").where(mainCondition) + .orderBy("priority DESC NULLS LAST", "runTime") + .maxRows(limit).queryIterator()) { GenericValue jobValue = jobsIterator.next(); while (jobValue != null) { // Claim ownership of this value. Using storeByCondition to avoid a race condition. @@ -546,7 +549,8 @@ public final class JobManager { jobName = Long.toString((new Date().getTime())); } Map<String, Object> jFields = UtilMisc.<String, Object> toMap("jobName", jobName, "runTime", new java.sql.Timestamp(startTime), - "serviceName", serviceName, "statusId", "SERVICE_PENDING", "recurrenceInfoId", infoId, "runtimeDataId", dataId); + "serviceName", serviceName, "statusId", "SERVICE_PENDING", "recurrenceInfoId", infoId, "runtimeDataId", dataId, + "priority", JobPriority.NORMAL); // set the pool ID if (UtilValidate.isNotEmpty(poolName)) { jFields.put("poolId", poolName); Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java?rev=1857071&r1=1857070&r2=1857071&view=diff ============================================================================== --- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java (original) +++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java Sun Apr 7 07:39:47 2019 @@ -20,13 +20,14 @@ package org.apache.ofbiz.service.job; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -63,15 +64,48 @@ public final class JobPoller implements private static ThreadPoolExecutor createThreadPoolExecutor() { try { ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool(); - return new ThreadPoolExecutor(threadPool.getMinThreads(), threadPool.getMaxThreads(), threadPool.getTtl(), - TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(threadPool.getJobs()), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); + return new ThreadPoolExecutor( + threadPool.getMinThreads(), + threadPool.getMaxThreads(), + threadPool.getTtl(), + TimeUnit.MILLISECONDS, + new PriorityBlockingQueue<>(threadPool.getJobs(), createPriorityComparator()), + new JobInvokerThreadFactory(), + new ThreadPoolExecutor.AbortPolicy()); } catch (GenericConfigException e) { Debug.logError(e, "Exception thrown while getting <thread-pool> model, using default <thread-pool> values: ", module); - return new ThreadPoolExecutor(ThreadPool.MIN_THREADS, ThreadPool.MAX_THREADS, ThreadPool.THREAD_TTL, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(ThreadPool.QUEUE_SIZE), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); + return new ThreadPoolExecutor( + ThreadPool.MIN_THREADS, + ThreadPool.MAX_THREADS, + ThreadPool.THREAD_TTL, + TimeUnit.MILLISECONDS, + new PriorityBlockingQueue<>(ThreadPool.QUEUE_SIZE, createPriorityComparator()), + new JobInvokerThreadFactory(), + new ThreadPoolExecutor.AbortPolicy()); } } + private static Comparator<Runnable> createPriorityComparator() { + return new Comparator<Runnable>() { + + /** + * Sorts jobs by priority then by start time + */ + @Override + public int compare(Runnable o1, Runnable o2) { + Job j1 = (Job) o1; + Job j2 = (Job) o2; + // Descending priority (higher number returns -1) + int priorityCompare = Long.compare(j2.getPriority(), j1.getPriority()); + if (priorityCompare != 0) { + return priorityCompare; + } + // Ascending start time (earlier time returns -1) + return Long.compare(j1.getStartTime().getTime(), j2.getStartTime().getTime()); + } + }; + } + private static int pollWaitTime() { try { ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool(); @@ -82,6 +116,16 @@ public final class JobPoller implements } } + static int queueSize() { + try { + ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool(); + return threadPool.getJobs(); + } catch (GenericConfigException e) { + Debug.logError(e, "Exception thrown while getting <thread-pool> model, using default <thread-pool> values: ", module); + return ThreadPool.QUEUE_SIZE; + } + } + /** * Register a {@link JobManager} with the job poller. * @@ -170,6 +214,7 @@ public final class JobPoller implements try { executor.execute(job); } catch (Exception e) { + Debug.logError(e, module); job.deQueue(); } } @@ -197,6 +242,7 @@ public final class JobPoller implements private static class JobInvokerThreadFactory implements ThreadFactory { + @Override public Thread newThread(Runnable runnable) { return new Thread(runnable, "OFBiz-JobQueue-" + created.getAndIncrement()); } @@ -214,7 +260,7 @@ public final class JobPoller implements Thread.sleep(1000); } while (!executor.isShutdown()) { - int remainingCapacity = executor.getQueue().remainingCapacity(); + int remainingCapacity = queueSize() - executor.getQueue().size(); if (remainingCapacity > 0) { // Build "list of lists" Collection<JobManager> jmCollection = jobManagers.values(); Added: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java?rev=1857071&view=auto ============================================================================== --- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java (added) +++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java Sun Apr 7 07:39:47 2019 @@ -0,0 +1,7 @@ +package org.apache.ofbiz.service.job; + +public final class JobPriority { + public static final long LOW = 0; + public static final long NORMAL = 50; + public static final long HIGH = 100; +} Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff ============================================================================== --- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java (original) +++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java Sun Apr 7 07:39:47 2019 @@ -211,6 +211,10 @@ public class PersistedServiceJob extends newJob.set("currentRetryCount", 0L); } nextRecurrence = next; + // Set priority if missing + if (newJob.getLong("priority") == null) { + newJob.set("priority", JobPriority.NORMAL); + } delegator.createSetNextSeqId(newJob); if (Debug.verboseOn()) { Debug.logVerbose("Created next job entry: " + newJob, module); @@ -379,4 +383,17 @@ public class PersistedServiceJob extends public Date getStartTime() { return new Date(startTime); } + + /* + * Returns the priority stored in the JobSandbox.priority field, if no value is present + * then it defaults to AbstractJob.getPriority() + */ + @Override + public long getPriority() { + Long priority = jobValue.getLong("priority"); + if (priority == null) { + return super.getPriority(); + } + return priority; + } } Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff ============================================================================== --- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java (original) +++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java Sun Apr 7 07:39:47 2019 @@ -82,4 +82,12 @@ public class PurgeJob extends AbstractJo throw new InvalidJobException("Illegal state change"); } } + + /* + * Returns JobPriority.LOW + */ + @Override + public long getPriority() { + return JobPriority.LOW; + } } |
Free forum by Nabble | Edit this page |