|
Hi Adrian,
I have a feeling that using a direct update to queue jobs was done on purpose to avoid duplication when multiple instances are polling for jobs in the same pool. The problem with doing a select then update is the following: 1. Instance A uses SELECT to gather the jobs to queue, table is locked for updates but reads are still possible 2. Instance B does the same and retrieves the same rows plus any new additions, while instance A works it's way through updating the rows 3. Instance B attempts to update the same rows but is met with a lock on those which instance A has already updated, potential for lock wait timeout issues for instance B (an existing and unavoidable issue) 4. Instance A finishes the updates and commits, locks are released and B can begin it's updates (if the lock wait didn't time out). A sends the retrieved jobs back to the poller to be queued in memory. 5. Instance B finishes the updates and commits, sends the retrieved jobs back to the poller to be queued in memory. I could be wrong about the above but I'm fairly sure the table wouldn't be locked for reading until A has made its first update (even then I can't recall off the top of my head if this prevents reads until the update is committed). I believe SELECT FOR UPDATE is the only "select then update" strategy that would lock the table immediately and OFBiz doesn't support it. Regards Scott On 8/08/2012, at 7:11 AM, [hidden email] wrote: > Author: adrianc > Date: Tue Aug 7 19:11:06 2012 > New Revision: 1370437 > > URL: http://svn.apache.org/viewvc?rev=1370437&view=rev > Log: > More work on the Job Scheduler: > > 1. Gave the job queue a fixed size so the job poller can't create an out-of-memory condition. > 2. Changed min/max thread settings to more conservative values. > 3. Changed JobSandbox polling code to accept a limit argument to control the number of records retrieved. Also used ELI to limit memory use. > 4. Improved JobManager reloadCrashedJobs method to recover queued jobs that were being missed previously. > > At this stage the Job Scheduler has been fixed to not crash/saturate the server, but it can still lose jobs. That will be fixed in the next commit. > > Modified: > ofbiz/trunk/framework/service/config/serviceengine.xml > ofbiz/trunk/framework/service/dtd/service-config.xsd > ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java > ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java > > Modified: ofbiz/trunk/framework/service/config/serviceengine.xml > URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/config/serviceengine.xml?rev=1370437&r1=1370436&r2=1370437&view=diff > ============================================================================== > --- ofbiz/trunk/framework/service/config/serviceengine.xml (original) > +++ ofbiz/trunk/framework/service/config/serviceengine.xml Tue Aug 7 19:11:06 2012 > @@ -25,15 +25,16 @@ under the License. > <!-- Name of the service to use for authorization --> > <authorization service-name="userLogin"/> > > - <!-- Thread pool configuration (max/min threads, uses to live and time to live) --> > + <!-- Job poller configuration. Many of these attributes are set to the job poller defaults, but they are included here for convenience. --> > <thread-pool send-to-pool="pool" > purge-job-days="4" > failed-retry-min="3" > - ttl="18000000" > - min-threads="5" > - max-threads="15" > + ttl="120000" > + jobs="100" > + min-threads="2" > + max-threads="5" > poll-enabled="true" > - poll-db-millis="20000"> > + poll-db-millis="30000"> > <run-from-pool name="pool"/> > </thread-pool> > > > Modified: ofbiz/trunk/framework/service/dtd/service-config.xsd > URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/dtd/service-config.xsd?rev=1370437&r1=1370436&r2=1370437&view=diff > ============================================================================== > --- ofbiz/trunk/framework/service/dtd/service-config.xsd (original) > +++ ofbiz/trunk/framework/service/dtd/service-config.xsd Tue Aug 7 19:11:06 2012 > @@ -58,38 +58,53 @@ under the License. > <xs:element name="thread-pool"> > <xs:complexType> > <xs:sequence> > - <xs:element minOccurs="0" maxOccurs="unbounded" ref="run-from-pool"/> > + <xs:element name="run-from-pool" minOccurs="0" maxOccurs="unbounded"> > + <xs:complexType> > + <xs:attribute type="xs:string" name="name" use="required" /> > + </xs:complexType> > + </xs:element> > </xs:sequence> > - <xs:attributeGroup ref="attlist.thread-pool"/> > + <xs:attribute type="xs:string" name="send-to-pool" use="required" /> > + <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30" /> > + <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30" /> > + <xs:attribute type="xs:nonNegativeInteger" name="ttl"> > + <xs:annotation> > + <xs:documentation>Idle queue service thread lifespan in milliseconds. Defaults to "120000" (2 minutes).</xs:documentation> > + </xs:annotation> > + </xs:attribute> > + <xs:attribute type="xs:nonNegativeInteger" name="jobs"> > + <xs:annotation> > + <xs:documentation>Job queue size. Defaults to "100".</xs:documentation> > + </xs:annotation> > + </xs:attribute> > + <xs:attribute type="xs:nonNegativeInteger" name="min-threads"> > + <xs:annotation> > + <xs:documentation>Minimum number of queue service threads. Defaults to "1".</xs:documentation> > + </xs:annotation> > + </xs:attribute> > + <xs:attribute type="xs:nonNegativeInteger" name="max-threads"> > + <xs:annotation> > + <xs:documentation>Maximum number of queue service threads. Defaults to "5".</xs:documentation> > + </xs:annotation> > + </xs:attribute> > + <xs:attribute name="poll-enabled"> > + <xs:annotation> > + <xs:documentation>Enable database polling. Defaults to "true".</xs:documentation> > + </xs:annotation> > + <xs:simpleType> > + <xs:restriction base="xs:token"> > + <xs:enumeration value="true" /> > + <xs:enumeration value="false" /> > + </xs:restriction> > + </xs:simpleType> > + </xs:attribute> > + <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis"> > + <xs:annotation> > + <xs:documentation>Database polling interval in milliseconds. Defaults to "30000" (30 seconds).</xs:documentation> > + </xs:annotation> > + </xs:attribute> > </xs:complexType> > </xs:element> > - <xs:attributeGroup name="attlist.thread-pool"> > - <xs:attribute type="xs:string" name="send-to-pool" use="required"/> > - <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30"/> > - <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30"/> > - <xs:attribute type="xs:nonNegativeInteger" name="ttl" use="required"/> > - <xs:attribute type="xs:nonNegativeInteger" name="wait-millis"/> <!-- deprecated --> > - <xs:attribute type="xs:nonNegativeInteger" name="jobs"/> <!-- deprecated --> > - <xs:attribute type="xs:nonNegativeInteger" name="min-threads" use="required"/> > - <xs:attribute type="xs:nonNegativeInteger" name="max-threads" use="required"/> > - <xs:attribute name="poll-enabled" default="true"> > - <xs:simpleType> > - <xs:restriction base="xs:token"> > - <xs:enumeration value="true"/> > - <xs:enumeration value="false"/> > - </xs:restriction> > - </xs:simpleType> > - </xs:attribute> > - <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis" use="required"/> > - </xs:attributeGroup> > - <xs:element name="run-from-pool"> > - <xs:complexType> > - <xs:attributeGroup ref="attlist.run-from-pool"/> > - </xs:complexType> > - </xs:element> > - <xs:attributeGroup name="attlist.run-from-pool"> > - <xs:attribute type="xs:string" name="name" use="required"/> > - </xs:attributeGroup> > <xs:element name="engine"> > <xs:complexType> > <xs:sequence> > > Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java > URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370437&r1=1370436&r2=1370437&view=diff > ============================================================================== > --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) > +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 19:11:06 2012 > @@ -38,13 +38,13 @@ import org.ofbiz.entity.Delegator; > import org.ofbiz.entity.GenericEntityException; > import org.ofbiz.entity.GenericValue; > import org.ofbiz.entity.condition.EntityCondition; > -import org.ofbiz.entity.condition.EntityConditionList; > import org.ofbiz.entity.condition.EntityExpr; > import org.ofbiz.entity.condition.EntityOperator; > import org.ofbiz.entity.serialize.SerializeException; > import org.ofbiz.entity.serialize.XmlSerializer; > import org.ofbiz.entity.transaction.GenericTransactionException; > import org.ofbiz.entity.transaction.TransactionUtil; > +import org.ofbiz.entity.util.EntityListIterator; > import org.ofbiz.service.DispatchContext; > import org.ofbiz.service.LocalDispatcher; > import org.ofbiz.service.ServiceContainer; > @@ -146,7 +146,7 @@ public final class JobManager { > * Returns an empty list if there are no jobs due to run. > * This method is called by the {@link JobPoller} polling thread. > */ > - protected synchronized List<Job> poll() { > + protected List<Job> poll(int limit) { > assertIsRunning(); > DispatchContext dctx = getDispatcher().getDispatchContext(); > if (dctx == null) { > @@ -154,8 +154,6 @@ public final class JobManager { > return null; > } > List<Job> poll = FastList.newInstance(); > - // sort the results by time > - List<String> order = UtilMisc.toList("runTime"); > // basic query > List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime.nowTimestamp()), > EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null), > @@ -173,21 +171,24 @@ public final class JobManager { > EntityCondition baseCondition = EntityCondition.makeCondition(expressions); > EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR); > EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(baseCondition, poolCondition)); > + EntityListIterator jobsIterator = null; > boolean beganTransaction = false; > try { > beganTransaction = TransactionUtil.begin(); > if (!beganTransaction) { > - Debug.logError("Unable to poll for jobs; transaction was not started by this process", module); > + Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); > return null; > } > - // first update the jobs w/ this instance running information > - delegator.storeByCondition("JobSandbox", updateFields, mainCondition); > - // now query all the 'queued' jobs for this instance > - List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false); > - if (UtilValidate.isNotEmpty(jobEnt)) { > - for (GenericValue v : jobEnt) { > - poll.add(new PersistedServiceJob(dctx, v, null)); // TODO fix the requester > + jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null); > + GenericValue jobValue = jobsIterator.next(); > + while (jobValue != null) { > + jobValue.putAll(updateFields); > + jobValue.store(); > + poll.add(new PersistedServiceJob(dctx, jobValue, null)); > + if (poll.size() == limit) { > + break; > } > + jobValue = jobsIterator.next(); > } > } catch (Throwable t) { > // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation > @@ -200,6 +201,13 @@ public final class JobManager { > Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); > } > } finally { > + if (jobsIterator != null) { > + try { > + jobsIterator.close(); > + } catch (GenericEntityException e) { > + Debug.logWarning(e, module); > + } > + } > try { > // only commit the transaction if we started one... but make sure we try > TransactionUtil.commit(beganTransaction); > @@ -214,11 +222,19 @@ public final class JobManager { > > private void reloadCrashedJobs() { > List<GenericValue> crashed = null; > - List<EntityExpr> exprs = UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId)); > - exprs.add(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); > - EntityConditionList<EntityExpr> ecl = EntityCondition.makeCondition(exprs); > + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); > + EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR); > + List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null)); > + List<String> pools = ServiceConfigUtil.getRunPools(); > + if (pools != null) { > + for (String poolName : pools) { > + poolsExpr.add(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, poolName)); > + } > + } > + EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR); > + EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId), statusCondition, poolCondition)); > try { > - crashed = delegator.findList("JobSandbox", ecl, null, UtilMisc.toList("startDateTime"), null, false); > + crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false); > } catch (GenericEntityException e) { > Debug.logError(e, "Unable to load crashed jobs", module); > } > > Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java > URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370437&r1=1370436&r2=1370437&view=diff > ============================================================================== > --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) > +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 19:11:06 2012 > @@ -33,8 +33,6 @@ import java.util.concurrent.atomic.Atomi > import org.ofbiz.base.util.Debug; > import org.ofbiz.service.config.ServiceConfigUtil; > > -import org.apache.commons.lang.math.NumberUtils; > - > /** > * Job poller. Queues and runs jobs. > */ > @@ -42,10 +40,11 @@ public final class JobPoller implements > > public static final String module = JobPoller.class.getName(); > private static final AtomicInteger created = new AtomicInteger(); > - public static final int MIN_THREADS = 1; > - public static final int MAX_THREADS = 15; > - public static final int POLL_WAIT = 20000; > - public static final long THREAD_TTL = 18000000; > + private static final int MIN_THREADS = 1; // Must be no less than one or the executor will shut down. > + private static final int MAX_THREADS = 5; // Values higher than 5 might slow things down. > + private static final int POLL_WAIT = 30000; // Database polling interval - 30 seconds. > + private static final int QUEUE_SIZE = 100; > + private static final long THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes. > > private final JobManager jm; > private final ThreadPoolExecutor executor; > @@ -61,7 +60,7 @@ public final class JobPoller implements > public JobPoller(JobManager jm) { > this.name = jm.getDelegator().getDelegatorName(); > this.jm = jm; > - this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), > + this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()), > new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy()); > } > > @@ -116,33 +115,48 @@ public final class JobPoller implements > } > > private long getTTL() { > - long ttl = THREAD_TTL; > - try { > - ttl = NumberUtils.toLong(ServiceConfigUtil.getElementAttr("thread-pool", "ttl")); > - } catch (NumberFormatException nfe) { > - Debug.logError("Problems reading value from attribute [ttl] of element [thread-pool] in serviceengine.xml file [" + nfe.toString() + "]. Using default (" + THREAD_TTL + ").", module); > + String threadTTLAttr = ServiceConfigUtil.getElementAttr("thread-pool", "ttl"); > + if (!threadTTLAttr.isEmpty()) { > + try { > + int threadTTL = Integer.parseInt(threadTTLAttr); > + if (threadTTL > 0) { > + return threadTTL; > + } > + } catch (NumberFormatException e) { > + Debug.logError("Exception thrown while parsing thread TTL from serviceengine.xml file [" + e + "]. Using default value.", module); > + } > } > - return ttl; > + return THREAD_TTL; > } > > private int maxThreads() { > - int max = MAX_THREADS; > - try { > - max = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "max-threads")); > - } catch (NumberFormatException nfe) { > - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); > + String maxThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "max-threads"); > + if (!maxThreadsAttr.isEmpty()) { > + try { > + int maxThreads = Integer.parseInt(maxThreadsAttr); > + if (maxThreads > 0) { > + return maxThreads; > + } > + } catch (NumberFormatException e) { > + Debug.logError("Exception thrown while parsing maximum threads from serviceengine.xml file [" + e + "]. Using default value.", module); > + } > } > - return max; > + return MAX_THREADS; > } > > private int minThreads() { > - int min = MIN_THREADS; > - try { > - min = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "min-threads")); > - } catch (NumberFormatException nfe) { > - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); > + String minThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "min-threads"); > + if (!minThreadsAttr.isEmpty()) { > + try { > + int minThreads = Integer.parseInt(minThreadsAttr); > + if (minThreads > 0) { > + return minThreads; > + } > + } catch (NumberFormatException e) { > + Debug.logError("Exception thrown while parsing minimum threads from serviceengine.xml file [" + e + "]. Using default value.", module); > + } > } > - return min; > + return MIN_THREADS; > } > > private boolean pollEnabled() { > @@ -153,13 +167,33 @@ public final class JobPoller implements > } > > private int pollWaitTime() { > - int poll = POLL_WAIT; > - try { > - poll = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis")); > - } catch (NumberFormatException nfe) { > - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); > + String pollIntervalAttr = ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis"); > + if (!pollIntervalAttr.isEmpty()) { > + try { > + int pollInterval = Integer.parseInt(pollIntervalAttr); > + if (pollInterval > 0) { > + return pollInterval; > + } > + } catch (NumberFormatException e) { > + Debug.logError("Exception thrown while parsing database polling interval from serviceengine.xml file [" + e + "]. Using default value.", module); > + } > + } > + return POLL_WAIT; > + } > + > + private int queueSize() { > + String queueSizeAttr = ServiceConfigUtil.getElementAttr("thread-pool", "jobs"); > + if (!queueSizeAttr.isEmpty()) { > + try { > + int queueSize = Integer.parseInt(queueSizeAttr); > + if (queueSize > 0) { > + return queueSize; > + } > + } catch (NumberFormatException e) { > + Debug.logError("Exception thrown while parsing queue size from serviceengine.xml file [" + e + "]. Using default value.", module); > + } > } > - return poll; > + return QUEUE_SIZE; > } > > /** > @@ -172,29 +206,28 @@ public final class JobPoller implements > this.executor.execute(new JobInvoker(job)); > } > > - public synchronized void run() { > + public void run() { > try { > // wait 30 seconds before the first poll > - java.lang.Thread.sleep(30000); > - } catch (InterruptedException e) { > - } > - while (!executor.isShutdown()) { > - try { > - // grab a list of jobs to run. > - List<Job> pollList = jm.poll(); > - for (Job job : pollList) { > - try { > - queueNow(job); > - } catch (InvalidJobException e) { > - Debug.logError(e, module); > + Thread.sleep(30000); > + while (!executor.isShutdown()) { > + int remainingCapacity = executor.getQueue().remainingCapacity(); > + if (remainingCapacity > 0) { > + List<Job> pollList = jm.poll(remainingCapacity); > + for (Job job : pollList) { > + try { > + queueNow(job); > + } catch (InvalidJobException e) { > + Debug.logError(e, module); > + } > } > } > - // NOTE: using sleep instead of wait for stricter locking > - java.lang.Thread.sleep(pollWaitTime()); > - } catch (InterruptedException e) { > - Debug.logError(e, module); > - stop(); > + Thread.sleep(pollWaitTime()); > } > + } catch (InterruptedException e) { > + Debug.logError(e, module); > + stop(); > + Thread.currentThread().interrupt(); > } > Debug.logInfo("JobPoller " + this.name + " thread terminated.", module); > } > @@ -234,7 +267,7 @@ public final class JobPoller implements > } > > public Thread newThread(Runnable runnable) { > - return new Thread(runnable, "OFBiz-JobInvoker-" + poolName + "-" + created.getAndIncrement()); > + return new Thread(runnable, "OFBiz-JobQueue-" + poolName + "-" + created.getAndIncrement()); > } > } > } > > |
|
Thanks for the review Scott!
I understand why the original code was done that way, but it is causing problems on busy servers. I will update the new code to check for data-race conditions. -Adrian On 8/10/2012 1:24 AM, Scott Gray wrote: > Hi Adrian, > > I have a feeling that using a direct update to queue jobs was done on purpose to avoid duplication when multiple instances are polling for jobs in the same pool. The problem with doing a select then update is the following: > 1. Instance A uses SELECT to gather the jobs to queue, table is locked for updates but reads are still possible > 2. Instance B does the same and retrieves the same rows plus any new additions, while instance A works it's way through updating the rows > 3. Instance B attempts to update the same rows but is met with a lock on those which instance A has already updated, potential for lock wait timeout issues for instance B (an existing and unavoidable issue) > 4. Instance A finishes the updates and commits, locks are released and B can begin it's updates (if the lock wait didn't time out). A sends the retrieved jobs back to the poller to be queued in memory. > 5. Instance B finishes the updates and commits, sends the retrieved jobs back to the poller to be queued in memory. > > I could be wrong about the above but I'm fairly sure the table wouldn't be locked for reading until A has made its first update (even then I can't recall off the top of my head if this prevents reads until the update is committed). I believe SELECT FOR UPDATE is the only "select then update" strategy that would lock the table immediately and OFBiz doesn't support it. > > Regards > Scott > > On 8/08/2012, at 7:11 AM, [hidden email] wrote: > >> Author: adrianc >> Date: Tue Aug 7 19:11:06 2012 >> New Revision: 1370437 >> >> URL: http://svn.apache.org/viewvc?rev=1370437&view=rev >> Log: >> More work on the Job Scheduler: >> >> 1. Gave the job queue a fixed size so the job poller can't create an out-of-memory condition. >> 2. Changed min/max thread settings to more conservative values. >> 3. Changed JobSandbox polling code to accept a limit argument to control the number of records retrieved. Also used ELI to limit memory use. >> 4. Improved JobManager reloadCrashedJobs method to recover queued jobs that were being missed previously. >> >> At this stage the Job Scheduler has been fixed to not crash/saturate the server, but it can still lose jobs. That will be fixed in the next commit. >> >> Modified: >> ofbiz/trunk/framework/service/config/serviceengine.xml >> ofbiz/trunk/framework/service/dtd/service-config.xsd >> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >> >> Modified: ofbiz/trunk/framework/service/config/serviceengine.xml >> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/config/serviceengine.xml?rev=1370437&r1=1370436&r2=1370437&view=diff >> ============================================================================== >> --- ofbiz/trunk/framework/service/config/serviceengine.xml (original) >> +++ ofbiz/trunk/framework/service/config/serviceengine.xml Tue Aug 7 19:11:06 2012 >> @@ -25,15 +25,16 @@ under the License. >> <!-- Name of the service to use for authorization --> >> <authorization service-name="userLogin"/> >> >> - <!-- Thread pool configuration (max/min threads, uses to live and time to live) --> >> + <!-- Job poller configuration. Many of these attributes are set to the job poller defaults, but they are included here for convenience. --> >> <thread-pool send-to-pool="pool" >> purge-job-days="4" >> failed-retry-min="3" >> - ttl="18000000" >> - min-threads="5" >> - max-threads="15" >> + ttl="120000" >> + jobs="100" >> + min-threads="2" >> + max-threads="5" >> poll-enabled="true" >> - poll-db-millis="20000"> >> + poll-db-millis="30000"> >> <run-from-pool name="pool"/> >> </thread-pool> >> >> >> Modified: ofbiz/trunk/framework/service/dtd/service-config.xsd >> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/dtd/service-config.xsd?rev=1370437&r1=1370436&r2=1370437&view=diff >> ============================================================================== >> --- ofbiz/trunk/framework/service/dtd/service-config.xsd (original) >> +++ ofbiz/trunk/framework/service/dtd/service-config.xsd Tue Aug 7 19:11:06 2012 >> @@ -58,38 +58,53 @@ under the License. >> <xs:element name="thread-pool"> >> <xs:complexType> >> <xs:sequence> >> - <xs:element minOccurs="0" maxOccurs="unbounded" ref="run-from-pool"/> >> + <xs:element name="run-from-pool" minOccurs="0" maxOccurs="unbounded"> >> + <xs:complexType> >> + <xs:attribute type="xs:string" name="name" use="required" /> >> + </xs:complexType> >> + </xs:element> >> </xs:sequence> >> - <xs:attributeGroup ref="attlist.thread-pool"/> >> + <xs:attribute type="xs:string" name="send-to-pool" use="required" /> >> + <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30" /> >> + <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30" /> >> + <xs:attribute type="xs:nonNegativeInteger" name="ttl"> >> + <xs:annotation> >> + <xs:documentation>Idle queue service thread lifespan in milliseconds. Defaults to "120000" (2 minutes).</xs:documentation> >> + </xs:annotation> >> + </xs:attribute> >> + <xs:attribute type="xs:nonNegativeInteger" name="jobs"> >> + <xs:annotation> >> + <xs:documentation>Job queue size. Defaults to "100".</xs:documentation> >> + </xs:annotation> >> + </xs:attribute> >> + <xs:attribute type="xs:nonNegativeInteger" name="min-threads"> >> + <xs:annotation> >> + <xs:documentation>Minimum number of queue service threads. Defaults to "1".</xs:documentation> >> + </xs:annotation> >> + </xs:attribute> >> + <xs:attribute type="xs:nonNegativeInteger" name="max-threads"> >> + <xs:annotation> >> + <xs:documentation>Maximum number of queue service threads. Defaults to "5".</xs:documentation> >> + </xs:annotation> >> + </xs:attribute> >> + <xs:attribute name="poll-enabled"> >> + <xs:annotation> >> + <xs:documentation>Enable database polling. Defaults to "true".</xs:documentation> >> + </xs:annotation> >> + <xs:simpleType> >> + <xs:restriction base="xs:token"> >> + <xs:enumeration value="true" /> >> + <xs:enumeration value="false" /> >> + </xs:restriction> >> + </xs:simpleType> >> + </xs:attribute> >> + <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis"> >> + <xs:annotation> >> + <xs:documentation>Database polling interval in milliseconds. Defaults to "30000" (30 seconds).</xs:documentation> >> + </xs:annotation> >> + </xs:attribute> >> </xs:complexType> >> </xs:element> >> - <xs:attributeGroup name="attlist.thread-pool"> >> - <xs:attribute type="xs:string" name="send-to-pool" use="required"/> >> - <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30"/> >> - <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30"/> >> - <xs:attribute type="xs:nonNegativeInteger" name="ttl" use="required"/> >> - <xs:attribute type="xs:nonNegativeInteger" name="wait-millis"/> <!-- deprecated --> >> - <xs:attribute type="xs:nonNegativeInteger" name="jobs"/> <!-- deprecated --> >> - <xs:attribute type="xs:nonNegativeInteger" name="min-threads" use="required"/> >> - <xs:attribute type="xs:nonNegativeInteger" name="max-threads" use="required"/> >> - <xs:attribute name="poll-enabled" default="true"> >> - <xs:simpleType> >> - <xs:restriction base="xs:token"> >> - <xs:enumeration value="true"/> >> - <xs:enumeration value="false"/> >> - </xs:restriction> >> - </xs:simpleType> >> - </xs:attribute> >> - <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis" use="required"/> >> - </xs:attributeGroup> >> - <xs:element name="run-from-pool"> >> - <xs:complexType> >> - <xs:attributeGroup ref="attlist.run-from-pool"/> >> - </xs:complexType> >> - </xs:element> >> - <xs:attributeGroup name="attlist.run-from-pool"> >> - <xs:attribute type="xs:string" name="name" use="required"/> >> - </xs:attributeGroup> >> <xs:element name="engine"> >> <xs:complexType> >> <xs:sequence> >> >> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370437&r1=1370436&r2=1370437&view=diff >> ============================================================================== >> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) >> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 19:11:06 2012 >> @@ -38,13 +38,13 @@ import org.ofbiz.entity.Delegator; >> import org.ofbiz.entity.GenericEntityException; >> import org.ofbiz.entity.GenericValue; >> import org.ofbiz.entity.condition.EntityCondition; >> -import org.ofbiz.entity.condition.EntityConditionList; >> import org.ofbiz.entity.condition.EntityExpr; >> import org.ofbiz.entity.condition.EntityOperator; >> import org.ofbiz.entity.serialize.SerializeException; >> import org.ofbiz.entity.serialize.XmlSerializer; >> import org.ofbiz.entity.transaction.GenericTransactionException; >> import org.ofbiz.entity.transaction.TransactionUtil; >> +import org.ofbiz.entity.util.EntityListIterator; >> import org.ofbiz.service.DispatchContext; >> import org.ofbiz.service.LocalDispatcher; >> import org.ofbiz.service.ServiceContainer; >> @@ -146,7 +146,7 @@ public final class JobManager { >> * Returns an empty list if there are no jobs due to run. >> * This method is called by the {@link JobPoller} polling thread. >> */ >> - protected synchronized List<Job> poll() { >> + protected List<Job> poll(int limit) { >> assertIsRunning(); >> DispatchContext dctx = getDispatcher().getDispatchContext(); >> if (dctx == null) { >> @@ -154,8 +154,6 @@ public final class JobManager { >> return null; >> } >> List<Job> poll = FastList.newInstance(); >> - // sort the results by time >> - List<String> order = UtilMisc.toList("runTime"); >> // basic query >> List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime.nowTimestamp()), >> EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null), >> @@ -173,21 +171,24 @@ public final class JobManager { >> EntityCondition baseCondition = EntityCondition.makeCondition(expressions); >> EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR); >> EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(baseCondition, poolCondition)); >> + EntityListIterator jobsIterator = null; >> boolean beganTransaction = false; >> try { >> beganTransaction = TransactionUtil.begin(); >> if (!beganTransaction) { >> - Debug.logError("Unable to poll for jobs; transaction was not started by this process", module); >> + Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); >> return null; >> } >> - // first update the jobs w/ this instance running information >> - delegator.storeByCondition("JobSandbox", updateFields, mainCondition); >> - // now query all the 'queued' jobs for this instance >> - List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false); >> - if (UtilValidate.isNotEmpty(jobEnt)) { >> - for (GenericValue v : jobEnt) { >> - poll.add(new PersistedServiceJob(dctx, v, null)); // TODO fix the requester >> + jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null); >> + GenericValue jobValue = jobsIterator.next(); >> + while (jobValue != null) { >> + jobValue.putAll(updateFields); >> + jobValue.store(); >> + poll.add(new PersistedServiceJob(dctx, jobValue, null)); >> + if (poll.size() == limit) { >> + break; >> } >> + jobValue = jobsIterator.next(); >> } >> } catch (Throwable t) { >> // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation >> @@ -200,6 +201,13 @@ public final class JobManager { >> Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); >> } >> } finally { >> + if (jobsIterator != null) { >> + try { >> + jobsIterator.close(); >> + } catch (GenericEntityException e) { >> + Debug.logWarning(e, module); >> + } >> + } >> try { >> // only commit the transaction if we started one... but make sure we try >> TransactionUtil.commit(beganTransaction); >> @@ -214,11 +222,19 @@ public final class JobManager { >> >> private void reloadCrashedJobs() { >> List<GenericValue> crashed = null; >> - List<EntityExpr> exprs = UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId)); >> - exprs.add(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); >> - EntityConditionList<EntityExpr> ecl = EntityCondition.makeCondition(exprs); >> + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); >> + EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR); >> + List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null)); >> + List<String> pools = ServiceConfigUtil.getRunPools(); >> + if (pools != null) { >> + for (String poolName : pools) { >> + poolsExpr.add(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, poolName)); >> + } >> + } >> + EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR); >> + EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId), statusCondition, poolCondition)); >> try { >> - crashed = delegator.findList("JobSandbox", ecl, null, UtilMisc.toList("startDateTime"), null, false); >> + crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false); >> } catch (GenericEntityException e) { >> Debug.logError(e, "Unable to load crashed jobs", module); >> } >> >> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370437&r1=1370436&r2=1370437&view=diff >> ============================================================================== >> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) >> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 19:11:06 2012 >> @@ -33,8 +33,6 @@ import java.util.concurrent.atomic.Atomi >> import org.ofbiz.base.util.Debug; >> import org.ofbiz.service.config.ServiceConfigUtil; >> >> -import org.apache.commons.lang.math.NumberUtils; >> - >> /** >> * Job poller. Queues and runs jobs. >> */ >> @@ -42,10 +40,11 @@ public final class JobPoller implements >> >> public static final String module = JobPoller.class.getName(); >> private static final AtomicInteger created = new AtomicInteger(); >> - public static final int MIN_THREADS = 1; >> - public static final int MAX_THREADS = 15; >> - public static final int POLL_WAIT = 20000; >> - public static final long THREAD_TTL = 18000000; >> + private static final int MIN_THREADS = 1; // Must be no less than one or the executor will shut down. >> + private static final int MAX_THREADS = 5; // Values higher than 5 might slow things down. >> + private static final int POLL_WAIT = 30000; // Database polling interval - 30 seconds. >> + private static final int QUEUE_SIZE = 100; >> + private static final long THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes. >> >> private final JobManager jm; >> private final ThreadPoolExecutor executor; >> @@ -61,7 +60,7 @@ public final class JobPoller implements >> public JobPoller(JobManager jm) { >> this.name = jm.getDelegator().getDelegatorName(); >> this.jm = jm; >> - this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), >> + this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()), >> new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy()); >> } >> >> @@ -116,33 +115,48 @@ public final class JobPoller implements >> } >> >> private long getTTL() { >> - long ttl = THREAD_TTL; >> - try { >> - ttl = NumberUtils.toLong(ServiceConfigUtil.getElementAttr("thread-pool", "ttl")); >> - } catch (NumberFormatException nfe) { >> - Debug.logError("Problems reading value from attribute [ttl] of element [thread-pool] in serviceengine.xml file [" + nfe.toString() + "]. Using default (" + THREAD_TTL + ").", module); >> + String threadTTLAttr = ServiceConfigUtil.getElementAttr("thread-pool", "ttl"); >> + if (!threadTTLAttr.isEmpty()) { >> + try { >> + int threadTTL = Integer.parseInt(threadTTLAttr); >> + if (threadTTL > 0) { >> + return threadTTL; >> + } >> + } catch (NumberFormatException e) { >> + Debug.logError("Exception thrown while parsing thread TTL from serviceengine.xml file [" + e + "]. Using default value.", module); >> + } >> } >> - return ttl; >> + return THREAD_TTL; >> } >> >> private int maxThreads() { >> - int max = MAX_THREADS; >> - try { >> - max = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "max-threads")); >> - } catch (NumberFormatException nfe) { >> - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); >> + String maxThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "max-threads"); >> + if (!maxThreadsAttr.isEmpty()) { >> + try { >> + int maxThreads = Integer.parseInt(maxThreadsAttr); >> + if (maxThreads > 0) { >> + return maxThreads; >> + } >> + } catch (NumberFormatException e) { >> + Debug.logError("Exception thrown while parsing maximum threads from serviceengine.xml file [" + e + "]. Using default value.", module); >> + } >> } >> - return max; >> + return MAX_THREADS; >> } >> >> private int minThreads() { >> - int min = MIN_THREADS; >> - try { >> - min = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "min-threads")); >> - } catch (NumberFormatException nfe) { >> - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); >> + String minThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "min-threads"); >> + if (!minThreadsAttr.isEmpty()) { >> + try { >> + int minThreads = Integer.parseInt(minThreadsAttr); >> + if (minThreads > 0) { >> + return minThreads; >> + } >> + } catch (NumberFormatException e) { >> + Debug.logError("Exception thrown while parsing minimum threads from serviceengine.xml file [" + e + "]. Using default value.", module); >> + } >> } >> - return min; >> + return MIN_THREADS; >> } >> >> private boolean pollEnabled() { >> @@ -153,13 +167,33 @@ public final class JobPoller implements >> } >> >> private int pollWaitTime() { >> - int poll = POLL_WAIT; >> - try { >> - poll = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis")); >> - } catch (NumberFormatException nfe) { >> - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module); >> + String pollIntervalAttr = ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis"); >> + if (!pollIntervalAttr.isEmpty()) { >> + try { >> + int pollInterval = Integer.parseInt(pollIntervalAttr); >> + if (pollInterval > 0) { >> + return pollInterval; >> + } >> + } catch (NumberFormatException e) { >> + Debug.logError("Exception thrown while parsing database polling interval from serviceengine.xml file [" + e + "]. Using default value.", module); >> + } >> + } >> + return POLL_WAIT; >> + } >> + >> + private int queueSize() { >> + String queueSizeAttr = ServiceConfigUtil.getElementAttr("thread-pool", "jobs"); >> + if (!queueSizeAttr.isEmpty()) { >> + try { >> + int queueSize = Integer.parseInt(queueSizeAttr); >> + if (queueSize > 0) { >> + return queueSize; >> + } >> + } catch (NumberFormatException e) { >> + Debug.logError("Exception thrown while parsing queue size from serviceengine.xml file [" + e + "]. Using default value.", module); >> + } >> } >> - return poll; >> + return QUEUE_SIZE; >> } >> >> /** >> @@ -172,29 +206,28 @@ public final class JobPoller implements >> this.executor.execute(new JobInvoker(job)); >> } >> >> - public synchronized void run() { >> + public void run() { >> try { >> // wait 30 seconds before the first poll >> - java.lang.Thread.sleep(30000); >> - } catch (InterruptedException e) { >> - } >> - while (!executor.isShutdown()) { >> - try { >> - // grab a list of jobs to run. >> - List<Job> pollList = jm.poll(); >> - for (Job job : pollList) { >> - try { >> - queueNow(job); >> - } catch (InvalidJobException e) { >> - Debug.logError(e, module); >> + Thread.sleep(30000); >> + while (!executor.isShutdown()) { >> + int remainingCapacity = executor.getQueue().remainingCapacity(); >> + if (remainingCapacity > 0) { >> + List<Job> pollList = jm.poll(remainingCapacity); >> + for (Job job : pollList) { >> + try { >> + queueNow(job); >> + } catch (InvalidJobException e) { >> + Debug.logError(e, module); >> + } >> } >> } >> - // NOTE: using sleep instead of wait for stricter locking >> - java.lang.Thread.sleep(pollWaitTime()); >> - } catch (InterruptedException e) { >> - Debug.logError(e, module); >> - stop(); >> + Thread.sleep(pollWaitTime()); >> } >> + } catch (InterruptedException e) { >> + Debug.logError(e, module); >> + stop(); >> + Thread.currentThread().interrupt(); >> } >> Debug.logInfo("JobPoller " + this.name + " thread terminated.", module); >> } >> @@ -234,7 +267,7 @@ public final class JobPoller implements >> } >> >> public Thread newThread(Runnable runnable) { >> - return new Thread(runnable, "OFBiz-JobInvoker-" + poolName + "-" + created.getAndIncrement()); >> + return new Thread(runnable, "OFBiz-JobQueue-" + poolName + "-" + created.getAndIncrement()); >> } >> } >> } >> >> |
| Free forum by Nabble | Edit this page |
