|
Author: adrianc
Date: Sat Aug 18 15:21:14 2012 New Revision: 1374596 URL: http://svn.apache.org/viewvc?rev=1374596&view=rev Log: Refactored JobPoller.java - the JobPoller instance is now a singleton. Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java?rev=1374596&r1=1374595&r2=1374596&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java Sat Aug 18 15:21:14 2012 @@ -834,8 +834,6 @@ public class ServiceDispatcher { // shutdown JMS listeners jlf.closeListeners(); } - // shutdown the job scheduler - jm.shutdown(); } // checks if parameters were passed for authentication Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1374596&r1=1374595&r2=1374596&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Sat Aug 18 15:21:14 2012 @@ -59,6 +59,10 @@ import com.ibm.icu.util.Calendar; * {@link #runJob(Job)} method, or schedule a job to be run later by calling the * {@link #schedule(String, String, String, Map, long, int, int, int, long, int)} method. * Scheduled jobs are persisted in the JobSandbox entity. + * <p>A scheduled job's start time is an approximation - the actual start time will depend + * on the job manager/job poller configuration (poll interval) and the load on the server. + * Scheduled jobs might be rescheduled if the server is busy. Therefore, applications + * requiring a precise job start time should use a different method to schedule the job.</p> */ public final class JobManager { @@ -82,13 +86,14 @@ public final class JobManager { public static JobManager getInstance(Delegator delegator, boolean enablePoller) { assertIsRunning(); Assert.notNull("delegator", delegator); - JobManager jm = JobManager.registeredManagers.get(delegator.getDelegatorName()); + JobManager jm = registeredManagers.get(delegator.getDelegatorName()); if (jm == null) { jm = new JobManager(delegator); - JobManager.registeredManagers.putIfAbsent(delegator.getDelegatorName(), jm); - jm = JobManager.registeredManagers.get(delegator.getDelegatorName()); + registeredManagers.putIfAbsent(delegator.getDelegatorName(), jm); + jm = registeredManagers.get(delegator.getDelegatorName()); if (enablePoller) { - jm.enablePoller(); + jm.reloadCrashedJobs(); + JobPoller.registerJobManager(jm); } } return jm; @@ -99,26 +104,14 @@ public final class JobManager { */ public static void shutDown() { isShutDown = true; - for (JobManager jm : registeredManagers.values()) { - jm.shutdown(); - } + JobPoller.getInstance().stop(); } private final Delegator delegator; - private final JobPoller jp; - private boolean pollerEnabled = false; + private boolean crashedJobsReloaded = false; private JobManager(Delegator delegator) { this.delegator = delegator; - jp = new JobPoller(this); - } - - private synchronized void enablePoller() { - if (!pollerEnabled) { - pollerEnabled = true; - reloadCrashedJobs(); - jp.enable(); - } } /** Returns the Delegator. */ @@ -138,7 +131,7 @@ public final class JobManager { * @return List containing a Map of each thread's state. */ public Map<String, Object> getPoolState() { - return jp.getPoolState(); + return JobPoller.getInstance().getPoolState(); } /** @@ -272,7 +265,11 @@ public final class JobManager { return poll; } - private void reloadCrashedJobs() { + private synchronized void reloadCrashedJobs() { + assertIsRunning(); + if (crashedJobsReloaded) { + return; + } List<GenericValue> crashed = null; List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_PENDING"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), @@ -317,6 +314,7 @@ public final class JobManager { if (Debug.infoOn()) Debug.logInfo("No crashed jobs to re-schedule", module); } + crashedJobsReloaded = true; } /** Queues a Job to run now. @@ -326,7 +324,7 @@ public final class JobManager { public void runJob(Job job) throws JobManagerException { assertIsRunning(); if (job.isValid()) { - jp.queueNow(job); + JobPoller.getInstance().queueNow(job); } } @@ -541,13 +539,4 @@ public final class JobManager { throw new JobManagerException(e.getMessage(), e); } } - - /** Close out the scheduler thread. */ - public void shutdown() { - Debug.logInfo("Stopping the JobManager...", module); - registeredManagers.remove(delegator.getDelegatorName(), this); - jp.stop(); - Debug.logInfo("JobManager stopped.", module); - } - } Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1374596&r1=1374595&r2=1374596&view=diff ============================================================================== --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original) +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Sat Aug 18 15:21:14 2012 @@ -19,10 +19,13 @@ package org.ofbiz.service.job; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; @@ -30,13 +33,14 @@ import java.util.concurrent.ThreadPoolEx import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.ofbiz.base.util.Assert; import org.ofbiz.base.util.Debug; import org.ofbiz.service.config.ServiceConfigUtil; /** * Job poller. Queues and runs jobs. */ -public final class JobPoller implements Runnable { +public final class JobPoller { public static final String module = JobPoller.class.getName(); private static final AtomicInteger created = new AtomicInteger(); @@ -45,76 +49,19 @@ public final class JobPoller implements private static final int POLL_WAIT = 30000; // Database polling interval - 30 seconds. private static final int QUEUE_SIZE = 100; private static final long THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes. - - private final JobManager jm; - private final ThreadPoolExecutor executor; - private final String name; - private boolean enabled = false; + private static final ConcurrentHashMap<String, JobManager> jobManagers = new ConcurrentHashMap<String, JobManager>(); + private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); + private static final JobPoller instance = new JobPoller(); /** - * Creates a new JobScheduler - * - * @param jm - * JobManager associated with this scheduler + * Returns the <code>JobPoller</code> instance. */ - public JobPoller(JobManager jm) { - this.name = jm.getDelegator().getDelegatorName(); - this.jm = jm; - this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()), - new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy()); - } - - public synchronized void enable() { - if (!enabled) { - enabled = true; - // start the thread only if polling is enabled - if (pollEnabled()) { - // create the poller thread - Thread thread = new Thread(this, "OFBiz-JobPoller-" + this.name); - thread.setDaemon(false); - // start the poller - thread.start(); - } - } + public static JobPoller getInstance() { + return instance; } - /** - * Returns the JobManager - */ - public JobManager getManager() { - return jm; - } - - public Map<String, Object> getPoolState() { - Map<String, Object> poolState = new HashMap<String, Object>(); - poolState.put("pollerName", this.name); - poolState.put("pollerThreadName", "OFBiz-JobPoller-" + this.name); - poolState.put("invokerThreadNameFormat", "OFBiz-JobInvoker-" + this.name + "-<SEQ>"); - poolState.put("keepAliveTimeInSeconds", this.executor.getKeepAliveTime(TimeUnit.SECONDS)); - poolState.put("numberOfCoreInvokerThreads", this.executor.getCorePoolSize()); - poolState.put("currentNumberOfInvokerThreads", this.executor.getPoolSize()); - poolState.put("numberOfActiveInvokerThreads", this.executor.getActiveCount()); - poolState.put("maxNumberOfInvokerThreads", this.executor.getMaximumPoolSize()); - poolState.put("greatestNumberOfInvokerThreads", this.executor.getLargestPoolSize()); - poolState.put("numberOfCompletedTasks", this.executor.getCompletedTaskCount()); - BlockingQueue<Runnable> queue = this.executor.getQueue(); - List<Map<String, Object>> taskList = new ArrayList<Map<String, Object>>(); - Map<String, Object> taskInfo = null; - for (Runnable task : queue) { - JobInvoker jobInvoker = (JobInvoker) task; - taskInfo = new HashMap<String, Object>(); - taskInfo.put("id", jobInvoker.getJobId()); - taskInfo.put("name", jobInvoker.getJobName()); - taskInfo.put("serviceName", jobInvoker.getServiceName()); - taskInfo.put("time", jobInvoker.getTime()); - taskInfo.put("runtime", jobInvoker.getCurrentRuntime()); - taskList.add(taskInfo); - } - poolState.put("taskList", taskList); - return poolState; - } - - private long getTTL() { + private static long getTTL() { String threadTTLAttr = ServiceConfigUtil.getElementAttr("thread-pool", "ttl"); if (!threadTTLAttr.isEmpty()) { try { @@ -129,7 +76,7 @@ public final class JobPoller implements return THREAD_TTL; } - private int maxThreads() { + private static int maxThreads() { String maxThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "max-threads"); if (!maxThreadsAttr.isEmpty()) { try { @@ -144,7 +91,7 @@ public final class JobPoller implements return MAX_THREADS; } - private int minThreads() { + private static int minThreads() { String minThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "min-threads"); if (!minThreadsAttr.isEmpty()) { try { @@ -159,14 +106,7 @@ public final class JobPoller implements return MIN_THREADS; } - private boolean pollEnabled() { - String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled"); - if (enabled.equalsIgnoreCase("false")) - return false; - return true; - } - - private int pollWaitTime() { + private static int pollWaitTime() { String pollIntervalAttr = ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis"); if (!pollIntervalAttr.isEmpty()) { try { @@ -181,7 +121,7 @@ public final class JobPoller implements return POLL_WAIT; } - private int queueSize() { + private static int queueSize() { String queueSizeAttr = ServiceConfigUtil.getElementAttr("thread-pool", "jobs"); if (!queueSizeAttr.isEmpty()) { try { @@ -197,51 +137,88 @@ public final class JobPoller implements } /** - * Adds a job to the RUN queue. + * Register a {@link JobManager} with the job poller. + * + * @param jm The <code>JobManager</code> to register. + * @throws IllegalArgumentException if <code>jm</code> is null + */ + public static void registerJobManager(JobManager jm) { + Assert.notNull("jm", jm); + jobManagers.putIfAbsent(jm.getDelegator().getDelegatorName(), jm); + } + + // -------------------------------------- // + + private final Thread jobManagerPollerThread; + + private JobPoller() { + if (pollEnabled()) { + jobManagerPollerThread = new Thread(new JobManagerPoller(), "OFBiz-JobPoller"); + jobManagerPollerThread.setDaemon(false); + jobManagerPollerThread.start(); + } else { + jobManagerPollerThread = null; + } + } + + /** + * Returns a <code>Map</code> containing <code>JobPoller</code> statistics. + */ + public Map<String, Object> getPoolState() { + Map<String, Object> poolState = new HashMap<String, Object>(); + poolState.put("keepAliveTimeInSeconds", executor.getKeepAliveTime(TimeUnit.SECONDS)); + poolState.put("numberOfCoreInvokerThreads", executor.getCorePoolSize()); + poolState.put("currentNumberOfInvokerThreads", executor.getPoolSize()); + poolState.put("numberOfActiveInvokerThreads", executor.getActiveCount()); + poolState.put("maxNumberOfInvokerThreads", executor.getMaximumPoolSize()); + poolState.put("greatestNumberOfInvokerThreads", executor.getLargestPoolSize()); + poolState.put("numberOfCompletedTasks", executor.getCompletedTaskCount()); + BlockingQueue<Runnable> queue = executor.getQueue(); + List<Map<String, Object>> taskList = new ArrayList<Map<String, Object>>(); + Map<String, Object> taskInfo = null; + for (Runnable task : queue) { + JobInvoker jobInvoker = (JobInvoker) task; + taskInfo = new HashMap<String, Object>(); + taskInfo.put("id", jobInvoker.getJobId()); + taskInfo.put("name", jobInvoker.getJobName()); + taskInfo.put("serviceName", jobInvoker.getServiceName()); + taskInfo.put("time", jobInvoker.getTime()); + taskInfo.put("runtime", jobInvoker.getCurrentRuntime()); + taskList.add(taskInfo); + } + poolState.put("taskList", taskList); + return poolState; + } + + private boolean pollEnabled() { + String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled"); + return !"false".equalsIgnoreCase(enabled); + } + + /** + * Adds a job to the job queue. * @throws InvalidJobException if the job is in an invalid state. * @throws RejectedExecutionException if the poller is stopped. */ public void queueNow(Job job) throws InvalidJobException { job.queue(); try { - this.executor.execute(new JobInvoker(job)); + executor.execute(new JobInvoker(job)); } catch (Exception e) { job.deQueue(); } } - public void run() { - try { - // wait 30 seconds before the first poll - Thread.sleep(30000); - while (!executor.isShutdown()) { - int remainingCapacity = executor.getQueue().remainingCapacity(); - if (remainingCapacity > 0) { - List<Job> pollList = jm.poll(remainingCapacity); - for (Job job : pollList) { - try { - queueNow(job); - } catch (InvalidJobException e) { - Debug.logError(e, module); - } - } - } - Thread.sleep(pollWaitTime()); - } - } catch (InterruptedException e) { - Debug.logError(e, module); - stop(); - Thread.currentThread().interrupt(); - } - Debug.logInfo("JobPoller " + this.name + " thread terminated.", module); - } - /** - * Stops the JobPoller + * Stops the <code>JobPoller</code>. This method is called when OFBiz shuts down. + * The <code>JobPoller</code> cannot be restarted. */ - void stop() { - Debug.logInfo("Shutting down thread pool for JobPoller " + this.name, module); - List<Runnable> queuedJobs = this.executor.shutdownNow(); + public void stop() { + Debug.logInfo("Shutting down JobPoller.", module); + if (jobManagerPollerThread != null) { + jobManagerPollerThread.interrupt(); + } + List<Runnable> queuedJobs = executor.shutdownNow(); for (Runnable task : queuedJobs) { try { Job queuedJob = (Job) task; @@ -250,19 +227,64 @@ public final class JobPoller implements Debug.logWarning(e, module); } } - Debug.logInfo("Shutdown completed of thread pool for JobPoller " + this.name, module); + Debug.logInfo("JobPoller shutdown completed.", module); } + private static class JobInvokerThreadFactory implements ThreadFactory { - private class JobInvokerThreadFactory implements ThreadFactory { - private final String poolName; - - public JobInvokerThreadFactory(String poolName) { - this.poolName = poolName; + public Thread newThread(Runnable runnable) { + return new Thread(runnable, "OFBiz-JobQueue-" + created.getAndIncrement()); } + } - public Thread newThread(Runnable runnable) { - return new Thread(runnable, "OFBiz-JobQueue-" + poolName + "-" + created.getAndIncrement()); + // Polls all registered JobManagers for jobs to queue. + private class JobManagerPoller implements Runnable { + + // Do not check for interrupts in this method. The design requires the + // thread to complete the job manager poll uninterrupted. + public void run() { + Debug.logInfo("JobPoller thread started.", module); + try { + // wait 30 seconds before the first poll + Thread.sleep(30000); + while (!executor.isShutdown()) { + int remainingCapacity = executor.getQueue().remainingCapacity(); + if (remainingCapacity > 0) { + // Build "list of lists" + Collection<JobManager> jmCollection = jobManagers.values(); + List<Iterator<Job>> pollResults = new ArrayList<Iterator<Job>>(); + for (JobManager jm : jmCollection) { + pollResults.add(jm.poll(remainingCapacity).iterator()); + } + // Create queue candidate list from "list of lists" + List<Job> queueCandidates = new ArrayList<Job>(); + boolean addingJobs = true; + while (addingJobs) { + addingJobs = false; + for (Iterator<Job> jobIterator : pollResults) { + if (jobIterator.hasNext()) { + queueCandidates.add(jobIterator.next()); + addingJobs = true; + } + } + } + // The candidate list might be larger than the queue remaining capacity, + // but that is okay - the excess jobs will be dequeued and rescheduled. + for (Job job : queueCandidates) { + try { + queueNow(job); + } catch (InvalidJobException e) { + Debug.logError(e, module); + } + } + } + Thread.sleep(pollWaitTime()); + } + } catch (InterruptedException e) { + // Happens when JobPoller shuts down - nothing to do. + Thread.currentThread().interrupt(); + } + Debug.logInfo("JobPoller thread stopped.", module); } } } |
| Free forum by Nabble | Edit this page |
