Mercurial > hg > mpdl-group
diff software/eXist/mpdl-modules/src/de/mpg/mpiwg/berlin/mpdl/schedule/MpdlChainScheduler.java @ 0:408254cf2f1d
Erstellung
author | Josef Willenborg <jwillenborg@mpiwg-berlin.mpg.de> |
---|---|
date | Wed, 24 Nov 2010 17:24:23 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/software/eXist/mpdl-modules/src/de/mpg/mpiwg/berlin/mpdl/schedule/MpdlChainScheduler.java Wed Nov 24 17:24:23 2010 +0100 @@ -0,0 +1,243 @@ +package de.mpg.mpiwg.berlin.mpdl.schedule; + +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Queue; + +import org.apache.log4j.Logger; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobListener; +import org.quartz.SchedulerException; +import org.quartz.SimpleTrigger; +import org.quartz.Trigger; +import org.quartz.impl.StdSchedulerFactory; + +import de.mpg.mpiwg.berlin.mpdl.exception.ApplicationException; + +public class MpdlChainScheduler { + private static MpdlChainScheduler instance; + private static String CRUD_JOB = "MPDL_CRUD_JOB"; + private static String CRUD_TRIGGER = "MPDL_CRUD_TRIGGER"; + private static String CRUD_GROUP = "MPDL_CRUD_GROUP"; + private static Logger LOGGER = Logger.getLogger(MpdlChainScheduler.class); // Logs to EXIST_HOME/webapp/WEB-INF/logs/exist.log + private org.quartz.Scheduler scheduler; + private JobListener jobListener; + private Queue<MpdlDocOperation> docOperationQueue = new PriorityQueue<MpdlDocOperation>(); + private HashMap<Integer, MpdlDocOperation> finishedDocOperations = new HashMap<Integer, MpdlDocOperation>(); + private boolean operationInProgress = false; + private int jobOrderId = 0; + + public static MpdlChainScheduler getInstance() throws ApplicationException { + if (instance == null) { + instance = new MpdlChainScheduler(); + instance.init(); + } + return instance; + } + + public MpdlDocOperation doOperation(MpdlDocOperation docOperation) throws ApplicationException { + jobOrderId++; + docOperation.setOrderId(jobOrderId); + queueOperation(docOperation); + scheduleNextOperation(); + return docOperation; + } + + public void finishOperation(MpdlDocOperation docOperation) throws ApplicationException { + operationInProgress = false; + Date now = new Date(); + docOperation.setEnd(now); + docOperation.setStatus("finished"); + int jobId = new Integer(docOperation.getOrderId()); + finishedDocOperations.put(jobId, docOperation); + log(docOperation); + // schedule next job if there is one + scheduleNextOperation(); + } + + private void log(MpdlDocOperation docOperation) { + Date startTime = docOperation.getStart(); + Date endTime = docOperation.getEnd(); + long executionTime = -1; + if (startTime != null && endTime != null) + executionTime = (endTime.getTime() - startTime.getTime()); + String jobInfo = "MPDL: Document operation " + docOperation.toString() + ": started at: " + startTime + + " and ended at: " + endTime + " (needed time: " + executionTime + " ms)"; + LOGGER.info(jobInfo); + } + + public synchronized void scheduleNextOperation() throws ApplicationException { + if (isOperationInProgress()) { + // nothing, operation has to wait + } else { + MpdlDocOperation docOperation = docOperationQueue.poll(); + if (docOperation == null) { + // if queue is empty then do nothing (there are no more operations to execute) + } else { + Date now = new Date(); + operationInProgress = true; + docOperation.setStart(now); + scheduleJob(docOperation, now); + } + } + } + + public ArrayList<MpdlDocOperation> getDocOperations() throws ApplicationException { + ArrayList<MpdlDocOperation> docOperations = new ArrayList<MpdlDocOperation>(); + try { + // first: all finished jobs + Collection<MpdlDocOperation> finiDocOperations = finishedDocOperations.values(); + docOperations.addAll(finiDocOperations); + // second: all currently executed jobs + if (operationInProgress) { + List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs(); + Iterator<JobExecutionContext> iter = currentJobs.iterator(); + while (iter.hasNext()) { + JobExecutionContext jobExecutionContext = iter.next(); + MpdlDocOperation docOperation = getDocOperation(jobExecutionContext); + if (docOperation != null) { + docOperations.add(docOperation); + } + } + } + // third: all queued jobs + Iterator<MpdlDocOperation> iter = docOperationQueue.iterator(); + while (iter.hasNext()) { + MpdlDocOperation docOperation = iter.next(); + docOperations.add(docOperation); + } + } catch (SchedulerException e) { + LOGGER.error(e.getMessage()); + throw new ApplicationException(e); + } + return docOperations; + } + + public MpdlDocOperation getDocOperation(int jobId) throws ApplicationException { + MpdlDocOperation docOperation = null; + try { + // first try: looks into currently executing jobs + if (operationInProgress) { + List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs(); + Iterator<JobExecutionContext> iter = currentJobs.iterator(); + while (iter.hasNext()) { + JobExecutionContext jobExecutionContext = iter.next(); + docOperation = getDocOperation(jobExecutionContext); + if (docOperation != null) { + int dopOpJobId = docOperation.getOrderId(); + if (jobId == dopOpJobId) + return docOperation; + } + } + } + // second try: look into finished jobs + docOperation = finishedDocOperations.get(new Integer(jobId)); + if (docOperation != null) { + return docOperation; + } + // third try: look into queued jobs + Iterator<MpdlDocOperation> iter = docOperationQueue.iterator(); + while (iter.hasNext()) { + docOperation = iter.next(); + if (docOperation.getOrderId() == jobId) + return docOperation; + } + } catch (SchedulerException e) { + LOGGER.error(e.getMessage()); + throw new ApplicationException(e); + } + // if not found return null + return null; + } + + public MpdlDocOperation getDocOperation(JobExecutionContext jobExecutionContext) { + MpdlDocOperation docOperation = null; + if (jobExecutionContext != null) { + JobDetail job = jobExecutionContext.getJobDetail(); + JobDataMap parameters = job.getJobDataMap(); + docOperation = (MpdlDocOperation) parameters.get("operation"); + } + return docOperation; + } + + private void queueOperation(MpdlDocOperation docOperation) { + int operationsBefore = docOperationQueue.size(); + if (operationsBefore == 0) + docOperation.setStatus("waiting in operation queue"); + else + docOperation.setStatus("waiting in operation queue: " + operationsBefore + " operations heve to be executed before this operation"); + docOperationQueue.offer(docOperation); + } + + private synchronized boolean isOperationInProgress() { + return operationInProgress; + } + + private void scheduleJob(MpdlDocOperation docOperation, Date fireTime) throws ApplicationException { + try { + int jobId = docOperation.getOrderId(); + String jobName = CRUD_JOB + "-id-" + jobId + "-timeId-" + fireTime; + JobDetail job = new JobDetail(jobName, CRUD_GROUP, MpdlDocJob.class); + JobDataMap parameters = new JobDataMap(); + parameters.put("operation", docOperation); + job.setJobDataMap(parameters); + job.addJobListener(jobListener.getName()); + String triggerName = CRUD_TRIGGER + "-id-" + jobId + "-timeId-" + fireTime; + Trigger trigger = new SimpleTrigger(triggerName, CRUD_GROUP, fireTime); + scheduler.scheduleJob(job, trigger); + String jobInfo = "MPDL: Schedule document operation: " + docOperation.toString() + ": done at: " + fireTime.toString(); + LOGGER.info(jobInfo); + } catch (SchedulerException e) { + LOGGER.error(e.getMessage()); + throw new ApplicationException(e); + } + } + + private void init() throws ApplicationException { + try { + if (scheduler == null) { + String quartzPath = getQuartzPath(); + StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzPath); + scheduler = schedulerFactory.getScheduler(); + jobListener = new MpdlChainSchedulerListener(); + scheduler.addJobListener(jobListener); + scheduler.start(); + LOGGER.info("MPDL: Started Quartz scheduler factory: " + quartzPath); + } + } catch (SchedulerException e) { + LOGGER.error(e.getMessage()); + throw new ApplicationException(e); + } + } + + public void end() throws ApplicationException { + try { + if (scheduler != null) { + scheduler.shutdown(); + } + String quartzPath = getQuartzPath(); + LOGGER.info("MPDL: Ended Quartz scheduler factory: " + quartzPath); + } catch (SchedulerException e) { + LOGGER.error(e.getMessage()); + throw new ApplicationException(e); + } + } + + private String getQuartzPath() { + URL quartzUrl = MpdlChainScheduler.class.getResource("quartz.properties"); + String quartzPath = quartzUrl.getPath(); + if (quartzPath.indexOf(".jar!") != -1) { + int beginIndex = quartzPath.indexOf(".jar!") + 6; + quartzPath = quartzPath.substring(beginIndex); + } + return quartzPath; + } +} \ No newline at end of file