Skip to content

Commit

Permalink
Merge pull request #192 from fabiojmendes/master
Browse files Browse the repository at this point in the history
Adding support to produce avro encoded messages
  • Loading branch information
birdayz authored Apr 22, 2022
2 parents caf619b + 2e2d87e commit 2abe93c
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
16 changes: 16 additions & 0 deletions cmd/kaf/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
partitionFlag int32
bufferSizeFlag int
inputModeFlag string
avroSchemaID int
)

func init() {
Expand All @@ -46,6 +47,8 @@ func init() {
produceCmd.Flags().StringVar(&timestampFlag, "timestamp", "", "Select timestamp for record")
produceCmd.Flags().Int32VarP(&partitionFlag, "partition", "p", -1, "Partition to produce to")

produceCmd.Flags().IntVarP(&avroSchemaID, "avro-schema-id", "", -1, "Value schema id for avro messsage encoding")

produceCmd.Flags().StringVarP(&inputModeFlag, "input-mode", "", "line", "Scanning input mode: [line|full]")
produceCmd.Flags().IntVarP(&bufferSizeFlag, "line-length-limit", "", 0, "line length limit in line input mode")
}
Expand Down Expand Up @@ -138,6 +141,13 @@ var produceCmd = &cobra.Command{

}

if avroSchemaID != -1 {
schemaCache = getSchemaCache()
if schemaCache == nil {
errorExit("Error getting a instance of schemaCache")
}
}

var headers []sarama.RecordHeader
for _, h := range headerFlag {
v := strings.SplitN(h, ":", 2)
Expand Down Expand Up @@ -166,6 +176,12 @@ var produceCmd = &cobra.Command{
} else {
errorExit("Failed to load payload proto type")
}
} else if avroSchemaID != -1 {
avro, err := schemaCache.EncodeMessage(avroSchemaID, data)
if err != nil {
errorExit("Failed to encode avro", err)
}
data = avro
}

var ts time.Time
Expand Down
27 changes: 27 additions & 0 deletions pkg/avro/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,30 @@ func (c *SchemaCache) DecodeMessage(b []byte) (message []byte, err error) {

return message, nil
}

// EncodeMessage returns a binary representation of an Avro-encoded message.
func (c *SchemaCache) EncodeMessage(schemaID int, json []byte) (message []byte, err error) {
codec, err := c.getCodecForSchemaID(schemaID)
if err != nil {
return nil, err
}

// Creates a header with an initial zero byte and
// the schema id encoded as a big endian uint32
buf := make([]byte, 5)
binary.BigEndian.PutUint32(buf[1:5], uint32(schemaID))

// Convert textual json data to native Go form
native, _, err := codec.NativeFromTextual(json)
if err != nil {
return nil, err
}

// Convert native Go form to binary Avro data
message, err = codec.BinaryFromNative(buf, native)
if err != nil {
return nil, err
}

return message, nil
}

0 comments on commit 2abe93c

Please sign in to comment.