|
Author: adrianc
Date: Sat Aug 4 12:00:07 2012 New Revision: 1369298 URL: http://svn.apache.org/viewvc?rev=1369298&view=rev Log: A bunch of fixes for JobManager and JobPoller: 1. Fixed some concurrency issues during object construction. 2. Cleaned up some feature-envy code. 3. Added code for orderly startup and shutdown. Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceContainer.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceContainer.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceContainer.java?rev=1369298&r1=1369297&r2=1369298&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceContainer.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceContainer.java Sat Aug 4 12:00:07 2012 @@ -18,16 +18,17 @@ *******************************************************************************/ package org.ofbiz.service; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + import org.ofbiz.base.container.Container; import org.ofbiz.base.container.ContainerConfig; import org.ofbiz.base.container.ContainerException; import org.ofbiz.base.util.Debug; import org.ofbiz.base.util.UtilValidate; import org.ofbiz.entity.Delegator; - -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import org.ofbiz.service.job.JobManager; /** * A container for the service engine. @@ -64,6 +65,7 @@ public class ServiceContainer implements @Override public void stop() throws ContainerException { + JobManager.shutDown(); Set<String> dispatcherNames = getAllDispatcherNames(); for (String dispatcherName: dispatcherNames) { deregister(dispatcherName); Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1369298&r1=1369297&r2=1369298&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Sat Aug 4 12:00:07 2012 @@ -21,14 +21,15 @@ package org.ofbiz.service.job; import java.io.IOException; import java.sql.Timestamp; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; import javolution.util.FastList; +import org.ofbiz.base.util.Assert; import org.ofbiz.base.util.Debug; -import org.ofbiz.base.util.GeneralRuntimeException; import org.ofbiz.base.util.UtilDateTime; import org.ofbiz.base.util.UtilMisc; import org.ofbiz.base.util.UtilProperties; @@ -54,18 +55,31 @@ import org.ofbiz.service.config.ServiceC /** * JobManager */ -public class JobManager { +public final class JobManager { public static final String module = JobManager.class.getName(); public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); - public static final Map<String, Object> updateFields = UtilMisc.<String, Object> toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED"); - private static final Map<String, JobManager> registeredManagers = new HashMap<String, JobManager>(); + public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED"); + private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>(); + private static boolean isShutDown = false; - public synchronized static JobManager getInstance(Delegator delegator, boolean enabled) { + private static void assertIsRunning() { + if (isShutDown) { + throw new IllegalStateException("OFBiz shutting down"); + } + } + + public static JobManager getInstance(Delegator delegator, boolean enablePoller) { + assertIsRunning(); + Assert.notNull("delegator", delegator); JobManager jm = JobManager.registeredManagers.get(delegator.getDelegatorName()); if (jm == null) { - jm = new JobManager(delegator, enabled); - JobManager.registeredManagers.put(delegator.getDelegatorName(), jm); + jm = new JobManager(delegator); + JobManager.registeredManagers.putIfAbsent(delegator.getDelegatorName(), jm); + jm = JobManager.registeredManagers.get(delegator.getDelegatorName()); + if (enablePoller) { + jm.enablePoller(); + } } return jm; } @@ -79,7 +93,6 @@ public class JobManager { return null; } GenericValue ri = job.getRelatedOne("RecurrenceInfo", false); - if (ri != null) { return new RecurrenceInfo(ri); } else { @@ -89,30 +102,35 @@ public class JobManager { return null; } } catch (GenericEntityException e) { - e.printStackTrace(); Debug.logError(e, "Problem getting RecurrenceInfo entity from JobSandbox", module); } catch (RecurrenceInfoException re) { - re.printStackTrace(); Debug.logError(re, "Problem creating RecurrenceInfo instance: " + re.getMessage(), module); } return null; } - protected Delegator delegator; - protected JobPoller jp; - - private JobManager(Delegator delegator, boolean enabled) { - if (delegator == null) { - throw new GeneralRuntimeException("ERROR: null delegator passed, cannot create JobManager"); + public static void shutDown() { + isShutDown = true; + for (JobManager jm : registeredManagers.values()) { + jm.shutdown(); } + } + + private final Delegator delegator; + private final JobPoller jp; + private boolean pollerEnabled = false; + + private JobManager(Delegator delegator) { this.delegator = delegator; - jp = new JobPoller(this, enabled); + jp = new JobPoller(this); } - @Override - public void finalize() throws Throwable { - this.shutdown(); - super.finalize(); + private synchronized void enablePoller() { + if (!pollerEnabled) { + pollerEnabled = true; + reloadCrashedJobs(); + jp.enable(); + } } /** Returns the Delegator. */ @@ -136,6 +154,7 @@ public class JobManager { } public synchronized List<Job> poll() { + assertIsRunning(); List<Job> poll = FastList.newInstance(); // sort the results by time List<String> order = UtilMisc.toList("runTime"); @@ -215,8 +234,7 @@ public class JobManager { return poll; } - public synchronized void reloadCrashedJobs() { - String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0"); + private void reloadCrashedJobs() { List<GenericValue> crashed = null; List<EntityExpr> exprs = UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId)); exprs.add(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")); @@ -231,8 +249,7 @@ public class JobManager { int rescheduled = 0; for (GenericValue job : crashed) { Timestamp now = UtilDateTime.nowTimestamp(); - Debug.log("Scheduling Job : " + job, module); - + Debug.logInfo("Scheduling Job : " + job, module); String pJobId = job.getString("parentJobId"); if (pJobId == null) { pJobId = job.getString("jobId"); @@ -245,12 +262,10 @@ public class JobManager { newJob.set("startDateTime", null); newJob.set("runByInstanceId", null); delegator.createSetNextSeqId(newJob); - // set the cancel time on the old job to the same as the re-schedule time job.set("statusId", "SERVICE_CRASHED"); job.set("cancelDateTime", now); delegator.store(job); - rescheduled++; } if (Debug.infoOn()) @@ -264,8 +279,12 @@ public class JobManager { } } - /** Queues a Job to run now. */ + /** Queues a Job to run now. + * @throws IllegalStateException if the Job Manager is shut down. + * @throws RejectedExecutionException if the poller is stopped. + */ public void runJob(Job job) throws JobManagerException { + assertIsRunning(); if (job.isValid()) { jp.queueNow(job); } @@ -400,10 +419,6 @@ public class JobManager { */ public void schedule(String jobName, String poolName, String serviceName, Map<String, ? extends Object> context, long startTime, int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException { - if (delegator == null) { - Debug.logWarning("No delegator referenced; cannot schedule job.", module); - return; - } // persist the context String dataId = null; try { @@ -445,13 +460,11 @@ public class JobManager { * The time in milliseconds the service should expire *@param maxRetry * The max number of retries on failure (-1 for no max) + * @throws IllegalStateException if the Job Manager is shut down. */ public void schedule(String jobName, String poolName, String serviceName, String dataId, long startTime, int frequency, int interval, int count, long endTime, int maxRetry) throws JobManagerException { - if (delegator == null) { - Debug.logWarning("No delegator referenced; cannot schedule job.", module); - return; - } + assertIsRunning(); // create the recurrence String infoId = null; if (frequency > -1 && count != 0) { @@ -491,11 +504,9 @@ public class JobManager { /** Close out the scheduler thread. */ public void shutdown() { - if (jp != null) { - Debug.logInfo("Stopping the JobManager...", module); - jp.stop(); - jp = null; - } + Debug.logInfo("Stopping the JobManager...", module); + registeredManagers.remove(delegator.getDelegatorName(), this); + jp.stop(); Debug.logInfo("JobManager stopped.", module); } Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1369298&r1=1369297&r2=1369298&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Sat Aug 4 12:00:07 2012 @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -35,7 +36,7 @@ import org.apache.commons.lang.math.Numb /** * JobPoller - Polls for persisted jobs to run. */ -public class JobPoller implements Runnable { +public final class JobPoller implements Runnable { public static final String module = JobPoller.class.getName(); public static final int MIN_THREADS = 1; @@ -43,12 +44,10 @@ public class JobPoller implements Runnab public static final int POLL_WAIT = 20000; public static final long THREAD_TTL = 18000000; - private JobManager jm = null; - private ThreadPoolExecutor executor = null; - private String name = null; - - protected JobPoller() { - } + private final JobManager jm; + private final ThreadPoolExecutor executor; + private final String name; + private boolean enabled = false; /** * Creates a new JobScheduler @@ -56,14 +55,16 @@ public class JobPoller implements Runnab * @param jm * JobManager associated with this scheduler */ - public JobPoller(JobManager jm, boolean enabled) { - this.name = (jm.getDelegator() != null ? jm.getDelegator().getDelegatorName() : "NA"); + public JobPoller(JobManager jm) { + this.name = jm.getDelegator().getDelegatorName(); this.jm = jm; this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy()); - if (enabled) { - // re-load crashed jobs - this.jm.reloadCrashedJobs(); + } + + public synchronized void enable() { + if (!enabled) { + enabled = true; // start the thread only if polling is enabled if (pollEnabled()) { // create the poller thread @@ -83,7 +84,7 @@ public class JobPoller implements Runnab } public Map<String, Object> getPoolState() { - Map poolState = new HashMap(); + Map<String, Object> poolState = new HashMap<String, Object>(); poolState.put("pollerName", this.name); poolState.put("pollerThreadName", "OFBiz-JobPoller-" + this.name); poolState.put("invokerThreadNameFormat", "OFBiz-JobInvoker-" + this.name + "-<SEQ>"); @@ -95,19 +96,17 @@ public class JobPoller implements Runnab poolState.put("greatestNumberOfInvokerThreads", this.executor.getLargestPoolSize()); poolState.put("numberOfCompletedTasks", this.executor.getCompletedTaskCount()); BlockingQueue<Runnable> queue = this.executor.getQueue(); - List taskList = new ArrayList(); - Map taskInfo = null; + List<Map<String, Object>> taskList = new ArrayList<Map<String, Object>>(); + Map<String, Object> taskInfo = null; for (Runnable task : queue) { - if (task instanceof JobInvoker) { - JobInvoker jobInvoker = (JobInvoker) task; - taskInfo = new HashMap(); - taskInfo.put("id", jobInvoker.getJobId()); - taskInfo.put("name", jobInvoker.getJobName()); - taskInfo.put("serviceName", jobInvoker.getServiceName()); - taskInfo.put("time", jobInvoker.getTime()); - taskInfo.put("runtime", jobInvoker.getCurrentRuntime()); - taskList.add(taskInfo); - } + JobInvoker jobInvoker = (JobInvoker) task; + taskInfo = new HashMap<String, Object>(); + taskInfo.put("id", jobInvoker.getJobId()); + taskInfo.put("name", jobInvoker.getJobName()); + taskInfo.put("serviceName", jobInvoker.getServiceName()); + taskInfo.put("time", jobInvoker.getTime()); + taskInfo.put("runtime", jobInvoker.getCurrentRuntime()); + taskList.add(taskInfo); } poolState.put("taskList", taskList); return poolState; @@ -147,12 +146,6 @@ public class JobPoller implements Runnab String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled"); if (enabled.equalsIgnoreCase("false")) return false; - - // also make sure we have a delegator to use for polling - if (jm.getDelegator() == null) { - Debug.logWarning("No delegator referenced; not starting job poller.", module); - return false; - } return true; } @@ -167,7 +160,8 @@ public class JobPoller implements Runnab } /** - * Adds a job to the RUN queue + * Adds a job to the RUN queue. + * @throws RejectedExecutionException if the poller is stopped. */ public void queueNow(Job job) { this.executor.execute(new JobInvoker(job)); @@ -197,6 +191,7 @@ public class JobPoller implements Runnab stop(); } } + Debug.logInfo("JobPoller " + this.name + " thread terminated.", module); } /** |
| Free forum by Nabble | Edit this page |
