001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.util;
018
019import java.io.DataInput;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.UTFDataFormatException;
023
024import org.apache.activemq.util.ByteSequence;
025
026/**
027 * Optimized ByteArrayInputStream that can be used more than once
028 */
029public final class DataByteArrayInputStream extends InputStream implements DataInput, AutoCloseable {
030
031    private byte[] buf;
032    private int pos;
033    private int offset;
034    private int length;
035
036    private byte[] work;
037
038    /**
039     * Creates a <code>StoreByteArrayInputStream</code>.
040     *
041     * @param buf the input buffer.
042     */
043    public DataByteArrayInputStream(byte buf[]) {
044        this.buf = buf;
045        this.pos = 0;
046        this.offset = 0;
047        this.length = buf.length;
048        this.work = new byte[8];
049    }
050
051    /**
052     * Creates a <code>StoreByteArrayInputStream</code>.
053     *
054     * @param sequence the input buffer.
055     */
056    public DataByteArrayInputStream(ByteSequence sequence) {
057        this.buf = sequence.getData();
058        this.offset = sequence.getOffset();
059        this.pos =  this.offset;
060        this.length = sequence.length;
061        this.work = new byte[8];
062    }
063
064    /**
065     * Creates <code>WireByteArrayInputStream</code> with a minmalist byte
066     * array
067     */
068    public DataByteArrayInputStream() {
069        this(new byte[0]);
070    }
071
072    /**
073     * @return the size
074     */
075    public int size() {
076        return pos - offset;
077    }
078
079    /**
080     * @return the underlying data array
081     */
082    public byte[] getRawData() {
083        return buf;
084    }
085
086    /**
087     * reset the <code>StoreByteArrayInputStream</code> to use an new byte
088     * array
089     *
090     * @param newBuff
091     */
092    public void restart(byte[] newBuff) {
093        buf = newBuff;
094        pos = 0;
095        length = newBuff.length;
096    }
097
098    public void restart() {
099        pos = 0;
100        length = buf.length;
101    }
102
103    /**
104     * reset the <code>StoreByteArrayInputStream</code> to use an new
105     * ByteSequence
106     *
107     * @param sequence
108     */
109    public void restart(ByteSequence sequence) {
110        this.buf = sequence.getData();
111        this.pos = sequence.getOffset();
112        this.length = sequence.getLength();
113    }
114
115    /**
116     * re-start the input stream - reusing the current buffer
117     *
118     * @param size
119     */
120    public void restart(int size) {
121        if (buf == null || buf.length < size) {
122            buf = new byte[size];
123        }
124        restart(buf);
125        this.length = size;
126    }
127
128    /**
129     * Reads the next byte of data from this input stream. The value byte is
130     * returned as an <code>int</code> in the range <code>0</code> to
131     * <code>255</code>. If no byte is available because the end of the
132     * stream has been reached, the value <code>-1</code> is returned.
133     * <p>
134     * This <code>read</code> method cannot block.
135     *
136     * @return the next byte of data, or <code>-1</code> if the end of the
137     *         stream has been reached.
138     */
139    @Override
140    public int read() {
141        return (pos < length) ? (buf[pos++] & 0xff) : -1;
142    }
143
144    /**
145     * Reads up to <code>len</code> bytes of data into an array of bytes from
146     * this input stream.
147     *
148     * @param b the buffer into which the data is read.
149     * @param off the start offset of the data.
150     * @param len the maximum number of bytes read.
151     * @return the total number of bytes read into the buffer, or
152     *         <code>-1</code> if there is no more data because the end of the
153     *         stream has been reached.
154     */
155    @Override
156    public int read(byte b[], int off, int len) {
157        if (b == null) {
158            throw new NullPointerException();
159        }
160        if (pos >= length) {
161            return -1;
162        }
163        if (pos + len > length) {
164            len = length - pos;
165        }
166        if (len <= 0) {
167            return 0;
168        }
169        System.arraycopy(buf, pos, b, off, len);
170        pos += len;
171        return len;
172    }
173
174    /**
175     * @return the number of bytes that can be read from the input stream
176     *         without blocking.
177     */
178    @Override
179    public int available() {
180        return length - pos;
181    }
182
183    @Override
184    public void readFully(byte[] b) {
185        read(b, 0, b.length);
186    }
187
188    @Override
189    public void readFully(byte[] b, int off, int len) {
190        read(b, off, len);
191    }
192
193    @Override
194    public int skipBytes(int n) {
195        if (pos + n > length) {
196            n = length - pos;
197        }
198        if (n < 0) {
199            return 0;
200        }
201        pos += n;
202        return n;
203    }
204
205    @Override
206    public boolean readBoolean() {
207        return read() != 0;
208    }
209
210    @Override
211    public byte readByte() {
212        return (byte)read();
213    }
214
215    @Override
216    public int readUnsignedByte() {
217        return read();
218    }
219
220    @Override
221    public short readShort() {
222        this.read(work, 0, 2);
223        return (short) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
224    }
225
226    @Override
227    public int readUnsignedShort() {
228        this.read(work, 0, 2);
229        return ((work[0] & 0xff) << 8) | (work[1] & 0xff);
230    }
231
232    @Override
233    public char readChar() {
234        this.read(work, 0, 2);
235        return (char) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
236    }
237
238    @Override
239    public int readInt() {
240        this.read(work, 0, 4);
241        return ((work[0] & 0xff) << 24) | ((work[1] & 0xff) << 16) |
242               ((work[2] & 0xff) << 8) | (work[3] & 0xff);
243    }
244
245    @Override
246    public long readLong() {
247        this.read(work, 0, 8);
248
249        int i1 = ((work[0] & 0xff) << 24) | ((work[1] & 0xff) << 16) |
250            ((work[2] & 0xff) << 8) | (work[3] & 0xff);
251        int i2 = ((work[4] & 0xff) << 24) | ((work[5] & 0xff) << 16) |
252            ((work[6] & 0xff) << 8) | (work[7] & 0xff);
253
254        return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL);
255    }
256
257    @Override
258    public float readFloat() throws IOException {
259        return Float.intBitsToFloat(readInt());
260    }
261
262    @Override
263    public double readDouble() throws IOException {
264        return Double.longBitsToDouble(readLong());
265    }
266
267    @Override
268    public String readLine() {
269        int start = pos;
270        while (pos < length) {
271            int c = read();
272            if (c == '\n') {
273                break;
274            }
275            if (c == '\r') {
276                c = read();
277                if (c != '\n' && c != -1) {
278                    pos--;
279                }
280                break;
281            }
282        }
283        return new String(buf, start, pos);
284    }
285
286    @Override
287    public String readUTF() throws IOException {
288        int length = readUnsignedShort();
289        int endPos = pos + length;
290        int count = 0, a;
291        char[] characters = new char[length];
292        while (pos < endPos) {
293            if ((characters[count] = (char) buf[pos++]) < '\u0080')
294                count++;
295            else if (((a = characters[count]) & 0xE0) == 0xC0) {
296                if (pos >= endPos) {
297                    throw new UTFDataFormatException("bad string");
298                }
299                int b = buf[pos++];
300                if ((b & 0xC0) != 0x80) {
301                    throw new UTFDataFormatException("bad string");
302                }
303                characters[count++] = (char) (((a & 0x1F) << 6) | (b & 0x3F));
304            } else if ((a & 0xf0) == 0xe0) {
305                if (pos + 1 >= endPos) {
306                    throw new UTFDataFormatException("bad string");
307                }
308                int b = buf[pos++];
309                int c = buf[pos++];
310                if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80)) {
311                    throw new UTFDataFormatException("bad string");
312                }
313                characters[count++] = (char) (((a & 0x0F) << 12) | ((b & 0x3F) << 6) | (c & 0x3F));
314            } else {
315                throw new UTFDataFormatException("bad string");
316            }
317        }
318        return new String(characters, 0, count);
319    }
320
321    public int getPos() {
322        return pos;
323    }
324
325    public void setPos(int pos) {
326        this.pos = pos;
327    }
328
329    public int getLength() {
330        return length;
331    }
332
333    public void setLength(int length) {
334        this.length = length;
335    }
336}