Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
TEPHRA-263 Fix test cases and TTL enforcement for tables configured t…
Browse files Browse the repository at this point in the history
…o read non transactional data.
anwar6953 committed Sep 22, 2017
1 parent 28d47c7 commit b15a8f9
Showing 29 changed files with 100 additions and 109 deletions.
11 changes: 8 additions & 3 deletions tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
Original file line number Diff line number Diff line change
@@ -94,9 +94,14 @@ public static long getOldestVisibleTimestamp(long ttl, Transaction tx, boolean r
if (ttl <= 0) {
return 0;
}
long ttlFactor = readNonTxnData ? 1 : TxConstants.MAX_TX_PER_MS;
long oldestVisibleTimestamp;
if (readNonTxnData) {
oldestVisibleTimestamp = getTimestamp(tx.getTransactionId()) - ttl;
} else {
oldestVisibleTimestamp = tx.getTransactionId() - ttl * TxConstants.MAX_TX_PER_MS;
}
// if the computed ttl is negative, return 0 because timestamps can not be negative
return Math.max(0, tx.getTransactionId() - ttl * ttlFactor);
return Math.max(0, oldestVisibleTimestamp);
}

/**
@@ -119,7 +124,7 @@ public static long getMaxVisibleTimestamp(Transaction tx) {
* as being written by this transaction (and therefore visible).
*/
public static Transaction createDummyTransaction(TransactionVisibilityState txVisibilityState) {
return new Transaction(txVisibilityState.getReadPointer(), Long.MAX_VALUE,
return new Transaction(txVisibilityState.getReadPointer(), txVisibilityState.getWritePointer(),
Longs.toArray(txVisibilityState.getInvalid()),
Longs.toArray(txVisibilityState.getInProgress().keySet()),
TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT);
Original file line number Diff line number Diff line change
@@ -40,8 +40,7 @@ public class TransactionFilters {
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
scanType, null));
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
}

/**
@@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlBy
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
scanType, cellFilter));
ScanType scanType, @Nullable Filter cellFilter) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
}
}
Original file line number Diff line number Diff line change
@@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti
* @param type the type of scan operation being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}

/**
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase {
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}

/**
@@ -80,20 +80,21 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(ttlEntry.getKey(),
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
// we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp
// in #filterKeyValue, using TxUtils#getTimestampForTTL(long)
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false));
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
Original file line number Diff line number Diff line change
@@ -107,7 +107,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
public static TemporaryFolder tmpFolder = new TemporaryFolder();

private static MiniDFSCluster dfsCluster;

private static final class TestBytes {
private static final byte[] table = Bytes.toBytes("testtable");
private static final byte[] family = Bytes.toBytes("f1");
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
TxFilterFactory txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -354,7 +354,7 @@ private interface TxFilterFactory {
private class CustomTxFilter extends TransactionVisibilityFilter {
public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
@Nullable Filter cellFilter) {
super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter);
super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
}

@Override
Original file line number Diff line number Diff line change
@@ -40,8 +40,7 @@ public class TransactionFilters {
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
scanType, null));
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
}

/**
@@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlBy
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
scanType, cellFilter));
ScanType scanType, @Nullable Filter cellFilter) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
}
}
Original file line number Diff line number Diff line change
@@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti
* @param type the type of scan being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}

/**
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase {
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}

/**
@@ -80,20 +80,21 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(ttlEntry.getKey(),
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
// we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp
// in #filterKeyValue, using TxUtils#getTimestampForTTL(long)
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false));
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
TxFilterFactory txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
}
};
runFilteringTest(txFilterFactory,
@@ -353,7 +353,7 @@ private interface TxFilterFactory {
private class CustomTxFilter extends TransactionVisibilityFilter {
public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
@Nullable Filter cellFilter) {
super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter);
super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
}

@Override
Original file line number Diff line number Diff line change
@@ -40,8 +40,7 @@ public class TransactionFilters {
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
scanType, null));
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
}

/**
@@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlBy
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
scanType, cellFilter));
ScanType scanType, @Nullable Filter cellFilter) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
}
}
Original file line number Diff line number Diff line change
@@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti
* @param type the type of scan being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}

/**
Loading

0 comments on commit b15a8f9

Please sign in to comment.