|
Author: adrianc
Date: Tue Aug 7 19:11:06 2012 New Revision: 1370437 URL: http://svn.apache.org/viewvc?rev=1370437&view=rev Log: More work on the Job Scheduler: 1. Gave the job queue a fixed size so the job poller can't create an out-of-memory condition. 2. Changed min/max thread settings to more conservative values. 3. Changed JobSandbox polling code to accept a limit argument to control the number of records retrieved. Also used ELI to limit memory use. 4. Improved JobManager reloadCrashedJobs method to recover queued jobs that were being missed previously. At this stage the Job Scheduler has been fixed to not crash/saturate the server, but it can still lose jobs. That will be fixed in the next commit. Modified: ofbiz/trunk/framework/service/config/serviceengine.xml ofbiz/trunk/framework/service/dtd/service-config.xsd ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Modified: ofbiz/trunk/framework/service/config/serviceengine.xml URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/config/serviceengine.xml?rev=1370437&r1=1370436&r2=1370437&view=diff ============================================================================== --- ofbiz/trunk/framework/service/config/serviceengine.xml (original) +++ ofbiz/trunk/framework/service/config/serviceengine.xml Tue Aug 7 19:11:06 2012 @@ -25,15 +25,16 @@ under the License. <!-- Name of the service to use for authorization --> <authorization service-name="userLogin"/> - <!-- Thread pool configuration (max/min threads, uses to live and time to live) --> + <!-- Job poller configuration. Many of these attributes are set to the job poller defaults, but they are included here for convenience. --> <thread-pool send-to-pool="pool" purge-job-days="4" failed-retry-min="3" - ttl="18000000" - min-threads="5" - max-threads="15" + ttl="120000" + jobs="100" + min-threads="2" + max-threads="5" poll-enabled="true" - poll-db-millis="20000"> + poll-db-millis="30000"> <run-from-pool name="pool"/> </thread-pool> Modified: ofbiz/trunk/framework/service/dtd/service-config.xsd URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/dtd/service-config.xsd?rev=1370437&r1=1370436&r2=1370437&view=diff ============================================================================== --- ofbiz/trunk/framework/service/dtd/service-config.xsd (original) +++ ofbiz/trunk/framework/service/dtd/service-config.xsd Tue Aug 7 19:11:06 2012 @@ -58,38 +58,53 @@ under the License. <xs:element name="thread-pool"> <xs:complexType> <xs:sequence> - <xs:element minOccurs="0" maxOccurs="unbounded" ref="run-from-pool"/> + <xs:element name="run-from-pool" minOccurs="0" maxOccurs="unbounded"> + <xs:complexType> + <xs:attribute type="xs:string" name="name" use="required" /> + </xs:complexType> + </xs:element> </xs:sequence> - <xs:attributeGroup ref="attlist.thread-pool"/> + <xs:attribute type="xs:string" name="send-to-pool" use="required" /> + <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30" /> + <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30" /> + <xs:attribute type="xs:nonNegativeInteger" name="ttl"> + <xs:annotation> + <xs:documentation>Idle queue service thread lifespan in milliseconds. Defaults to "120000" (2 minutes).</xs:documentation> + </xs:annotation> + </xs:attribute> + <xs:attribute type="xs:nonNegativeInteger" name="jobs"> + <xs:annotation> + <xs:documentation>Job queue size. Defaults to "100".</xs:documentation> + </xs:annotation> + </xs:attribute> + <xs:attribute type="xs:nonNegativeInteger" name="min-threads"> + <xs:annotation> + <xs:documentation>Minimum number of queue service threads. Defaults to "1".</xs:documentation> + </xs:annotation> + </xs:attribute> + <xs:attribute type="xs:nonNegativeInteger" name="max-threads"> + <xs:annotation> + <xs:documentation>Maximum number of queue service threads. Defaults to "5".</xs:documentation> + </xs:annotation> + </xs:attribute> + <xs:attribute name="poll-enabled"> + <xs:annotation> + <xs:documentation>Enable database polling. Defaults to "true".</xs:documentation> + </xs:annotation> + <xs:simpleType> + <xs:restriction base="xs:token"> + <xs:enumeration value="true" /> + <xs:enumeration value="false" /> + </xs:restriction> + </xs:simpleType> + </xs:attribute> + <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis"> + <xs:annotation> + <xs:documentation>Database polling interval in milliseconds. Defaults to "30000" (30 seconds).</xs:documentation> + </xs:annotation> + </xs:attribute> </xs:complexType> </xs:element> - <xs:attributeGroup name="attlist.thread-pool"> - <xs:attribute type="xs:string" name="send-to-pool" use="required"/> - <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30"/> - <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30"/> - <xs:attribute type="xs:nonNegativeInteger" name="ttl" use="required"/> - <xs:attribute type="xs:nonNegativeInteger" name="wait-millis"/> <!-- deprecated --> - <xs:attribute type="xs:nonNegativeInteger" name="jobs"/> <!-- deprecated --> - <xs:attribute type="xs:nonNegativeInteger" name="min-threads" use="required"/> - <xs:attribute type="xs:nonNegativeInteger" name="max-threads" use="required"/> - <xs:attribute name="poll-enabled" default="true"> - <xs:simpleType> - <xs:restriction base="xs:token"> - <xs:enumeration value="true"/> - <xs:enumeration value="false"/> - </xs:restriction> - </xs:simpleType> - </xs:attribute> - <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis" use="required"/> - </xs:attributeGroup> - <xs:element name="run-from-pool"> - <xs:complexType> - <xs:attributeGroup ref="attlist.run-from-pool"/> - </xs:complexType> - </xs:element> - <xs:attributeGroup name="attlist.run-from-pool"> - <xs:attribute type="xs:string" name="name" use="required"/> - </xs:attributeGroup> <xs:element name="engine"> <xs:complexType> <xs:sequence> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370437&r1=1370436&r2=1370437&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 19:11:06 2012 @@ -38,13 +38,13 @@ import org.ofbiz.entity.Delegator; import org.ofbiz.entity.GenericEntityException; import org.ofbiz.entity.GenericValue; import org.ofbiz.entity.condition.EntityCondition; -import org.ofbiz.entity.condition.EntityConditionList; import org.ofbiz.entity.condition.EntityExpr; import org.ofbiz.entity.condition.EntityOperator; import org.ofbiz.entity.serialize.SerializeException; import org.ofbiz.entity.serialize.XmlSerializer; import org.ofbiz.entity.transaction.GenericTransactionException; import org.ofbiz.entity.transaction.TransactionUtil; +import org.ofbiz.entity.util.EntityListIterator; import org.ofbiz.service.DispatchContext; import org.ofbiz.service.LocalDispatcher; import org.ofbiz.service.ServiceContainer; @@ -146,7 +146,7 @@ public final class JobManager { * Returns an empty list if there are no jobs due to run. * This method is called by the {@link JobPoller} polling thread. */ - protected synchronized List<Job> poll() { + protected List<Job> poll(int limit) { assertIsRunning(); DispatchContext dctx = getDispatcher().getDispatchContext(); if (dctx == null) { @@ -154,8 +154,6 @@ public final class JobManager { return null; } List<Job> poll = FastList.newInstance(); - // sort the results by time - List<String> order = UtilMisc.toList("runTime"); // basic query List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime.nowTimestamp()), EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null), @@ -173,21 +171,24 @@ public final class JobManager { EntityCondition baseCondition = EntityCondition.makeCondition(expressions); EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR); EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(baseCondition, poolCondition)); + EntityListIterator jobsIterator = null; boolean beganTransaction = false; try { beganTransaction = TransactionUtil.begin(); if (!beganTransaction) { - Debug.logError("Unable to poll for jobs; transaction was not started by this process", module); + Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); return null; } - // first update the jobs w/ this instance running information - delegator.storeByCondition("JobSandbox", updateFields, mainCondition); - // now query all the 'queued' jobs for this instance - List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false); - if (UtilValidate.isNotEmpty(jobEnt)) { - for (GenericValue v : jobEnt) { - poll.add(new PersistedServiceJob(dctx, v, null)); // TODO fix the requester + jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null); + GenericValue jobValue = jobsIterator.next(); + while (jobValue != null) { + jobValue.putAll(updateFields); + jobValue.store(); + poll.add(new PersistedServiceJob(dctx, jobValue, null)); + if (poll.size() == limit) { + break; } + jobValue = jobsIterator.next(); } } catch (Throwable t) { // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation @@ -200,6 +201,13 @@ public final class JobManager { Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); } } finally { + if (jobsIterator != null) { + try { + jobsIterator.close(); + } catch (GenericEntityException e) { + Debug.logWarning(e, module); + } + } try { // only commit the transaction if we started one... but make sure we try TransactionUtil.commit(beganTransaction); @@ -214,11 +222,19 @@ public final class JobManager { private void reloadCrashedJobs() { List<GenericValue> crashed = null; - List<EntityExpr> exprs = UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId)); - exprs.add(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); - EntityConditionList<EntityExpr> ecl = EntityCondition.makeCondition(exprs); + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); + EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR); + List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null)); + List<String> pools = ServiceConfigUtil.getRunPools(); + if (pools != null) { + for (String poolName : pools) { + poolsExpr.add(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, poolName)); + } + } + EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR); + EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId), statusCondition, poolCondition)); try { - crashed = delegator.findList("JobSandbox", ecl, null, UtilMisc.toList("startDateTime"), null, false); + crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false); } catch (GenericEntityException e) { Debug.logError(e, "Unable to load crashed jobs", module); } Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370437&r1=1370436&r2=1370437&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 19:11:06 2012 @@ -33,8 +33,6 @@ import java.util.concurrent.atomic.Atomi import org.ofbiz.base.util.Debug; import org.ofbiz.service.config.ServiceConfigUtil; -import org.apache.commons.lang.math.NumberUtils; - /** * Job poller. Queues and runs jobs. */ @@ -42,10 +40,11 @@ public final class JobPoller implements public static final String module = JobPoller.class.getName(); private static final AtomicInteger created = new AtomicInteger(); - public static final int MIN_THREADS = 1; - public static final int MAX_THREADS = 15; - public static final int POLL_WAIT = 20000; - public static final long THREAD_TTL = 18000000; + private static final int MIN_THREADS = 1; // Must be no less than one or the executor will shut down. + private static final int MAX_THREADS = 5; // Values higher than 5 might slow things down. + private static final int POLL_WAIT = 30000; // Database polling interval - 30 seconds. + private static final int QUEUE_SIZE = 100; + private static final long THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes. private final JobManager jm; private final ThreadPoolExecutor executor; @@ -61,7 +60,7 @@ public final class JobPoller implements public JobPoller(JobManager jm) { this.name = jm.getDelegator().getDelegatorName(); this.jm = jm; - this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), + this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()), new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy()); } @@ -116,33 +115,48 @@ public final class JobPoller implements } private long getTTL() { - long ttl = THREAD_TTL; - try { - ttl = NumberUtils.toLong(ServiceConfigUtil.getElementAttr("thread-pool", "ttl")); - } catch (NumberFormatException nfe) { - Debug.logError("Problems reading value from attribute [ttl] of element [thread-pool] in serviceengine.xml file [" + nfe.toString() + "]. Using default (" + THREAD_TTL + ").", module); + String threadTTLAttr = ServiceConfigUtil.getElementAttr("thread-pool", "ttl"); + if (!threadTTLAttr.isEmpty()) { + try { + int threadTTL = Integer.parseInt(threadTTLAttr); + if (threadTTL > 0) { + return threadTTL; + } + } catch (NumberFormatException e) { + Debug.logError("Exception thrown while parsing thread TTL from serviceengine.xml file [" + e + "]. Using default value.", module); + } } - return ttl; + return THREAD_TTL; } private int maxThreads() { - int max = MAX_THREADS; - try { - max = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "max-threads")); - } catch (NumberFormatException nfe) { - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); + String maxThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "max-threads"); + if (!maxThreadsAttr.isEmpty()) { + try { + int maxThreads = Integer.parseInt(maxThreadsAttr); + if (maxThreads > 0) { + return maxThreads; + } + } catch (NumberFormatException e) { + Debug.logError("Exception thrown while parsing maximum threads from serviceengine.xml file [" + e + "]. Using default value.", module); + } } - return max; + return MAX_THREADS; } private int minThreads() { - int min = MIN_THREADS; - try { - min = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "min-threads")); - } catch (NumberFormatException nfe) { - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); + String minThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "min-threads"); + if (!minThreadsAttr.isEmpty()) { + try { + int minThreads = Integer.parseInt(minThreadsAttr); + if (minThreads > 0) { + return minThreads; + } + } catch (NumberFormatException e) { + Debug.logError("Exception thrown while parsing minimum threads from serviceengine.xml file [" + e + "]. Using default value.", module); + } } - return min; + return MIN_THREADS; } private boolean pollEnabled() { @@ -153,13 +167,33 @@ public final class JobPoller implements } private int pollWaitTime() { - int poll = POLL_WAIT; - try { - poll = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis")); - } catch (NumberFormatException nfe) { - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); + String pollIntervalAttr = ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis"); + if (!pollIntervalAttr.isEmpty()) { + try { + int pollInterval = Integer.parseInt(pollIntervalAttr); + if (pollInterval > 0) { + return pollInterval; + } + } catch (NumberFormatException e) { + Debug.logError("Exception thrown while parsing database polling interval from serviceengine.xml file [" + e + "]. Using default value.", module); + } + } + return POLL_WAIT; + } + + private int queueSize() { + String queueSizeAttr = ServiceConfigUtil.getElementAttr("thread-pool", "jobs"); + if (!queueSizeAttr.isEmpty()) { + try { + int queueSize = Integer.parseInt(queueSizeAttr); + if (queueSize > 0) { + return queueSize; + } + } catch (NumberFormatException e) { + Debug.logError("Exception thrown while parsing queue size from serviceengine.xml file [" + e + "]. Using default value.", module); + } } - return poll; + return QUEUE_SIZE; } /** @@ -172,29 +206,28 @@ public final class JobPoller implements this.executor.execute(new JobInvoker(job)); } - public synchronized void run() { + public void run() { try { // wait 30 seconds before the first poll - java.lang.Thread.sleep(30000); - } catch (InterruptedException e) { - } - while (!executor.isShutdown()) { - try { - // grab a list of jobs to run. - List<Job> pollList = jm.poll(); - for (Job job : pollList) { - try { - queueNow(job); - } catch (InvalidJobException e) { - Debug.logError(e, module); + Thread.sleep(30000); + while (!executor.isShutdown()) { + int remainingCapacity = executor.getQueue().remainingCapacity(); + if (remainingCapacity > 0) { + List<Job> pollList = jm.poll(remainingCapacity); + for (Job job : pollList) { + try { + queueNow(job); + } catch (InvalidJobException e) { + Debug.logError(e, module); + } } } - // NOTE: using sleep instead of wait for stricter locking - java.lang.Thread.sleep(pollWaitTime()); - } catch (InterruptedException e) { - Debug.logError(e, module); - stop(); + Thread.sleep(pollWaitTime()); } + } catch (InterruptedException e) { + Debug.logError(e, module); + stop(); + Thread.currentThread().interrupt(); } Debug.logInfo("JobPoller " + this.name + " thread terminated.", module); } @@ -234,7 +267,7 @@ public final class JobPoller implements } public Thread newThread(Runnable runnable) { - return new Thread(runnable, "OFBiz-JobInvoker-" + poolName + "-" + created.getAndIncrement()); + return new Thread(runnable, "OFBiz-JobQueue-" + poolName + "-" + created.getAndIncrement()); } } } |
| Free forum by Nabble | Edit this page |
