Mercurial > hg > mpdl-group
view software/eXist/mpdl-modules/src/de/mpg/mpiwg/berlin/mpdl/schedule/MpdlChainScheduler.java @ 0:408254cf2f1d
Erstellung
author | Josef Willenborg <jwillenborg@mpiwg-berlin.mpg.de> |
---|---|
date | Wed, 24 Nov 2010 17:24:23 +0100 |
parents | |
children |
line wrap: on
line source
package de.mpg.mpiwg.berlin.mpdl.schedule; import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; import org.apache.log4j.Logger; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobListener; import org.quartz.SchedulerException; import org.quartz.SimpleTrigger; import org.quartz.Trigger; import org.quartz.impl.StdSchedulerFactory; import de.mpg.mpiwg.berlin.mpdl.exception.ApplicationException; public class MpdlChainScheduler { private static MpdlChainScheduler instance; private static String CRUD_JOB = "MPDL_CRUD_JOB"; private static String CRUD_TRIGGER = "MPDL_CRUD_TRIGGER"; private static String CRUD_GROUP = "MPDL_CRUD_GROUP"; private static Logger LOGGER = Logger.getLogger(MpdlChainScheduler.class); // Logs to EXIST_HOME/webapp/WEB-INF/logs/exist.log private org.quartz.Scheduler scheduler; private JobListener jobListener; private Queue<MpdlDocOperation> docOperationQueue = new PriorityQueue<MpdlDocOperation>(); private HashMap<Integer, MpdlDocOperation> finishedDocOperations = new HashMap<Integer, MpdlDocOperation>(); private boolean operationInProgress = false; private int jobOrderId = 0; public static MpdlChainScheduler getInstance() throws ApplicationException { if (instance == null) { instance = new MpdlChainScheduler(); instance.init(); } return instance; } public MpdlDocOperation doOperation(MpdlDocOperation docOperation) throws ApplicationException { jobOrderId++; docOperation.setOrderId(jobOrderId); queueOperation(docOperation); scheduleNextOperation(); return docOperation; } public void finishOperation(MpdlDocOperation docOperation) throws ApplicationException { operationInProgress = false; Date now = new Date(); docOperation.setEnd(now); docOperation.setStatus("finished"); int jobId = new Integer(docOperation.getOrderId()); finishedDocOperations.put(jobId, docOperation); log(docOperation); // schedule next job if there is one scheduleNextOperation(); } private void log(MpdlDocOperation docOperation) { Date startTime = docOperation.getStart(); Date endTime = docOperation.getEnd(); long executionTime = -1; if (startTime != null && endTime != null) executionTime = (endTime.getTime() - startTime.getTime()); String jobInfo = "MPDL: Document operation " + docOperation.toString() + ": started at: " + startTime + " and ended at: " + endTime + " (needed time: " + executionTime + " ms)"; LOGGER.info(jobInfo); } public synchronized void scheduleNextOperation() throws ApplicationException { if (isOperationInProgress()) { // nothing, operation has to wait } else { MpdlDocOperation docOperation = docOperationQueue.poll(); if (docOperation == null) { // if queue is empty then do nothing (there are no more operations to execute) } else { Date now = new Date(); operationInProgress = true; docOperation.setStart(now); scheduleJob(docOperation, now); } } } public ArrayList<MpdlDocOperation> getDocOperations() throws ApplicationException { ArrayList<MpdlDocOperation> docOperations = new ArrayList<MpdlDocOperation>(); try { // first: all finished jobs Collection<MpdlDocOperation> finiDocOperations = finishedDocOperations.values(); docOperations.addAll(finiDocOperations); // second: all currently executed jobs if (operationInProgress) { List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs(); Iterator<JobExecutionContext> iter = currentJobs.iterator(); while (iter.hasNext()) { JobExecutionContext jobExecutionContext = iter.next(); MpdlDocOperation docOperation = getDocOperation(jobExecutionContext); if (docOperation != null) { docOperations.add(docOperation); } } } // third: all queued jobs Iterator<MpdlDocOperation> iter = docOperationQueue.iterator(); while (iter.hasNext()) { MpdlDocOperation docOperation = iter.next(); docOperations.add(docOperation); } } catch (SchedulerException e) { LOGGER.error(e.getMessage()); throw new ApplicationException(e); } return docOperations; } public MpdlDocOperation getDocOperation(int jobId) throws ApplicationException { MpdlDocOperation docOperation = null; try { // first try: looks into currently executing jobs if (operationInProgress) { List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs(); Iterator<JobExecutionContext> iter = currentJobs.iterator(); while (iter.hasNext()) { JobExecutionContext jobExecutionContext = iter.next(); docOperation = getDocOperation(jobExecutionContext); if (docOperation != null) { int dopOpJobId = docOperation.getOrderId(); if (jobId == dopOpJobId) return docOperation; } } } // second try: look into finished jobs docOperation = finishedDocOperations.get(new Integer(jobId)); if (docOperation != null) { return docOperation; } // third try: look into queued jobs Iterator<MpdlDocOperation> iter = docOperationQueue.iterator(); while (iter.hasNext()) { docOperation = iter.next(); if (docOperation.getOrderId() == jobId) return docOperation; } } catch (SchedulerException e) { LOGGER.error(e.getMessage()); throw new ApplicationException(e); } // if not found return null return null; } public MpdlDocOperation getDocOperation(JobExecutionContext jobExecutionContext) { MpdlDocOperation docOperation = null; if (jobExecutionContext != null) { JobDetail job = jobExecutionContext.getJobDetail(); JobDataMap parameters = job.getJobDataMap(); docOperation = (MpdlDocOperation) parameters.get("operation"); } return docOperation; } private void queueOperation(MpdlDocOperation docOperation) { int operationsBefore = docOperationQueue.size(); if (operationsBefore == 0) docOperation.setStatus("waiting in operation queue"); else docOperation.setStatus("waiting in operation queue: " + operationsBefore + " operations heve to be executed before this operation"); docOperationQueue.offer(docOperation); } private synchronized boolean isOperationInProgress() { return operationInProgress; } private void scheduleJob(MpdlDocOperation docOperation, Date fireTime) throws ApplicationException { try { int jobId = docOperation.getOrderId(); String jobName = CRUD_JOB + "-id-" + jobId + "-timeId-" + fireTime; JobDetail job = new JobDetail(jobName, CRUD_GROUP, MpdlDocJob.class); JobDataMap parameters = new JobDataMap(); parameters.put("operation", docOperation); job.setJobDataMap(parameters); job.addJobListener(jobListener.getName()); String triggerName = CRUD_TRIGGER + "-id-" + jobId + "-timeId-" + fireTime; Trigger trigger = new SimpleTrigger(triggerName, CRUD_GROUP, fireTime); scheduler.scheduleJob(job, trigger); String jobInfo = "MPDL: Schedule document operation: " + docOperation.toString() + ": done at: " + fireTime.toString(); LOGGER.info(jobInfo); } catch (SchedulerException e) { LOGGER.error(e.getMessage()); throw new ApplicationException(e); } } private void init() throws ApplicationException { try { if (scheduler == null) { String quartzPath = getQuartzPath(); StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzPath); scheduler = schedulerFactory.getScheduler(); jobListener = new MpdlChainSchedulerListener(); scheduler.addJobListener(jobListener); scheduler.start(); LOGGER.info("MPDL: Started Quartz scheduler factory: " + quartzPath); } } catch (SchedulerException e) { LOGGER.error(e.getMessage()); throw new ApplicationException(e); } } public void end() throws ApplicationException { try { if (scheduler != null) { scheduler.shutdown(); } String quartzPath = getQuartzPath(); LOGGER.info("MPDL: Ended Quartz scheduler factory: " + quartzPath); } catch (SchedulerException e) { LOGGER.error(e.getMessage()); throw new ApplicationException(e); } } private String getQuartzPath() { URL quartzUrl = MpdlChainScheduler.class.getResource("quartz.properties"); String quartzPath = quartzUrl.getPath(); if (quartzPath.indexOf(".jar!") != -1) { int beginIndex = quartzPath.indexOf(".jar!") + 6; quartzPath = quartzPath.substring(beginIndex); } return quartzPath; } }