001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.broker.scheduler.memory;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Date;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.Timer;
028import java.util.TimerTask;
029import java.util.TreeMap;
030import java.util.concurrent.CopyOnWriteArrayList;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.ReentrantReadWriteLock;
033
034import javax.jms.MessageFormatException;
035
036import org.apache.activemq.broker.scheduler.CronParser;
037import org.apache.activemq.broker.scheduler.Job;
038import org.apache.activemq.broker.scheduler.JobListener;
039import org.apache.activemq.broker.scheduler.JobScheduler;
040import org.apache.activemq.broker.scheduler.JobSupport;
041import org.apache.activemq.util.ByteSequence;
042import org.apache.activemq.util.IdGenerator;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Implements an in-memory JobScheduler instance.
048 */
049public class InMemoryJobScheduler implements JobScheduler {
050
051    private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobScheduler.class);
052
053    private static final IdGenerator ID_GENERATOR = new IdGenerator();
054
055    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
056    private final String name;
057    private final TreeMap<Long, ScheduledTask> jobs = new TreeMap<>();
058    private final AtomicBoolean started = new AtomicBoolean(false);
059    private final AtomicBoolean dispatchEnabled = new AtomicBoolean(false);
060    private final List<JobListener> jobListeners = new CopyOnWriteArrayList<>();
061    private final Timer timer = new Timer();
062
063    public InMemoryJobScheduler(String name) {
064        this.name = name;
065    }
066
067    @Override
068    public String getName() throws Exception {
069        return name;
070    }
071
072    public void start() throws Exception {
073        if (started.compareAndSet(false, true)) {
074            startDispatching();
075            LOG.trace("JobScheduler[{}] started", name);
076        }
077    }
078
079    public void stop() throws Exception {
080        if (started.compareAndSet(true, false)) {
081            stopDispatching();
082            timer.cancel();
083            jobs.clear();
084            LOG.trace("JobScheduler[{}] stopped", name);
085        }
086    }
087
088    public boolean isStarted() {
089        return started.get();
090    }
091
092    public boolean isDispatchEnabled() {
093        return dispatchEnabled.get();
094    }
095
096    @Override
097    public void startDispatching() throws Exception {
098        dispatchEnabled.set(true);
099    }
100
101    @Override
102    public void stopDispatching() throws Exception {
103        dispatchEnabled.set(false);
104    }
105
106    @Override
107    public void addListener(JobListener listener) throws Exception {
108        this.jobListeners.add(listener);
109    }
110
111    @Override
112    public void removeListener(JobListener listener) throws Exception {
113        this.jobListeners.remove(listener);
114    }
115
116    @Override
117    public void schedule(String jobId, ByteSequence payload, long delay) throws Exception {
118        doSchedule(jobId, payload, "", 0, delay, 0);
119    }
120
121    @Override
122    public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception {
123        doSchedule(jobId, payload, cronEntry, 0, 0, 0);
124    }
125
126    @Override
127    public void schedule(String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws Exception {
128        doSchedule(jobId, payload, cronEntry, delay, period, repeat);
129    }
130
131    @Override
132    public void remove(long time) throws Exception {
133        doRemoveRange(time, time);
134    }
135
136    @Override
137    public void remove(String jobId) throws Exception {
138        doRemoveJob(jobId);
139    }
140
141    @Override
142    public void removeAllJobs() throws Exception {
143        doRemoveRange(0, Long.MAX_VALUE);
144    }
145
146    @Override
147    public void removeAllJobs(long start, long finish) throws Exception {
148        doRemoveRange(start, finish);
149    }
150
151    @Override
152    public long getNextScheduleTime() throws Exception {
153        long nextExecutionTime = -1L;
154
155        lock.readLock().lock();
156        try {
157            if (!jobs.isEmpty()) {
158                nextExecutionTime = jobs.entrySet().iterator().next().getKey();
159            }
160        } finally {
161            lock.readLock().unlock();
162        }
163        return nextExecutionTime;
164    }
165
166    @Override
167    public List<Job> getNextScheduleJobs() throws Exception {
168        List<Job> result = new ArrayList<>();
169        lock.readLock().lock();
170        try {
171            if (!jobs.isEmpty()) {
172                result.addAll(jobs.entrySet().iterator().next().getValue().getAllJobs());
173            }
174        } finally {
175            lock.readLock().unlock();
176        }
177        return result;
178    }
179
180    @Override
181    public List<Job> getAllJobs() throws Exception {
182        final List<Job> result = new ArrayList<>();
183        this.lock.readLock().lock();
184        try {
185            for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
186                result.addAll(entry.getValue().getAllJobs());
187            }
188        } finally {
189            this.lock.readLock().unlock();
190        }
191
192        return result;
193    }
194
195    @Override
196    public List<Job> getAllJobs(long start, long finish) throws Exception {
197        final List<Job> result = new ArrayList<>();
198        this.lock.readLock().lock();
199        try {
200            for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
201                long jobTime = entry.getKey();
202                if (start <= jobTime && jobTime <= finish) {
203                    result.addAll(entry.getValue().getAllJobs());
204                }
205            }
206        } finally {
207            this.lock.readLock().unlock();
208        }
209        return result;
210    }
211
212    @Override
213    public int hashCode() {
214        return name.hashCode();
215    }
216
217    @Override
218    public String toString() {
219        return "JobScheduler: " + name;
220    }
221
222    private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException {
223        long startTime = System.currentTimeMillis();
224        long executionTime = 0;
225        // round startTime - so we can schedule more jobs at the same time
226        startTime = ((startTime + 500) / 500) * 500;
227
228        if (cronEntry != null && cronEntry.length() > 0) {
229            try {
230                executionTime = CronParser.getNextScheduledTime(cronEntry, startTime);
231            } catch (MessageFormatException e) {
232                throw new IOException(e.getMessage());
233            }
234        }
235
236        if (executionTime == 0) {
237            // start time not set by CRON - so it it to the current time
238            executionTime = startTime;
239        }
240
241        if (delay > 0) {
242            executionTime += delay;
243        } else {
244            executionTime += period;
245        }
246
247        InMemoryJob newJob = new InMemoryJob(jobId);
248        newJob.setStart(startTime);
249        newJob.setCronEntry(cronEntry);
250        newJob.setDelay(delay);
251        newJob.setPeriod(period);
252        newJob.setRepeat(repeat);
253        newJob.setNextTime(executionTime);
254        newJob.setPayload(payload.getData());
255
256        LOG.trace("JobScheduler adding job[{}] to fire at: {}", jobId, JobSupport.getDateTime(executionTime));
257
258        lock.writeLock().lock();
259        try {
260            ScheduledTask task = jobs.get(executionTime);
261            if (task == null) {
262                task = new ScheduledTask(executionTime);
263                task.add(newJob);
264                jobs.put(task.getExecutionTime(), task);
265                timer.schedule(task, new Date(newJob.getNextTime()));
266            } else {
267                task.add(newJob);
268            }
269        } finally {
270            lock.writeLock().unlock();
271        }
272    }
273
274    private void doReschedule(InMemoryJob job, long nextExecutionTime) {
275        job.setNextTime(nextExecutionTime);
276        job.incrementExecutionCount();
277        if (!job.isCron()) {
278            job.decrementRepeatCount();
279        }
280
281        LOG.trace("JobScheduler rescheduling job[{}] to fire at: {}", job.getJobId(), JobSupport.getDateTime(nextExecutionTime));
282
283        lock.writeLock().lock();
284        try {
285            ScheduledTask task = jobs.get(nextExecutionTime);
286            if (task == null) {
287                task = new ScheduledTask(nextExecutionTime);
288                task.add(job);
289                jobs.put(task.getExecutionTime(), task);
290                timer.schedule(task, new Date(task.getExecutionTime()));
291            } else {
292                task.add(job);
293            }
294        } finally {
295            lock.writeLock().unlock();
296        }
297    }
298
299    private void doRemoveJob(String jobId) throws IOException {
300        this.lock.writeLock().lock();
301        try {
302            Iterator<Map.Entry<Long, ScheduledTask>> scheduled = jobs.entrySet().iterator();
303            while (scheduled.hasNext()) {
304                Map.Entry<Long, ScheduledTask> entry = scheduled.next();
305                ScheduledTask task = entry.getValue();
306                if (task.remove(jobId)) {
307                    LOG.trace("JobScheduler removing job[{}]", jobId);
308                    if (task.isEmpty()) {
309                        task.cancel();
310                        scheduled.remove();
311                    }
312                    return;
313                }
314            }
315        } finally {
316            this.lock.writeLock().unlock();
317        }
318    }
319
320    private void doRemoveRange(long start, long end) throws IOException {
321        this.lock.writeLock().lock();
322        try {
323            Iterator<Map.Entry<Long, ScheduledTask>> scheduled = jobs.entrySet().iterator();
324            while (scheduled.hasNext()) {
325                Map.Entry<Long, ScheduledTask> entry = scheduled.next();
326                long executionTime = entry.getKey();
327                if (start <= executionTime && executionTime <= end) {
328                    ScheduledTask task = entry.getValue();
329                    task.cancel();
330                    scheduled.remove();
331                }
332
333                // Don't look beyond the end range.
334                if (end < executionTime) {
335                    break;
336                }
337            }
338        } finally {
339            this.lock.writeLock().unlock();
340        }
341    }
342
343    private boolean canDispatch() {
344        return isStarted() && isDispatchEnabled();
345    }
346
347    private long calculateNextExecutionTime(InMemoryJob job, long currentTime, int repeat) throws MessageFormatException {
348        long result = currentTime;
349        String cron = job.getCronEntry();
350        if (cron != null && cron.length() > 0) {
351            result = CronParser.getNextScheduledTime(cron, result);
352        } else if (job.getRepeat() != 0) {
353            result += job.getPeriod();
354        }
355        return result;
356    }
357
358    private void dispatch(InMemoryJob job) throws IllegalStateException, IOException {
359        if (canDispatch()) {
360            LOG.debug("Firing: {}", job);
361            for (JobListener l : jobListeners) {
362                l.scheduledJob(job.getJobId(), new ByteSequence(job.getPayload()));
363            }
364        }
365    }
366
367    /*
368     * A TimerTask instance that can aggregate the execution of a number
369     * scheduled Jobs and handle rescheduling the jobs that require it.
370     */
371    private class ScheduledTask extends TimerTask {
372
373        private final Map<String, InMemoryJob> jobs = new TreeMap<>();
374        private final long executionTime;
375
376        public ScheduledTask(long executionTime) {
377            this.executionTime = executionTime;
378        }
379
380        public long getExecutionTime() {
381            return executionTime;
382        }
383
384        /**
385         * @return a Collection containing all the managed jobs for this task.
386         */
387        public Collection<InMemoryJob> getAllJobs() {
388            return new ArrayList<>(jobs.values());
389        }
390
391        /**
392         * @return true if the internal list of jobs has become empty.
393         */
394        public boolean isEmpty() {
395            return jobs.isEmpty();
396        }
397
398        /**
399         * Adds the job to the internal list of scheduled Jobs managed by this task.
400         *
401         * @param newJob
402         *        the new job to add to the list of Jobs.
403         */
404        public void add(InMemoryJob newJob) {
405            this.jobs.put(newJob.getJobId(), newJob);
406        }
407
408        /**
409         * Removes the job from the internal list of scheduled Jobs managed by this task.
410         *
411         * @param jobId
412         *        the job ID to remove from the list of Jobs.
413         *
414         * @return true if the job was removed from the list of managed jobs.
415         */
416        public boolean remove(String jobId) {
417            return jobs.remove(jobId) != null;
418        }
419
420        @Override
421        public void run() {
422            if (!isStarted()) {
423                return;
424            }
425
426            try {
427                long currentTime = System.currentTimeMillis();
428                lock.writeLock().lock();
429                try {
430                    // Remove this entry as it will now fire any scheduled jobs, if new
431                    // jobs or rescheduled jobs land in the same time slot we want them
432                    // to go into a new ScheduledTask in the Timer instance.
433                    InMemoryJobScheduler.this.jobs.remove(executionTime);
434                } finally {
435                    lock.writeLock().unlock();
436                }
437
438                long nextExecutionTime = 0;
439
440                for (InMemoryJob job : jobs.values()) {
441
442                    if (!isStarted()) {
443                        break;
444                    }
445
446                    int repeat = job.getRepeat();
447                    nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat);
448                    if (!job.isCron()) {
449                        dispatch(job);
450                        if (repeat != 0) {
451                            // Reschedule for the next time, the scheduler will take care of
452                            // updating the repeat counter on the update.
453                            doReschedule(job, nextExecutionTime);
454                        }
455                    } else {
456                        if (repeat == 0) {
457                            // This is a non-repeating Cron entry so we can fire and forget it.
458                            dispatch(job);
459                        }
460
461                        if (nextExecutionTime > currentTime) {
462                            // Reschedule the cron job as a new event, if the cron entry signals
463                            // a repeat then it will be stored separately and fired as a normal
464                            // event with decrementing repeat.
465                            doReschedule(job, nextExecutionTime);
466
467                            if (repeat != 0) {
468                                // we have a separate schedule to run at this time
469                                // so the cron job is used to set of a separate schedule
470                                // hence we won't fire the original cron job to the
471                                // listeners but we do need to start a separate schedule
472                                String jobId = ID_GENERATOR.generateId();
473                                ByteSequence payload = new ByteSequence(job.getPayload());
474                                schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
475                            }
476                        }
477                    }
478                }
479            } catch (Throwable e) {
480                LOG.error("Error while processing scheduled job(s).", e);
481            }
482        }
483    }
484}