|
Administrator
|
Hi Adrian,
Why did you remove these comments ? > - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation > - // only rollback the transaction if we started one... > - // only commit the transaction if we started one... but make sure we try > - // we don't really want to do anything different, so just log and move on It seems they add some information Jacques From: <[hidden email]> > Author: adrianc > Date: Tue Aug 7 22:10:27 2012 > New Revision: 1370566 > > URL: http://svn.apache.org/viewvc?rev=1370566&view=rev > Log: > Fixed the Job Scheduler so jobs are not lost during heavy server load. > > Modified: > ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java > ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java > ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java > ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java > ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java > > Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java > URL: > http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff > ============================================================================== > --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original) > +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug 7 22:10:27 2012 > @@ -144,4 +144,12 @@ public class GenericServiceJob extends A > public boolean isValid() { > return currentState == State.CREATED; > } > + > + @Override > + public void deQueue() throws InvalidJobException { > + if (currentState != State.QUEUED) { > + throw new InvalidJobException("Illegal state change"); > + } > + throw new InvalidJobException("Unable to queue job [" + getJobId() + "]"); > + } > } > > Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java > URL: > http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff > ============================================================================== > --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original) > +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug 7 22:10:27 2012 > @@ -60,7 +60,13 @@ public interface Job { > boolean isValid(); > > /** > - * Transitions the job to the queued state. > + * Transitions this job to the pre-queued (created) state. The job manager > + * will call this method when there was a problem adding this job to the queue. > + */ > + void deQueue() throws InvalidJobException; > + > + /** > + * Transitions this job to the queued state. > */ > void queue() throws InvalidJobException; > } > > Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java > URL: > http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff > ============================================================================== > --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) > +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 22:10:27 2012 > @@ -63,7 +63,6 @@ public final class JobManager { > > public static final String module = JobManager.class.getName(); > public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); > - public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, > "statusId", "SERVICE_QUEUED"); > private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>(); > private static boolean isShutDown = false; > > @@ -150,7 +149,7 @@ public final class JobManager { > assertIsRunning(); > DispatchContext dctx = getDispatcher().getDispatchContext(); > if (dctx == null) { > - Debug.logError("Unable to locate DispatchContext object; not running job!", module); > + Debug.logWarning("Unable to locate DispatchContext object; not running job!", module); > return null; > } > List<Job> poll = FastList.newInstance(); > @@ -176,13 +175,13 @@ public final class JobManager { > try { > beganTransaction = TransactionUtil.begin(); > if (!beganTransaction) { > - Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); > + Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); > return null; > } > jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null); > GenericValue jobValue = jobsIterator.next(); > while (jobValue != null) { > - jobValue.putAll(updateFields); > + jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value. > jobValue.store(); > poll.add(new PersistedServiceJob(dctx, jobValue, null)); > if (poll.size() == limit) { > @@ -191,14 +190,12 @@ public final class JobManager { > jobValue = jobsIterator.next(); > } > } catch (Throwable t) { > - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation > String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction."; > - Debug.logError(t, errMsg, module); > + Debug.logWarning(t, errMsg, module); > try { > - // only rollback the transaction if we started one... > TransactionUtil.rollback(beganTransaction, errMsg, t); > } catch (GenericEntityException e2) { > - Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); > + Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); > } > } finally { > if (jobsIterator != null) { > @@ -209,12 +206,10 @@ public final class JobManager { > } > } > try { > - // only commit the transaction if we started one... but make sure we try > TransactionUtil.commit(beganTransaction); > } catch (GenericTransactionException e) { > String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString(); > - // we don't really want to do anything different, so just log and move on > - Debug.logError(e, errMsg, module); > + Debug.logWarning(e, errMsg, module); > } > } > return poll; > @@ -222,7 +217,9 @@ public final class JobManager { > > private void reloadCrashedJobs() { > List<GenericValue> crashed = null; > - List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, > "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); > + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, > "SERVICE_PENDING"), > + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), > + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); > EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR); > List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null)); > List<String> pools = ServiceConfigUtil.getRunPools(); > @@ -236,13 +233,13 @@ public final class JobManager { > try { > crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false); > } catch (GenericEntityException e) { > - Debug.logError(e, "Unable to load crashed jobs", module); > + Debug.logWarning(e, "Unable to load crashed jobs", module); > } > if (UtilValidate.isNotEmpty(crashed)) { > - try { > - int rescheduled = 0; > - for (GenericValue job : crashed) { > - Timestamp now = UtilDateTime.nowTimestamp(); > + int rescheduled = 0; > + Timestamp now = UtilDateTime.nowTimestamp(); > + for (GenericValue job : crashed) { > + try { > Debug.logInfo("Scheduling Job : " + job, module); > String pJobId = job.getString("parentJobId"); > if (pJobId == null) { > @@ -261,12 +258,12 @@ public final class JobManager { > job.set("cancelDateTime", now); > delegator.store(job); > rescheduled++; > + } catch (GenericEntityException e) { > + Debug.logWarning(e, module); > } > - if (Debug.infoOn()) > - Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); > - } catch (GenericEntityException e) { > - Debug.logError(e, module); > } > + if (Debug.infoOn()) > + Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); > } else { > if (Debug.infoOn()) > Debug.logInfo("No crashed jobs to re-schedule", module); > > Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java > URL: > http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff > ============================================================================== > --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) > +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 22:10:27 2012 > @@ -203,7 +203,11 @@ public final class JobPoller implements > */ > public void queueNow(Job job) throws InvalidJobException { > job.queue(); > - this.executor.execute(new JobInvoker(job)); > + try { > + this.executor.execute(new JobInvoker(job)); > + } catch (Exception e) { > + job.deQueue(); > + } > } > > public void run() { > > Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java > URL: > http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff > ============================================================================== > --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original) > +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug 7 22:10:27 2012 > @@ -20,7 +20,6 @@ package org.ofbiz.service.job; > > import java.io.IOException; > import java.sql.Timestamp; > -import com.ibm.icu.util.Calendar; > import java.util.Date; > import java.util.Map; > > @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura > > import javolution.util.FastMap; > > +import org.apache.commons.lang.StringUtils; > import org.ofbiz.base.util.Debug; > import org.ofbiz.base.util.UtilDateTime; > import org.ofbiz.base.util.UtilGenerics; > import org.ofbiz.base.util.UtilProperties; > import org.ofbiz.base.util.UtilValidate; > -import org.ofbiz.service.calendar.RecurrenceInfoException; > -import org.ofbiz.service.calendar.TemporalExpression; > -import org.ofbiz.service.calendar.TemporalExpressionWorker; > import org.ofbiz.entity.Delegator; > import org.ofbiz.entity.GenericEntityException; > import org.ofbiz.entity.GenericValue; > @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext > import org.ofbiz.service.GenericRequester; > import org.ofbiz.service.ServiceUtil; > import org.ofbiz.service.calendar.RecurrenceInfo; > +import org.ofbiz.service.calendar.RecurrenceInfoException; > +import org.ofbiz.service.calendar.TemporalExpression; > +import org.ofbiz.service.calendar.TemporalExpressionWorker; > import org.ofbiz.service.config.ServiceConfigUtil; > import org.xml.sax.SAXException; > > -import org.apache.commons.lang.StringUtils; > +import com.ibm.icu.util.Calendar; > > /** > * A {@link Job} that is backed by the entity engine. Job data is stored > @@ -100,14 +100,15 @@ public class PersistedServiceJob extends > // job not available > throw new InvalidJobException("Job [" + getJobId() + "] is not available"); > } else { > - // set the start time to now > - jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); > - jobValue.set("statusId", "SERVICE_RUNNING"); > + jobValue.set("statusId", "SERVICE_QUEUED"); > try { > jobValue.store(); > } catch (GenericEntityException e) { > throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + > "]; not running!", e); > } > + if (Debug.verboseOn()) { > + Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module); > + } > } > } > > @@ -129,6 +130,16 @@ public class PersistedServiceJob extends > // This condition isn't possible, but we will leave it here. > throw new InvalidJobException("Job has been accepted by a different instance!"); > } > + jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); > + jobValue.set("statusId", "SERVICE_RUNNING"); > + try { > + jobValue.store(); > + } catch (GenericEntityException e) { > + throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; > not running!", e); > + } > + if (Debug.verboseOn()) { > + Debug.logVerbose("Job [" + getJobId() + "] running", module); > + } > // configure any additional recurrences > long maxRecurrenceCount = -1; > long currentRecurrenceCount = 0; > @@ -331,4 +342,24 @@ public class PersistedServiceJob extends > } > return null; > } > + > + @Override > + public void deQueue() throws InvalidJobException { > + if (currentState != State.QUEUED) { > + throw new InvalidJobException("Illegal state change"); > + } > + currentState = State.CREATED; > + try { > + jobValue.refresh(); > + jobValue.set("startDateTime", null); > + jobValue.set("runByInstanceId", null); > + jobValue.set("statusId", "SERVICE_PENDING"); > + jobValue.store(); > + } catch (GenericEntityException e) { > + throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e); > + } > + if (Debug.verboseOn()) { > + Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module); > + } > + } > } > > |
|
Administrator
|
Else forgot to say that I like it
Jacques From: "Jacques Le Roux" <[hidden email]> > Hi Adrian, > > Why did you remove these comments ? >> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation >> - // only rollback the transaction if we started one... >> - // only commit the transaction if we started one... but make sure we try >> - // we don't really want to do anything different, so just log and move on > > It seems they add some information > > Jacques > > From: <[hidden email]> >> Author: adrianc >> Date: Tue Aug 7 22:10:27 2012 >> New Revision: 1370566 >> >> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev >> Log: >> Fixed the Job Scheduler so jobs are not lost during heavy server load. >> >> Modified: >> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >> >> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >> URL: >> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff >> ============================================================================== >> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original) >> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug 7 22:10:27 2012 >> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A >> public boolean isValid() { >> return currentState == State.CREATED; >> } >> + >> + @Override >> + public void deQueue() throws InvalidJobException { >> + if (currentState != State.QUEUED) { >> + throw new InvalidJobException("Illegal state change"); >> + } >> + throw new InvalidJobException("Unable to queue job [" + getJobId() + "]"); >> + } >> } >> >> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >> URL: >> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff >> ============================================================================== >> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original) >> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug 7 22:10:27 2012 >> @@ -60,7 +60,13 @@ public interface Job { >> boolean isValid(); >> >> /** >> - * Transitions the job to the queued state. >> + * Transitions this job to the pre-queued (created) state. The job manager >> + * will call this method when there was a problem adding this job to the queue. >> + */ >> + void deQueue() throws InvalidJobException; >> + >> + /** >> + * Transitions this job to the queued state. >> */ >> void queue() throws InvalidJobException; >> } >> >> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >> URL: >> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff >> ============================================================================== >> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) >> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 22:10:27 2012 >> @@ -63,7 +63,6 @@ public final class JobManager { >> >> public static final String module = JobManager.class.getName(); >> public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); >> - public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, >> "statusId", "SERVICE_QUEUED"); >> private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>(); >> private static boolean isShutDown = false; >> >> @@ -150,7 +149,7 @@ public final class JobManager { >> assertIsRunning(); >> DispatchContext dctx = getDispatcher().getDispatchContext(); >> if (dctx == null) { >> - Debug.logError("Unable to locate DispatchContext object; not running job!", module); >> + Debug.logWarning("Unable to locate DispatchContext object; not running job!", module); >> return null; >> } >> List<Job> poll = FastList.newInstance(); >> @@ -176,13 +175,13 @@ public final class JobManager { >> try { >> beganTransaction = TransactionUtil.begin(); >> if (!beganTransaction) { >> - Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); >> + Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); >> return null; >> } >> jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null); >> GenericValue jobValue = jobsIterator.next(); >> while (jobValue != null) { >> - jobValue.putAll(updateFields); >> + jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value. >> jobValue.store(); >> poll.add(new PersistedServiceJob(dctx, jobValue, null)); >> if (poll.size() == limit) { >> @@ -191,14 +190,12 @@ public final class JobManager { >> jobValue = jobsIterator.next(); >> } >> } catch (Throwable t) { >> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation >> String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction."; >> - Debug.logError(t, errMsg, module); >> + Debug.logWarning(t, errMsg, module); >> try { >> - // only rollback the transaction if we started one... >> TransactionUtil.rollback(beganTransaction, errMsg, t); >> } catch (GenericEntityException e2) { >> - Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); >> + Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); >> } >> } finally { >> if (jobsIterator != null) { >> @@ -209,12 +206,10 @@ public final class JobManager { >> } >> } >> try { >> - // only commit the transaction if we started one... but make sure we try >> TransactionUtil.commit(beganTransaction); >> } catch (GenericTransactionException e) { >> String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString(); >> - // we don't really want to do anything different, so just log and move on >> - Debug.logError(e, errMsg, module); >> + Debug.logWarning(e, errMsg, module); >> } >> } >> return poll; >> @@ -222,7 +217,9 @@ public final class JobManager { >> >> private void reloadCrashedJobs() { >> List<GenericValue> crashed = null; >> - List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, >> "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); >> + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, >> "SERVICE_PENDING"), >> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), >> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); >> EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR); >> List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null)); >> List<String> pools = ServiceConfigUtil.getRunPools(); >> @@ -236,13 +233,13 @@ public final class JobManager { >> try { >> crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false); >> } catch (GenericEntityException e) { >> - Debug.logError(e, "Unable to load crashed jobs", module); >> + Debug.logWarning(e, "Unable to load crashed jobs", module); >> } >> if (UtilValidate.isNotEmpty(crashed)) { >> - try { >> - int rescheduled = 0; >> - for (GenericValue job : crashed) { >> - Timestamp now = UtilDateTime.nowTimestamp(); >> + int rescheduled = 0; >> + Timestamp now = UtilDateTime.nowTimestamp(); >> + for (GenericValue job : crashed) { >> + try { >> Debug.logInfo("Scheduling Job : " + job, module); >> String pJobId = job.getString("parentJobId"); >> if (pJobId == null) { >> @@ -261,12 +258,12 @@ public final class JobManager { >> job.set("cancelDateTime", now); >> delegator.store(job); >> rescheduled++; >> + } catch (GenericEntityException e) { >> + Debug.logWarning(e, module); >> } >> - if (Debug.infoOn()) >> - Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); >> - } catch (GenericEntityException e) { >> - Debug.logError(e, module); >> } >> + if (Debug.infoOn()) >> + Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); >> } else { >> if (Debug.infoOn()) >> Debug.logInfo("No crashed jobs to re-schedule", module); >> >> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >> URL: >> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff >> ============================================================================== >> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) >> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 22:10:27 2012 >> @@ -203,7 +203,11 @@ public final class JobPoller implements >> */ >> public void queueNow(Job job) throws InvalidJobException { >> job.queue(); >> - this.executor.execute(new JobInvoker(job)); >> + try { >> + this.executor.execute(new JobInvoker(job)); >> + } catch (Exception e) { >> + job.deQueue(); >> + } >> } >> >> public void run() { >> >> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >> URL: >> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff >> ============================================================================== >> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original) >> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug 7 22:10:27 2012 >> @@ -20,7 +20,6 @@ package org.ofbiz.service.job; >> >> import java.io.IOException; >> import java.sql.Timestamp; >> -import com.ibm.icu.util.Calendar; >> import java.util.Date; >> import java.util.Map; >> >> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura >> >> import javolution.util.FastMap; >> >> +import org.apache.commons.lang.StringUtils; >> import org.ofbiz.base.util.Debug; >> import org.ofbiz.base.util.UtilDateTime; >> import org.ofbiz.base.util.UtilGenerics; >> import org.ofbiz.base.util.UtilProperties; >> import org.ofbiz.base.util.UtilValidate; >> -import org.ofbiz.service.calendar.RecurrenceInfoException; >> -import org.ofbiz.service.calendar.TemporalExpression; >> -import org.ofbiz.service.calendar.TemporalExpressionWorker; >> import org.ofbiz.entity.Delegator; >> import org.ofbiz.entity.GenericEntityException; >> import org.ofbiz.entity.GenericValue; >> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext >> import org.ofbiz.service.GenericRequester; >> import org.ofbiz.service.ServiceUtil; >> import org.ofbiz.service.calendar.RecurrenceInfo; >> +import org.ofbiz.service.calendar.RecurrenceInfoException; >> +import org.ofbiz.service.calendar.TemporalExpression; >> +import org.ofbiz.service.calendar.TemporalExpressionWorker; >> import org.ofbiz.service.config.ServiceConfigUtil; >> import org.xml.sax.SAXException; >> >> -import org.apache.commons.lang.StringUtils; >> +import com.ibm.icu.util.Calendar; >> >> /** >> * A {@link Job} that is backed by the entity engine. Job data is stored >> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends >> // job not available >> throw new InvalidJobException("Job [" + getJobId() + "] is not available"); >> } else { >> - // set the start time to now >> - jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); >> - jobValue.set("statusId", "SERVICE_RUNNING"); >> + jobValue.set("statusId", "SERVICE_QUEUED"); >> try { >> jobValue.store(); >> } catch (GenericEntityException e) { >> throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + >> "]; not running!", e); >> } >> + if (Debug.verboseOn()) { >> + Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module); >> + } >> } >> } >> >> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends >> // This condition isn't possible, but we will leave it here. >> throw new InvalidJobException("Job has been accepted by a different instance!"); >> } >> + jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); >> + jobValue.set("statusId", "SERVICE_RUNNING"); >> + try { >> + jobValue.store(); >> + } catch (GenericEntityException e) { >> + throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; >> not running!", e); >> + } >> + if (Debug.verboseOn()) { >> + Debug.logVerbose("Job [" + getJobId() + "] running", module); >> + } >> // configure any additional recurrences >> long maxRecurrenceCount = -1; >> long currentRecurrenceCount = 0; >> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends >> } >> return null; >> } >> + >> + @Override >> + public void deQueue() throws InvalidJobException { >> + if (currentState != State.QUEUED) { >> + throw new InvalidJobException("Illegal state change"); >> + } >> + currentState = State.CREATED; >> + try { >> + jobValue.refresh(); >> + jobValue.set("startDateTime", null); >> + jobValue.set("runByInstanceId", null); >> + jobValue.set("statusId", "SERVICE_PENDING"); >> + jobValue.store(); >> + } catch (GenericEntityException e) { >> + throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e); >> + } >> + if (Debug.verboseOn()) { >> + Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module); >> + } >> + } >> } >> >> |
|
You are welcome to put them back in. They seemed redundant (and silly)
to me. Example: // Set var to 1 int var = 1; // Call someMethod, put result in var2 int var2 = someMethod(); // Calculate var1 * var2 int result = var1 * var2 The comments are not adding anything of value - they are only stating the obvious. -Adrian On 8/8/2012 10:07 PM, Jacques Le Roux wrote: > Else forgot to say that I like it > > Jacques > > From: "Jacques Le Roux" <[hidden email]> >> Hi Adrian, >> >> Why did you remove these comments ? >>> - // catch Throwable so nothing slips through the >>> cracks... this is a fairly sensitive operation >>> - // only rollback the transaction if we started one... >>> - // only commit the transaction if we started one... >>> but make sure we try >>> - // we don't really want to do anything different, >>> so just log and move on >> >> It seems they add some information >> >> Jacques >> >> From: <[hidden email]> >>> Author: adrianc >>> Date: Tue Aug 7 22:10:27 2012 >>> New Revision: 1370566 >>> >>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev >>> Log: >>> Fixed the Job Scheduler so jobs are not lost during heavy server load. >>> >>> Modified: >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>> >>> Modified: >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>> URL: >>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>> ============================================================================== >>> >>> --- >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>> (original) >>> +++ >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>> Tue Aug 7 22:10:27 2012 >>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A >>> public boolean isValid() { >>> return currentState == State.CREATED; >>> } >>> + >>> + @Override >>> + public void deQueue() throws InvalidJobException { >>> + if (currentState != State.QUEUED) { >>> + throw new InvalidJobException("Illegal state change"); >>> + } >>> + throw new InvalidJobException("Unable to queue job [" + >>> getJobId() + "]"); >>> + } >>> } >>> >>> Modified: >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>> URL: >>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>> ============================================================================== >>> >>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>> (original) >>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>> Tue Aug 7 22:10:27 2012 >>> @@ -60,7 +60,13 @@ public interface Job { >>> boolean isValid(); >>> >>> /** >>> - * Transitions the job to the queued state. >>> + * Transitions this job to the pre-queued (created) state. The >>> job manager >>> + * will call this method when there was a problem adding this >>> job to the queue. >>> + */ >>> + void deQueue() throws InvalidJobException; >>> + >>> + /** >>> + * Transitions this job to the queued state. >>> */ >>> void queue() throws InvalidJobException; >>> } >>> >>> Modified: >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>> URL: >>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>> ============================================================================== >>> >>> --- >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>> (original) >>> +++ >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>> Tue Aug 7 22:10:27 2012 >>> @@ -63,7 +63,6 @@ public final class JobManager { >>> >>> public static final String module = JobManager.class.getName(); >>> public static final String instanceId = >>> UtilProperties.getPropertyValue("general.properties", >>> "unique.instanceId", "ofbiz0"); >>> - public static final Map<String, Object> updateFields = >>> UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, >>> "statusId", "SERVICE_QUEUED"); >>> private static final ConcurrentHashMap<String, JobManager> >>> registeredManagers = new ConcurrentHashMap<String, JobManager>(); >>> private static boolean isShutDown = false; >>> >>> @@ -150,7 +149,7 @@ public final class JobManager { >>> assertIsRunning(); >>> DispatchContext dctx = getDispatcher().getDispatchContext(); >>> if (dctx == null) { >>> - Debug.logError("Unable to locate DispatchContext >>> object; not running job!", module); >>> + Debug.logWarning("Unable to locate DispatchContext >>> object; not running job!", module); >>> return null; >>> } >>> List<Job> poll = FastList.newInstance(); >>> @@ -176,13 +175,13 @@ public final class JobManager { >>> try { >>> beganTransaction = TransactionUtil.begin(); >>> if (!beganTransaction) { >>> - Debug.logError("Unable to poll JobSandbox for jobs; >>> transaction was not started by this process", module); >>> + Debug.logWarning("Unable to poll JobSandbox for >>> jobs; transaction was not started by this process", module); >>> return null; >>> } >>> jobsIterator = delegator.find("JobSandbox", >>> mainCondition, null, null, UtilMisc.toList("runTime"), null); >>> GenericValue jobValue = jobsIterator.next(); >>> while (jobValue != null) { >>> - jobValue.putAll(updateFields); >>> + jobValue.set("runByInstanceId", instanceId); // >>> Claim ownership of this value. >>> jobValue.store(); >>> poll.add(new PersistedServiceJob(dctx, jobValue, >>> null)); >>> if (poll.size() == limit) { >>> @@ -191,14 +190,12 @@ public final class JobManager { >>> jobValue = jobsIterator.next(); >>> } >>> } catch (Throwable t) { >>> - // catch Throwable so nothing slips through the >>> cracks... this is a fairly sensitive operation >>> String errMsg = "Error in polling JobSandbox: [" + >>> t.toString() + "]. Rolling back transaction."; >>> - Debug.logError(t, errMsg, module); >>> + Debug.logWarning(t, errMsg, module); >>> try { >>> - // only rollback the transaction if we started one... >>> TransactionUtil.rollback(beganTransaction, errMsg, t); >>> } catch (GenericEntityException e2) { >>> - Debug.logError(e2, "[Delegator] Could not rollback >>> transaction: " + e2.toString(), module); >>> + Debug.logWarning(e2, "[Delegator] Could not >>> rollback transaction: " + e2.toString(), module); >>> } >>> } finally { >>> if (jobsIterator != null) { >>> @@ -209,12 +206,10 @@ public final class JobManager { >>> } >>> } >>> try { >>> - // only commit the transaction if we started one... >>> but make sure we try >>> TransactionUtil.commit(beganTransaction); >>> } catch (GenericTransactionException e) { >>> String errMsg = "Transaction error trying to commit >>> when polling and updating the JobSandbox: " + e.toString(); >>> - // we don't really want to do anything different, >>> so just log and move on >>> - Debug.logError(e, errMsg, module); >>> + Debug.logWarning(e, errMsg, module); >>> } >>> } >>> return poll; >>> @@ -222,7 +217,9 @@ public final class JobManager { >>> >>> private void reloadCrashedJobs() { >>> List<GenericValue> crashed = null; >>> - List<EntityExpr> statusExprList = >>> UtilMisc.toList(EntityCondition.makeCondition("statusId", >>> EntityOperator.EQUALS, "SERVICE_QUEUED"), >>> EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, >>> "SERVICE_RUNNING")); >>> + List<EntityExpr> statusExprList = >>> UtilMisc.toList(EntityCondition.makeCondition("statusId", >>> EntityOperator.EQUALS, "SERVICE_PENDING"), >>> + EntityCondition.makeCondition("statusId", >>> EntityOperator.EQUALS, "SERVICE_QUEUED"), >>> + EntityCondition.makeCondition("statusId", >>> EntityOperator.EQUALS, "SERVICE_RUNNING")); >>> EntityCondition statusCondition = >>> EntityCondition.makeCondition(statusExprList, EntityOperator.OR); >>> List<EntityExpr> poolsExpr = >>> UtilMisc.toList(EntityCondition.makeCondition("poolId", >>> EntityOperator.EQUALS, null)); >>> List<String> pools = ServiceConfigUtil.getRunPools(); >>> @@ -236,13 +233,13 @@ public final class JobManager { >>> try { >>> crashed = delegator.findList("JobSandbox", >>> mainCondition, null, UtilMisc.toList("startDateTime"), null, false); >>> } catch (GenericEntityException e) { >>> - Debug.logError(e, "Unable to load crashed jobs", module); >>> + Debug.logWarning(e, "Unable to load crashed jobs", >>> module); >>> } >>> if (UtilValidate.isNotEmpty(crashed)) { >>> - try { >>> - int rescheduled = 0; >>> - for (GenericValue job : crashed) { >>> - Timestamp now = UtilDateTime.nowTimestamp(); >>> + int rescheduled = 0; >>> + Timestamp now = UtilDateTime.nowTimestamp(); >>> + for (GenericValue job : crashed) { >>> + try { >>> Debug.logInfo("Scheduling Job : " + job, module); >>> String pJobId = job.getString("parentJobId"); >>> if (pJobId == null) { >>> @@ -261,12 +258,12 @@ public final class JobManager { >>> job.set("cancelDateTime", now); >>> delegator.store(job); >>> rescheduled++; >>> + } catch (GenericEntityException e) { >>> + Debug.logWarning(e, module); >>> } >>> - if (Debug.infoOn()) >>> - Debug.logInfo("-- " + rescheduled + " jobs >>> re-scheduled", module); >>> - } catch (GenericEntityException e) { >>> - Debug.logError(e, module); >>> } >>> + if (Debug.infoOn()) >>> + Debug.logInfo("-- " + rescheduled + " jobs >>> re-scheduled", module); >>> } else { >>> if (Debug.infoOn()) >>> Debug.logInfo("No crashed jobs to re-schedule", >>> module); >>> >>> Modified: >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>> URL: >>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>> ============================================================================== >>> >>> --- >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>> (original) >>> +++ >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>> Tue Aug 7 22:10:27 2012 >>> @@ -203,7 +203,11 @@ public final class JobPoller implements >>> */ >>> public void queueNow(Job job) throws InvalidJobException { >>> job.queue(); >>> - this.executor.execute(new JobInvoker(job)); >>> + try { >>> + this.executor.execute(new JobInvoker(job)); >>> + } catch (Exception e) { >>> + job.deQueue(); >>> + } >>> } >>> >>> public void run() { >>> >>> Modified: >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>> URL: >>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>> ============================================================================== >>> >>> --- >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>> (original) >>> +++ >>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>> Tue Aug 7 22:10:27 2012 >>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job; >>> >>> import java.io.IOException; >>> import java.sql.Timestamp; >>> -import com.ibm.icu.util.Calendar; >>> import java.util.Date; >>> import java.util.Map; >>> >>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura >>> >>> import javolution.util.FastMap; >>> >>> +import org.apache.commons.lang.StringUtils; >>> import org.ofbiz.base.util.Debug; >>> import org.ofbiz.base.util.UtilDateTime; >>> import org.ofbiz.base.util.UtilGenerics; >>> import org.ofbiz.base.util.UtilProperties; >>> import org.ofbiz.base.util.UtilValidate; >>> -import org.ofbiz.service.calendar.RecurrenceInfoException; >>> -import org.ofbiz.service.calendar.TemporalExpression; >>> -import org.ofbiz.service.calendar.TemporalExpressionWorker; >>> import org.ofbiz.entity.Delegator; >>> import org.ofbiz.entity.GenericEntityException; >>> import org.ofbiz.entity.GenericValue; >>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext >>> import org.ofbiz.service.GenericRequester; >>> import org.ofbiz.service.ServiceUtil; >>> import org.ofbiz.service.calendar.RecurrenceInfo; >>> +import org.ofbiz.service.calendar.RecurrenceInfoException; >>> +import org.ofbiz.service.calendar.TemporalExpression; >>> +import org.ofbiz.service.calendar.TemporalExpressionWorker; >>> import org.ofbiz.service.config.ServiceConfigUtil; >>> import org.xml.sax.SAXException; >>> >>> -import org.apache.commons.lang.StringUtils; >>> +import com.ibm.icu.util.Calendar; >>> >>> /** >>> * A {@link Job} that is backed by the entity engine. Job data is >>> stored >>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends >>> // job not available >>> throw new InvalidJobException("Job [" + getJobId() + "] >>> is not available"); >>> } else { >>> - // set the start time to now >>> - jobValue.set("startDateTime", >>> UtilDateTime.nowTimestamp()); >>> - jobValue.set("statusId", "SERVICE_RUNNING"); >>> + jobValue.set("statusId", "SERVICE_QUEUED"); >>> try { >>> jobValue.store(); >>> } catch (GenericEntityException e) { >>> throw new InvalidJobException("Unable to set the >>> startDateTime and statusId on the current job [" + getJobId() + "]; >>> not running!", e); >>> } >>> + if (Debug.verboseOn()) { >>> + Debug.logVerbose("Placing job [" + getJobId() + "] >>> in queue", module); >>> + } >>> } >>> } >>> >>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends >>> // This condition isn't possible, but we will leave it >>> here. >>> throw new InvalidJobException("Job has been accepted by >>> a different instance!"); >>> } >>> + jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); >>> + jobValue.set("statusId", "SERVICE_RUNNING"); >>> + try { >>> + jobValue.store(); >>> + } catch (GenericEntityException e) { >>> + throw new InvalidJobException("Unable to set the >>> startDateTime and statusId on the current job [" + getJobId() + "]; >>> not running!", e); >>> + } >>> + if (Debug.verboseOn()) { >>> + Debug.logVerbose("Job [" + getJobId() + "] running", >>> module); >>> + } >>> // configure any additional recurrences >>> long maxRecurrenceCount = -1; >>> long currentRecurrenceCount = 0; >>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends >>> } >>> return null; >>> } >>> + >>> + @Override >>> + public void deQueue() throws InvalidJobException { >>> + if (currentState != State.QUEUED) { >>> + throw new InvalidJobException("Illegal state change"); >>> + } >>> + currentState = State.CREATED; >>> + try { >>> + jobValue.refresh(); >>> + jobValue.set("startDateTime", null); >>> + jobValue.set("runByInstanceId", null); >>> + jobValue.set("statusId", "SERVICE_PENDING"); >>> + jobValue.store(); >>> + } catch (GenericEntityException e) { >>> + throw new InvalidJobException("Unable to dequeue job [" >>> + getJobId() + "]", e); >>> + } >>> + if (Debug.verboseOn()) { >>> + Debug.logVerbose("Job [" + getJobId() + "] not queued, >>> rescheduling", module); >>> + } >>> + } >>> } >>> >>> |
|
Administrator
|
I just looked at the diff, I will see in the context, maybe you are right...
Jacques From: "Adrian Crum" <[hidden email]> > You are welcome to put them back in. They seemed redundant (and silly) to me. Example: > > // Set var to 1 > int var = 1; > // Call someMethod, put result in var2 > int var2 = someMethod(); > // Calculate var1 * var2 > int result = var1 * var2 > > The comments are not adding anything of value - they are only stating the obvious. > > -Adrian > > On 8/8/2012 10:07 PM, Jacques Le Roux wrote: >> Else forgot to say that I like it >> >> Jacques >> >> From: "Jacques Le Roux" <[hidden email]> >>> Hi Adrian, >>> >>> Why did you remove these comments ? >>>> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation >>>> - // only rollback the transaction if we started one... >>>> - // only commit the transaction if we started one... but make sure we try >>>> - // we don't really want to do anything different, so just log and move on >>> >>> It seems they add some information >>> >>> Jacques >>> >>> From: <[hidden email]> >>>> Author: adrianc >>>> Date: Tue Aug 7 22:10:27 2012 >>>> New Revision: 1370566 >>>> >>>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev >>>> Log: >>>> Fixed the Job Scheduler so jobs are not lost during heavy server load. >>>> >>>> Modified: >>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>>> >>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>>> URL: >>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>> ============================================================================== >>>> --- >>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original) >>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug 7 22:10:27 2012 >>>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A >>>> public boolean isValid() { >>>> return currentState == State.CREATED; >>>> } >>>> + >>>> + @Override >>>> + public void deQueue() throws InvalidJobException { >>>> + if (currentState != State.QUEUED) { >>>> + throw new InvalidJobException("Illegal state change"); >>>> + } >>>> + throw new InvalidJobException("Unable to queue job [" + getJobId() + "]"); >>>> + } >>>> } >>>> >>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>>> URL: >>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>> ============================================================================== >>>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original) >>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug 7 22:10:27 2012 >>>> @@ -60,7 +60,13 @@ public interface Job { >>>> boolean isValid(); >>>> >>>> /** >>>> - * Transitions the job to the queued state. >>>> + * Transitions this job to the pre-queued (created) state. The job manager >>>> + * will call this method when there was a problem adding this job to the queue. >>>> + */ >>>> + void deQueue() throws InvalidJobException; >>>> + >>>> + /** >>>> + * Transitions this job to the queued state. >>>> */ >>>> void queue() throws InvalidJobException; >>>> } >>>> >>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>>> URL: >>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>> ============================================================================== >>>> --- >>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) >>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 22:10:27 2012 >>>> @@ -63,7 +63,6 @@ public final class JobManager { >>>> >>>> public static final String module = JobManager.class.getName(); >>>> public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", >>>> "ofbiz0"); >>>> - public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, >>>> "statusId", "SERVICE_QUEUED"); >>>> private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, >>>> JobManager>(); >>>> private static boolean isShutDown = false; >>>> >>>> @@ -150,7 +149,7 @@ public final class JobManager { >>>> assertIsRunning(); >>>> DispatchContext dctx = getDispatcher().getDispatchContext(); >>>> if (dctx == null) { >>>> - Debug.logError("Unable to locate DispatchContext object; not running job!", module); >>>> + Debug.logWarning("Unable to locate DispatchContext object; not running job!", module); >>>> return null; >>>> } >>>> List<Job> poll = FastList.newInstance(); >>>> @@ -176,13 +175,13 @@ public final class JobManager { >>>> try { >>>> beganTransaction = TransactionUtil.begin(); >>>> if (!beganTransaction) { >>>> - Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); >>>> + Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); >>>> return null; >>>> } >>>> jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null); >>>> GenericValue jobValue = jobsIterator.next(); >>>> while (jobValue != null) { >>>> - jobValue.putAll(updateFields); >>>> + jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value. >>>> jobValue.store(); >>>> poll.add(new PersistedServiceJob(dctx, jobValue, null)); >>>> if (poll.size() == limit) { >>>> @@ -191,14 +190,12 @@ public final class JobManager { >>>> jobValue = jobsIterator.next(); >>>> } >>>> } catch (Throwable t) { >>>> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation >>>> String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction."; >>>> - Debug.logError(t, errMsg, module); >>>> + Debug.logWarning(t, errMsg, module); >>>> try { >>>> - // only rollback the transaction if we started one... >>>> TransactionUtil.rollback(beganTransaction, errMsg, t); >>>> } catch (GenericEntityException e2) { >>>> - Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); >>>> + Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); >>>> } >>>> } finally { >>>> if (jobsIterator != null) { >>>> @@ -209,12 +206,10 @@ public final class JobManager { >>>> } >>>> } >>>> try { >>>> - // only commit the transaction if we started one... but make sure we try >>>> TransactionUtil.commit(beganTransaction); >>>> } catch (GenericTransactionException e) { >>>> String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString(); >>>> - // we don't really want to do anything different, so just log and move on >>>> - Debug.logError(e, errMsg, module); >>>> + Debug.logWarning(e, errMsg, module); >>>> } >>>> } >>>> return poll; >>>> @@ -222,7 +217,9 @@ public final class JobManager { >>>> >>>> private void reloadCrashedJobs() { >>>> List<GenericValue> crashed = null; >>>> - List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, >>>> "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); >>>> + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, >>>> "SERVICE_PENDING"), >>>> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), >>>> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); >>>> EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR); >>>> List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null)); >>>> List<String> pools = ServiceConfigUtil.getRunPools(); >>>> @@ -236,13 +233,13 @@ public final class JobManager { >>>> try { >>>> crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false); >>>> } catch (GenericEntityException e) { >>>> - Debug.logError(e, "Unable to load crashed jobs", module); >>>> + Debug.logWarning(e, "Unable to load crashed jobs", module); >>>> } >>>> if (UtilValidate.isNotEmpty(crashed)) { >>>> - try { >>>> - int rescheduled = 0; >>>> - for (GenericValue job : crashed) { >>>> - Timestamp now = UtilDateTime.nowTimestamp(); >>>> + int rescheduled = 0; >>>> + Timestamp now = UtilDateTime.nowTimestamp(); >>>> + for (GenericValue job : crashed) { >>>> + try { >>>> Debug.logInfo("Scheduling Job : " + job, module); >>>> String pJobId = job.getString("parentJobId"); >>>> if (pJobId == null) { >>>> @@ -261,12 +258,12 @@ public final class JobManager { >>>> job.set("cancelDateTime", now); >>>> delegator.store(job); >>>> rescheduled++; >>>> + } catch (GenericEntityException e) { >>>> + Debug.logWarning(e, module); >>>> } >>>> - if (Debug.infoOn()) >>>> - Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); >>>> - } catch (GenericEntityException e) { >>>> - Debug.logError(e, module); >>>> } >>>> + if (Debug.infoOn()) >>>> + Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); >>>> } else { >>>> if (Debug.infoOn()) >>>> Debug.logInfo("No crashed jobs to re-schedule", module); >>>> >>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>>> URL: >>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>> ============================================================================== >>>> --- >>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) >>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 22:10:27 2012 >>>> @@ -203,7 +203,11 @@ public final class JobPoller implements >>>> */ >>>> public void queueNow(Job job) throws InvalidJobException { >>>> job.queue(); >>>> - this.executor.execute(new JobInvoker(job)); >>>> + try { >>>> + this.executor.execute(new JobInvoker(job)); >>>> + } catch (Exception e) { >>>> + job.deQueue(); >>>> + } >>>> } >>>> >>>> public void run() { >>>> >>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>>> URL: >>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>> ============================================================================== >>>> --- >>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original) >>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug 7 22:10:27 2012 >>>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job; >>>> >>>> import java.io.IOException; >>>> import java.sql.Timestamp; >>>> -import com.ibm.icu.util.Calendar; >>>> import java.util.Date; >>>> import java.util.Map; >>>> >>>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura >>>> >>>> import javolution.util.FastMap; >>>> >>>> +import org.apache.commons.lang.StringUtils; >>>> import org.ofbiz.base.util.Debug; >>>> import org.ofbiz.base.util.UtilDateTime; >>>> import org.ofbiz.base.util.UtilGenerics; >>>> import org.ofbiz.base.util.UtilProperties; >>>> import org.ofbiz.base.util.UtilValidate; >>>> -import org.ofbiz.service.calendar.RecurrenceInfoException; >>>> -import org.ofbiz.service.calendar.TemporalExpression; >>>> -import org.ofbiz.service.calendar.TemporalExpressionWorker; >>>> import org.ofbiz.entity.Delegator; >>>> import org.ofbiz.entity.GenericEntityException; >>>> import org.ofbiz.entity.GenericValue; >>>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext >>>> import org.ofbiz.service.GenericRequester; >>>> import org.ofbiz.service.ServiceUtil; >>>> import org.ofbiz.service.calendar.RecurrenceInfo; >>>> +import org.ofbiz.service.calendar.RecurrenceInfoException; >>>> +import org.ofbiz.service.calendar.TemporalExpression; >>>> +import org.ofbiz.service.calendar.TemporalExpressionWorker; >>>> import org.ofbiz.service.config.ServiceConfigUtil; >>>> import org.xml.sax.SAXException; >>>> >>>> -import org.apache.commons.lang.StringUtils; >>>> +import com.ibm.icu.util.Calendar; >>>> >>>> /** >>>> * A {@link Job} that is backed by the entity engine. Job data is stored >>>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends >>>> // job not available >>>> throw new InvalidJobException("Job [" + getJobId() + "] is not available"); >>>> } else { >>>> - // set the start time to now >>>> - jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); >>>> - jobValue.set("statusId", "SERVICE_RUNNING"); >>>> + jobValue.set("statusId", "SERVICE_QUEUED"); >>>> try { >>>> jobValue.store(); >>>> } catch (GenericEntityException e) { >>>> throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() >>>> + "]; not running!", e); >>>> } >>>> + if (Debug.verboseOn()) { >>>> + Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module); >>>> + } >>>> } >>>> } >>>> >>>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends >>>> // This condition isn't possible, but we will leave it here. >>>> throw new InvalidJobException("Job has been accepted by a different instance!"); >>>> } >>>> + jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); >>>> + jobValue.set("statusId", "SERVICE_RUNNING"); >>>> + try { >>>> + jobValue.store(); >>>> + } catch (GenericEntityException e) { >>>> + throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + >>>> "]; not running!", e); >>>> + } >>>> + if (Debug.verboseOn()) { >>>> + Debug.logVerbose("Job [" + getJobId() + "] running", module); >>>> + } >>>> // configure any additional recurrences >>>> long maxRecurrenceCount = -1; >>>> long currentRecurrenceCount = 0; >>>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends >>>> } >>>> return null; >>>> } >>>> + >>>> + @Override >>>> + public void deQueue() throws InvalidJobException { >>>> + if (currentState != State.QUEUED) { >>>> + throw new InvalidJobException("Illegal state change"); >>>> + } >>>> + currentState = State.CREATED; >>>> + try { >>>> + jobValue.refresh(); >>>> + jobValue.set("startDateTime", null); >>>> + jobValue.set("runByInstanceId", null); >>>> + jobValue.set("statusId", "SERVICE_PENDING"); >>>> + jobValue.store(); >>>> + } catch (GenericEntityException e) { >>>> + throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e); >>>> + } >>>> + if (Debug.verboseOn()) { >>>> + Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module); >>>> + } >>>> + } >>>> } >>>> >>>> > |
|
Administrator
|
OK clear, I had to look at the source not only the diff.
I must say I try to cope with the changes there, but will need more time to review completly Jacques From: "Jacques Le Roux" <[hidden email]> >I just looked at the diff, I will see in the context, maybe you are right... > > Jacques > > From: "Adrian Crum" <[hidden email]> >> You are welcome to put them back in. They seemed redundant (and silly) to me. Example: >> >> // Set var to 1 >> int var = 1; >> // Call someMethod, put result in var2 >> int var2 = someMethod(); >> // Calculate var1 * var2 >> int result = var1 * var2 >> >> The comments are not adding anything of value - they are only stating the obvious. >> >> -Adrian >> >> On 8/8/2012 10:07 PM, Jacques Le Roux wrote: >>> Else forgot to say that I like it >>> >>> Jacques >>> >>> From: "Jacques Le Roux" <[hidden email]> >>>> Hi Adrian, >>>> >>>> Why did you remove these comments ? >>>>> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation >>>>> - // only rollback the transaction if we started one... >>>>> - // only commit the transaction if we started one... but make sure we try >>>>> - // we don't really want to do anything different, so just log and move on >>>> >>>> It seems they add some information >>>> >>>> Jacques >>>> >>>> From: <[hidden email]> >>>>> Author: adrianc >>>>> Date: Tue Aug 7 22:10:27 2012 >>>>> New Revision: 1370566 >>>>> >>>>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev >>>>> Log: >>>>> Fixed the Job Scheduler so jobs are not lost during heavy server load. >>>>> >>>>> Modified: >>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>>>> >>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>>>> URL: >>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>>> ============================================================================== >>>>> --- >>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original) >>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug 7 22:10:27 2012 >>>>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A >>>>> public boolean isValid() { >>>>> return currentState == State.CREATED; >>>>> } >>>>> + >>>>> + @Override >>>>> + public void deQueue() throws InvalidJobException { >>>>> + if (currentState != State.QUEUED) { >>>>> + throw new InvalidJobException("Illegal state change"); >>>>> + } >>>>> + throw new InvalidJobException("Unable to queue job [" + getJobId() + "]"); >>>>> + } >>>>> } >>>>> >>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>>>> URL: >>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>>> ============================================================================== >>>>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original) >>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug 7 22:10:27 2012 >>>>> @@ -60,7 +60,13 @@ public interface Job { >>>>> boolean isValid(); >>>>> >>>>> /** >>>>> - * Transitions the job to the queued state. >>>>> + * Transitions this job to the pre-queued (created) state. The job manager >>>>> + * will call this method when there was a problem adding this job to the queue. >>>>> + */ >>>>> + void deQueue() throws InvalidJobException; >>>>> + >>>>> + /** >>>>> + * Transitions this job to the queued state. >>>>> */ >>>>> void queue() throws InvalidJobException; >>>>> } >>>>> >>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>>>> URL: >>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>>> ============================================================================== >>>>> --- >>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) >>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 22:10:27 2012 >>>>> @@ -63,7 +63,6 @@ public final class JobManager { >>>>> >>>>> public static final String module = JobManager.class.getName(); >>>>> public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", >>>>> "ofbiz0"); >>>>> - public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, >>>>> "statusId", "SERVICE_QUEUED"); >>>>> private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, >>>>> JobManager>(); >>>>> private static boolean isShutDown = false; >>>>> >>>>> @@ -150,7 +149,7 @@ public final class JobManager { >>>>> assertIsRunning(); >>>>> DispatchContext dctx = getDispatcher().getDispatchContext(); >>>>> if (dctx == null) { >>>>> - Debug.logError("Unable to locate DispatchContext object; not running job!", module); >>>>> + Debug.logWarning("Unable to locate DispatchContext object; not running job!", module); >>>>> return null; >>>>> } >>>>> List<Job> poll = FastList.newInstance(); >>>>> @@ -176,13 +175,13 @@ public final class JobManager { >>>>> try { >>>>> beganTransaction = TransactionUtil.begin(); >>>>> if (!beganTransaction) { >>>>> - Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); >>>>> + Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module); >>>>> return null; >>>>> } >>>>> jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null); >>>>> GenericValue jobValue = jobsIterator.next(); >>>>> while (jobValue != null) { >>>>> - jobValue.putAll(updateFields); >>>>> + jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value. >>>>> jobValue.store(); >>>>> poll.add(new PersistedServiceJob(dctx, jobValue, null)); >>>>> if (poll.size() == limit) { >>>>> @@ -191,14 +190,12 @@ public final class JobManager { >>>>> jobValue = jobsIterator.next(); >>>>> } >>>>> } catch (Throwable t) { >>>>> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation >>>>> String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction."; >>>>> - Debug.logError(t, errMsg, module); >>>>> + Debug.logWarning(t, errMsg, module); >>>>> try { >>>>> - // only rollback the transaction if we started one... >>>>> TransactionUtil.rollback(beganTransaction, errMsg, t); >>>>> } catch (GenericEntityException e2) { >>>>> - Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); >>>>> + Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); >>>>> } >>>>> } finally { >>>>> if (jobsIterator != null) { >>>>> @@ -209,12 +206,10 @@ public final class JobManager { >>>>> } >>>>> } >>>>> try { >>>>> - // only commit the transaction if we started one... but make sure we try >>>>> TransactionUtil.commit(beganTransaction); >>>>> } catch (GenericTransactionException e) { >>>>> String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + >>>>> e.toString(); >>>>> - // we don't really want to do anything different, so just log and move on >>>>> - Debug.logError(e, errMsg, module); >>>>> + Debug.logWarning(e, errMsg, module); >>>>> } >>>>> } >>>>> return poll; >>>>> @@ -222,7 +217,9 @@ public final class JobManager { >>>>> >>>>> private void reloadCrashedJobs() { >>>>> List<GenericValue> crashed = null; >>>>> - List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, >>>>> "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); >>>>> + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, >>>>> "SERVICE_PENDING"), >>>>> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), >>>>> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); >>>>> EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR); >>>>> List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null)); >>>>> List<String> pools = ServiceConfigUtil.getRunPools(); >>>>> @@ -236,13 +233,13 @@ public final class JobManager { >>>>> try { >>>>> crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false); >>>>> } catch (GenericEntityException e) { >>>>> - Debug.logError(e, "Unable to load crashed jobs", module); >>>>> + Debug.logWarning(e, "Unable to load crashed jobs", module); >>>>> } >>>>> if (UtilValidate.isNotEmpty(crashed)) { >>>>> - try { >>>>> - int rescheduled = 0; >>>>> - for (GenericValue job : crashed) { >>>>> - Timestamp now = UtilDateTime.nowTimestamp(); >>>>> + int rescheduled = 0; >>>>> + Timestamp now = UtilDateTime.nowTimestamp(); >>>>> + for (GenericValue job : crashed) { >>>>> + try { >>>>> Debug.logInfo("Scheduling Job : " + job, module); >>>>> String pJobId = job.getString("parentJobId"); >>>>> if (pJobId == null) { >>>>> @@ -261,12 +258,12 @@ public final class JobManager { >>>>> job.set("cancelDateTime", now); >>>>> delegator.store(job); >>>>> rescheduled++; >>>>> + } catch (GenericEntityException e) { >>>>> + Debug.logWarning(e, module); >>>>> } >>>>> - if (Debug.infoOn()) >>>>> - Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); >>>>> - } catch (GenericEntityException e) { >>>>> - Debug.logError(e, module); >>>>> } >>>>> + if (Debug.infoOn()) >>>>> + Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module); >>>>> } else { >>>>> if (Debug.infoOn()) >>>>> Debug.logInfo("No crashed jobs to re-schedule", module); >>>>> >>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>>>> URL: >>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>>> ============================================================================== >>>>> --- >>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) >>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 22:10:27 2012 >>>>> @@ -203,7 +203,11 @@ public final class JobPoller implements >>>>> */ >>>>> public void queueNow(Job job) throws InvalidJobException { >>>>> job.queue(); >>>>> - this.executor.execute(new JobInvoker(job)); >>>>> + try { >>>>> + this.executor.execute(new JobInvoker(job)); >>>>> + } catch (Exception e) { >>>>> + job.deQueue(); >>>>> + } >>>>> } >>>>> >>>>> public void run() { >>>>> >>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>>>> URL: >>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>>> ============================================================================== >>>>> --- >>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original) >>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug 7 22:10:27 2012 >>>>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job; >>>>> >>>>> import java.io.IOException; >>>>> import java.sql.Timestamp; >>>>> -import com.ibm.icu.util.Calendar; >>>>> import java.util.Date; >>>>> import java.util.Map; >>>>> >>>>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura >>>>> >>>>> import javolution.util.FastMap; >>>>> >>>>> +import org.apache.commons.lang.StringUtils; >>>>> import org.ofbiz.base.util.Debug; >>>>> import org.ofbiz.base.util.UtilDateTime; >>>>> import org.ofbiz.base.util.UtilGenerics; >>>>> import org.ofbiz.base.util.UtilProperties; >>>>> import org.ofbiz.base.util.UtilValidate; >>>>> -import org.ofbiz.service.calendar.RecurrenceInfoException; >>>>> -import org.ofbiz.service.calendar.TemporalExpression; >>>>> -import org.ofbiz.service.calendar.TemporalExpressionWorker; >>>>> import org.ofbiz.entity.Delegator; >>>>> import org.ofbiz.entity.GenericEntityException; >>>>> import org.ofbiz.entity.GenericValue; >>>>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext >>>>> import org.ofbiz.service.GenericRequester; >>>>> import org.ofbiz.service.ServiceUtil; >>>>> import org.ofbiz.service.calendar.RecurrenceInfo; >>>>> +import org.ofbiz.service.calendar.RecurrenceInfoException; >>>>> +import org.ofbiz.service.calendar.TemporalExpression; >>>>> +import org.ofbiz.service.calendar.TemporalExpressionWorker; >>>>> import org.ofbiz.service.config.ServiceConfigUtil; >>>>> import org.xml.sax.SAXException; >>>>> >>>>> -import org.apache.commons.lang.StringUtils; >>>>> +import com.ibm.icu.util.Calendar; >>>>> >>>>> /** >>>>> * A {@link Job} that is backed by the entity engine. Job data is stored >>>>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends >>>>> // job not available >>>>> throw new InvalidJobException("Job [" + getJobId() + "] is not available"); >>>>> } else { >>>>> - // set the start time to now >>>>> - jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); >>>>> - jobValue.set("statusId", "SERVICE_RUNNING"); >>>>> + jobValue.set("statusId", "SERVICE_QUEUED"); >>>>> try { >>>>> jobValue.store(); >>>>> } catch (GenericEntityException e) { >>>>> throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() >>>>> + "]; not running!", e); >>>>> } >>>>> + if (Debug.verboseOn()) { >>>>> + Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module); >>>>> + } >>>>> } >>>>> } >>>>> >>>>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends >>>>> // This condition isn't possible, but we will leave it here. >>>>> throw new InvalidJobException("Job has been accepted by a different instance!"); >>>>> } >>>>> + jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); >>>>> + jobValue.set("statusId", "SERVICE_RUNNING"); >>>>> + try { >>>>> + jobValue.store(); >>>>> + } catch (GenericEntityException e) { >>>>> + throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + >>>>> "]; not running!", e); >>>>> + } >>>>> + if (Debug.verboseOn()) { >>>>> + Debug.logVerbose("Job [" + getJobId() + "] running", module); >>>>> + } >>>>> // configure any additional recurrences >>>>> long maxRecurrenceCount = -1; >>>>> long currentRecurrenceCount = 0; >>>>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends >>>>> } >>>>> return null; >>>>> } >>>>> + >>>>> + @Override >>>>> + public void deQueue() throws InvalidJobException { >>>>> + if (currentState != State.QUEUED) { >>>>> + throw new InvalidJobException("Illegal state change"); >>>>> + } >>>>> + currentState = State.CREATED; >>>>> + try { >>>>> + jobValue.refresh(); >>>>> + jobValue.set("startDateTime", null); >>>>> + jobValue.set("runByInstanceId", null); >>>>> + jobValue.set("statusId", "SERVICE_PENDING"); >>>>> + jobValue.store(); >>>>> + } catch (GenericEntityException e) { >>>>> + throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e); >>>>> + } >>>>> + if (Debug.verboseOn()) { >>>>> + Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module); >>>>> + } >>>>> + } >>>>> } >>>>> >>>>> >> |
|
Usually, when I do an overhaul like this I reformat the code first, in a
separate commit. But I didn't start out intending to overhaul the Job Scheduler, I was just trying to fix some bugs I encountered, then things progressed... I would appreciate another review. Scott's comments were helpful, and I'm sure yours will be also. -Adrian On 8/11/2012 9:49 AM, Jacques Le Roux wrote: > OK clear, I had to look at the source not only the diff. > I must say I try to cope with the changes there, but will need more > time to review completly > > Jacques > > From: "Jacques Le Roux" <[hidden email]> >> I just looked at the diff, I will see in the context, maybe you are >> right... >> >> Jacques >> >> From: "Adrian Crum" <[hidden email]> >>> You are welcome to put them back in. They seemed redundant (and >>> silly) to me. Example: >>> >>> // Set var to 1 >>> int var = 1; >>> // Call someMethod, put result in var2 >>> int var2 = someMethod(); >>> // Calculate var1 * var2 >>> int result = var1 * var2 >>> >>> The comments are not adding anything of value - they are only >>> stating the obvious. >>> >>> -Adrian >>> >>> On 8/8/2012 10:07 PM, Jacques Le Roux wrote: >>>> Else forgot to say that I like it >>>> >>>> Jacques >>>> >>>> From: "Jacques Le Roux" <[hidden email]> >>>>> Hi Adrian, >>>>> >>>>> Why did you remove these comments ? >>>>>> - // catch Throwable so nothing slips through the >>>>>> cracks... this is a fairly sensitive operation >>>>>> - // only rollback the transaction if we started >>>>>> one... >>>>>> - // only commit the transaction if we started >>>>>> one... but make sure we try >>>>>> - // we don't really want to do anything >>>>>> different, so just log and move on >>>>> >>>>> It seems they add some information >>>>> >>>>> Jacques >>>>> >>>>> From: <[hidden email]> >>>>>> Author: adrianc >>>>>> Date: Tue Aug 7 22:10:27 2012 >>>>>> New Revision: 1370566 >>>>>> >>>>>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev >>>>>> Log: >>>>>> Fixed the Job Scheduler so jobs are not lost during heavy server >>>>>> load. >>>>>> >>>>>> Modified: >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>>>>> >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>>>>> >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>>>>> >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>>>>> >>>>>> >>>>>> Modified: >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>>>>> URL: >>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>>>> ============================================================================== >>>>>> >>>>>> --- >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>>>>> (original) >>>>>> +++ >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java >>>>>> Tue Aug 7 22:10:27 2012 >>>>>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A >>>>>> public boolean isValid() { >>>>>> return currentState == State.CREATED; >>>>>> } >>>>>> + >>>>>> + @Override >>>>>> + public void deQueue() throws InvalidJobException { >>>>>> + if (currentState != State.QUEUED) { >>>>>> + throw new InvalidJobException("Illegal state change"); >>>>>> + } >>>>>> + throw new InvalidJobException("Unable to queue job [" + >>>>>> getJobId() + "]"); >>>>>> + } >>>>>> } >>>>>> >>>>>> Modified: >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>>>>> URL: >>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>>>> ============================================================================== >>>>>> >>>>>> --- >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>>>>> (original) >>>>>> +++ >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java >>>>>> Tue Aug 7 22:10:27 2012 >>>>>> @@ -60,7 +60,13 @@ public interface Job { >>>>>> boolean isValid(); >>>>>> >>>>>> /** >>>>>> - * Transitions the job to the queued state. >>>>>> + * Transitions this job to the pre-queued (created) state. >>>>>> The job manager >>>>>> + * will call this method when there was a problem adding >>>>>> this job to the queue. >>>>>> + */ >>>>>> + void deQueue() throws InvalidJobException; >>>>>> + >>>>>> + /** >>>>>> + * Transitions this job to the queued state. >>>>>> */ >>>>>> void queue() throws InvalidJobException; >>>>>> } >>>>>> >>>>>> Modified: >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>>>>> >>>>>> URL: >>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>>>> ============================================================================== >>>>>> >>>>>> --- >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>>>>> (original) >>>>>> +++ >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java >>>>>> Tue Aug 7 22:10:27 2012 >>>>>> @@ -63,7 +63,6 @@ public final class JobManager { >>>>>> >>>>>> public static final String module = JobManager.class.getName(); >>>>>> public static final String instanceId = >>>>>> UtilProperties.getPropertyValue("general.properties", >>>>>> "unique.instanceId", "ofbiz0"); >>>>>> - public static final Map<String, Object> updateFields = >>>>>> UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, >>>>>> "statusId", "SERVICE_QUEUED"); >>>>>> private static final ConcurrentHashMap<String, JobManager> >>>>>> registeredManagers = new ConcurrentHashMap<String, JobManager>(); >>>>>> private static boolean isShutDown = false; >>>>>> >>>>>> @@ -150,7 +149,7 @@ public final class JobManager { >>>>>> assertIsRunning(); >>>>>> DispatchContext dctx = getDispatcher().getDispatchContext(); >>>>>> if (dctx == null) { >>>>>> - Debug.logError("Unable to locate DispatchContext >>>>>> object; not running job!", module); >>>>>> + Debug.logWarning("Unable to locate DispatchContext >>>>>> object; not running job!", module); >>>>>> return null; >>>>>> } >>>>>> List<Job> poll = FastList.newInstance(); >>>>>> @@ -176,13 +175,13 @@ public final class JobManager { >>>>>> try { >>>>>> beganTransaction = TransactionUtil.begin(); >>>>>> if (!beganTransaction) { >>>>>> - Debug.logError("Unable to poll JobSandbox for >>>>>> jobs; transaction was not started by this process", module); >>>>>> + Debug.logWarning("Unable to poll JobSandbox for >>>>>> jobs; transaction was not started by this process", module); >>>>>> return null; >>>>>> } >>>>>> jobsIterator = delegator.find("JobSandbox", >>>>>> mainCondition, null, null, UtilMisc.toList("runTime"), null); >>>>>> GenericValue jobValue = jobsIterator.next(); >>>>>> while (jobValue != null) { >>>>>> - jobValue.putAll(updateFields); >>>>>> + jobValue.set("runByInstanceId", instanceId); // >>>>>> Claim ownership of this value. >>>>>> jobValue.store(); >>>>>> poll.add(new PersistedServiceJob(dctx, jobValue, >>>>>> null)); >>>>>> if (poll.size() == limit) { >>>>>> @@ -191,14 +190,12 @@ public final class JobManager { >>>>>> jobValue = jobsIterator.next(); >>>>>> } >>>>>> } catch (Throwable t) { >>>>>> - // catch Throwable so nothing slips through the >>>>>> cracks... this is a fairly sensitive operation >>>>>> String errMsg = "Error in polling JobSandbox: [" + >>>>>> t.toString() + "]. Rolling back transaction."; >>>>>> - Debug.logError(t, errMsg, module); >>>>>> + Debug.logWarning(t, errMsg, module); >>>>>> try { >>>>>> - // only rollback the transaction if we started >>>>>> one... >>>>>> TransactionUtil.rollback(beganTransaction, errMsg, t); >>>>>> } catch (GenericEntityException e2) { >>>>>> - Debug.logError(e2, "[Delegator] Could not >>>>>> rollback transaction: " + e2.toString(), module); >>>>>> + Debug.logWarning(e2, "[Delegator] Could not >>>>>> rollback transaction: " + e2.toString(), module); >>>>>> } >>>>>> } finally { >>>>>> if (jobsIterator != null) { >>>>>> @@ -209,12 +206,10 @@ public final class JobManager { >>>>>> } >>>>>> } >>>>>> try { >>>>>> - // only commit the transaction if we started >>>>>> one... but make sure we try >>>>>> TransactionUtil.commit(beganTransaction); >>>>>> } catch (GenericTransactionException e) { >>>>>> String errMsg = "Transaction error trying to >>>>>> commit when polling and updating the JobSandbox: " + e.toString(); >>>>>> - // we don't really want to do anything >>>>>> different, so just log and move on >>>>>> - Debug.logError(e, errMsg, module); >>>>>> + Debug.logWarning(e, errMsg, module); >>>>>> } >>>>>> } >>>>>> return poll; >>>>>> @@ -222,7 +217,9 @@ public final class JobManager { >>>>>> >>>>>> private void reloadCrashedJobs() { >>>>>> List<GenericValue> crashed = null; >>>>>> - List<EntityExpr> statusExprList = >>>>>> UtilMisc.toList(EntityCondition.makeCondition("statusId", >>>>>> EntityOperator.EQUALS, "SERVICE_QUEUED"), >>>>>> EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, >>>>>> "SERVICE_RUNNING")); >>>>>> + List<EntityExpr> statusExprList = >>>>>> UtilMisc.toList(EntityCondition.makeCondition("statusId", >>>>>> EntityOperator.EQUALS, "SERVICE_PENDING"), >>>>>> + EntityCondition.makeCondition("statusId", >>>>>> EntityOperator.EQUALS, "SERVICE_QUEUED"), >>>>>> + EntityCondition.makeCondition("statusId", >>>>>> EntityOperator.EQUALS, "SERVICE_RUNNING")); >>>>>> EntityCondition statusCondition = >>>>>> EntityCondition.makeCondition(statusExprList, EntityOperator.OR); >>>>>> List<EntityExpr> poolsExpr = >>>>>> UtilMisc.toList(EntityCondition.makeCondition("poolId", >>>>>> EntityOperator.EQUALS, null)); >>>>>> List<String> pools = ServiceConfigUtil.getRunPools(); >>>>>> @@ -236,13 +233,13 @@ public final class JobManager { >>>>>> try { >>>>>> crashed = delegator.findList("JobSandbox", >>>>>> mainCondition, null, UtilMisc.toList("startDateTime"), null, false); >>>>>> } catch (GenericEntityException e) { >>>>>> - Debug.logError(e, "Unable to load crashed jobs", >>>>>> module); >>>>>> + Debug.logWarning(e, "Unable to load crashed jobs", >>>>>> module); >>>>>> } >>>>>> if (UtilValidate.isNotEmpty(crashed)) { >>>>>> - try { >>>>>> - int rescheduled = 0; >>>>>> - for (GenericValue job : crashed) { >>>>>> - Timestamp now = UtilDateTime.nowTimestamp(); >>>>>> + int rescheduled = 0; >>>>>> + Timestamp now = UtilDateTime.nowTimestamp(); >>>>>> + for (GenericValue job : crashed) { >>>>>> + try { >>>>>> Debug.logInfo("Scheduling Job : " + job, >>>>>> module); >>>>>> String pJobId = job.getString("parentJobId"); >>>>>> if (pJobId == null) { >>>>>> @@ -261,12 +258,12 @@ public final class JobManager { >>>>>> job.set("cancelDateTime", now); >>>>>> delegator.store(job); >>>>>> rescheduled++; >>>>>> + } catch (GenericEntityException e) { >>>>>> + Debug.logWarning(e, module); >>>>>> } >>>>>> - if (Debug.infoOn()) >>>>>> - Debug.logInfo("-- " + rescheduled + " jobs >>>>>> re-scheduled", module); >>>>>> - } catch (GenericEntityException e) { >>>>>> - Debug.logError(e, module); >>>>>> } >>>>>> + if (Debug.infoOn()) >>>>>> + Debug.logInfo("-- " + rescheduled + " jobs >>>>>> re-scheduled", module); >>>>>> } else { >>>>>> if (Debug.infoOn()) >>>>>> Debug.logInfo("No crashed jobs to re-schedule", >>>>>> module); >>>>>> >>>>>> Modified: >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>>>>> >>>>>> URL: >>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>>>> ============================================================================== >>>>>> >>>>>> --- >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>>>>> (original) >>>>>> +++ >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java >>>>>> Tue Aug 7 22:10:27 2012 >>>>>> @@ -203,7 +203,11 @@ public final class JobPoller implements >>>>>> */ >>>>>> public void queueNow(Job job) throws InvalidJobException { >>>>>> job.queue(); >>>>>> - this.executor.execute(new JobInvoker(job)); >>>>>> + try { >>>>>> + this.executor.execute(new JobInvoker(job)); >>>>>> + } catch (Exception e) { >>>>>> + job.deQueue(); >>>>>> + } >>>>>> } >>>>>> >>>>>> public void run() { >>>>>> >>>>>> Modified: >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>>>>> URL: >>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff >>>>>> ============================================================================== >>>>>> >>>>>> --- >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>>>>> (original) >>>>>> +++ >>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java >>>>>> Tue Aug 7 22:10:27 2012 >>>>>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job; >>>>>> >>>>>> import java.io.IOException; >>>>>> import java.sql.Timestamp; >>>>>> -import com.ibm.icu.util.Calendar; >>>>>> import java.util.Date; >>>>>> import java.util.Map; >>>>>> >>>>>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura >>>>>> >>>>>> import javolution.util.FastMap; >>>>>> >>>>>> +import org.apache.commons.lang.StringUtils; >>>>>> import org.ofbiz.base.util.Debug; >>>>>> import org.ofbiz.base.util.UtilDateTime; >>>>>> import org.ofbiz.base.util.UtilGenerics; >>>>>> import org.ofbiz.base.util.UtilProperties; >>>>>> import org.ofbiz.base.util.UtilValidate; >>>>>> -import org.ofbiz.service.calendar.RecurrenceInfoException; >>>>>> -import org.ofbiz.service.calendar.TemporalExpression; >>>>>> -import org.ofbiz.service.calendar.TemporalExpressionWorker; >>>>>> import org.ofbiz.entity.Delegator; >>>>>> import org.ofbiz.entity.GenericEntityException; >>>>>> import org.ofbiz.entity.GenericValue; >>>>>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext >>>>>> import org.ofbiz.service.GenericRequester; >>>>>> import org.ofbiz.service.ServiceUtil; >>>>>> import org.ofbiz.service.calendar.RecurrenceInfo; >>>>>> +import org.ofbiz.service.calendar.RecurrenceInfoException; >>>>>> +import org.ofbiz.service.calendar.TemporalExpression; >>>>>> +import org.ofbiz.service.calendar.TemporalExpressionWorker; >>>>>> import org.ofbiz.service.config.ServiceConfigUtil; >>>>>> import org.xml.sax.SAXException; >>>>>> >>>>>> -import org.apache.commons.lang.StringUtils; >>>>>> +import com.ibm.icu.util.Calendar; >>>>>> >>>>>> /** >>>>>> * A {@link Job} that is backed by the entity engine. Job data is >>>>>> stored >>>>>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends >>>>>> // job not available >>>>>> throw new InvalidJobException("Job [" + getJobId() + >>>>>> "] is not available"); >>>>>> } else { >>>>>> - // set the start time to now >>>>>> - jobValue.set("startDateTime", >>>>>> UtilDateTime.nowTimestamp()); >>>>>> - jobValue.set("statusId", "SERVICE_RUNNING"); >>>>>> + jobValue.set("statusId", "SERVICE_QUEUED"); >>>>>> try { >>>>>> jobValue.store(); >>>>>> } catch (GenericEntityException e) { >>>>>> throw new InvalidJobException("Unable to set the >>>>>> startDateTime and statusId on the current job [" + getJobId() + >>>>>> "]; not running!", e); >>>>>> } >>>>>> + if (Debug.verboseOn()) { >>>>>> + Debug.logVerbose("Placing job [" + getJobId() + >>>>>> "] in queue", module); >>>>>> + } >>>>>> } >>>>>> } >>>>>> >>>>>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends >>>>>> // This condition isn't possible, but we will leave >>>>>> it here. >>>>>> throw new InvalidJobException("Job has been accepted >>>>>> by a different instance!"); >>>>>> } >>>>>> + jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); >>>>>> + jobValue.set("statusId", "SERVICE_RUNNING"); >>>>>> + try { >>>>>> + jobValue.store(); >>>>>> + } catch (GenericEntityException e) { >>>>>> + throw new InvalidJobException("Unable to set the >>>>>> startDateTime and statusId on the current job [" + getJobId() + >>>>>> "]; not running!", e); >>>>>> + } >>>>>> + if (Debug.verboseOn()) { >>>>>> + Debug.logVerbose("Job [" + getJobId() + "] running", >>>>>> module); >>>>>> + } >>>>>> // configure any additional recurrences >>>>>> long maxRecurrenceCount = -1; >>>>>> long currentRecurrenceCount = 0; >>>>>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends >>>>>> } >>>>>> return null; >>>>>> } >>>>>> + >>>>>> + @Override >>>>>> + public void deQueue() throws InvalidJobException { >>>>>> + if (currentState != State.QUEUED) { >>>>>> + throw new InvalidJobException("Illegal state change"); >>>>>> + } >>>>>> + currentState = State.CREATED; >>>>>> + try { >>>>>> + jobValue.refresh(); >>>>>> + jobValue.set("startDateTime", null); >>>>>> + jobValue.set("runByInstanceId", null); >>>>>> + jobValue.set("statusId", "SERVICE_PENDING"); >>>>>> + jobValue.store(); >>>>>> + } catch (GenericEntityException e) { >>>>>> + throw new InvalidJobException("Unable to dequeue job >>>>>> [" + getJobId() + "]", e); >>>>>> + } >>>>>> + if (Debug.verboseOn()) { >>>>>> + Debug.logVerbose("Job [" + getJobId() + "] not >>>>>> queued, rescheduling", module); >>>>>> + } >>>>>> + } >>>>>> } >>>>>> >>>>>> >>> |
| Free forum by Nabble | Edit this page |
