diff software/eXist/mpdl-modules/src/de/mpg/mpiwg/berlin/mpdl/schedule/MpdlChainScheduler.java @ 0:408254cf2f1d

author Josef Willenborg <jwillenborg@mpiwg-berlin.mpg.de>
date Wed, 24 Nov 2010 17:24:23 +0100
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/software/eXist/mpdl-modules/src/de/mpg/mpiwg/berlin/mpdl/schedule/MpdlChainScheduler.java	Wed Nov 24 17:24:23 2010 +0100
@@ -0,0 +1,243 @@
+package de.mpg.mpiwg.berlin.mpdl.schedule;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import org.apache.log4j.Logger;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobListener;
+import org.quartz.SchedulerException;
+import org.quartz.SimpleTrigger;
+import org.quartz.Trigger;
+import org.quartz.impl.StdSchedulerFactory;
+import de.mpg.mpiwg.berlin.mpdl.exception.ApplicationException;
+public class MpdlChainScheduler {
+  private static MpdlChainScheduler instance;
+  private static String CRUD_JOB = "MPDL_CRUD_JOB";
+  private static String CRUD_TRIGGER = "MPDL_CRUD_TRIGGER";
+  private static String CRUD_GROUP = "MPDL_CRUD_GROUP";
+  private static Logger LOGGER = Logger.getLogger(MpdlChainScheduler.class); // Logs to EXIST_HOME/webapp/WEB-INF/logs/exist.log
+  private org.quartz.Scheduler scheduler;
+  private JobListener jobListener;
+  private Queue<MpdlDocOperation> docOperationQueue = new PriorityQueue<MpdlDocOperation>();
+  private HashMap<Integer, MpdlDocOperation> finishedDocOperations = new HashMap<Integer, MpdlDocOperation>();
+  private boolean operationInProgress = false;
+  private int jobOrderId = 0;
+  public static MpdlChainScheduler getInstance() throws ApplicationException {
+    if (instance == null) {
+      instance = new MpdlChainScheduler();
+      instance.init();
+    }
+    return instance;
+  }
+  public MpdlDocOperation doOperation(MpdlDocOperation docOperation) throws ApplicationException {
+    jobOrderId++;
+    docOperation.setOrderId(jobOrderId);
+    queueOperation(docOperation);
+    scheduleNextOperation();
+    return docOperation;
+  }
+  public void finishOperation(MpdlDocOperation docOperation) throws ApplicationException {
+    operationInProgress = false;
+    Date now = new Date();
+    docOperation.setEnd(now);
+    docOperation.setStatus("finished");
+    int jobId = new Integer(docOperation.getOrderId());
+    finishedDocOperations.put(jobId, docOperation);
+    log(docOperation);
+    // schedule next job if there is one
+    scheduleNextOperation();
+  }
+  private void log(MpdlDocOperation docOperation) {
+    Date startTime = docOperation.getStart();
+    Date endTime = docOperation.getEnd();
+    long executionTime = -1;
+    if (startTime != null && endTime != null)
+      executionTime = (endTime.getTime() - startTime.getTime());
+    String jobInfo = "MPDL: Document operation " + docOperation.toString() + ": started at: " + startTime + 
+      " and ended at: " + endTime + " (needed time: " + executionTime + " ms)";
+    LOGGER.info(jobInfo);
+  }
+  public synchronized void scheduleNextOperation() throws ApplicationException {
+    if (isOperationInProgress()) {
+      // nothing, operation has to wait
+    } else {
+      MpdlDocOperation docOperation = docOperationQueue.poll();
+      if (docOperation == null) {
+        // if queue is empty then do nothing (there are no more operations to execute)
+      } else {
+        Date now = new Date();
+        operationInProgress = true;
+        docOperation.setStart(now);
+        scheduleJob(docOperation, now);
+      }
+    }
+  }
+  public ArrayList<MpdlDocOperation> getDocOperations() throws ApplicationException {
+    ArrayList<MpdlDocOperation> docOperations = new ArrayList<MpdlDocOperation>();
+    try {
+      // first: all finished jobs
+      Collection<MpdlDocOperation> finiDocOperations = finishedDocOperations.values();
+      docOperations.addAll(finiDocOperations);
+      // second: all currently executed jobs
+      if (operationInProgress) {
+        List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs();
+        Iterator<JobExecutionContext> iter = currentJobs.iterator();
+        while (iter.hasNext()) {
+          JobExecutionContext jobExecutionContext = iter.next();
+          MpdlDocOperation docOperation = getDocOperation(jobExecutionContext);
+          if (docOperation != null) {
+            docOperations.add(docOperation);
+          }
+        }
+      }
+      // third: all queued jobs
+      Iterator<MpdlDocOperation> iter = docOperationQueue.iterator();
+      while (iter.hasNext()) {
+        MpdlDocOperation docOperation = iter.next();
+        docOperations.add(docOperation);
+      }
+    } catch (SchedulerException e) {
+      LOGGER.error(e.getMessage());
+      throw new ApplicationException(e);
+    }
+    return docOperations;
+  }
+  public MpdlDocOperation getDocOperation(int jobId) throws ApplicationException {
+    MpdlDocOperation docOperation = null;
+    try {
+      // first try: looks into currently executing jobs
+      if (operationInProgress) {
+        List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs();
+        Iterator<JobExecutionContext> iter = currentJobs.iterator();
+        while (iter.hasNext()) {
+          JobExecutionContext jobExecutionContext = iter.next();
+          docOperation = getDocOperation(jobExecutionContext);
+          if (docOperation != null) {
+            int dopOpJobId = docOperation.getOrderId();
+            if (jobId == dopOpJobId)
+              return docOperation;
+          }
+        }
+      }
+      // second try: look into finished jobs
+      docOperation = finishedDocOperations.get(new Integer(jobId));
+      if (docOperation != null) {
+        return docOperation;
+      }
+      // third try: look into queued jobs
+      Iterator<MpdlDocOperation> iter = docOperationQueue.iterator();
+      while (iter.hasNext()) {
+        docOperation = iter.next();
+        if (docOperation.getOrderId() == jobId)
+          return docOperation;
+      }
+    } catch (SchedulerException e) {
+      LOGGER.error(e.getMessage());
+      throw new ApplicationException(e);
+    }
+    // if not found return null
+    return null;
+  }
+  public MpdlDocOperation getDocOperation(JobExecutionContext jobExecutionContext) {
+    MpdlDocOperation docOperation = null;
+    if (jobExecutionContext != null) {
+      JobDetail job = jobExecutionContext.getJobDetail();
+      JobDataMap parameters = job.getJobDataMap();
+      docOperation = (MpdlDocOperation) parameters.get("operation");
+    }
+    return docOperation;
+  }
+  private void queueOperation(MpdlDocOperation docOperation) {
+    int operationsBefore = docOperationQueue.size();
+    if (operationsBefore == 0)
+     docOperation.setStatus("waiting in operation queue");
+    else 
+      docOperation.setStatus("waiting in operation queue: " + operationsBefore + " operations heve to be executed before this operation");
+    docOperationQueue.offer(docOperation);
+  }
+  private synchronized boolean isOperationInProgress() {
+    return operationInProgress;  
+  }
+  private void scheduleJob(MpdlDocOperation docOperation, Date fireTime) throws ApplicationException {
+    try {
+      int jobId = docOperation.getOrderId();
+      String jobName = CRUD_JOB + "-id-" + jobId + "-timeId-" + fireTime;
+      JobDetail job = new JobDetail(jobName, CRUD_GROUP, MpdlDocJob.class);
+      JobDataMap parameters = new JobDataMap();
+      parameters.put("operation", docOperation);
+      job.setJobDataMap(parameters);
+      job.addJobListener(jobListener.getName());        
+      String triggerName = CRUD_TRIGGER + "-id-" + jobId + "-timeId-" + fireTime;
+      Trigger trigger = new SimpleTrigger(triggerName, CRUD_GROUP, fireTime);
+      scheduler.scheduleJob(job, trigger);
+      String jobInfo = "MPDL: Schedule document operation: " + docOperation.toString() + ": done at: " + fireTime.toString();
+      LOGGER.info(jobInfo);
+    } catch (SchedulerException e) {
+      LOGGER.error(e.getMessage());
+      throw new ApplicationException(e);
+    }
+  }
+  private void init() throws ApplicationException {
+    try {
+      if (scheduler == null) {
+        String quartzPath = getQuartzPath();
+        StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzPath);
+        scheduler = schedulerFactory.getScheduler();
+        jobListener = new MpdlChainSchedulerListener();
+        scheduler.addJobListener(jobListener);
+        scheduler.start();
+        LOGGER.info("MPDL: Started Quartz scheduler factory: " + quartzPath);
+      } 
+    } catch (SchedulerException e) {
+      LOGGER.error(e.getMessage());
+      throw new ApplicationException(e);
+    }
+  }
+  public void end() throws ApplicationException {
+    try {
+      if (scheduler != null) {
+        scheduler.shutdown();
+      }
+      String quartzPath = getQuartzPath();
+      LOGGER.info("MPDL: Ended Quartz scheduler factory: " + quartzPath);
+    } catch (SchedulerException e) {
+      LOGGER.error(e.getMessage());
+      throw new ApplicationException(e);
+    }
+  }
+  private String getQuartzPath() {
+    URL quartzUrl = MpdlChainScheduler.class.getResource("quartz.properties");
+    String quartzPath = quartzUrl.getPath();
+    if (quartzPath.indexOf(".jar!") != -1) {
+      int beginIndex = quartzPath.indexOf(".jar!") + 6;
+      quartzPath = quartzPath.substring(beginIndex);
+    }
+    return quartzPath;    
+  }
\ No newline at end of file