Mercurial > hg > mpdl-group
comparison software/eXist/mpdl-modules/src/de/mpg/mpiwg/berlin/mpdl/schedule/MpdlChainScheduler.java @ 0:408254cf2f1d
Erstellung
author | Josef Willenborg <jwillenborg@mpiwg-berlin.mpg.de> |
---|---|
date | Wed, 24 Nov 2010 17:24:23 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:408254cf2f1d |
---|---|
1 package de.mpg.mpiwg.berlin.mpdl.schedule; | |
2 | |
3 import java.net.URL; | |
4 import java.util.ArrayList; | |
5 import java.util.Collection; | |
6 import java.util.Date; | |
7 import java.util.HashMap; | |
8 import java.util.Iterator; | |
9 import java.util.List; | |
10 import java.util.PriorityQueue; | |
11 import java.util.Queue; | |
12 | |
13 import org.apache.log4j.Logger; | |
14 import org.quartz.JobDataMap; | |
15 import org.quartz.JobDetail; | |
16 import org.quartz.JobExecutionContext; | |
17 import org.quartz.JobListener; | |
18 import org.quartz.SchedulerException; | |
19 import org.quartz.SimpleTrigger; | |
20 import org.quartz.Trigger; | |
21 import org.quartz.impl.StdSchedulerFactory; | |
22 | |
23 import de.mpg.mpiwg.berlin.mpdl.exception.ApplicationException; | |
24 | |
25 public class MpdlChainScheduler { | |
26 private static MpdlChainScheduler instance; | |
27 private static String CRUD_JOB = "MPDL_CRUD_JOB"; | |
28 private static String CRUD_TRIGGER = "MPDL_CRUD_TRIGGER"; | |
29 private static String CRUD_GROUP = "MPDL_CRUD_GROUP"; | |
30 private static Logger LOGGER = Logger.getLogger(MpdlChainScheduler.class); // Logs to EXIST_HOME/webapp/WEB-INF/logs/exist.log | |
31 private org.quartz.Scheduler scheduler; | |
32 private JobListener jobListener; | |
33 private Queue<MpdlDocOperation> docOperationQueue = new PriorityQueue<MpdlDocOperation>(); | |
34 private HashMap<Integer, MpdlDocOperation> finishedDocOperations = new HashMap<Integer, MpdlDocOperation>(); | |
35 private boolean operationInProgress = false; | |
36 private int jobOrderId = 0; | |
37 | |
38 public static MpdlChainScheduler getInstance() throws ApplicationException { | |
39 if (instance == null) { | |
40 instance = new MpdlChainScheduler(); | |
41 instance.init(); | |
42 } | |
43 return instance; | |
44 } | |
45 | |
46 public MpdlDocOperation doOperation(MpdlDocOperation docOperation) throws ApplicationException { | |
47 jobOrderId++; | |
48 docOperation.setOrderId(jobOrderId); | |
49 queueOperation(docOperation); | |
50 scheduleNextOperation(); | |
51 return docOperation; | |
52 } | |
53 | |
54 public void finishOperation(MpdlDocOperation docOperation) throws ApplicationException { | |
55 operationInProgress = false; | |
56 Date now = new Date(); | |
57 docOperation.setEnd(now); | |
58 docOperation.setStatus("finished"); | |
59 int jobId = new Integer(docOperation.getOrderId()); | |
60 finishedDocOperations.put(jobId, docOperation); | |
61 log(docOperation); | |
62 // schedule next job if there is one | |
63 scheduleNextOperation(); | |
64 } | |
65 | |
66 private void log(MpdlDocOperation docOperation) { | |
67 Date startTime = docOperation.getStart(); | |
68 Date endTime = docOperation.getEnd(); | |
69 long executionTime = -1; | |
70 if (startTime != null && endTime != null) | |
71 executionTime = (endTime.getTime() - startTime.getTime()); | |
72 String jobInfo = "MPDL: Document operation " + docOperation.toString() + ": started at: " + startTime + | |
73 " and ended at: " + endTime + " (needed time: " + executionTime + " ms)"; | |
74 LOGGER.info(jobInfo); | |
75 } | |
76 | |
77 public synchronized void scheduleNextOperation() throws ApplicationException { | |
78 if (isOperationInProgress()) { | |
79 // nothing, operation has to wait | |
80 } else { | |
81 MpdlDocOperation docOperation = docOperationQueue.poll(); | |
82 if (docOperation == null) { | |
83 // if queue is empty then do nothing (there are no more operations to execute) | |
84 } else { | |
85 Date now = new Date(); | |
86 operationInProgress = true; | |
87 docOperation.setStart(now); | |
88 scheduleJob(docOperation, now); | |
89 } | |
90 } | |
91 } | |
92 | |
93 public ArrayList<MpdlDocOperation> getDocOperations() throws ApplicationException { | |
94 ArrayList<MpdlDocOperation> docOperations = new ArrayList<MpdlDocOperation>(); | |
95 try { | |
96 // first: all finished jobs | |
97 Collection<MpdlDocOperation> finiDocOperations = finishedDocOperations.values(); | |
98 docOperations.addAll(finiDocOperations); | |
99 // second: all currently executed jobs | |
100 if (operationInProgress) { | |
101 List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs(); | |
102 Iterator<JobExecutionContext> iter = currentJobs.iterator(); | |
103 while (iter.hasNext()) { | |
104 JobExecutionContext jobExecutionContext = iter.next(); | |
105 MpdlDocOperation docOperation = getDocOperation(jobExecutionContext); | |
106 if (docOperation != null) { | |
107 docOperations.add(docOperation); | |
108 } | |
109 } | |
110 } | |
111 // third: all queued jobs | |
112 Iterator<MpdlDocOperation> iter = docOperationQueue.iterator(); | |
113 while (iter.hasNext()) { | |
114 MpdlDocOperation docOperation = iter.next(); | |
115 docOperations.add(docOperation); | |
116 } | |
117 } catch (SchedulerException e) { | |
118 LOGGER.error(e.getMessage()); | |
119 throw new ApplicationException(e); | |
120 } | |
121 return docOperations; | |
122 } | |
123 | |
124 public MpdlDocOperation getDocOperation(int jobId) throws ApplicationException { | |
125 MpdlDocOperation docOperation = null; | |
126 try { | |
127 // first try: looks into currently executing jobs | |
128 if (operationInProgress) { | |
129 List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs(); | |
130 Iterator<JobExecutionContext> iter = currentJobs.iterator(); | |
131 while (iter.hasNext()) { | |
132 JobExecutionContext jobExecutionContext = iter.next(); | |
133 docOperation = getDocOperation(jobExecutionContext); | |
134 if (docOperation != null) { | |
135 int dopOpJobId = docOperation.getOrderId(); | |
136 if (jobId == dopOpJobId) | |
137 return docOperation; | |
138 } | |
139 } | |
140 } | |
141 // second try: look into finished jobs | |
142 docOperation = finishedDocOperations.get(new Integer(jobId)); | |
143 if (docOperation != null) { | |
144 return docOperation; | |
145 } | |
146 // third try: look into queued jobs | |
147 Iterator<MpdlDocOperation> iter = docOperationQueue.iterator(); | |
148 while (iter.hasNext()) { | |
149 docOperation = iter.next(); | |
150 if (docOperation.getOrderId() == jobId) | |
151 return docOperation; | |
152 } | |
153 } catch (SchedulerException e) { | |
154 LOGGER.error(e.getMessage()); | |
155 throw new ApplicationException(e); | |
156 } | |
157 // if not found return null | |
158 return null; | |
159 } | |
160 | |
161 public MpdlDocOperation getDocOperation(JobExecutionContext jobExecutionContext) { | |
162 MpdlDocOperation docOperation = null; | |
163 if (jobExecutionContext != null) { | |
164 JobDetail job = jobExecutionContext.getJobDetail(); | |
165 JobDataMap parameters = job.getJobDataMap(); | |
166 docOperation = (MpdlDocOperation) parameters.get("operation"); | |
167 } | |
168 return docOperation; | |
169 } | |
170 | |
171 private void queueOperation(MpdlDocOperation docOperation) { | |
172 int operationsBefore = docOperationQueue.size(); | |
173 if (operationsBefore == 0) | |
174 docOperation.setStatus("waiting in operation queue"); | |
175 else | |
176 docOperation.setStatus("waiting in operation queue: " + operationsBefore + " operations heve to be executed before this operation"); | |
177 docOperationQueue.offer(docOperation); | |
178 } | |
179 | |
180 private synchronized boolean isOperationInProgress() { | |
181 return operationInProgress; | |
182 } | |
183 | |
184 private void scheduleJob(MpdlDocOperation docOperation, Date fireTime) throws ApplicationException { | |
185 try { | |
186 int jobId = docOperation.getOrderId(); | |
187 String jobName = CRUD_JOB + "-id-" + jobId + "-timeId-" + fireTime; | |
188 JobDetail job = new JobDetail(jobName, CRUD_GROUP, MpdlDocJob.class); | |
189 JobDataMap parameters = new JobDataMap(); | |
190 parameters.put("operation", docOperation); | |
191 job.setJobDataMap(parameters); | |
192 job.addJobListener(jobListener.getName()); | |
193 String triggerName = CRUD_TRIGGER + "-id-" + jobId + "-timeId-" + fireTime; | |
194 Trigger trigger = new SimpleTrigger(triggerName, CRUD_GROUP, fireTime); | |
195 scheduler.scheduleJob(job, trigger); | |
196 String jobInfo = "MPDL: Schedule document operation: " + docOperation.toString() + ": done at: " + fireTime.toString(); | |
197 LOGGER.info(jobInfo); | |
198 } catch (SchedulerException e) { | |
199 LOGGER.error(e.getMessage()); | |
200 throw new ApplicationException(e); | |
201 } | |
202 } | |
203 | |
204 private void init() throws ApplicationException { | |
205 try { | |
206 if (scheduler == null) { | |
207 String quartzPath = getQuartzPath(); | |
208 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzPath); | |
209 scheduler = schedulerFactory.getScheduler(); | |
210 jobListener = new MpdlChainSchedulerListener(); | |
211 scheduler.addJobListener(jobListener); | |
212 scheduler.start(); | |
213 LOGGER.info("MPDL: Started Quartz scheduler factory: " + quartzPath); | |
214 } | |
215 } catch (SchedulerException e) { | |
216 LOGGER.error(e.getMessage()); | |
217 throw new ApplicationException(e); | |
218 } | |
219 } | |
220 | |
221 public void end() throws ApplicationException { | |
222 try { | |
223 if (scheduler != null) { | |
224 scheduler.shutdown(); | |
225 } | |
226 String quartzPath = getQuartzPath(); | |
227 LOGGER.info("MPDL: Ended Quartz scheduler factory: " + quartzPath); | |
228 } catch (SchedulerException e) { | |
229 LOGGER.error(e.getMessage()); | |
230 throw new ApplicationException(e); | |
231 } | |
232 } | |
233 | |
234 private String getQuartzPath() { | |
235 URL quartzUrl = MpdlChainScheduler.class.getResource("quartz.properties"); | |
236 String quartzPath = quartzUrl.getPath(); | |
237 if (quartzPath.indexOf(".jar!") != -1) { | |
238 int beginIndex = quartzPath.indexOf(".jar!") + 6; | |
239 quartzPath = quartzPath.substring(beginIndex); | |
240 } | |
241 return quartzPath; | |
242 } | |
243 } |