view software/eXist/mpdl-modules/src/de/mpg/mpiwg/berlin/mpdl/schedule/MpdlChainScheduler.java @ 0:408254cf2f1d

Erstellung
author Josef Willenborg <jwillenborg@mpiwg-berlin.mpg.de>
date Wed, 24 Nov 2010 17:24:23 +0100
parents
children
line wrap: on
line source

package de.mpg.mpiwg.berlin.mpdl.schedule;

import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;

import org.apache.log4j.Logger;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobListener;
import org.quartz.SchedulerException;
import org.quartz.SimpleTrigger;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;

import de.mpg.mpiwg.berlin.mpdl.exception.ApplicationException;

public class MpdlChainScheduler {
  private static MpdlChainScheduler instance;
  private static String CRUD_JOB = "MPDL_CRUD_JOB";
  private static String CRUD_TRIGGER = "MPDL_CRUD_TRIGGER";
  private static String CRUD_GROUP = "MPDL_CRUD_GROUP";
  private static Logger LOGGER = Logger.getLogger(MpdlChainScheduler.class); // Logs to EXIST_HOME/webapp/WEB-INF/logs/exist.log
  private org.quartz.Scheduler scheduler;
  private JobListener jobListener;
  private Queue<MpdlDocOperation> docOperationQueue = new PriorityQueue<MpdlDocOperation>();
  private HashMap<Integer, MpdlDocOperation> finishedDocOperations = new HashMap<Integer, MpdlDocOperation>();
  private boolean operationInProgress = false;
  private int jobOrderId = 0;
  
  public static MpdlChainScheduler getInstance() throws ApplicationException {
    if (instance == null) {
      instance = new MpdlChainScheduler();
      instance.init();
    }
    return instance;
  }

  public MpdlDocOperation doOperation(MpdlDocOperation docOperation) throws ApplicationException {
    jobOrderId++;
    docOperation.setOrderId(jobOrderId);
    queueOperation(docOperation);
    scheduleNextOperation();
    return docOperation;
  }
  
  public void finishOperation(MpdlDocOperation docOperation) throws ApplicationException {
    operationInProgress = false;
    Date now = new Date();
    docOperation.setEnd(now);
    docOperation.setStatus("finished");
    int jobId = new Integer(docOperation.getOrderId());
    finishedDocOperations.put(jobId, docOperation);
    log(docOperation);
    // schedule next job if there is one
    scheduleNextOperation();
  }
  
  private void log(MpdlDocOperation docOperation) {
    Date startTime = docOperation.getStart();
    Date endTime = docOperation.getEnd();
    long executionTime = -1;
    if (startTime != null && endTime != null)
      executionTime = (endTime.getTime() - startTime.getTime());
    String jobInfo = "MPDL: Document operation " + docOperation.toString() + ": started at: " + startTime + 
      " and ended at: " + endTime + " (needed time: " + executionTime + " ms)";
    LOGGER.info(jobInfo);
  }
  
  public synchronized void scheduleNextOperation() throws ApplicationException {
    if (isOperationInProgress()) {
      // nothing, operation has to wait
    } else {
      MpdlDocOperation docOperation = docOperationQueue.poll();
      if (docOperation == null) {
        // if queue is empty then do nothing (there are no more operations to execute)
      } else {
        Date now = new Date();
        operationInProgress = true;
        docOperation.setStart(now);
        scheduleJob(docOperation, now);
      }
    }
  }
  
  public ArrayList<MpdlDocOperation> getDocOperations() throws ApplicationException {
    ArrayList<MpdlDocOperation> docOperations = new ArrayList<MpdlDocOperation>();
    try {
      // first: all finished jobs
      Collection<MpdlDocOperation> finiDocOperations = finishedDocOperations.values();
      docOperations.addAll(finiDocOperations);
      // second: all currently executed jobs
      if (operationInProgress) {
        List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs();
        Iterator<JobExecutionContext> iter = currentJobs.iterator();
        while (iter.hasNext()) {
          JobExecutionContext jobExecutionContext = iter.next();
          MpdlDocOperation docOperation = getDocOperation(jobExecutionContext);
          if (docOperation != null) {
            docOperations.add(docOperation);
          }
        }
      }
      // third: all queued jobs
      Iterator<MpdlDocOperation> iter = docOperationQueue.iterator();
      while (iter.hasNext()) {
        MpdlDocOperation docOperation = iter.next();
        docOperations.add(docOperation);
      }
    } catch (SchedulerException e) {
      LOGGER.error(e.getMessage());
      throw new ApplicationException(e);
    }
    return docOperations;
  }
    
  public MpdlDocOperation getDocOperation(int jobId) throws ApplicationException {
    MpdlDocOperation docOperation = null;
    try {
      // first try: looks into currently executing jobs
      if (operationInProgress) {
        List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs();
        Iterator<JobExecutionContext> iter = currentJobs.iterator();
        while (iter.hasNext()) {
          JobExecutionContext jobExecutionContext = iter.next();
          docOperation = getDocOperation(jobExecutionContext);
          if (docOperation != null) {
            int dopOpJobId = docOperation.getOrderId();
            if (jobId == dopOpJobId)
              return docOperation;
          }
        }
      }
      // second try: look into finished jobs
      docOperation = finishedDocOperations.get(new Integer(jobId));
      if (docOperation != null) {
        return docOperation;
      }
      // third try: look into queued jobs
      Iterator<MpdlDocOperation> iter = docOperationQueue.iterator();
      while (iter.hasNext()) {
        docOperation = iter.next();
        if (docOperation.getOrderId() == jobId)
          return docOperation;
      }
    } catch (SchedulerException e) {
      LOGGER.error(e.getMessage());
      throw new ApplicationException(e);
    }
    // if not found return null
    return null;
  }
  
  public MpdlDocOperation getDocOperation(JobExecutionContext jobExecutionContext) {
    MpdlDocOperation docOperation = null;
    if (jobExecutionContext != null) {
      JobDetail job = jobExecutionContext.getJobDetail();
      JobDataMap parameters = job.getJobDataMap();
      docOperation = (MpdlDocOperation) parameters.get("operation");
    }
    return docOperation;
  }
  
  private void queueOperation(MpdlDocOperation docOperation) {
    int operationsBefore = docOperationQueue.size();
    if (operationsBefore == 0)
     docOperation.setStatus("waiting in operation queue");
    else 
      docOperation.setStatus("waiting in operation queue: " + operationsBefore + " operations heve to be executed before this operation");
    docOperationQueue.offer(docOperation);
  }
  
  private synchronized boolean isOperationInProgress() {
    return operationInProgress;  
  }
  
  private void scheduleJob(MpdlDocOperation docOperation, Date fireTime) throws ApplicationException {
    try {
      int jobId = docOperation.getOrderId();
      String jobName = CRUD_JOB + "-id-" + jobId + "-timeId-" + fireTime;
      JobDetail job = new JobDetail(jobName, CRUD_GROUP, MpdlDocJob.class);
      JobDataMap parameters = new JobDataMap();
      parameters.put("operation", docOperation);
      job.setJobDataMap(parameters);
      job.addJobListener(jobListener.getName());        
      String triggerName = CRUD_TRIGGER + "-id-" + jobId + "-timeId-" + fireTime;
      Trigger trigger = new SimpleTrigger(triggerName, CRUD_GROUP, fireTime);
      scheduler.scheduleJob(job, trigger);
      String jobInfo = "MPDL: Schedule document operation: " + docOperation.toString() + ": done at: " + fireTime.toString();
      LOGGER.info(jobInfo);
    } catch (SchedulerException e) {
      LOGGER.error(e.getMessage());
      throw new ApplicationException(e);
    }
  }
  
  private void init() throws ApplicationException {
    try {
      if (scheduler == null) {
        String quartzPath = getQuartzPath();
        StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzPath);
        scheduler = schedulerFactory.getScheduler();
        jobListener = new MpdlChainSchedulerListener();
        scheduler.addJobListener(jobListener);
        scheduler.start();
        LOGGER.info("MPDL: Started Quartz scheduler factory: " + quartzPath);
      } 
    } catch (SchedulerException e) {
      LOGGER.error(e.getMessage());
      throw new ApplicationException(e);
    }
  }
  
  public void end() throws ApplicationException {
    try {
      if (scheduler != null) {
        scheduler.shutdown();
      }
      String quartzPath = getQuartzPath();
      LOGGER.info("MPDL: Ended Quartz scheduler factory: " + quartzPath);
    } catch (SchedulerException e) {
      LOGGER.error(e.getMessage());
      throw new ApplicationException(e);
    }
  }

  private String getQuartzPath() {
    URL quartzUrl = MpdlChainScheduler.class.getResource("quartz.properties");
    String quartzPath = quartzUrl.getPath();
    if (quartzPath.indexOf(".jar!") != -1) {
      int beginIndex = quartzPath.indexOf(".jar!") + 6;
      quartzPath = quartzPath.substring(beginIndex);
    }
    return quartzPath;    
  }
}