comparison software/mpdl-services/mpiwg-mpdl-cms/src/de/mpg/mpiwg/berlin/mpdl/cms/scheduler/CmsChainScheduler.java @ 23:e845310098ba

diverse Korrekturen
author Josef Willenborg <jwillenborg@mpiwg-berlin.mpg.de>
date Tue, 27 Nov 2012 12:35:19 +0100
parents
children
comparison
equal deleted inserted replaced
22:6a45a982c333 23:e845310098ba
1 package de.mpg.mpiwg.berlin.mpdl.cms.scheduler;
2
3 import java.net.URL;
4 import java.util.ArrayList;
5 import java.util.Collection;
6 import java.util.Date;
7 import java.util.HashMap;
8 import java.util.Iterator;
9 import java.util.List;
10 import java.util.PriorityQueue;
11 import java.util.Queue;
12
13 import java.util.logging.Logger;
14 import org.quartz.JobDataMap;
15 import org.quartz.JobDetail;
16 import org.quartz.JobExecutionContext;
17 import org.quartz.JobListener;
18 import org.quartz.SchedulerException;
19 import org.quartz.SimpleTrigger;
20 import org.quartz.Trigger;
21 import org.quartz.impl.StdSchedulerFactory;
22
23 import de.mpg.mpiwg.berlin.mpdl.exception.ApplicationException;
24
25 public class CmsChainScheduler {
26 private static CmsChainScheduler instance;
27 private static String CRUD_JOB = "MPDL_CRUD_JOB";
28 private static String CRUD_TRIGGER = "MPDL_CRUD_TRIGGER";
29 private static String CRUD_GROUP = "MPDL_CRUD_GROUP";
30 private static Logger LOGGER = Logger.getLogger(CmsDocJob.class.getName());
31 private org.quartz.Scheduler scheduler;
32 private JobListener jobListener;
33 private Queue<CmsDocOperation> docOperationQueue = new PriorityQueue<CmsDocOperation>();
34 private HashMap<Integer, CmsDocOperation> finishedDocOperations = new HashMap<Integer, CmsDocOperation>();
35 private boolean operationInProgress = false;
36 private int jobOrderId = 0;
37
38 public static CmsChainScheduler getInstance() throws ApplicationException {
39 if (instance == null) {
40 instance = new CmsChainScheduler();
41 instance.init();
42 }
43 return instance;
44 }
45
46 public CmsDocOperation doOperation(CmsDocOperation docOperation) throws ApplicationException {
47 jobOrderId++;
48 docOperation.setOrderId(jobOrderId);
49 queueOperation(docOperation);
50 scheduleNextOperation();
51 return docOperation;
52 }
53
54 public void finishOperation(CmsDocOperation docOperation) throws ApplicationException {
55 operationInProgress = false;
56 Date now = new Date();
57 docOperation.setEnd(now);
58 docOperation.setStatus("finished");
59 int jobId = new Integer(docOperation.getOrderId());
60 finishedDocOperations.put(jobId, docOperation);
61 log(docOperation);
62 // schedule next job if there is one
63 scheduleNextOperation();
64 }
65
66 private void log(CmsDocOperation docOperation) {
67 Date startTime = docOperation.getStart();
68 Date endTime = docOperation.getEnd();
69 long executionTime = -1;
70 if (startTime != null && endTime != null)
71 executionTime = (endTime.getTime() - startTime.getTime());
72 String jobInfo = "Document operation " + docOperation.toString() + ": started at: " + startTime +
73 " and ended at: " + endTime + " (needed time: " + executionTime + " ms)";
74 LOGGER.info(jobInfo);
75 }
76
77 public synchronized void scheduleNextOperation() throws ApplicationException {
78 if (isOperationInProgress()) {
79 // nothing, operation has to wait
80 } else {
81 CmsDocOperation docOperation = docOperationQueue.poll();
82 if (docOperation == null) {
83 // if queue is empty then do nothing (there are no more operations to execute)
84 } else {
85 Date now = new Date();
86 operationInProgress = true;
87 docOperation.setStart(now);
88 scheduleJob(docOperation, now);
89 }
90 }
91 }
92
93 public ArrayList<CmsDocOperation> getDocOperations() throws ApplicationException {
94 ArrayList<CmsDocOperation> docOperations = new ArrayList<CmsDocOperation>();
95 try {
96 // first: all finished jobs
97 Collection<CmsDocOperation> finiDocOperations = finishedDocOperations.values();
98 docOperations.addAll(finiDocOperations);
99 // second: all currently executed jobs
100 if (operationInProgress) {
101 List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs();
102 Iterator<JobExecutionContext> iter = currentJobs.iterator();
103 while (iter.hasNext()) {
104 JobExecutionContext jobExecutionContext = iter.next();
105 CmsDocOperation docOperation = getDocOperation(jobExecutionContext);
106 if (docOperation != null) {
107 docOperations.add(docOperation);
108 }
109 }
110 }
111 // third: all queued jobs
112 Iterator<CmsDocOperation> iter = docOperationQueue.iterator();
113 while (iter.hasNext()) {
114 CmsDocOperation docOperation = iter.next();
115 docOperations.add(docOperation);
116 }
117 } catch (SchedulerException e) {
118 LOGGER.severe(e.getMessage());
119 throw new ApplicationException(e);
120 }
121 return docOperations;
122 }
123
124 public CmsDocOperation getDocOperation(int jobId) throws ApplicationException {
125 CmsDocOperation docOperation = null;
126 try {
127 // first try: looks into currently executing jobs
128 if (operationInProgress) {
129 List<JobExecutionContext> currentJobs = (List<JobExecutionContext>) scheduler.getCurrentlyExecutingJobs();
130 Iterator<JobExecutionContext> iter = currentJobs.iterator();
131 while (iter.hasNext()) {
132 JobExecutionContext jobExecutionContext = iter.next();
133 docOperation = getDocOperation(jobExecutionContext);
134 if (docOperation != null) {
135 int dopOpJobId = docOperation.getOrderId();
136 if (jobId == dopOpJobId)
137 return docOperation;
138 }
139 }
140 }
141 // second try: look into finished jobs
142 docOperation = finishedDocOperations.get(new Integer(jobId));
143 if (docOperation != null) {
144 return docOperation;
145 }
146 // third try: look into queued jobs
147 Iterator<CmsDocOperation> iter = docOperationQueue.iterator();
148 while (iter.hasNext()) {
149 docOperation = iter.next();
150 if (docOperation.getOrderId() == jobId)
151 return docOperation;
152 }
153 } catch (SchedulerException e) {
154 LOGGER.severe(e.getMessage());
155 throw new ApplicationException(e);
156 }
157 // if not found return null
158 return null;
159 }
160
161 public CmsDocOperation getDocOperation(JobExecutionContext jobExecutionContext) {
162 CmsDocOperation docOperation = null;
163 if (jobExecutionContext != null) {
164 JobDetail job = jobExecutionContext.getJobDetail();
165 JobDataMap parameters = job.getJobDataMap();
166 docOperation = (CmsDocOperation) parameters.get("operation");
167 }
168 return docOperation;
169 }
170
171 private void queueOperation(CmsDocOperation docOperation) {
172 int operationsBefore = docOperationQueue.size();
173 if (operationsBefore == 0)
174 docOperation.setStatus("waiting in operation queue");
175 else
176 docOperation.setStatus("waiting in operation queue: " + operationsBefore + " operations heve to be executed before this operation");
177 docOperationQueue.offer(docOperation);
178 }
179
180 private synchronized boolean isOperationInProgress() {
181 return operationInProgress;
182 }
183
184 private void scheduleJob(CmsDocOperation docOperation, Date fireTime) throws ApplicationException {
185 try {
186 int jobId = docOperation.getOrderId();
187 String jobName = CRUD_JOB + "-id-" + jobId + "-timeId-" + fireTime;
188 JobDetail job = new JobDetail(jobName, CRUD_GROUP, CmsDocJob.class);
189 JobDataMap parameters = new JobDataMap();
190 parameters.put("operation", docOperation);
191 job.setJobDataMap(parameters);
192 job.addJobListener(jobListener.getName());
193 String triggerName = CRUD_TRIGGER + "-id-" + jobId + "-timeId-" + fireTime;
194 Trigger trigger = new SimpleTrigger(triggerName, CRUD_GROUP, fireTime);
195 scheduler.scheduleJob(job, trigger);
196 String jobInfo = "Schedule document operation: " + docOperation.toString() + ": done at: " + fireTime.toString();
197 LOGGER.info(jobInfo);
198 } catch (SchedulerException e) {
199 LOGGER.severe(e.getMessage());
200 throw new ApplicationException(e);
201 }
202 }
203
204 private void init() throws ApplicationException {
205 try {
206 if (scheduler == null) {
207 String quartzPath = getQuartzPath();
208 StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzPath);
209 scheduler = schedulerFactory.getScheduler();
210 jobListener = new CmsChainSchedulerListener();
211 scheduler.addJobListener(jobListener);
212 scheduler.start();
213 LOGGER.info("Started Quartz scheduler factory: " + quartzPath);
214 }
215 } catch (SchedulerException e) {
216 LOGGER.severe(e.getMessage());
217 throw new ApplicationException(e);
218 }
219 }
220
221 public void end() throws ApplicationException {
222 try {
223 if (scheduler != null) {
224 scheduler.shutdown();
225 }
226 String quartzPath = getQuartzPath();
227 LOGGER.info("Ended Quartz scheduler factory: " + quartzPath);
228 } catch (SchedulerException e) {
229 LOGGER.severe(e.getMessage());
230 throw new ApplicationException(e);
231 }
232 }
233
234 private String getQuartzPath() {
235 URL quartzUrl = CmsChainScheduler.class.getResource("quartz.properties");
236 String quartzPath = quartzUrl.getPath();
237 if (quartzPath.indexOf(".jar!") != -1) {
238 int beginIndex = quartzPath.indexOf(".jar!") + 6;
239 quartzPath = quartzPath.substring(beginIndex);
240 }
241 return quartzPath;
242 }
243 }