comparison software/eXist/mpdl-modules/src/de/mpg/mpiwg/berlin/mpdl/schedule/MpdlChainScheduler.java @ 0:408254cf2f1d

Erstellung
author Josef Willenborg <jwillenborg@mpiwg-berlin.mpg.de>
date Wed, 24 Nov 2010 17:24:23 +0100
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:408254cf2f1d
1 package de.mpg.mpiwg.berlin.mpdl.schedule;
2
3 import java.net.URL;
4 import java.util.ArrayList;
5 import java.util.Collection;
6 import java.util.Date;
7 import java.util.HashMap;
8 import java.util.Iterator;
9 import java.util.List;
10 import java.util.PriorityQueue;
11 import java.util.Queue;
12
13 import org.apache.log4j.Logger;
14 import org.quartz.JobDataMap;
15 import org.quartz.JobDetail;
16 import org.quartz.JobExecutionContext;
17 import org.quartz.JobListener;
18 import org.quartz.SchedulerException;
19 import org.quartz.SimpleTrigger;
20 import org.quartz.Trigger;
21 import org.quartz.impl.StdSchedulerFactory;
22
23 import de.mpg.mpiwg.berlin.mpdl.exception.ApplicationException;
24
25 public class MpdlChainScheduler {
26 private static MpdlChainScheduler instance;
27 private static String CRUD_JOB = "MPDL_CRUD_JOB";
28 private static String CRUD_TRIGGER = "MPDL_CRUD_TRIGGER";
29 private static String CRUD_GROUP = "MPDL_CRUD_GROUP";
30 private static Logger LOGGER = Logger.getLogger(MpdlChainScheduler.class); // Logs to EXIST_HOME/webapp/WEB-INF/logs/exist.log
31 private org.quartz.Scheduler scheduler;
32 private JobListener jobListener;
33 private Queue<MpdlDocOperation> docOperationQueue = new PriorityQueue<MpdlDocOperation>();
34 private HashMap<Integer, MpdlDocOperation> finishedDocOperations = new HashMap<Integer, MpdlDocOperation>();
35 private boolean operationInProgress = false;
36 private int jobOrderId = 0;
37
38 public static MpdlChainScheduler getInstance() throws ApplicationException {
39 if (instance == null) {
40 instance = new MpdlChainScheduler();
41 instance.init();
42 }
43 return instance;
44 }
45
46 public MpdlDocOperation doOperation(MpdlDocOperation docOperation) throws ApplicationException {
47 jobOrderId++;
48 docOperation.setOrderId(jobOrderId);
49 queueOperation(docOperation);
50 scheduleNextOperation();
51 return docOperation;
52 }
53
54 public void finishOperation(MpdlDocOperation docOperation) throws ApplicationException {
55 operationInProgress = false;
56 Date now = new Date();
57 docOperation.setEnd(now);
58 docOperation.setStatus("finished");
59 int jobId = new Integer(docOperation.getOrderId());
60 finishedDocOperations.put(jobId, docOperation);
61 log(docOperation);
62 // schedule next job if there is one
63 scheduleNextOperation();
64 }
65
66 private void log(MpdlDocOperation docOperation) {
67 Date startTime = docOperation.getStart();
68 Date endTime = docOperation.getEnd();
69 long executionTime = -1;
70 if (startTime != null && endTime != null)
71 executionTime = (endTime.getTime() - startTime.getTime());
72 String jobInfo = "MPDL: Document operation " + docOperation.toString() + ": started at: " + startTime +
73 " and ended at: " + endTime + " (needed time: " + executionTime + " ms)";
74 LOGGER.info(jobInfo);
75 }
76
77 public synchronized void scheduleNextOperation() throws ApplicationException {
78 if (isOperationInProgress()) {
79 // nothing, operation has to wait
80 } else {
81 MpdlDocOperation docOperation = docOperationQueue.poll();
82 if (docOperation == null) {
83 // if queue is empty then do nothing (there are no more operations to execute)
84 } else {
85 Date now = new Date();
86 operationInProgress = true;
87 docOperation.setStart(now);
88 scheduleJob(docOperation, now);
89 }
90 }
91 }
92
93 public ArrayList<MpdlDocOperation> getDocOperations() throws ApplicationException {
94 ArrayList<MpdlDocOperation> docOperations = new ArrayList<MpdlDocOperation>();
95 try {
96 // first: all finished jobs
97 Collection<MpdlDocOperation> finiDocOperations = finishedDocOperations.values();
98 docOperations.addAll(finiDocOperations);
99 // second: all currently executed jobs
100 if (operationInProgress) {
101 List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs();
102 Iterator<JobExecutionContext> iter = currentJobs.iterator();
103 while (iter.hasNext()) {
104 JobExecutionContext jobExecutionContext = iter.next();
105 MpdlDocOperation docOperation = getDocOperation(jobExecutionContext);
106 if (docOperation != null) {
107 docOperations.add(docOperation);
108 }
109 }
110 }
111 // third: all queued jobs
112 Iterator<MpdlDocOperation> iter = docOperationQueue.iterator();
113 while (iter.hasNext()) {
114 MpdlDocOperation docOperation = iter.next();
115 docOperations.add(docOperation);
116 }
117 } catch (SchedulerException e) {
118 LOGGER.error(e.getMessage());
119 throw new ApplicationException(e);
120 }
121 return docOperations;
122 }
123
124 public MpdlDocOperation getDocOperation(int jobId) throws ApplicationException {
125 MpdlDocOperation docOperation = null;
126 try {
127 // first try: looks into currently executing jobs
128 if (operationInProgress) {
129 List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs();
130 Iterator<JobExecutionContext> iter = currentJobs.iterator();
131 while (iter.hasNext()) {
132 JobExecutionContext jobExecutionContext = iter.next();
133 docOperation = getDocOperation(jobExecutionContext);
134 if (docOperation != null) {
135 int dopOpJobId = docOperation.getOrderId();
136 if (jobId == dopOpJobId)
137 return docOperation;
138 }
139 }
140 }
141 // second try: look into finished jobs
142 docOperation = finishedDocOperations.get(new Integer(jobId));
143 if (docOperation != null) {
144 return docOperation;
145 }
146 // third try: look into queued jobs
147 Iterator<MpdlDocOperation> iter = docOperationQueue.iterator();
148 while (iter.hasNext()) {
149 docOperation = iter.next();
150 if (docOperation.getOrderId() == jobId)
151 return docOperation;
152 }
153 } catch (SchedulerException e) {
154 LOGGER.error(e.getMessage());
155 throw new ApplicationException(e);
156 }
157 // if not found return null
158 return null;
159 }
160
161 public MpdlDocOperation getDocOperation(JobExecutionContext jobExecutionContext) {
162 MpdlDocOperation docOperation = null;
163 if (jobExecutionContext != null) {
164 JobDetail job = jobExecutionContext.getJobDetail();
165 JobDataMap parameters = job.getJobDataMap();
166 docOperation = (MpdlDocOperation) parameters.get("operation");
167 }
168 return docOperation;
169 }
170
171 private void queueOperation(MpdlDocOperation docOperation) {
172 int operationsBefore = docOperationQueue.size();
173 if (operationsBefore == 0)
174 docOperation.setStatus("waiting in operation queue");
175 else
176 docOperation.setStatus("waiting in operation queue: " + operationsBefore + " operations heve to be executed before this operation");
177 docOperationQueue.offer(docOperation);
178 }
179
180 private synchronized boolean isOperationInProgress() {
181 return operationInProgress;
182 }
183
184 private void scheduleJob(MpdlDocOperation docOperation, Date fireTime) throws ApplicationException {
185 try {
186 int jobId = docOperation.getOrderId();
187 String jobName = CRUD_JOB + "-id-" + jobId + "-timeId-" + fireTime;
188 JobDetail job = new JobDetail(jobName, CRUD_GROUP, MpdlDocJob.class);
189 JobDataMap parameters = new JobDataMap();
190 parameters.put("operation", docOperation);
191 job.setJobDataMap(parameters);
192 job.addJobListener(jobListener.getName());
193 String triggerName = CRUD_TRIGGER + "-id-" + jobId + "-timeId-" + fireTime;
194 Trigger trigger = new SimpleTrigger(triggerName, CRUD_GROUP, fireTime);
195 scheduler.scheduleJob(job, trigger);
196 String jobInfo = "MPDL: Schedule document operation: " + docOperation.toString() + ": done at: " + fireTime.toString();
197 LOGGER.info(jobInfo);
198 } catch (SchedulerException e) {
199 LOGGER.error(e.getMessage());
200 throw new ApplicationException(e);
201 }
202 }
203
204 private void init() throws ApplicationException {
205 try {
206 if (scheduler == null) {
207 String quartzPath = getQuartzPath();
208 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzPath);
209 scheduler = schedulerFactory.getScheduler();
210 jobListener = new MpdlChainSchedulerListener();
211 scheduler.addJobListener(jobListener);
212 scheduler.start();
213 LOGGER.info("MPDL: Started Quartz scheduler factory: " + quartzPath);
214 }
215 } catch (SchedulerException e) {
216 LOGGER.error(e.getMessage());
217 throw new ApplicationException(e);
218 }
219 }
220
221 public void end() throws ApplicationException {
222 try {
223 if (scheduler != null) {
224 scheduler.shutdown();
225 }
226 String quartzPath = getQuartzPath();
227 LOGGER.info("MPDL: Ended Quartz scheduler factory: " + quartzPath);
228 } catch (SchedulerException e) {
229 LOGGER.error(e.getMessage());
230 throw new ApplicationException(e);
231 }
232 }
233
234 private String getQuartzPath() {
235 URL quartzUrl = MpdlChainScheduler.class.getResource("quartz.properties");
236 String quartzPath = quartzUrl.getPath();
237 if (quartzPath.indexOf(".jar!") != -1) {
238 int beginIndex = quartzPath.indexOf(".jar!") + 6;
239 quartzPath = quartzPath.substring(beginIndex);
240 }
241 return quartzPath;
242 }
243 }