Skip to content

Commit

Permalink
Update Spark key negotiation protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
srowen committed Aug 21, 2021
1 parent 163fbd2 commit 4be5660
Show file tree
Hide file tree
Showing 13 changed files with 432 additions and 615 deletions.
4 changes: 4 additions & 0 deletions common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>
<dependency>
<groupId>com.google.crypto.tink</groupId>
<artifactId>tink</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ private void doSparkAuth(TransportClient client, Channel channel)

String secretKey = secretKeyHolder.getSecretKey(appId);
try (AuthEngine engine = new AuthEngine(appId, secretKey, conf)) {
ClientChallenge challenge = engine.challenge();
AuthMessage challenge = engine.challenge();
ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
challenge.encode(challengeData);

ByteBuffer responseData =
client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs());
ServerResponse response = ServerResponse.decodeMessage(responseData);
AuthMessage response = AuthMessage.decodeMessage(responseData);

engine.validate(response);
engine.deriveSessionCipher(challenge, response);
engine.sessionCipher().addToChannel(channel);
}
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,65 +21,55 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.spark.network.protocol.Encodable;
import org.apache.spark.network.protocol.Encoders;

/**
* Server's response to client's challenge.
* A message sent in the forward secure authentication protocol, containing an app ID, a salt for
* key derivation, and an encrypted payload.
*
* Please see crypto/README.md for more details.
* Please see crypto/README.md for more details of implementation.
*/
public class ServerResponse implements Encodable {
class AuthMessage implements Encodable {
/** Serialization tag used to catch incorrect payloads. */
private static final byte TAG_BYTE = (byte) 0xFB;

public final byte[] response;
public final byte[] nonce;
public final byte[] inputIv;
public final byte[] outputIv;
public final String appId;
public final byte[] salt;
public final byte[] ciphertext;

public ServerResponse(
byte[] response,
byte[] nonce,
byte[] inputIv,
byte[] outputIv) {
this.response = response;
this.nonce = nonce;
this.inputIv = inputIv;
this.outputIv = outputIv;
AuthMessage(String appId, byte[] salt, byte[] ciphertext) {
this.appId = appId;
this.salt = salt;
this.ciphertext = ciphertext;
}

@Override
public int encodedLength() {
return 1 +
Encoders.ByteArrays.encodedLength(response) +
Encoders.ByteArrays.encodedLength(nonce) +
Encoders.ByteArrays.encodedLength(inputIv) +
Encoders.ByteArrays.encodedLength(outputIv);
Encoders.Strings.encodedLength(appId) +
Encoders.ByteArrays.encodedLength(salt) +
Encoders.ByteArrays.encodedLength(ciphertext);
}

@Override
public void encode(ByteBuf buf) {
buf.writeByte(TAG_BYTE);
Encoders.ByteArrays.encode(buf, response);
Encoders.ByteArrays.encode(buf, nonce);
Encoders.ByteArrays.encode(buf, inputIv);
Encoders.ByteArrays.encode(buf, outputIv);
Encoders.Strings.encode(buf, appId);
Encoders.ByteArrays.encode(buf, salt);
Encoders.ByteArrays.encode(buf, ciphertext);
}

public static ServerResponse decodeMessage(ByteBuffer buffer) {
public static AuthMessage decodeMessage(ByteBuffer buffer) {
ByteBuf buf = Unpooled.wrappedBuffer(buffer);

if (buf.readByte() != TAG_BYTE) {
throw new IllegalArgumentException("Expected ServerResponse, received something else.");
throw new IllegalArgumentException("Expected ClientChallenge, received something else.");
}

return new ServerResponse(
Encoders.ByteArrays.decode(buf),
Encoders.ByteArrays.decode(buf),
Encoders.ByteArrays.decode(buf),
Encoders.ByteArrays.decode(buf));
return new AuthMessage(
Encoders.Strings.decode(buf), // AppID
Encoders.ByteArrays.decode(buf), // Salt
Encoders.ByteArrays.decode(buf)); // Ciphertext
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ protected boolean doAuthChallenge(
int position = message.position();
int limit = message.limit();

ClientChallenge challenge;
AuthMessage challenge;
try {
challenge = ClientChallenge.decodeMessage(message);
challenge = AuthMessage.decodeMessage(message);
LOG.debug("Received new auth challenge for client {}.", channel.remoteAddress());
} catch (RuntimeException e) {
if (conf.saslFallback()) {
Expand All @@ -113,7 +113,7 @@ protected boolean doAuthChallenge(
"Trying to authenticate non-registered app %s.", challenge.appId);
LOG.debug("Authenticating challenge for app {}.", challenge.appId);
engine = new AuthEngine(challenge.appId, secret, conf);
ServerResponse response = engine.respond(challenge);
AuthMessage response = engine.response(challenge);
ByteBuf responseData = Unpooled.buffer(response.encodedLength());
response.encode(responseData);
callback.onSuccess(responseData.nioBuffer());
Expand Down

This file was deleted.

Loading

0 comments on commit 4be5660

Please sign in to comment.