|
Author: adrianc
Date: Sat Aug 4 21:24:27 2012 New Revision: 1369467 URL: http://svn.apache.org/viewvc?rev=1369467&view=rev Log: Refactored Job implementations so they implement a state machine. This approach makes the code much simpler. Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/AbstractJob.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/AbstractJob.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/AbstractJob.java?rev=1369467&r1=1369466&r2=1369467&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/AbstractJob.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/AbstractJob.java Sat Aug 4 21:24:27 2012 @@ -18,68 +18,43 @@ *******************************************************************************/ package org.ofbiz.service.job; +import org.ofbiz.base.util.Assert; + /** - * Abstract Service Job - Invokes a service + * Abstract Job. */ -@SuppressWarnings("serial") public abstract class AbstractJob implements Job { - public static final String module = AbstractJob.class.getName(); - - protected long runtime = -1; - protected long sequence = 0; - private String jobId; - private String jobName; - private boolean queued = false; + private final String jobId; + private final String jobName; + protected State currentState = State.CREATED; protected AbstractJob(String jobId, String jobName) { + Assert.notNull("jobId", jobId, "jobName", jobName); this.jobId = jobId; this.jobName = jobName; } - /** - * Returns the time to run in milliseconds. - */ - public long getRuntime() { - return runtime; - } - - /** - * Returns true if this job is still valid. - */ - public boolean isValid() { - if (runtime > 0) - return true; - return false; + @Override + public State currentState() { + return currentState; } - /** - * Returns the ID of this Job. - */ + @Override public String getJobId() { return this.jobId; } - /** - * Returns the name of this Job. - */ + @Override public String getJobName() { return this.jobName; } - /** - * Flags this job as 'is-queued' - */ + @Override public void queue() throws InvalidJobException { - this.queued = true; - } - - /** - * Executes the Job. - */ - public abstract void exec() throws InvalidJobException; - - public boolean isQueued() { - return queued; + if (currentState != State.CREATED) { + throw new InvalidJobException("Illegal state change"); + } + this.currentState = State.QUEUED; } } Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1369467&r1=1369466&r2=1369467&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Sat Aug 4 21:24:27 2012 @@ -18,43 +18,38 @@ *******************************************************************************/ package org.ofbiz.service.job; +import java.io.Serializable; import java.util.Map; +import org.ofbiz.base.util.Assert; import org.ofbiz.base.util.Debug; import org.ofbiz.service.DispatchContext; import org.ofbiz.service.GenericRequester; import org.ofbiz.service.LocalDispatcher; import org.ofbiz.service.ServiceUtil; +import org.ofbiz.service.job.Job.State; /** * Generic Service Job - A generic async-service Job. */ @SuppressWarnings("serial") -public class GenericServiceJob extends AbstractJob { +public class GenericServiceJob extends AbstractJob implements Serializable { public static final String module = GenericServiceJob.class.getName(); - protected transient GenericRequester requester = null; - protected transient DispatchContext dctx = null; - - private String service = null; - private Map<String, Object> context = null; + protected final transient GenericRequester requester; + protected final transient DispatchContext dctx; + private final String service; + private final Map<String, Object> context; + protected long runtime = System.currentTimeMillis(); public GenericServiceJob(DispatchContext dctx, String jobId, String jobName, String service, Map<String, Object> context, GenericRequester req) { super(jobId, jobName); + Assert.notNull("dctx", dctx); this.dctx = dctx; this.service = service; this.context = context; this.requester = req; - runtime = System.currentTimeMillis(); - } - - protected GenericServiceJob(String jobId, String jobName) { - super(jobId, jobName); - this.dctx = null; - this.requester = null; - this.service = null; - this.context = null; } /** @@ -62,6 +57,10 @@ public class GenericServiceJob extends A */ @Override public void exec() throws InvalidJobException { + if (currentState != State.QUEUED) { + throw new InvalidJobException("Illegal state change"); + } + currentState = State.RUNNING; init(); Map<String, Object> result = null; @@ -105,8 +104,11 @@ public class GenericServiceJob extends A * Method is called after the service has finished. */ protected void finish(Map<String, Object> result) throws InvalidJobException { + if (currentState != State.RUNNING) { + throw new InvalidJobException("Illegal state change"); + } + currentState = State.FINISHED; if (Debug.verboseOn()) Debug.logVerbose("Async-Service finished.", module); - runtime = 0; } /** @@ -114,8 +116,11 @@ public class GenericServiceJob extends A * @param t Throwable */ protected void failed(Throwable t) throws InvalidJobException { + if (currentState != State.RUNNING) { + throw new InvalidJobException("Illegal state change"); + } + currentState = State.FAILED; Debug.logError(t, "Async-Service failed.", module); - runtime = 0; } /** @@ -133,4 +138,14 @@ public class GenericServiceJob extends A protected String getServiceName() throws InvalidJobException { return service; } + + @Override + public long getRuntime() { + return runtime; + } + + @Override + public boolean isValid() { + return currentState == State.CREATED; + } } Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1369467&r1=1369466&r2=1369467&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Sat Aug 4 21:24:27 2012 @@ -18,41 +18,46 @@ *******************************************************************************/ package org.ofbiz.service.job; -import java.io.Serializable; - /** - * Job Interface + * A scheduled job. */ -public interface Job extends Serializable { +public interface Job { + + public static enum State {CREATED, QUEUED, RUNNING, FINISHED, FAILED}; + + /** + * Returns the current state of this job. + */ + State currentState(); /** - * Executes the Job. + * Executes this Job. */ - public void exec() throws InvalidJobException; + void exec() throws InvalidJobException; /** * Returns the ID of this Job. */ - public String getJobId(); + String getJobId(); /** * Returns the name of this Job. */ - public String getJobName(); + String getJobName(); /** * Returns the time to run in milliseconds. */ - public long getRuntime(); + long getRuntime(); /** - * Returns true if this job is still valid. + * Returns true if this job is ready to be queued. */ - public boolean isValid(); + boolean isValid(); /** - * Flags this job as 'is-queued' + * Transitions the job to the queued state. */ - public void queue() throws InvalidJobException; + void queue() throws InvalidJobException; } Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java?rev=1369467&r1=1369466&r2=1369467&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java Sat Aug 4 21:24:27 2012 @@ -20,6 +20,7 @@ package org.ofbiz.service.job; import java.util.Date; +import org.ofbiz.base.util.Assert; import org.ofbiz.base.util.Debug; import org.ofbiz.entity.transaction.GenericTransactionException; import org.ofbiz.entity.transaction.TransactionUtil; @@ -31,18 +32,15 @@ public class JobInvoker implements Runna public static final String module = JobInvoker.class.getName(); - private Date created = null; + private final Date created = new Date(); private long jobStart; - - private Job currentJob = null; + private final Job currentJob; public JobInvoker(Job job) { - this.created = new Date(); + Assert.notNull("job", job); this.currentJob = job; } - protected JobInvoker() {} - /** * Gets the time when this thread was created. * @return Time in milliseconds when this was created. @@ -69,11 +67,7 @@ public class JobInvoker implements Runna * @return String ID of the current running job. */ public String getJobId() { - if (this.currentJob != null) { - return this.currentJob.getJobId(); - } else { - return "WARNING: Invalid Job!"; - } + return this.currentJob.getJobId(); } /** @@ -81,11 +75,7 @@ public class JobInvoker implements Runna * @return String name of the current running job. */ public String getJobName() { - if (this.currentJob != null) { - return this.currentJob.getJobName(); - } else { - return "WARNING: Invalid Job!"; - } + return this.currentJob.getJobName(); } /** @@ -93,31 +83,26 @@ public class JobInvoker implements Runna * @return The name of the service being run. */ public String getServiceName() { - String serviceName = null; - if (this.currentJob != null) { - if (this.currentJob instanceof GenericServiceJob) { - GenericServiceJob gsj = (GenericServiceJob) this.currentJob; - try { - serviceName = gsj.getServiceName(); - } catch (InvalidJobException e) { - Debug.logError(e, module); - } + if (this.currentJob instanceof GenericServiceJob) { + GenericServiceJob gsj = (GenericServiceJob) this.currentJob; + try { + return gsj.getServiceName(); + } catch (InvalidJobException e) { + Debug.logWarning(e, module); } } - return serviceName; + return null; } public void run() { // setup the current job settings this.jobStart = System.currentTimeMillis(); - // execute the job try { this.currentJob.exec(); } catch (InvalidJobException e) { Debug.logWarning(e.getMessage(), module); } - // sanity check; make sure we don't have any transactions in place try { // roll back current TX first @@ -125,7 +110,6 @@ public class JobInvoker implements Runna Debug.logWarning("*** NOTICE: JobInvoker finished w/ a transaction in place! Rolling back.", module); TransactionUtil.rollback(); } - // now resume/rollback any suspended txs if (TransactionUtil.suspendedTransactionsHeld()) { int suspended = TransactionUtil.cleanSuspendedTransactions(); Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1369467&r1=1369466&r2=1369467&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Sat Aug 4 21:24:27 2012 @@ -155,13 +155,19 @@ public final class JobManager { public synchronized List<Job> poll() { assertIsRunning(); + DispatchContext dctx = getDispatcher().getDispatchContext(); + if (dctx == null) { + Debug.logError("Unable to locate DispatchContext object; not running job!", module); + return null; + } List<Job> poll = FastList.newInstance(); // sort the results by time List<String> order = UtilMisc.toList("runTime"); // basic query List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime.nowTimestamp()), - EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null), EntityCondition.makeCondition("cancelDateTime", - EntityOperator.EQUALS, null), EntityCondition.makeCondition("runByInstanceId", EntityOperator.EQUALS, null)); + EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null), + EntityCondition.makeCondition("cancelDateTime", EntityOperator.EQUALS, null), + EntityCondition.makeCondition("runByInstanceId", EntityOperator.EQUALS, null)); // limit to just defined pools List<String> pools = ServiceConfigUtil.getRunPools(); List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null)); @@ -174,62 +180,41 @@ public final class JobManager { EntityCondition baseCondition = EntityCondition.makeCondition(expressions); EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR); EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(baseCondition, poolCondition)); - // we will loop until we have no more to do - boolean pollDone = false; - while (!pollDone) { - boolean beganTransaction = false; - try { - beganTransaction = TransactionUtil.begin(); - if (!beganTransaction) { - Debug.logError("Unable to poll for jobs; transaction was not started by this process", module); - return null; - } - List<Job> localPoll = FastList.newInstance(); - // first update the jobs w/ this instance running information - delegator.storeByCondition("JobSandbox", updateFields, mainCondition); - // now query all the 'queued' jobs for this instance - List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false); - // jobEnt = delegator.findByCondition("JobSandbox", mainCondition, null, order); - if (UtilValidate.isNotEmpty(jobEnt)) { - for (GenericValue v : jobEnt) { - DispatchContext dctx = getDispatcher().getDispatchContext(); - if (dctx == null) { - Debug.logError("Unable to locate DispatchContext object; not running job!", module); - continue; - } - Job job = new PersistedServiceJob(dctx, v, null); // TODO fix the requester - try { - job.queue(); - localPoll.add(job); - } catch (InvalidJobException e) { - Debug.logError(e, module); - } - } - } else { - pollDone = true; - } - // nothing should go wrong at this point, so add to the general list - poll.addAll(localPoll); - } catch (Throwable t) { - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation - String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction."; - Debug.logError(t, errMsg, module); - try { - // only rollback the transaction if we started one... - TransactionUtil.rollback(beganTransaction, errMsg, t); - } catch (GenericEntityException e2) { - Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); - } - } finally { - try { - // only commit the transaction if we started one... but make sure we try - TransactionUtil.commit(beganTransaction); - } catch (GenericTransactionException e) { - String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString(); - // we don't really want to do anything different, so just log and move on - Debug.logError(e, errMsg, module); + boolean beganTransaction = false; + try { + beganTransaction = TransactionUtil.begin(); + if (!beganTransaction) { + Debug.logError("Unable to poll for jobs; transaction was not started by this process", module); + return null; + } + // first update the jobs w/ this instance running information + delegator.storeByCondition("JobSandbox", updateFields, mainCondition); + // now query all the 'queued' jobs for this instance + List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false); + if (UtilValidate.isNotEmpty(jobEnt)) { + for (GenericValue v : jobEnt) { + poll.add(new PersistedServiceJob(dctx, v, null)); // TODO fix the requester } } + } catch (Throwable t) { + // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation + String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction."; + Debug.logError(t, errMsg, module); + try { + // only rollback the transaction if we started one... + TransactionUtil.rollback(beganTransaction, errMsg, t); + } catch (GenericEntityException e2) { + Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module); + } + } finally { + try { + // only commit the transaction if we started one... but make sure we try + TransactionUtil.commit(beganTransaction); + } catch (GenericTransactionException e) { + String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString(); + // we don't really want to do anything different, so just log and move on + Debug.logError(e, errMsg, module); + } } return poll; } Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1369467&r1=1369466&r2=1369467&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Sat Aug 4 21:24:27 2012 @@ -161,9 +161,11 @@ public final class JobPoller implements /** * Adds a job to the RUN queue. + * @throws InvalidJobException if the job is in an invalid state. * @throws RejectedExecutionException if the poller is stopped. */ - public void queueNow(Job job) { + public void queueNow(Job job) throws InvalidJobException { + job.queue(); this.executor.execute(new JobInvoker(job)); } @@ -177,11 +179,11 @@ public final class JobPoller implements try { // grab a list of jobs to run. List<Job> pollList = jm.poll(); - // Debug.logInfo("Received poll list from JobManager [" + pollList.size() + "]", module); for (Job job : pollList) { - if (job.isValid()) { + try { queueNow(job); - // Debug.logInfo("Job [" + job.getJobId() + "] is queued", module); + } catch (InvalidJobException e) { + Debug.logError(e, module); } } // NOTE: using sleep instead of wait for stricter locking Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1369467&r1=1369466&r2=1369467&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Sat Aug 4 21:24:27 2012 @@ -52,7 +52,8 @@ import org.xml.sax.SAXException; import org.apache.commons.lang.StringUtils; /** - * Entity Service Job - Store => Schedule => Run + * A {@link Job} that is backed by the entity engine. Job data is stored + * in the JobSandbox entity. */ @SuppressWarnings("serial") public class PersistedServiceJob extends GenericServiceJob { @@ -60,7 +61,6 @@ public class PersistedServiceJob extends public static final String module = PersistedServiceJob.class.getName(); private transient Delegator delegator = null; - private Timestamp storedDate = null; private long nextRecurrence = -1; private long maxRetry = -1; private long currentRetryCount = 0; @@ -73,11 +73,9 @@ public class PersistedServiceJob extends * @param req */ public PersistedServiceJob(DispatchContext dctx, GenericValue jobValue, GenericRequester req) { - super(jobValue.getString("jobId"), jobValue.getString("jobName")); + super(dctx, jobValue.getString("jobId"), jobValue.getString("jobName"), null, null, req); this.delegator = dctx.getDelegator(); - this.requester = req; - this.dctx = dctx; - this.storedDate = jobValue.getTimestamp("runTime"); + Timestamp storedDate = jobValue.getTimestamp("runTime"); this.runtime = storedDate.getTime(); this.maxRetry = jobValue.get("maxRetry") != null ? jobValue.getLong("maxRetry").longValue() : -1; Long retryCount = jobValue.getLong("currentRetryCount"); @@ -87,51 +85,36 @@ public class PersistedServiceJob extends // backward compatibility this.currentRetryCount = PersistedServiceJob.getRetries(jobValue, this.delegator); } - - // Debug.logInfo("=============== New PersistedServiceJob, delegator from dctx is [" + dctx.getDelegator().getDelegatorName() + "] and delegator from jobValue is [" + jobValue.getDelegator().getDelegatorName() + "]", module); } @Override public void queue() throws InvalidJobException { super.queue(); - // refresh the job object GenericValue jobValue = null; try { jobValue = this.getJob(); jobValue.refresh(); } catch (GenericEntityException e) { - runtime = -1; - throw new InvalidJobException("Unable to refresh Job object", e); + throw new InvalidJobException("Unable to refresh JobSandbox value", e); } - - // make sure it isn't already set/cancelled - if (runtime != -1) { - Timestamp cancelTime = jobValue.getTimestamp("cancelDateTime"); - Timestamp startTime = jobValue.getTimestamp("startDateTime"); - if (cancelTime != null || startTime != null) { - // job not available - runtime = -1; - throw new InvalidJobException("Job [" + getJobId() + "] is not available"); - - } else { - // set the start time to now - jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); - jobValue.set("statusId", "SERVICE_RUNNING"); - try { - jobValue.store(); - } catch (GenericEntityException e) { - runtime = -1; - throw new InvalidJobException("Unable to set the startDateTime on the current job [" + getJobId() + "]; not running!", e); - - } + Timestamp cancelTime = jobValue.getTimestamp("cancelDateTime"); + Timestamp startTime = jobValue.getTimestamp("startDateTime"); + if (cancelTime != null || startTime != null) { + // job not available + throw new InvalidJobException("Job [" + getJobId() + "] is not available"); + } else { + // set the start time to now + jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); + jobValue.set("statusId", "SERVICE_RUNNING"); + try { + jobValue.store(); + } catch (GenericEntityException e) { + throw new InvalidJobException("Unable to set the startDateTime on the current job [" + getJobId() + "]; not running!", e); } } } - /** - * @see org.ofbiz.service.job.GenericServiceJob#init() - */ @Override protected void init() throws InvalidJobException { super.init(); @@ -217,19 +200,13 @@ public class PersistedServiceJob extends } } - /** - * @see org.ofbiz.service.job.GenericServiceJob#finish(Map) - */ @Override protected void finish(Map<String, Object> result) throws InvalidJobException { super.finish(result); // set the finish date GenericValue job = getJob(); - String status = job.getString("statusId"); - if (status == null || "SERVICE_RUNNING".equals(status)) { - job.set("statusId", "SERVICE_FINISHED"); - } + job.set("statusId", "SERVICE_FINISHED"); job.set("finishDateTime", UtilDateTime.nowTimestamp()); String jobResult = null; if (ServiceUtil.isError(result)) { @@ -247,9 +224,6 @@ public class PersistedServiceJob extends } } - /** - * @see org.ofbiz.service.job.GenericServiceJob#failed(Throwable) - */ @Override protected void failed(Throwable t) throws InvalidJobException { super.failed(t); @@ -284,9 +258,6 @@ public class PersistedServiceJob extends } } - /** - * @see org.ofbiz.service.job.GenericServiceJob#getServiceName() - */ @Override protected String getServiceName() throws InvalidJobException { GenericValue jobObj = getJob(); @@ -296,9 +267,6 @@ public class PersistedServiceJob extends return jobObj.getString("serviceName"); } - /** - * @see org.ofbiz.service.job.GenericServiceJob#getContext() - */ @Override protected Map<String, Object> getContext() throws InvalidJobException { Map<String, Object> context = null; @@ -341,7 +309,6 @@ public class PersistedServiceJob extends private GenericValue getJob() throws InvalidJobException { try { GenericValue jobObj = delegator.findOne("JobSandbox", false, "jobId", getJobId()); - if (jobObj == null) { throw new InvalidJobException("Job [" + getJobId() + "] came back null from datasource from delegator " + delegator.getDelegatorName()); } |
| Free forum by Nabble | Edit this page |
