diff --git a/NOTICE.txt b/NOTICE.txt new file mode 100644 index 00000000..eab39c08 --- /dev/null +++ b/NOTICE.txt @@ -0,0 +1,10 @@ +This product is distributed under the GNU General Public License (GPL v2+) (see COPYING). + +This product includes software developed by several Apache Software Foundation projects, +licensed under the Apache License, 2.0, including but not limited to: + - Apache Mahout + - Apache Hadoop + +This product includes a JUnit jar: http://junit.sourceforge.net/ +License: Common Public License - v 1.0 (http://junit.sourceforge.net/cpl-v10.html) +Copyright (c) 2000-2006, www.hamcrest.org diff --git a/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java b/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java new file mode 100644 index 00000000..83fa660a --- /dev/null +++ b/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java @@ -0,0 +1,103 @@ +/* + * This file is part of Hadoop-Gpl-Compression. + * + * Hadoop-Gpl-Compression is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Hadoop-Gpl-Compression is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hadoop-Gpl-Compression. If not, see + * . + */ + +package com.hadoop.compression.lzo; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; + +public class LzoBasicIndexSerde implements LzoIndexSerde { + + private static final int BUFFER_CAPACITY = 16 * 1024 * 8; //size for a 4GB file (with 256KB lzo blocks) + + private DataOutputStream os; + private DataInputStream is; + private ByteBuffer bytesIn; + private long firstLong; + private int numBlocks = 0; + private boolean processedFirstLong = false; + + @Override + public boolean accepts(long firstLong) { + if (firstLong < 0) { + return false; + } else { + this.firstLong = firstLong; + return true; + } + } + + @Override + public void prepareToWrite(DataOutputStream os) throws IOException { + this.os = os; + } + + @Override + public void prepareToRead(DataInputStream is) throws IOException { + this.is = is; + bytesIn = fillBuffer(); + numBlocks = bytesIn.remaining()/8 + 1; // plus one for the first long. + processedFirstLong = false; + } + + @Override + public void writeOffset(long offset) throws IOException { + os.writeLong(offset); + } + + @Override + public void finishWriting() throws IOException { + os.close(); + } + + @Override + public boolean hasNext() throws IOException { + return !processedFirstLong || (bytesIn != null && bytesIn.hasRemaining()); + } + + @Override + public long next() throws IOException { + if (!processedFirstLong) { + processedFirstLong = true; + return firstLong; + } + if (bytesIn != null && bytesIn.hasRemaining()) { + return bytesIn.getLong(); + } else { + throw new IOException("Attempt to read past the edge of the index."); + } + } + + private ByteBuffer fillBuffer() throws IOException { + DataOutputBuffer bytes = new DataOutputBuffer(BUFFER_CAPACITY); + // copy indexIn and close it if finished + IOUtils.copyBytes(is, bytes, 4*1024, true); + return ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()); + } + + @Override + public void finishReading() throws IOException { + is.close(); + } + +} diff --git a/src/java/com/hadoop/compression/lzo/LzoIndex.java b/src/java/com/hadoop/compression/lzo/LzoIndex.java index c5f9059e..f7635cc4 100644 --- a/src/java/com/hadoop/compression/lzo/LzoIndex.java +++ b/src/java/com/hadoop/compression/lzo/LzoIndex.java @@ -20,7 +20,7 @@ import java.io.EOFException; import java.io.IOException; -import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import org.apache.hadoop.conf.Configurable; @@ -29,8 +29,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; @@ -44,6 +42,13 @@ public class LzoIndex { private long[] blockPositions_; + private static ArrayList> serdeClasses = + new ArrayList>(); + static { + serdeClasses.add(LzoBasicIndexSerde.class); + serdeClasses.add(LzoTinyOffsetsSerde.class); + } + /** * Create an empty index, typically indicating no index file exists. */ @@ -175,21 +180,29 @@ public static LzoIndex readIndex(FileSystem fs, Path lzoFile) throws IOException // return empty index, fall back to the unsplittable mode return new LzoIndex(); } - - int capacity = 16 * 1024 * 8; //size for a 4GB file (with 256KB lzo blocks) - DataOutputBuffer bytes = new DataOutputBuffer(capacity); - - // copy indexIn and close it - IOUtils.copyBytes(indexIn, bytes, 4*1024, true); - - ByteBuffer bytesIn = ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()); - int blocks = bytesIn.remaining()/8; - LzoIndex index = new LzoIndex(blocks); - - for (int i = 0; i < blocks; i++) { - index.set(i, bytesIn.getLong()); + long firstLong = indexIn.readLong(); + LzoIndexSerde serde = null; + for (Class candidateClass : serdeClasses) { + LzoIndexSerde candidate = null; + candidate = quietGetInstance(candidateClass); + if (candidate.accepts(firstLong)) { + serde = candidate; + break; + } + } + serde.prepareToRead(indexIn); + // Sized for at least 1 256MB HDFS block with 256KB Lzo blocks. + // if it's less than that, you shouldn't bother indexing anyway. + ArrayList offsets = new ArrayList(1024); + while (serde.hasNext()) { + offsets.add(serde.next()); } + serde.finishReading(); + LzoIndex index = new LzoIndex(offsets.size()); + for (int i = 0; i < offsets.size(); i++) { + index.set(i, offsets.get(i)); + } return index; } @@ -217,6 +230,8 @@ public static void createIndex(FileSystem fs, Path lzoFile) FSDataInputStream is = null; FSDataOutputStream os = null; + LzoIndexSerde writer = new LzoTinyOffsetsSerde(); + Path outputFile = lzoFile.suffix(LZO_INDEX_SUFFIX); Path tmpOutputFile = lzoFile.suffix(LZO_TMP_INDEX_SUFFIX); @@ -226,6 +241,7 @@ public static void createIndex(FileSystem fs, Path lzoFile) try { is = fs.open(lzoFile); os = fs.create(tmpOutputFile); + writer.prepareToWrite(os); LzopDecompressor decompressor = (LzopDecompressor) codec.createDecompressor(); // Solely for reading the header codec.createInputStream(is, decompressor); @@ -252,7 +268,7 @@ public static void createIndex(FileSystem fs, Path lzoFile) numDecompressedChecksums : numDecompressedChecksums + numCompressedChecksums; long pos = is.getPos(); // write the pos of the block start - os.writeLong(pos - 8); + writer.writeOffset(pos - 8); // seek to the start of the next block, skip any checksums is.seek(pos + compressedBlockSize + (4 * numChecksumsToSkip)); } @@ -263,7 +279,7 @@ public static void createIndex(FileSystem fs, Path lzoFile) if (is != null) { is.close(); } - + writer.finishWriting(); if (os != null) { os.close(); } @@ -277,5 +293,17 @@ public static void createIndex(FileSystem fs, Path lzoFile) } } } + + private static LzoIndexSerde quietGetInstance(Class klass) throws IOException { + LzoIndexSerde instance = null; + try { + instance = klass.newInstance(); + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + return instance; + } } diff --git a/src/java/com/hadoop/compression/lzo/LzoIndexSerde.java b/src/java/com/hadoop/compression/lzo/LzoIndexSerde.java new file mode 100644 index 00000000..ab2a29f1 --- /dev/null +++ b/src/java/com/hadoop/compression/lzo/LzoIndexSerde.java @@ -0,0 +1,67 @@ +/* + * This file is part of Hadoop-Gpl-Compression. + * + * Hadoop-Gpl-Compression is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Hadoop-Gpl-Compression is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hadoop-Gpl-Compression. If not, see + * . + */ + +package com.hadoop.compression.lzo; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public interface LzoIndexSerde { + + /** + * Serdes will be tried in order until one is found that accepts + * the offered format. A format is determined from the first 8 + * bytes (represented as a long) written to the index file. + *

+ * The first long is somewhat constrained: the topmost bit should be + * 1, the next 31 are a version number by which the appropriate SerDe + * is decided, and the next 32 can have arbitrary data (a header, or + * a length of the header, or an offset.. up to you). + * + * @param firstLong + * @return true if this format is recognized by the SerDe, false otherwise. + */ + public boolean accepts(long firstLong); + + public void prepareToWrite(DataOutputStream os) throws IOException; + + /** + * Prepare to read the index. Note that the first 8 bits will have been already + * read from this stream, and passed to you in accepts() in the form of a long. + * @param is InputStream to read. + */ + public void prepareToRead(DataInputStream is) throws IOException; + + /** + * Write the next offset into the file. It is expected that + * the offsets are supplied in order. prepareToWrite() + * should be called before the first invocation of this method. + * @param offset + */ + public void writeOffset(long offset) throws IOException; + + public void finishWriting() throws IOException; + + public void finishReading() throws IOException; + + public boolean hasNext() throws IOException; + + public long next() throws IOException; + +} diff --git a/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java b/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java new file mode 100644 index 00000000..36c8b22a --- /dev/null +++ b/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java @@ -0,0 +1,141 @@ +/* + * This file is part of Hadoop-Gpl-Compression. + * + * Hadoop-Gpl-Compression is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Hadoop-Gpl-Compression is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hadoop-Gpl-Compression. If not, see + * . + */ + +package com.hadoop.compression.lzo; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; + +/** + * The index is stored as follows: + *

    + *
  • 4 bytes: 1 for topmost bit, 30 0s, 1 (version number).. + *
  • 4 bytes: first offset (offset from LZO header) + *
  • 4 bytes: size of first block + *
  • Sequence of 2-byte shorts: delta to size of the first split that gets you to size of next split. + *
+ */ +public class LzoTinyOffsetsSerde implements LzoIndexSerde { + + private DataOutputStream os; + private DataInputStream is; + + private boolean readFirstLong = false; + private int firstBlockSize; + private boolean wroteFirstBlock = false; + + private boolean readInitialOffset = false; + private boolean wroteInitialOffset = false; + private long currOffset = 0; + + // for hasNext, we have to read the next value + // (or, if the buffer hasn't been used, just check it for null) + private Integer bufferedOffset = null; + + private static final int VERSION = (1 << 31) + 1; + + @Override + public boolean accepts(long firstLong) { + if ( ((int) (firstLong >>> 32)) == VERSION) { + currOffset = (int) (firstLong << 32 >>> 32); + return true; + } else { + return false; + } + } + + @Override + public void prepareToWrite(DataOutputStream os) throws IOException { + this.os = os; + os.writeInt(VERSION); + wroteFirstBlock = false; + wroteInitialOffset = false; + } + + @Override + public void prepareToRead(DataInputStream is) throws IOException { + this.is = is; + readFirstLong = false; + readInitialOffset = false; + } + + @Override + public void writeOffset(long offset) throws IOException { + + if (!wroteInitialOffset) { + os.writeInt((int) offset); + wroteInitialOffset = true; + } else if (!wroteFirstBlock) { + firstBlockSize = (int) (offset - currOffset); + os.writeInt(firstBlockSize); + wroteFirstBlock = true; + } else { + int delta = ((int) (offset - currOffset)) - firstBlockSize; + Varint.writeSignedVarInt(delta, os); + } + currOffset = offset; + + } + + @Override + public void finishWriting() throws IOException { + os.close(); + } + + @Override + public void finishReading() throws IOException { + is.close(); + } + + @Override + public boolean hasNext() throws IOException { + if (readInitialOffset && readFirstLong) { + if (bufferedOffset == null) { + // try to read something. If we hit EOF, we are done. + try { + bufferedOffset = Varint.readSignedVarInt(is); + } catch (EOFException e) { + return false; + } + } + return true; + } + return !readInitialOffset || (!readFirstLong && is.available() != 0); + } + + @Override + public long next() throws IOException { + if (!readInitialOffset) { + readInitialOffset = true; + } else if (!readFirstLong) { + readFirstLong = true; + firstBlockSize = is.readInt(); + currOffset += firstBlockSize; + } else { + if (bufferedOffset == null) { + bufferedOffset = Varint.readSignedVarInt(is); + } + currOffset += firstBlockSize + bufferedOffset; + bufferedOffset = null; + } + return currOffset; + } + +} diff --git a/src/java/com/hadoop/compression/lzo/Varint.java b/src/java/com/hadoop/compression/lzo/Varint.java new file mode 100644 index 00000000..ad417d22 --- /dev/null +++ b/src/java/com/hadoop/compression/lzo/Varint.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The Varint code is taken from the Apache Mahout project under the + * Apache Software License. + * https://github.com/apache/mahout/blob/trunk/core/src/main/java/org/apache/mahout/math/Varint.java + * + * We've replace the guava Preconditions usage with simple if() {throw IllegalArgumentException} blocks. + * Other than that, it's a direct copy as of Feb 14 2012. + * + */ + +package com.hadoop.compression.lzo; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + + +/** + *

Encodes signed and unsigned values using a common variable-length + * scheme, found for example in + * + * Google's Protocol Buffers. It uses fewer bytes to encode smaller values, + * but will use slightly more bytes to encode large values.

+ * + *

Signed values are further encoded using so-called zig-zag encoding + * in order to make them "compatible" with variable-length encoding.

+ */ +public final class Varint { + + private Varint() { + } + + /** + * Encodes a value using the variable-length encoding from + * + * Google Protocol Buffers. It uses zig-zag encoding to efficiently + * encode signed values. If values are known to be nonnegative, + * {@link #writeUnsignedVarLong(long, DataOutput)} should be used. + * + * @param value value to encode + * @param out to write bytes to + * @throws IOException if {@link DataOutput} throws {@link IOException} + */ + public static void writeSignedVarLong(long value, DataOutput out) throws IOException { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + writeUnsignedVarLong((value << 1) ^ (value >> 63), out); + } + + /** + * Encodes a value using the variable-length encoding from + * + * Google Protocol Buffers. Zig-zag is not used, so input must not be negative. + * If values can be negative, use {@link #writeSignedVarLong(long, DataOutput)} + * instead. This method treats negative input as like a large unsigned value. + * + * @param value value to encode + * @param out to write bytes to + * @throws IOException if {@link DataOutput} throws {@link IOException} + */ + public static void writeUnsignedVarLong(long value, DataOutput out) throws IOException { + while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) { + out.writeByte(((int) value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte((int) value & 0x7F); + } + + /** + * @see #writeSignedVarLong(long, DataOutput) + */ + public static void writeSignedVarInt(int value, DataOutput out) throws IOException { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + writeUnsignedVarInt((value << 1) ^ (value >> 31), out); + } + + /** + * @see #writeUnsignedVarLong(long, DataOutput) + */ + public static void writeUnsignedVarInt(int value, DataOutput out) throws IOException { + while ((value & 0xFFFFFF80) != 0L) { + out.writeByte((value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte(value & 0x7F); + } + + /** + * @param in to read bytes from + * @return decode value + * @throws IOException if {@link DataInput} throws {@link IOException} + * @throws IllegalArgumentException if variable-length value does not terminate + * after 9 bytes have been read + * @see #writeSignedVarLong(long, DataOutput) + */ + public static long readSignedVarLong(DataInput in) throws IOException { + long raw = readUnsignedVarLong(in); + // This undoes the trick in writeSignedVarLong() + long temp = (((raw << 63) >> 63) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1L << 63)); + } + + /** + * @param in to read bytes from + * @return decode value + * @throws IOException if {@link DataInput} throws {@link IOException} + * @throws IllegalArgumentException if variable-length value does not terminate + * after 9 bytes have been read + * @see #writeUnsignedVarLong(long, DataOutput) + */ + public static long readUnsignedVarLong(DataInput in) throws IOException { + long value = 0L; + int i = 0; + long b; + while (((b = in.readByte()) & 0x80L) != 0) { + value |= (b & 0x7F) << i; + i += 7; + if (i > 63) { + throw new IllegalArgumentException("Variable length quantity is too long"); + } + } + return value | (b << i); + } + + /** + * @throws IllegalArgumentException if variable-length value does not terminate + * after 5 bytes have been read + * @throws IOException if {@link DataInput} throws {@link IOException} + * @see #readSignedVarLong(DataInput) + */ + public static int readSignedVarInt(DataInput in) throws IOException { + int raw = readUnsignedVarInt(in); + // This undoes the trick in writeSignedVarInt() + int temp = (((raw << 31) >> 31) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values. + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1 << 31)); + } + + /** + * @throws IllegalArgumentException if variable-length value does not terminate + * after 5 bytes have been read + * @throws IOException if {@link DataInput} throws {@link IOException} + * @see #readUnsignedVarLong(DataInput) + */ + public static int readUnsignedVarInt(DataInput in) throws IOException { + int value = 0; + int i = 0; + int b; + while (((b = in.readByte()) & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + if (i > 35) { + throw new IllegalArgumentException("Variable length quantity is too long"); + } + } + return value | (b << i); + } + +} \ No newline at end of file diff --git a/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java b/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java new file mode 100644 index 00000000..42a5f77e --- /dev/null +++ b/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java @@ -0,0 +1,69 @@ +/* + * This file is part of Hadoop-Gpl-Compression. + * + * Hadoop-Gpl-Compression is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Hadoop-Gpl-Compression is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hadoop-Gpl-Compression. If not, see + * . + */ + +package com.hadoop.compression.lzo; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import junit.framework.TestCase; + +public class TestLzoIndexSerde extends TestCase { + + public void testBasicSerde() throws IOException, InstantiationException, IllegalAccessException { + testGenericSerde(new LzoBasicIndexSerde()); + } + + public void testLzoTinyOffsetsSerde() throws IOException, InstantiationException, IllegalAccessException { + testGenericSerde(new LzoTinyOffsetsSerde()); + } + + /** + * Ensures that the provided serde can read its own output correctly + * @param serde + * @throws IOException + * @throws IllegalAccessException + * @throws InstantiationException + */ + public void testGenericSerde(LzoIndexSerde serde) throws IOException, InstantiationException, IllegalAccessException { + long[] expected = { 40L, 500L, 584L, 10017L }; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + serde.prepareToWrite(os); + for (long val : expected) { + serde.writeOffset(val); + } + serde.finishWriting(); + serde = serde.getClass().newInstance(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream is = new DataInputStream(bais); + long firstLong = is.readLong(); + assertTrue("Serde does not accept its own first long", serde.accepts(firstLong)); + serde.prepareToRead(is); + for (long val : expected) { + assertTrue("Serde does not return as many values as were written", serde.hasNext()); + assertEquals("Serde returned wrong offset", val, serde.next()); + } + assertFalse(serde.hasNext()); + + } +}