|
Author: adrianc
Date: Tue Aug 7 22:10:27 2012 New Revision: 1370566 URL: http://svn.apache.org/viewvc?rev=1370566&view=rev Log: Fixed the Job Scheduler so jobs are not lost during heavy server load. Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug 7 22:10:27 2012 @@ -144,4 +144,12 @@ public class GenericServiceJob extends A public boolean isValid() { return currentState == State.CREATED; } + + @Override + public void deQueue() throws InvalidJobException { + if (currentState != State.QUEUED) { + throw new InvalidJobException("Illegal state change"); + } + throw new InvalidJobException("Unable to queue job [" + getJobId() + "]"); + } } Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug 7 22:10:27 2012 @@ -60,7 +60,13 @@ public interface Job { boolean isValid(); /** - * Transitions the job to the queued state. + * Transitions this job to the pre-queued (created) state. The job manager + * will call this method when there was a problem adding this job to the queue. + */ + void deQueue() throws InvalidJobException; + + /** + * Transitions this job to the queued state. */ void queue() throws InvalidJobException; } Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 22:10:27 2012 @@ -63,7 +63,6 @@ public final class JobManager { public static final String module = JobManager.class.getName(); public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); - public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED"); private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>(); private static boolean isShutDown = false; @@ -150,7 +149,7 @@ public final class JobManager { assertIsRunning(); DispatchContext dctx = getDispatcher().getDispatchContext(); if (dctx == null) { - Debug.logError("Unable to locate DispatchContext object; not running job!", module); + Debug.logWarning("Unable to locate DispatchContext object; not running job!", module); return null; } List<Job> poll = FastList.newInstance(); @@ -176,13 +175,13 @@ public final class JobManager { try { beganTransaction = TransactionUtil.begin(); if (!beganTransaction) { - Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); + Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); return null; } jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null); GenericValue jobValue = jobsIterator.next(); while (jobValue != null) { - jobValue.putAll(updateFields); + jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value. jobValue.store(); poll.add(new PersistedServiceJob(dctx, jobValue, null)); if (poll.size() == limit) { @@ -191,14 +190,12 @@ public final class JobManager { jobValue = jobsIterator.next(); } } catch (Throwable t) { - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction."; - Debug.logError(t, errMsg, module); + Debug.logWarning(t, errMsg, module); try { - // only rollback the transaction if we started one... TransactionUtil.rollback(beganTransaction, errMsg, t); } catch (GenericEntityException e2) { - Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); + Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); } } finally { if (jobsIterator != null) { @@ -209,12 +206,10 @@ public final class JobManager { } } try { - // only commit the transaction if we started one... but make sure we try TransactionUtil.commit(beganTransaction); } catch (GenericTransactionException e) { String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString(); - // we don't really want to do anything different, so just log and move on - Debug.logError(e, errMsg, module); + Debug.logWarning(e, errMsg, module); } } return poll; @@ -222,7 +217,9 @@ public final class JobManager { private void reloadCrashedJobs() { List<GenericValue> crashed = null; - List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_PENDING"), + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR); List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null)); List<String> pools = ServiceConfigUtil.getRunPools(); @@ -236,13 +233,13 @@ public final class JobManager { try { crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false); } catch (GenericEntityException e) { - Debug.logError(e, "Unable to load crashed jobs", module); + Debug.logWarning(e, "Unable to load crashed jobs", module); } if (UtilValidate.isNotEmpty(crashed)) { - try { - int rescheduled = 0; - for (GenericValue job : crashed) { - Timestamp now = UtilDateTime.nowTimestamp(); + int rescheduled = 0; + Timestamp now = UtilDateTime.nowTimestamp(); + for (GenericValue job : crashed) { + try { Debug.logInfo("Scheduling Job : " + job, module); String pJobId = job.getString("parentJobId"); if (pJobId == null) { @@ -261,12 +258,12 @@ public final class JobManager { job.set("cancelDateTime", now); delegator.store(job); rescheduled++; + } catch (GenericEntityException e) { + Debug.logWarning(e, module); } - if (Debug.infoOn()) - Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); - } catch (GenericEntityException e) { - Debug.logError(e, module); } + if (Debug.infoOn()) + Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); } else { if (Debug.infoOn()) Debug.logInfo("No crashed jobs to re-schedule", module); Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 22:10:27 2012 @@ -203,7 +203,11 @@ public final class JobPoller implements */ public void queueNow(Job job) throws InvalidJobException { job.queue(); - this.executor.execute(new JobInvoker(job)); + try { + this.executor.execute(new JobInvoker(job)); + } catch (Exception e) { + job.deQueue(); + } } public void run() { Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug 7 22:10:27 2012 @@ -20,7 +20,6 @@ package org.ofbiz.service.job; import java.io.IOException; import java.sql.Timestamp; -import com.ibm.icu.util.Calendar; import java.util.Date; import java.util.Map; @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura import javolution.util.FastMap; +import org.apache.commons.lang.StringUtils; import org.ofbiz.base.util.Debug; import org.ofbiz.base.util.UtilDateTime; import org.ofbiz.base.util.UtilGenerics; import org.ofbiz.base.util.UtilProperties; import org.ofbiz.base.util.UtilValidate; -import org.ofbiz.service.calendar.RecurrenceInfoException; -import org.ofbiz.service.calendar.TemporalExpression; -import org.ofbiz.service.calendar.TemporalExpressionWorker; import org.ofbiz.entity.Delegator; import org.ofbiz.entity.GenericEntityException; import org.ofbiz.entity.GenericValue; @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext import org.ofbiz.service.GenericRequester; import org.ofbiz.service.ServiceUtil; import org.ofbiz.service.calendar.RecurrenceInfo; +import org.ofbiz.service.calendar.RecurrenceInfoException; +import org.ofbiz.service.calendar.TemporalExpression; +import org.ofbiz.service.calendar.TemporalExpressionWorker; import org.ofbiz.service.config.ServiceConfigUtil; import org.xml.sax.SAXException; -import org.apache.commons.lang.StringUtils; +import com.ibm.icu.util.Calendar; /** * A {@link Job} that is backed by the entity engine. Job data is stored @@ -100,14 +100,15 @@ public class PersistedServiceJob extends // job not available throw new InvalidJobException("Job [" + getJobId() + "] is not available"); } else { - // set the start time to now - jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); - jobValue.set("statusId", "SERVICE_RUNNING"); + jobValue.set("statusId", "SERVICE_QUEUED"); try { jobValue.store(); } catch (GenericEntityException e) { throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e); } + if (Debug.verboseOn()) { + Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module); + } } } @@ -129,6 +130,16 @@ public class PersistedServiceJob extends // This condition isn't possible, but we will leave it here. throw new InvalidJobException("Job has been accepted by a different instance!"); } + jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); + jobValue.set("statusId", "SERVICE_RUNNING"); + try { + jobValue.store(); + } catch (GenericEntityException e) { + throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e); + } + if (Debug.verboseOn()) { + Debug.logVerbose("Job [" + getJobId() + "] running", module); + } // configure any additional recurrences long maxRecurrenceCount = -1; long currentRecurrenceCount = 0; @@ -331,4 +342,24 @@ public class PersistedServiceJob extends } return null; } + + @Override + public void deQueue() throws InvalidJobException { + if (currentState != State.QUEUED) { + throw new InvalidJobException("Illegal state change"); + } + currentState = State.CREATED; + try { + jobValue.refresh(); + jobValue.set("startDateTime", null); + jobValue.set("runByInstanceId", null); + jobValue.set("statusId", "SERVICE_PENDING"); + jobValue.store(); + } catch (GenericEntityException e) { + throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e); + } + if (Debug.verboseOn()) { + Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module); + } + } } |
| Free forum by Nabble | Edit this page |
