Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "TEPHRA-263 Enforce TTL, regardless of any in-progress transac… #66

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 7 additions & 23 deletions tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.tephra.util;

import com.google.common.primitives.Longs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionType;
Expand Down Expand Up @@ -63,7 +61,9 @@ public class TxUtils {
* @return The oldest timestamp that will be visible for the given transaction and TTL configuration
*/
public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx) {
return getOldestVisibleTimestamp(ttlByFamily, tx, false);
long maxTTL = getMaxTTL(ttlByFamily);
// we know that data will not be cleaned up while this tx is running up to this point as janitor uses it
return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0;
}

/**
Expand All @@ -75,28 +75,12 @@ public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Tran
* @return The oldest timestamp that will be visible for the given transaction and TTL configuration
*/
public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx, boolean readNonTxnData) {
long maxTTL = getMaxTTL(ttlByFamily);
if (maxTTL == Long.MAX_VALUE) {
return 0;
if (readNonTxnData) {
long maxTTL = getMaxTTL(ttlByFamily);
return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0;
}
return getOldestVisibleTimestamp(maxTTL, tx, readNonTxnData);
}

/**
* Returns the oldest visible timestamp for the given transaction, based on the TTL configured. If the TTL is
* negative or zero, the oldest visible timestamp will be {@code 0}.
* @param ttl TTL value (in milliseconds)
* @param tx The current transaction
* @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data
* @return The oldest timestamp that will be visible for the given transaction and TTL configuration
*/
public static long getOldestVisibleTimestamp(long ttl, Transaction tx, boolean readNonTxnData) {
if (ttl <= 0) {
return 0;
}
long ttlFactor = readNonTxnData ? 1 : TxConstants.MAX_TX_PER_MS;
// if the computed ttl is negative, return 0 because timestamps can not be negative
return Math.max(0, tx.getTransactionId() - ttl * ttlFactor);
return getOldestVisibleTimestamp(ttlByFamily, tx);
}

/**
Expand Down
35 changes: 0 additions & 35 deletions tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@
package org.apache.tephra.util;

import org.apache.tephra.Transaction;
import org.apache.tephra.TxConstants;
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

/**
Expand Down Expand Up @@ -59,34 +54,4 @@ public void testPruneUpperBound() {
tx = new Transaction(100, 110, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS);
Assert.assertEquals(99, TxUtils.getPruneUpperBound(tx));
}

@Test
public void testTTL() {
long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS;
byte[] family = new byte[] { 'd' };
long currentTxTimeSeconds = 100;
Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond,
new long[] {10 * txIdsPerSecond, 30 * txIdsPerSecond},
new long[] {80 * txIdsPerSecond, 90 * txIdsPerSecond},
80 * txIdsPerSecond);
int ttlSeconds = 60;
Map<byte[], Long> ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L);
// ttl should only be impacted by the current transaction's id, and not by any older, in-progress transactions
Assert.assertEquals((currentTxTimeSeconds - ttlSeconds) * txIdsPerSecond,
TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx));
}

@Test
public void testLargeTTL() {
long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS;
byte[] family = new byte[] { 'd' };
long currentTxTimeSeconds = 100;
Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond,
new long[] { }, new long[] { }, 100);
// ~100 years, so that the computed start timestamp is prior to 0 (epoch)
long ttlSeconds = TimeUnit.DAYS.toSeconds(365 * 100);
Map<byte[], Long> ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L);
// oldest visible timestamp should be 0, not negative, because HBase timestamps can not be negative
Assert.assertEquals(0, TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public class TransactionFilters {
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
scanType, null));
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
}

/**
Expand All @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlBy
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
scanType, cellFilter));
ScanType scanType, @Nullable Filter cellFilter) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti
* @param type the type of scan operation being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase {
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}

/**
Expand All @@ -80,20 +80,19 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(ttlEntry.getKey(),
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
TxFilterFactory txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
}
};
runFilteringTest(txFilterFactory,
Expand All @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
}
};
runFilteringTest(txFilterFactory,
Expand All @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
}
};
runFilteringTest(txFilterFactory,
Expand All @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
txFilterFactory = new TxFilterFactory() {
@Override
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter);
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
}
};
runFilteringTest(txFilterFactory,
Expand Down Expand Up @@ -294,9 +294,8 @@ public void testTTLFiltering() throws Exception {
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);

long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
// we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
Transaction tx = txManager.startShort();
long now = tx.getVisibilityUpperBound();
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
Expand Down Expand Up @@ -354,7 +353,7 @@ private interface TxFilterFactory {
private class CustomTxFilter extends TransactionVisibilityFilter {
public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
@Nullable Filter cellFilter) {
super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter);
super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public class TransactionFilters {
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
scanType, null));
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
}

/**
Expand All @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlBy
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
scanType, cellFilter));
ScanType scanType, @Nullable Filter cellFilter) {
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti
* @param type the type of scan being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase {
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}

/**
Expand All @@ -80,20 +80,19 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
* these will be interpreted as "delete" markers and the column will be filtered out
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(ttlEntry.getKey(),
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
Expand Down
Loading