diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java index 658b34767b594..33df4aca96d2d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Base64; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -40,7 +41,9 @@ import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; @@ -87,7 +90,8 @@ public void updateConfig(ClientBuilder clientBuilder, Authentication authenticat * Whether to display BytesMessages in hexdump style, ignored for simple text messages * @return String representation of the message */ - protected String interpretMessage(Message message, boolean displayHex) throws IOException { + protected String interpretMessage(Message message, boolean displayHex, boolean printMetadata) + throws IOException { StringBuilder sb = new StringBuilder(); String properties = Arrays.toString(message.getProperties().entrySet().toArray()); @@ -122,6 +126,45 @@ protected String interpretMessage(Message message, boolean displayHex) throws } sb.append("content:").append(data); + if (printMetadata) { + if (message.getEncryptionCtx().isPresent()) { + EncryptionContext encContext = message.getEncryptionCtx().get(); + if (encContext.getKeys() != null && !encContext.getKeys().isEmpty()) { + sb.append(", "); + sb.append("encryption-keys:").append(", "); + encContext.getKeys().forEach((keyName, keyInfo) -> { + String metadata = Arrays.toString(keyInfo.getMetadata().entrySet().toArray()); + sb.append("name:").append(keyName).append(", ").append("key-value:") + .append(Base64.getEncoder().encode(keyInfo.getKeyValue())).append(", ") + .append("metadata:").append(metadata).append(", "); + + }); + sb.append(", ").append("param:").append(Base64.getEncoder().encode(encContext.getParam())) + .append(", ").append("algorithm:").append(encContext.getAlgorithm()).append(", ") + .append("compression-type:").append(encContext.getCompressionType()).append(", ") + .append("uncompressed-size").append(encContext.getUncompressedMessageSize()).append(", ") + .append("batch-size") + .append(encContext.getBatchSize().isPresent() ? encContext.getBatchSize().get() : 1); + } + } + if (message.hasBrokerPublishTime()) { + sb.append(", ").append("publish-time:").append(DateFormatter.format(message.getPublishTime())); + } + sb.append(", ").append("event-time:").append(DateFormatter.format(message.getEventTime())); + sb.append(", ").append("message-id:").append(message.getMessageId()); + sb.append(", ").append("producer-name:").append(message.getProducerName()); + sb.append(", ").append("sequence-id:").append(message.getSequenceId()); + sb.append(", ").append("replicated-from:").append(message.getReplicatedFrom()); + sb.append(", ").append("redelivery-count:").append(message.getRedeliveryCount()); + sb.append(", ").append("ordering-key:") + .append(message.getOrderingKey() != null ? new String(message.getOrderingKey()) : ""); + sb.append(", ").append("schema-version:") + .append(message.getSchemaVersion() != null ? new String(message.getSchemaVersion()) : ""); + if (message.hasIndex()) { + sb.append(", ").append("index:").append(message.getIndex()); + } + } + return sb.toString(); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java index 46adc45e2ea4d..0f0e2f0a9c813 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java @@ -115,6 +115,9 @@ public class CmdConsume extends AbstractCmdConsume { @Option(names = { "-ca", "--crypto-failure-action" }, description = "Crypto Failure Action") private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL; + @Option(names = { "-mp", "--print-metadata" }, description = "Message metadata") + private boolean printMetadata = false; + public CmdConsume() { // Do nothing super(); @@ -199,7 +202,7 @@ private int consume(String topic) { numMessagesConsumed += 1; if (!hideContent) { System.out.println(MESSAGE_BOUNDARY); - String output = this.interpretMessage(msg, displayHex); + String output = this.interpretMessage(msg, displayHex, printMetadata); System.out.println(output); } else if (numMessagesConsumed % 1000 == 0) { System.out.println("Received " + numMessagesConsumed + " messages"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java index 51bf2d6898b6b..529d1d9c41272 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java @@ -105,6 +105,9 @@ public class CmdRead extends AbstractCmdConsume { @Option(names = { "-ca", "--crypto-failure-action" }, description = "Crypto Failure Action") private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL; + @Option(names = { "-mp", "--print-metadata" }, description = "Message metadata") + private boolean printMetadata = false; + public CmdRead() { // Do nothing super(); @@ -178,7 +181,7 @@ private int read(String topic) { numMessagesRead += 1; if (!hideContent) { System.out.println(MESSAGE_BOUNDARY); - String output = this.interpretMessage(msg, displayHex); + String output = this.interpretMessage(msg, displayHex, printMetadata); System.out.println(output); } else if (numMessagesRead % 1000 == 0) { System.out.println("Received " + numMessagesRead + " messages");