From e0572c496cbdb2f0296519338b94ce07dbd27f5b Mon Sep 17 00:00:00 2001 From: Ali Anwar Date: Thu, 21 Sep 2017 17:55:01 -0700 Subject: [PATCH] Revert "TEPHRA-263 Enforce TTL, regardless of any in-progress transactions. Also Handle the case where TTL is longer than the duration from beginning of epoch to now." This reverts commit a346efe7560cc3b96349263669c4e15951ad401b. --- .../java/org/apache/tephra/util/TxUtils.java | 30 ++++------------ .../org/apache/tephra/util/TxUtilsTest.java | 35 ------------------- .../hbase/coprocessor/TransactionFilters.java | 9 ++--- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 9 +++-- .../TransactionVisibilityFilterTest.java | 15 ++++---- .../hbase/coprocessor/TransactionFilters.java | 9 ++--- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 7 ++-- .../TransactionVisibilityFilterTest.java | 15 ++++---- .../hbase/coprocessor/TransactionFilters.java | 9 ++--- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 7 ++-- .../TransactionVisibilityFilterTest.java | 15 ++++---- .../hbase/coprocessor/TransactionFilters.java | 9 ++--- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 7 ++-- .../TransactionVisibilityFilterTest.java | 15 ++++---- .../hbase/coprocessor/TransactionFilters.java | 9 ++--- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 11 +++--- .../TransactionVisibilityFilterTest.java | 15 ++++---- .../hbase/coprocessor/TransactionFilters.java | 9 ++--- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 11 +++--- .../TransactionVisibilityFilterTest.java | 15 ++++---- 26 files changed, 96 insertions(+), 177 deletions(-) diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java index 5eb2b3ec..aaca23d1 100644 --- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java +++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java @@ -19,8 +19,6 @@ package org.apache.tephra.util; import com.google.common.primitives.Longs; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionType; @@ -63,7 +61,9 @@ public class TxUtils { * @return The oldest timestamp that will be visible for the given transaction and TTL configuration */ public static long getOldestVisibleTimestamp(Map ttlByFamily, Transaction tx) { - return getOldestVisibleTimestamp(ttlByFamily, tx, false); + long maxTTL = getMaxTTL(ttlByFamily); + // we know that data will not be cleaned up while this tx is running up to this point as janitor uses it + return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0; } /** @@ -75,28 +75,12 @@ public static long getOldestVisibleTimestamp(Map ttlByFamily, Tran * @return The oldest timestamp that will be visible for the given transaction and TTL configuration */ public static long getOldestVisibleTimestamp(Map ttlByFamily, Transaction tx, boolean readNonTxnData) { - long maxTTL = getMaxTTL(ttlByFamily); - if (maxTTL == Long.MAX_VALUE) { - return 0; + if (readNonTxnData) { + long maxTTL = getMaxTTL(ttlByFamily); + return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0; } - return getOldestVisibleTimestamp(maxTTL, tx, readNonTxnData); - } - /** - * Returns the oldest visible timestamp for the given transaction, based on the TTL configured. If the TTL is - * negative or zero, the oldest visible timestamp will be {@code 0}. - * @param ttl TTL value (in milliseconds) - * @param tx The current transaction - * @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data - * @return The oldest timestamp that will be visible for the given transaction and TTL configuration - */ - public static long getOldestVisibleTimestamp(long ttl, Transaction tx, boolean readNonTxnData) { - if (ttl <= 0) { - return 0; - } - long ttlFactor = readNonTxnData ? 1 : TxConstants.MAX_TX_PER_MS; - // if the computed ttl is negative, return 0 because timestamps can not be negative - return Math.max(0, tx.getTransactionId() - ttl * ttlFactor); + return getOldestVisibleTimestamp(ttlByFamily, tx); } /** diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java index b172e89e..db687fe7 100644 --- a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java @@ -19,14 +19,9 @@ package org.apache.tephra.util; import org.apache.tephra.Transaction; -import org.apache.tephra.TxConstants; import org.junit.Assert; import org.junit.Test; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertEquals; /** @@ -59,34 +54,4 @@ public void testPruneUpperBound() { tx = new Transaction(100, 110, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS); Assert.assertEquals(99, TxUtils.getPruneUpperBound(tx)); } - - @Test - public void testTTL() { - long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS; - byte[] family = new byte[] { 'd' }; - long currentTxTimeSeconds = 100; - Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond, - new long[] {10 * txIdsPerSecond, 30 * txIdsPerSecond}, - new long[] {80 * txIdsPerSecond, 90 * txIdsPerSecond}, - 80 * txIdsPerSecond); - int ttlSeconds = 60; - Map ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L); - // ttl should only be impacted by the current transaction's id, and not by any older, in-progress transactions - Assert.assertEquals((currentTxTimeSeconds - ttlSeconds) * txIdsPerSecond, - TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx)); - } - - @Test - public void testLargeTTL() { - long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS; - byte[] family = new byte[] { 'd' }; - long currentTxTimeSeconds = 100; - Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond, - new long[] { }, new long[] { }, 100); - // ~100 years, so that the computed start timestamp is prior to 0 (epoch) - long ttlSeconds = TimeUnit.DAYS.toSeconds(365 * 100); - Map ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L); - // oldest visible timestamp should be 0, not negative, because HBase timestamps can not be negative - Assert.assertEquals(0, TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx)); - } } diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index d78ccea9..10ecfa40 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan operation being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index d24f5042..36752686 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -80,20 +80,19 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ - public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(ttlEntry.getKey(), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 7286a2a3..4d34ed9b 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -294,9 +294,8 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; - // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out - Transaction tx = new Transaction(now, now, new long[0], new long[0], now); + Transaction tx = txManager.startShort(); + long now = tx.getVisibilityUpperBound(); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -354,7 +353,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 435ae02a..17d55a4e 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 3e3db595..9a617a97 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -80,20 +80,19 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(ttlEntry.getKey(), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 49de0eba..3352eef4 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -293,9 +293,8 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; - // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out - Transaction tx = new Transaction(now, now, new long[0], new long[0], now); + Transaction tx = txManager.startShort(); + long now = tx.getVisibilityUpperBound(); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -353,7 +352,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index ab2ac8d4..ca960526 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 9e324787..9825deda 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -73,7 +73,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -83,20 +83,19 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index d4e4ed16..c27a10d2 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -293,9 +293,8 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; - // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out - Transaction tx = new Transaction(now, now, new long[0], new long[0], now); + Transaction tx = txManager.startShort(); + long now = tx.getVisibilityUpperBound(); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -353,7 +352,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 7325a7a1..263ee98e 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 9e324787..9825deda 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -73,7 +73,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -83,20 +83,19 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 28dfba83..4b2b40c3 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -293,9 +293,8 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; - // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out - Transaction tx = new Transaction(now, now, new long[0], new long[0], now); + Transaction tx = txManager.startShort(); + long now = tx.getVisibilityUpperBound(); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -353,7 +352,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 848cb1fb..553f5989 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index bd3c719e..5ad7c29c 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,8 +70,8 @@ public class TransactionVisibilityFilter extends FilterBase { * @param scanType the type of scan operation being performed */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + ScanType scanType) { + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -81,20 +81,19 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ - public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index b6650f61..1b02609f 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -294,9 +294,8 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; - // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out - Transaction tx = new Transaction(now, now, new long[0], new long[0], now); + Transaction tx = txManager.startShort(); + long now = tx.getVisibilityUpperBound(); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -354,7 +353,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 848cb1fb..553f5989 100644 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 81eb604c..40e2c37a 100644 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,8 +70,8 @@ public class TransactionVisibilityFilter extends FilterBase { * @param scanType the type of scan operation being performed */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + ScanType scanType) { + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -81,20 +81,19 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ - public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index b6650f61..1b02609f 100644 --- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -294,9 +294,8 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; - // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out - Transaction tx = new Transaction(now, now, new long[0], new long[0], now); + Transaction tx = txManager.startShort(); + long now = tx.getVisibilityUpperBound(); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -354,7 +353,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override