001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.util;
018
019
020import java.util.ArrayList;
021import java.util.concurrent.*;
022
023/**
024 * <p>
025 *   Used to implement callback based result passing of a promised computation.
026 *   Can be converted to a future using the future() method.
027 * </p>
028 *
029 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
030 */
031public class Promise<T> extends PromiseCallback<T> {
032
033    ArrayList<PromiseCallback<T>> callbacks = new ArrayList<PromiseCallback<T>>(1);
034    T value;
035    Throwable error;
036    Future<T> future=null;
037
038    private class PromiseFuture extends PromiseCallback<T> implements Future<T> {
039        CountDownLatch latch = new CountDownLatch(1);
040
041        public boolean cancel(boolean mayInterruptIfRunning) {
042            return false;
043        }
044
045        public boolean isCancelled() {
046            return false;
047        }
048
049        public boolean isDone() {
050            return latch.getCount() == 0;
051        }
052
053        public T get() throws InterruptedException, ExecutionException {
054            latch.await();
055            return value();
056        }
057
058        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
059            if (latch.await(timeout, unit)) {
060                return value();
061            } else {
062                throw new TimeoutException();
063            }
064        }
065
066        public void onComplete(T value, Throwable error) {
067            latch.countDown();
068        }
069
070        private T value() throws ExecutionException {
071            if( error!=null ) {
072                throw new ExecutionException(error);
073            }
074            return value;
075        }
076    }
077
078    public Future<T> future() {
079        if( future == null ) {
080            PromiseFuture future = new PromiseFuture();
081            watch(future);
082            this.future = future;
083        }
084        return future;
085    }
086
087    public void watch(PromiseCallback<T> callback) {
088        if (callback == null)
089            throw new IllegalArgumentException("callback cannot be null");
090        boolean queued = false;
091        synchronized (this) {
092            if (callbacks != null) {
093                callbacks.add(callback);
094                queued = true;
095            }
096        }
097        if (!queued) {
098            callback.onComplete(value, error);
099        }
100    }
101
102    @Override
103    public void onComplete(T value, Throwable error) {
104        if( value!=null && error !=null ) {
105            throw new IllegalArgumentException("You can not have both a vaule and error");
106        }
107        ArrayList<PromiseCallback<T>> callbacks;
108        synchronized (this) {
109            callbacks = this.callbacks;
110            if (callbacks != null) {
111                this.value = value;
112                this.error = error;
113                this.callbacks = null;
114            }
115        }
116        if (callbacks != null) {
117            for (PromiseCallback callback : callbacks) {
118                callback.onComplete(this.value, this.error);
119            }
120        }
121    }
122
123}