From f8dce6c501a804a32f629f2effbf5416df007634 Mon Sep 17 00:00:00 2001 From: Vikas Kumar Date: Thu, 16 May 2024 10:00:52 +0530 Subject: [PATCH] HADOOP-18851. Performance improvement for DelegationTokenSecretManager (#6803) --- .../AbstractDelegationTokenSecretManager.java | 444 +++++++++++------- 1 file changed, 267 insertions(+), 177 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index e218bea8000c0..9cf3ccdd445e7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.crypto.SecretKey; @@ -122,10 +123,10 @@ private String formatTokenId(TokenIdent id) { */ private DelegationKey currentKey; - private long keyUpdateInterval; - private long tokenMaxLifetime; - private long tokenRemoverScanInterval; - private long tokenRenewInterval; + private final long keyUpdateInterval; + private final long tokenMaxLifetime; + private final long tokenRemoverScanInterval; + private final long tokenRenewInterval; /** * Whether to store a token's tracking ID in its TokenInformation. * Can be overridden by a subclass. @@ -140,6 +141,8 @@ private String formatTokenId(TokenIdent id) { */ protected Object noInterruptsLock = new Object(); + private final ReentrantReadWriteLock apiLock = new ReentrantReadWriteLock(true); + /** * Create a secret manager * @param delegationKeyUpdateInterval the number of milliseconds for rolling @@ -169,21 +172,29 @@ public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval, public void startThreads() throws IOException { Preconditions.checkState(!running); updateCurrentKey(); - synchronized (this) { + this.apiLock.writeLock().lock(); + try { running = true; tokenRemoverThread = new Daemon(new ExpiredTokenRemover()); tokenRemoverThread.start(); + } finally { + this.apiLock.writeLock().unlock(); } } /** * Reset all data structures and mutable state. */ - public synchronized void reset() { - setCurrentKeyId(0); - allKeys.clear(); - setDelegationTokenSeqNum(0); - currentTokens.clear(); + public void reset() { + this.apiLock.writeLock().lock(); + try { + setCurrentKeyId(0); + allKeys.clear(); + setDelegationTokenSeqNum(0); + currentTokens.clear(); + } finally { + this.apiLock.writeLock().unlock(); + } } /** @@ -210,17 +221,27 @@ protected long getTokenRenewInterval() { * @param key delegation key. * @throws IOException raised on errors performing I/O. */ - public synchronized void addKey(DelegationKey key) throws IOException { + public void addKey(DelegationKey key) throws IOException { if (running) // a safety check throw new IOException("Can't add delegation key to a running SecretManager."); - if (key.getKeyId() > getCurrentKeyId()) { - setCurrentKeyId(key.getKeyId()); + this.apiLock.writeLock().lock(); + try { + if (key.getKeyId() > getCurrentKeyId()) { + setCurrentKeyId(key.getKeyId()); + } + allKeys.put(key.getKeyId(), key); + } finally { + this.apiLock.writeLock().unlock(); } - allKeys.put(key.getKeyId(), key); } - public synchronized DelegationKey[] getAllKeys() { - return allKeys.values().toArray(new DelegationKey[0]); + public DelegationKey[] getAllKeys() { + this.apiLock.readLock().lock(); + try { + return allKeys.values().toArray(new DelegationKey[0]); + } finally { + this.apiLock.readLock().unlock(); + } } // HDFS @@ -263,8 +284,13 @@ protected void updateStoredToken(TokenIdent ident, long renewDate) throws IOExce * * @return currentId. */ - protected synchronized int getCurrentKeyId() { - return currentId; + protected int getCurrentKeyId() { + this.apiLock.readLock().lock(); + try { + return currentId; + } finally { + this.apiLock.readLock().unlock(); + } } /** @@ -273,8 +299,13 @@ protected synchronized int getCurrentKeyId() { * * @return currentId. */ - protected synchronized int incrementCurrentKeyId() { - return ++currentId; + protected int incrementCurrentKeyId() { + this.apiLock.writeLock().lock(); + try { + return ++currentId; + } finally { + this.apiLock.writeLock().unlock(); + } } /** @@ -283,8 +314,13 @@ protected synchronized int incrementCurrentKeyId() { * * @param keyId keyId. */ - protected synchronized void setCurrentKeyId(int keyId) { - currentId = keyId; + protected void setCurrentKeyId(int keyId) { + this.apiLock.writeLock().lock(); + try { + currentId = keyId; + } finally { + this.apiLock.writeLock().unlock(); + } } /** @@ -293,8 +329,13 @@ protected synchronized void setCurrentKeyId(int keyId) { * * @return delegationTokenSequenceNumber. */ - protected synchronized int getDelegationTokenSeqNum() { - return delegationTokenSequenceNumber; + protected int getDelegationTokenSeqNum() { + this.apiLock.readLock().lock(); + try { + return delegationTokenSequenceNumber; + } finally { + this.apiLock.readLock().unlock(); + } } /** @@ -303,8 +344,13 @@ protected synchronized int getDelegationTokenSeqNum() { * * @return delegationTokenSequenceNumber. */ - protected synchronized int incrementDelegationTokenSeqNum() { - return ++delegationTokenSequenceNumber; + protected int incrementDelegationTokenSeqNum() { + this.apiLock.writeLock().lock(); + try { + return ++delegationTokenSequenceNumber; + } finally { + this.apiLock.writeLock().unlock(); + } } /** @@ -313,8 +359,13 @@ protected synchronized int incrementDelegationTokenSeqNum() { * * @param seqNum seqNum. */ - protected synchronized void setDelegationTokenSeqNum(int seqNum) { - delegationTokenSequenceNumber = seqNum; + protected void setDelegationTokenSeqNum(int seqNum) { + this.apiLock.writeLock().lock(); + try { + delegationTokenSequenceNumber = seqNum; + } finally { + this.apiLock.writeLock().unlock(); + } } /** @@ -401,34 +452,39 @@ protected void updateToken(TokenIdent ident, * @param renewDate token renew time * @throws IOException raised on errors performing I/O. */ - public synchronized void addPersistedDelegationToken( + public void addPersistedDelegationToken( TokenIdent identifier, long renewDate) throws IOException { if (running) { // a safety check throw new IOException( "Can't add persisted delegation token to a running SecretManager."); } - int keyId = identifier.getMasterKeyId(); - DelegationKey dKey = allKeys.get(keyId); - byte[] password = null; - if (dKey == null) { - LOG.warn("No KEY found for persisted identifier, expiring stored token " - + formatTokenId(identifier)); - // make sure the token is expired - renewDate = 0L; - } else { - password = createPassword(identifier.getBytes(), dKey.getKey()); - } - if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) { - setDelegationTokenSeqNum(identifier.getSequenceNumber()); - } - if (getTokenInfo(identifier) == null) { - currentTokens.put(identifier, new DelegationTokenInformation(renewDate, - password, getTrackingIdIfEnabled(identifier))); - addTokenForOwnerStats(identifier); - } else { - throw new IOException("Same delegation token being added twice: " - + formatTokenId(identifier)); + this.apiLock.writeLock().lock(); + try { + int keyId = identifier.getMasterKeyId(); + DelegationKey dKey = allKeys.get(keyId); + byte[] password = null; + if (dKey == null) { + LOG.warn("No KEY found for persisted identifier, expiring stored token " + formatTokenId( + identifier)); + // make sure the token is expired + renewDate = 0L; + } else { + password = createPassword(identifier.getBytes(), dKey.getKey()); + } + if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) { + setDelegationTokenSeqNum(identifier.getSequenceNumber()); + } + if (getTokenInfo(identifier) == null) { + currentTokens.put(identifier, new DelegationTokenInformation(renewDate, password, + getTrackingIdIfEnabled(identifier))); + addTokenForOwnerStats(identifier); + } else { + throw new IOException("Same delegation token being added twice: " + + formatTokenId(identifier)); + } + } finally { + this.apiLock.writeLock().unlock(); } } @@ -441,17 +497,18 @@ private void updateCurrentKey() throws IOException { LOG.info("Updating the current master key for generating delegation tokens"); /* Create a new currentKey with an estimated expiry date. */ int newCurrentId; - synchronized (this) { - newCurrentId = incrementCurrentKeyId(); - } + newCurrentId = incrementCurrentKeyId(); DelegationKey newKey = new DelegationKey(newCurrentId, System .currentTimeMillis() + keyUpdateInterval + tokenMaxLifetime, generateSecret()); //Log must be invoked outside the lock on 'this' logUpdateMasterKey(newKey); - synchronized (this) { + this.apiLock.writeLock().lock(); + try { currentKey = newKey; storeDelegationKey(currentKey); + } finally { + this.apiLock.writeLock().unlock(); } } @@ -461,7 +518,8 @@ private void updateCurrentKey() throws IOException { * @throws IOException raised on errors performing I/O. */ protected void rollMasterKey() throws IOException { - synchronized (this) { + this.apiLock.writeLock().lock(); + try { removeExpiredKeys(); /* set final expiry date for retiring currentKey */ currentKey.setExpiryDate(Time.now() + tokenMaxLifetime); @@ -471,46 +529,59 @@ protected void rollMasterKey() throws IOException { * allKeys just in case. */ updateDelegationKey(currentKey); + } finally { + this.apiLock.writeLock().unlock(); } updateCurrentKey(); } - private synchronized void removeExpiredKeys() { - long now = Time.now(); - for (Iterator> it = allKeys.entrySet() - .iterator(); it.hasNext();) { - Map.Entry e = it.next(); - if (e.getValue().getExpiryDate() < now) { - it.remove(); - // ensure the tokens generated by this current key can be recovered - // with this current key after this current key is rolled - if(!e.getValue().equals(currentKey)) - removeStoredMasterKey(e.getValue()); + private void removeExpiredKeys() { + this.apiLock.writeLock().lock(); + try { + long now = Time.now(); + for (Iterator> it = + allKeys.entrySet().iterator(); it.hasNext();) { + Map.Entry e = it.next(); + if (e.getValue().getExpiryDate() < now) { + it.remove(); + // ensure the tokens generated by this current key can be recovered + // with this current key after this current key is rolled + if (!e.getValue().equals(currentKey)) { + removeStoredMasterKey(e.getValue()); + } + } } + } finally { + this.apiLock.writeLock().unlock(); } } @Override - protected synchronized byte[] createPassword(TokenIdent identifier) { - int sequenceNum; - long now = Time.now(); - sequenceNum = incrementDelegationTokenSeqNum(); - identifier.setIssueDate(now); - identifier.setMaxDate(now + tokenMaxLifetime); - identifier.setMasterKeyId(currentKey.getKeyId()); - identifier.setSequenceNumber(sequenceNum); - LOG.info("Creating password for identifier: " + formatTokenId(identifier) - + ", currentKey: " + currentKey.getKeyId()); - byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); - DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now - + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)); + protected byte[] createPassword(TokenIdent identifier) { + this.apiLock.writeLock().lock(); try { - METRICS.trackStoreToken(() -> storeToken(identifier, tokenInfo)); - } catch (IOException ioe) { - LOG.error("Could not store token " + formatTokenId(identifier) + "!!", - ioe); + int sequenceNum; + long now = Time.now(); + sequenceNum = incrementDelegationTokenSeqNum(); + identifier.setIssueDate(now); + identifier.setMaxDate(now + tokenMaxLifetime); + identifier.setMasterKeyId(currentKey.getKeyId()); + identifier.setSequenceNumber(sequenceNum); + LOG.info("Creating password for identifier: " + formatTokenId(identifier) + + ", currentKey: " + currentKey.getKeyId()); + byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); + DelegationTokenInformation tokenInfo = + new DelegationTokenInformation(now + tokenRenewInterval, password, + getTrackingIdIfEnabled(identifier)); + try { + METRICS.trackStoreToken(() -> storeToken(identifier, tokenInfo)); + } catch (IOException ioe) { + LOG.error("Could not store token " + formatTokenId(identifier) + "!!", ioe); + } + return password; + } finally { + this.apiLock.writeLock().unlock(); } - return password; } @@ -526,7 +597,6 @@ protected synchronized byte[] createPassword(TokenIdent identifier) { */ protected DelegationTokenInformation checkToken(TokenIdent identifier) throws InvalidToken { - assert Thread.holdsLock(this); DelegationTokenInformation info = getTokenInfo(identifier); String err; if (info == null) { @@ -546,9 +616,14 @@ protected DelegationTokenInformation checkToken(TokenIdent identifier) } @Override - public synchronized byte[] retrievePassword(TokenIdent identifier) + public byte[] retrievePassword(TokenIdent identifier) throws InvalidToken { - return checkToken(identifier).getPassword(); + this.apiLock.readLock().lock(); + try { + return checkToken(identifier).getPassword(); + } finally { + this.apiLock.readLock().unlock(); + } } protected String getTrackingIdIfEnabled(TokenIdent ident) { @@ -558,12 +633,17 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) { return null; } - public synchronized String getTokenTrackingId(TokenIdent identifier) { - DelegationTokenInformation info = getTokenInfo(identifier); - if (info == null) { - return null; + public String getTokenTrackingId(TokenIdent identifier) { + this.apiLock.readLock().lock(); + try { + DelegationTokenInformation info = getTokenInfo(identifier); + if (info == null) { + return null; + } + return info.getTrackingId(); + } finally { + this.apiLock.readLock().unlock(); } - return info.getTrackingId(); } /** @@ -572,12 +652,17 @@ public synchronized String getTokenTrackingId(TokenIdent identifier) { * @param password Password in the token. * @throws InvalidToken InvalidToken. */ - public synchronized void verifyToken(TokenIdent identifier, byte[] password) + public void verifyToken(TokenIdent identifier, byte[] password) throws InvalidToken { - byte[] storedPassword = retrievePassword(identifier); - if (!MessageDigest.isEqual(password, storedPassword)) { - throw new InvalidToken("token " + formatTokenId(identifier) - + " is invalid, password doesn't match"); + this.apiLock.readLock().lock(); + try { + byte[] storedPassword = retrievePassword(identifier); + if (!MessageDigest.isEqual(password, storedPassword)) { + throw new InvalidToken("token " + formatTokenId(identifier) + + " is invalid, password doesn't match"); + } + } finally { + this.apiLock.readLock().unlock(); } } @@ -589,57 +674,55 @@ public synchronized void verifyToken(TokenIdent identifier, byte[] password) * @throws InvalidToken if the token is invalid * @throws AccessControlException if the user can't renew token */ - public synchronized long renewToken(Token token, + public long renewToken(Token token, String renewer) throws InvalidToken, IOException { - ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); - DataInputStream in = new DataInputStream(buf); - TokenIdent id = createIdentifier(); - id.readFields(in); - LOG.info("Token renewal for identifier: " + formatTokenId(id) - + "; total currentTokens " + currentTokens.size()); + this.apiLock.writeLock().lock(); + try { + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + TokenIdent id = createIdentifier(); + id.readFields(in); + LOG.info("Token renewal for identifier: " + formatTokenId(id) + "; total currentTokens " + + currentTokens.size()); + + long now = Time.now(); + if (id.getMaxDate() < now) { + throw new InvalidToken(renewer + " tried to renew an expired token " + formatTokenId(id) + + " max expiration date: " + Time.formatTime(id.getMaxDate()) + " currentTime: " + + Time.formatTime(now)); + } + if ((id.getRenewer() == null) || (id.getRenewer().toString().isEmpty())) { + throw new AccessControlException(renewer + " tried to renew a token " + formatTokenId(id) + + " without a renewer"); + } + if (!id.getRenewer().toString().equals(renewer)) { + throw new AccessControlException(renewer + " tries to renew a token " + formatTokenId(id) + + " with non-matching renewer " + id.getRenewer()); + } + DelegationKey key = getDelegationKey(id.getMasterKeyId()); + if (key == null) { + throw new InvalidToken("Unable to find master key for keyId=" + id.getMasterKeyId() + + " from cache. Failed to renew an unexpired token " + formatTokenId(id) + + " with sequenceNumber=" + id.getSequenceNumber()); + } + byte[] password = createPassword(token.getIdentifier(), key.getKey()); + if (!MessageDigest.isEqual(password, token.getPassword())) { + throw new AccessControlException( + renewer + " is trying to renew a token " + formatTokenId(id) + " with wrong password"); + } + long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval); + String trackingId = getTrackingIdIfEnabled(id); + DelegationTokenInformation info = + new DelegationTokenInformation(renewTime, password, trackingId); - long now = Time.now(); - if (id.getMaxDate() < now) { - throw new InvalidToken(renewer + " tried to renew an expired token " - + formatTokenId(id) + " max expiration date: " - + Time.formatTime(id.getMaxDate()) - + " currentTime: " + Time.formatTime(now)); - } - if ((id.getRenewer() == null) || (id.getRenewer().toString().isEmpty())) { - throw new AccessControlException(renewer + - " tried to renew a token " + formatTokenId(id) - + " without a renewer"); - } - if (!id.getRenewer().toString().equals(renewer)) { - throw new AccessControlException(renewer - + " tries to renew a token " + formatTokenId(id) - + " with non-matching renewer " + id.getRenewer()); - } - DelegationKey key = getDelegationKey(id.getMasterKeyId()); - if (key == null) { - throw new InvalidToken("Unable to find master key for keyId=" - + id.getMasterKeyId() - + " from cache. Failed to renew an unexpired token " - + formatTokenId(id) + " with sequenceNumber=" - + id.getSequenceNumber()); - } - byte[] password = createPassword(token.getIdentifier(), key.getKey()); - if (!MessageDigest.isEqual(password, token.getPassword())) { - throw new AccessControlException(renewer - + " is trying to renew a token " - + formatTokenId(id) + " with wrong password"); - } - long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval); - String trackingId = getTrackingIdIfEnabled(id); - DelegationTokenInformation info = new DelegationTokenInformation(renewTime, - password, trackingId); - - if (getTokenInfo(id) == null) { - throw new InvalidToken("Renewal request for unknown token " - + formatTokenId(id)); + if (getTokenInfo(id) == null) { + throw new InvalidToken("Renewal request for unknown token " + formatTokenId(id)); + } + METRICS.trackUpdateToken(() -> updateToken(id, info)); + return renewTime; + } finally { + this.apiLock.writeLock().unlock(); } - METRICS.trackUpdateToken(() -> updateToken(id, info)); - return renewTime; } /** @@ -651,37 +734,41 @@ public synchronized long renewToken(Token token, * @throws InvalidToken for invalid token * @throws AccessControlException if the user isn't allowed to cancel */ - public synchronized TokenIdent cancelToken(Token token, + public TokenIdent cancelToken(Token token, String canceller) throws IOException { - ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); - DataInputStream in = new DataInputStream(buf); - TokenIdent id = createIdentifier(); - id.readFields(in); - LOG.info("Token cancellation requested for identifier: " - + formatTokenId(id)); - - if (id.getUser() == null) { - throw new InvalidToken("Token with no owner " + formatTokenId(id)); - } - String owner = id.getUser().getUserName(); - Text renewer = id.getRenewer(); - HadoopKerberosName cancelerKrbName = new HadoopKerberosName(canceller); - String cancelerShortName = cancelerKrbName.getShortName(); - if (!canceller.equals(owner) - && (renewer == null || renewer.toString().isEmpty() || !cancelerShortName - .equals(renewer.toString()))) { - throw new AccessControlException(canceller - + " is not authorized to cancel the token " + formatTokenId(id)); - } - DelegationTokenInformation info = currentTokens.remove(id); - if (info == null) { - throw new InvalidToken("Token not found " + formatTokenId(id)); + this.apiLock.writeLock().lock(); + try { + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + TokenIdent id = createIdentifier(); + id.readFields(in); + LOG.info("Token cancellation requested for identifier: " + formatTokenId(id)); + + if (id.getUser() == null) { + throw new InvalidToken("Token with no owner " + formatTokenId(id)); + } + String owner = id.getUser().getUserName(); + Text renewer = id.getRenewer(); + HadoopKerberosName cancelerKrbName = new HadoopKerberosName(canceller); + String cancelerShortName = cancelerKrbName.getShortName(); + if (!canceller.equals(owner) && + (renewer == null || renewer.toString().isEmpty() || + !cancelerShortName.equals(renewer.toString()))) { + throw new AccessControlException(canceller + " is not authorized to cancel the token " + + formatTokenId(id)); + } + DelegationTokenInformation info = currentTokens.remove(id); + if (info == null) { + throw new InvalidToken("Token not found " + formatTokenId(id)); + } + METRICS.trackRemoveToken(() -> { + removeTokenForOwnerStats(id); + removeStoredToken(id); + }); + return id; + } finally { + this.apiLock.writeLock().unlock(); } - METRICS.trackRemoveToken(() -> { - removeTokenForOwnerStats(id); - removeStoredToken(id); - }); - return id; } /** @@ -762,7 +849,8 @@ public void readFields(DataInput in) throws IOException { private void removeExpiredToken() throws IOException { long now = Time.now(); Set expiredTokens = new HashSet<>(); - synchronized (this) { + this.apiLock.writeLock().lock(); + try { Iterator> i = getCandidateTokensForCleanup().entrySet().iterator(); while (i.hasNext()) { @@ -774,6 +862,8 @@ private void removeExpiredToken() throws IOException { i.remove(); } } + } finally { + this.apiLock.writeLock().unlock(); } // don't hold lock on 'this' to avoid edit log updates blocking token ops logExpireTokens(expiredTokens); @@ -818,10 +908,10 @@ public void stopThreads() { * is secretMgr running * @return true if secret mgr is running */ - public synchronized boolean isRunning() { + public boolean isRunning() { return running; } - + private class ExpiredTokenRemover extends Thread { private long lastMasterKeyUpdate; private long lastTokenCacheCleanup;