Skip to content

Commit

Permalink
Merge pull request #82 from optiopay/add_version
Browse files Browse the repository at this point in the history
Made `version` member of Request and Response in order to avoid passi…
  • Loading branch information
e-max authored Jan 25, 2018
2 parents 1a3dfe1 + 58c29f8 commit cec0458
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 84 deletions.
16 changes: 8 additions & 8 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (c *connection) Metadata(req *proto.MetadataReq) (*proto.MetadataResp, erro
return nil, fmt.Errorf("wait for response: %s", err)
}

if _, err := req.WriteTo(c.rw, req.Version); err != nil {
if _, err := req.WriteTo(c.rw); err != nil {
c.logger.Error("msg", "cannot write", "error", err)
c.releaseWaiter(req.CorrelationID)
return nil, err
Expand All @@ -211,7 +211,7 @@ func (c *connection) Produce(req *proto.ProduceReq) (*proto.ProduceResp, error)
}

if req.RequiredAcks == proto.RequiredAcksNone {
_, err := req.WriteTo(c.rw, req.Version)
_, err := req.WriteTo(c.rw)
return nil, err
}

Expand All @@ -221,7 +221,7 @@ func (c *connection) Produce(req *proto.ProduceReq) (*proto.ProduceResp, error)
return nil, fmt.Errorf("wait for response: %s", err)
}

if _, err := req.WriteTo(c.rw, req.Version); err != nil {
if _, err := req.WriteTo(c.rw); err != nil {
c.logger.Error("msg", "cannot write", "error", err)
c.releaseWaiter(req.CorrelationID)
return nil, err
Expand All @@ -247,7 +247,7 @@ func (c *connection) Fetch(req *proto.FetchReq) (*proto.FetchResp, error) {
return nil, fmt.Errorf("wait for response: %s", err)
}

if _, err := req.WriteTo(c.rw, req.Version); err != nil {
if _, err := req.WriteTo(c.rw); err != nil {
c.logger.Error("msg", "cannot write", "error", err)
c.releaseWaiter(req.CorrelationID)
return nil, err
Expand Down Expand Up @@ -301,7 +301,7 @@ func (c *connection) Offset(req *proto.OffsetReq) (*proto.OffsetResp, error) {
// TODO(husio) documentation is not mentioning this directly, but I assume
// -1 is for non node clients
req.ReplicaID = -1
if _, err := req.WriteTo(c.rw, req.Version); err != nil {
if _, err := req.WriteTo(c.rw); err != nil {
c.logger.Error("msg", "cannot write", "error", err)
c.releaseWaiter(req.CorrelationID)
return nil, err
Expand All @@ -323,7 +323,7 @@ func (c *connection) ConsumerMetadata(req *proto.ConsumerMetadataReq) (*proto.Co
c.logger.Error("msg", "failed waiting for response", "error", err)
return nil, fmt.Errorf("wait for response: %s", err)
}
if _, err := req.WriteTo(c.rw, req.Version); err != nil {
if _, err := req.WriteTo(c.rw); err != nil {
c.logger.Error("msg", "cannot write", "error", err)
c.releaseWaiter(req.CorrelationID)
return nil, err
Expand All @@ -345,7 +345,7 @@ func (c *connection) OffsetCommit(req *proto.OffsetCommitReq) (*proto.OffsetComm
c.logger.Error("msg", "failed waiting for response", "error", err)
return nil, fmt.Errorf("wait for response: %s", err)
}
if _, err := req.WriteTo(c.rw, req.Version); err != nil {
if _, err := req.WriteTo(c.rw); err != nil {
c.logger.Error("msg", "cannot write", "error", err)
c.releaseWaiter(req.CorrelationID)
return nil, err
Expand All @@ -367,7 +367,7 @@ func (c *connection) OffsetFetch(req *proto.OffsetFetchReq) (*proto.OffsetFetchR
c.logger.Error("msg", "failed waiting for response", "error", err)
return nil, fmt.Errorf("wait for response: %s", err)
}
if _, err := req.WriteTo(c.rw, req.Version); err != nil {
if _, err := req.WriteTo(c.rw); err != nil {
c.logger.Error("msg", "cannot write", "error", err)
c.releaseWaiter(req.CorrelationID)
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type serializableMessage interface {
Bytes(int16) ([]byte, error)
Bytes() ([]byte, error)
}

func testServer(messages ...serializableMessage) (net.Listener, error) {
Expand All @@ -22,7 +22,7 @@ func testServer(messages ...serializableMessage) (net.Listener, error) {

responses := make([][]byte, len(messages))
for i, m := range messages {
b, err := m.Bytes(proto.KafkaV0)
b, err := m.Bytes()
if err != nil {
_ = ln.Close()
return nil, err
Expand Down Expand Up @@ -68,7 +68,7 @@ func testServer2() (net.Listener, chan serializableMessage, error) {
defer func() { _ = cli.Close() }()

for msg := range msgs {
b, err := msg.Bytes(proto.KafkaV0)
b, err := msg.Bytes()
if err != nil {
panic(err)
}
Expand Down
6 changes: 6 additions & 0 deletions kafkatest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func (s *Server) handleProduceRequest(nodeID int32, conn net.Conn, req *proto.Pr
defer s.mu.Unlock()

resp := &proto.ProduceResp{
Version: req.Version,
CorrelationID: req.CorrelationID,
Topics: make([]proto.ProduceRespTopic, len(req.Topics)),
}
Expand Down Expand Up @@ -380,6 +381,7 @@ func (s *Server) fetchRequest(req *proto.FetchReq) (response, int) {
var messagesNum int

resp := &proto.FetchResp{
Version: req.Version,
CorrelationID: req.CorrelationID,
Topics: make([]proto.FetchRespTopic, len(req.Topics)),
}
Expand Down Expand Up @@ -430,6 +432,7 @@ func (s *Server) handleOffsetRequest(nodeID int32, conn net.Conn, req *proto.Off
defer s.mu.RUnlock()

resp := &proto.OffsetResp{
Version: req.Version,
CorrelationID: req.CorrelationID,
Topics: make([]proto.OffsetRespTopic, len(req.Topics)),
}
Expand Down Expand Up @@ -462,6 +465,7 @@ func (s *Server) handleConsumerMetadataRequest(nodeID int32, conn net.Conn, req
port, _ := strconv.Atoi(addrps[1])

return &proto.ConsumerMetadataResp{
Version: req.Version,
CorrelationID: req.CorrelationID,
CoordinatorID: 0,
CoordinatorHost: addrps[0],
Expand Down Expand Up @@ -496,6 +500,7 @@ func (s *Server) handleOffsetFetchRequest(nodeID int32, conn net.Conn, req *prot
defer s.mu.RUnlock()

resp := &proto.OffsetFetchResp{
Version: req.Version,
CorrelationID: req.CorrelationID,
Topics: make([]proto.OffsetFetchRespTopic, len(req.Topics)),
}
Expand All @@ -518,6 +523,7 @@ func (s *Server) handleOffsetCommitRequest(nodeID int32, conn net.Conn, req *pro
defer s.mu.Unlock()

resp := &proto.OffsetCommitResp{
Version: req.Version,
CorrelationID: req.CorrelationID,
Topics: make([]proto.OffsetCommitRespTopic, len(req.Topics)),
}
Expand Down
Loading

0 comments on commit cec0458

Please sign in to comment.