svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java

adrianc
Author: adrianc
Date: Tue Aug  7 22:10:27 2012
New Revision: 1370566

URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
Log:
Fixed the Job Scheduler so jobs are not lost during heavy server load.

Modified:
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug  7 22:10:27 2012
@@ -144,4 +144,12 @@ public class GenericServiceJob extends A
     public boolean isValid() {
         return currentState == State.CREATED;
     }
+
+    @Override
+    public void deQueue() throws InvalidJobException {
+        if (currentState != State.QUEUED) {
+            throw new InvalidJobException("Illegal state change");
+        }
+        throw new InvalidJobException("Unable to queue job [" + getJobId() + "]");
+    }
 }

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug  7 22:10:27 2012
@@ -60,7 +60,13 @@ public interface Job {
     boolean isValid();
 
     /**
-     * Transitions the job to the queued state.
+     * Transitions this job to the pre-queued (created) state. The job manager
+     * will call this method when there was a problem adding this job to the queue.
+     */
+    void deQueue() throws InvalidJobException;
+
+    /**
+     * Transitions this job to the queued state.
      */
     void queue() throws InvalidJobException;
 }

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug  7 22:10:27 2012
@@ -63,7 +63,6 @@ public final class JobManager {
 
     public static final String module = JobManager.class.getName();
     public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
-    public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED");
     private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>();
     private static boolean isShutDown = false;
 
@@ -150,7 +149,7 @@ public final class JobManager {
         assertIsRunning();
         DispatchContext dctx = getDispatcher().getDispatchContext();
         if (dctx == null) {
-            Debug.logError("Unable to locate DispatchContext object; not running job!", module);
+            Debug.logWarning("Unable to locate DispatchContext object; not running job!", module);
             return null;
         }
         List<Job> poll = FastList.newInstance();
@@ -176,13 +175,13 @@ public final class JobManager {
         try {
             beganTransaction = TransactionUtil.begin();
             if (!beganTransaction) {
-                Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
+                Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
                 return null;
             }
             jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
             GenericValue jobValue = jobsIterator.next();
             while (jobValue != null) {
-                jobValue.putAll(updateFields);
+                jobValue.set("runByInstanceId", instanceId);  // Claim ownership of this value.
                 jobValue.store();
                 poll.add(new PersistedServiceJob(dctx, jobValue, null));
                 if (poll.size() == limit) {
@@ -191,14 +190,12 @@ public final class JobManager {
                 jobValue = jobsIterator.next();
             }
         } catch (Throwable t) {
-            // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
             String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
-            Debug.logError(t, errMsg, module);
+            Debug.logWarning(t, errMsg, module);
             try {
-                // only rollback the transaction if we started one...
                 TransactionUtil.rollback(beganTransaction, errMsg, t);
             } catch (GenericEntityException e2) {
-                Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
+                Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
             }
         } finally {
             if (jobsIterator != null) {
@@ -209,12 +206,10 @@ public final class JobManager {
                 }
             }
             try {
-                // only commit the transaction if we started one... but make sure we try
                 TransactionUtil.commit(beganTransaction);
             } catch (GenericTransactionException e) {
                 String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
-                // we don't really want to do anything different, so just log and move on
-                Debug.logError(e, errMsg, module);
+                Debug.logWarning(e, errMsg, module);
             }
         }
         return poll;
@@ -222,7 +217,9 @@ public final class JobManager {
 
     private void reloadCrashedJobs() {
         List<GenericValue> crashed = null;
-        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
+        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_PENDING"),
+                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
+                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
         EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
         List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
         List<String> pools = ServiceConfigUtil.getRunPools();
@@ -236,13 +233,13 @@ public final class JobManager {
         try {
             crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
         } catch (GenericEntityException e) {
-            Debug.logError(e, "Unable to load crashed jobs", module);
+            Debug.logWarning(e, "Unable to load crashed jobs", module);
         }
         if (UtilValidate.isNotEmpty(crashed)) {
-            try {
-                int rescheduled = 0;
-                for (GenericValue job : crashed) {
-                    Timestamp now = UtilDateTime.nowTimestamp();
+            int rescheduled = 0;
+            Timestamp now = UtilDateTime.nowTimestamp();
+            for (GenericValue job : crashed) {
+                try {
                     Debug.logInfo("Scheduling Job : " + job, module);
                     String pJobId = job.getString("parentJobId");
                     if (pJobId == null) {
@@ -261,12 +258,12 @@ public final class JobManager {
                     job.set("cancelDateTime", now);
                     delegator.store(job);
                     rescheduled++;
+                } catch (GenericEntityException e) {
+                    Debug.logWarning(e, module);
                 }
-                if (Debug.infoOn())
-                    Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
-            } catch (GenericEntityException e) {
-                Debug.logError(e, module);
             }
+            if (Debug.infoOn())
+                Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
         } else {
             if (Debug.infoOn())
                 Debug.logInfo("No crashed jobs to re-schedule", module);

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug  7 22:10:27 2012
@@ -203,7 +203,11 @@ public final class JobPoller implements
      */
     public void queueNow(Job job) throws InvalidJobException {
         job.queue();
-        this.executor.execute(new JobInvoker(job));
+        try {
+            this.executor.execute(new JobInvoker(job));
+        } catch (Exception e) {
+            job.deQueue();
+        }
     }
 
     public void run() {

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug  7 22:10:27 2012
@@ -20,7 +20,6 @@ package org.ofbiz.service.job;
 
 import java.io.IOException;
 import java.sql.Timestamp;
-import com.ibm.icu.util.Calendar;
 import java.util.Date;
 import java.util.Map;
 
@@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
 
 import javolution.util.FastMap;
 
+import org.apache.commons.lang.StringUtils;
 import org.ofbiz.base.util.Debug;
 import org.ofbiz.base.util.UtilDateTime;
 import org.ofbiz.base.util.UtilGenerics;
 import org.ofbiz.base.util.UtilProperties;
 import org.ofbiz.base.util.UtilValidate;
-import org.ofbiz.service.calendar.RecurrenceInfoException;
-import org.ofbiz.service.calendar.TemporalExpression;
-import org.ofbiz.service.calendar.TemporalExpressionWorker;
 import org.ofbiz.entity.Delegator;
 import org.ofbiz.entity.GenericEntityException;
 import org.ofbiz.entity.GenericValue;
@@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
 import org.ofbiz.service.GenericRequester;
 import org.ofbiz.service.ServiceUtil;
 import org.ofbiz.service.calendar.RecurrenceInfo;
+import org.ofbiz.service.calendar.RecurrenceInfoException;
+import org.ofbiz.service.calendar.TemporalExpression;
+import org.ofbiz.service.calendar.TemporalExpressionWorker;
 import org.ofbiz.service.config.ServiceConfigUtil;
 import org.xml.sax.SAXException;
 
-import org.apache.commons.lang.StringUtils;
+import com.ibm.icu.util.Calendar;
 
 /**
  * A {@link Job} that is backed by the entity engine. Job data is stored
@@ -100,14 +100,15 @@ public class PersistedServiceJob extends
             // job not available
             throw new InvalidJobException("Job [" + getJobId() + "] is not available");
         } else {
-            // set the start time to now
-            jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
-            jobValue.set("statusId", "SERVICE_RUNNING");
+            jobValue.set("statusId", "SERVICE_QUEUED");
             try {
                 jobValue.store();
             } catch (GenericEntityException e) {
                 throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e);
             }
+            if (Debug.verboseOn()) {
+                Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
+            }
         }
     }
 
@@ -129,6 +130,16 @@ public class PersistedServiceJob extends
             // This condition isn't possible, but we will leave it here.
             throw new InvalidJobException("Job has been accepted by a different instance!");
         }
+        jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
+        jobValue.set("statusId", "SERVICE_RUNNING");
+        try {
+            jobValue.store();
+        } catch (GenericEntityException e) {
+            throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e);
+        }
+        if (Debug.verboseOn()) {
+            Debug.logVerbose("Job [" + getJobId() + "] running", module);
+        }
         // configure any additional recurrences
         long maxRecurrenceCount = -1;
         long currentRecurrenceCount = 0;
@@ -331,4 +342,24 @@ public class PersistedServiceJob extends
         }
         return null;
     }
+
+    @Override
+    public void deQueue() throws InvalidJobException {
+        if (currentState != State.QUEUED) {
+            throw new InvalidJobException("Illegal state change");
+        }
+        currentState = State.CREATED;
+        try {
+            jobValue.refresh();
+            jobValue.set("startDateTime", null);
+            jobValue.set("runByInstanceId", null);
+            jobValue.set("statusId", "SERVICE_PENDING");
+            jobValue.store();
+        } catch (GenericEntityException e) {
+            throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
+        }
+        if (Debug.verboseOn()) {
+            Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
+        }
+    }
 }