Skip to content

Commit

Permalink
fix flow control bug
Browse files Browse the repository at this point in the history
  • Loading branch information
fstab committed Mar 9, 2016
1 parent b89fcfc commit 12d6fcf
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 31 deletions.
11 changes: 6 additions & 5 deletions http2client/internal/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func findHeader(name string, headers []hpack.HeaderField) string {

// Just a quick implementation to make large downloads work.
// Should be replaced with a more sophisticated flow control strategy
// TODO: This is copy-and-paste from connection
func (c *connection) flowControlForIncomingDataFrame(frame *frames.DataFrame) {
threshold := int64(2 << 13) // size of one frame
c.remainingReceiveWindowSize -= int64(len(frame.Data))
Expand Down Expand Up @@ -345,21 +346,21 @@ func (s *settings) handleSettingsFrame(frame *frames.SettingsFrame) {
}

func (c *connection) handleWindowUpdateFrame(frame *frames.WindowUpdateFrame) {
c.increaseFlowControlWindow(int64(frame.WindowSizeIncrement))
c.increaseSendFlowControlWindow(int64(frame.WindowSizeIncrement))
for _, s := range c.streams {
s.ProcessPendingDataFrames()
}
}

func (c *connection) RemainingFlowControlWindowIsEnough(nBytesToWrite int64) bool {
return c.remainingReceiveWindowSize > nBytesToWrite
func (c *connection) RemainingSendFlowControlWindowIsEnough(nBytesToWrite int64) bool {
return c.remainingSendWindowSize > nBytesToWrite
}

func (c *connection) DecreaseFlowControlWindow(nBytesToWrite int64) {
func (c *connection) DecreaseSendFlowControlWindow(nBytesToWrite int64) {
c.remainingSendWindowSize -= nBytesToWrite
}

func (c *connection) increaseFlowControlWindow(nBytes int64) {
func (c *connection) increaseSendFlowControlWindow(nBytes int64) {
c.remainingSendWindowSize += nBytes
}

Expand Down
58 changes: 32 additions & 26 deletions http2client/internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type Stream interface {

type FlowControlledFrameWriter interface {
Write(frame frames.Frame)
RemainingFlowControlWindowIsEnough(nBytesToWrite int64) bool
DecreaseFlowControlWindow(nBytesToWrite int64)
RemainingSendFlowControlWindowIsEnough(nBytesToWrite int64) bool
DecreaseSendFlowControlWindow(nBytesToWrite int64)
}

type streamError struct {
Expand All @@ -66,8 +66,9 @@ type stream struct {
responseBody bytes.Buffer
err *streamError // RST_STREAM sent or received.
cmd *commands.HttpCommand
initialReceiveWindowSize int64
initialSendWindowSize int64
remainingSendWindowSize int64
initialReceiveWindowSize int64
remainingReceiveWindowSize int64
pendingDataFrameWrites []*frames.DataFrame
streamId uint32
Expand All @@ -81,6 +82,7 @@ func New(streamId uint32, cmd *commands.HttpCommand, initialSendWindowSize uint3
responseHeaders: make([]hpack.HeaderField, 0),
streamId: streamId,
cmd: cmd,
initialSendWindowSize: int64(initialSendWindowSize),
remainingSendWindowSize: int64(initialSendWindowSize),
initialReceiveWindowSize: int64(initialReceiveWindowSize),
remainingReceiveWindowSize: int64(initialReceiveWindowSize),
Expand Down Expand Up @@ -164,14 +166,8 @@ func (s *stream) SendFrame(frame frames.Frame) {
wasClosedBefore := s.state == streamstate.CLOSED
switch frame := frame.(type) {
case *frames.DataFrame:
size := int64(len(frame.Data))
if s.RemainingFlowControlWindowIsEnough(size) {
s.DecreaseFlowControlWindow(size)
streamstate.HandleOutgoingFrame(s, frame)
s.out.Write(frame)
} else {
s.scheduleDataFrameWrite(frame)
}
firstInQueue := len(s.pendingDataFrameWrites) == 0
s.sendDataFrame(frame, firstInQueue)
case *frames.HeadersFrame:
s.addRequestHeaders(frame.Headers...)
streamstate.HandleOutgoingFrame(s, frame)
Expand All @@ -185,12 +181,26 @@ func (s *stream) SendFrame(frame frames.Frame) {
}
}

func (s *stream) RemainingFlowControlWindowIsEnough(nBytesToWrite int64) bool {
return s.remainingReceiveWindowSize > nBytesToWrite && s.out.RemainingFlowControlWindowIsEnough(nBytesToWrite)
// The frame will only be sent immediately if firstInQueue is true.
// If the frame is not firstInQueue, it will be postponed so that the order of outgoing DATA frames is preserved.
func (s *stream) sendDataFrame(frame *frames.DataFrame, firstInQueue bool) {
size := int64(len(frame.Data))
if firstInQueue && s.RemainingSendFlowControlWindowIsEnough(size) {
s.DecreaseSendFlowControlWindow(size)
streamstate.HandleOutgoingFrame(s, frame)
s.out.Write(frame)
} else {
s.scheduleDataFrameWrite(frame)
}
}

func (s *stream) RemainingSendFlowControlWindowIsEnough(nBytesToWrite int64) bool {
fmt.Fprintf(os.Stderr, "Stream: RemainingSendFlowControlWindowIsEnough(%v): %v > %v = %v\n", nBytesToWrite, s.remainingSendWindowSize, nBytesToWrite, s.remainingSendWindowSize > nBytesToWrite)
return s.remainingSendWindowSize > nBytesToWrite && s.out.RemainingSendFlowControlWindowIsEnough(nBytesToWrite)
}

func (s *stream) DecreaseFlowControlWindow(nBytesToWrite int64) {
s.out.DecreaseFlowControlWindow(nBytesToWrite)
func (s *stream) DecreaseSendFlowControlWindow(nBytesToWrite int64) {
s.out.DecreaseSendFlowControlWindow(nBytesToWrite)
s.remainingSendWindowSize -= nBytesToWrite
}

Expand All @@ -202,6 +212,7 @@ func (s *stream) receiveWindowUpdateFrame(frame *frames.WindowUpdateFrame) {

// Just a quick implementation to make large downloads work.
// Should be replaced with a more sophisticated flow control strategy
// TODO: This is copy-and-paste from stream
func (s *stream) flowControlForIncomingDataFrame(frame *frames.DataFrame) {
threshold := int64(2 << 13) // size of one frame
s.remainingReceiveWindowSize -= int64(len(frame.Data))
Expand All @@ -213,25 +224,20 @@ func (s *stream) flowControlForIncomingDataFrame(frame *frames.DataFrame) {
}

func (s *stream) ProcessPendingDataFrames() {
for _, frame := range s.pendingDataFrameWrites {
if !s.RemainingFlowControlWindowIsEnough(int64(len(frame.Data))) {
return // must stop here, because data frames must be sent in the right order
for len(s.pendingDataFrameWrites) > 0 {
nextFrame := s.pendingDataFrameWrites[0]
if !s.RemainingSendFlowControlWindowIsEnough(int64(len(nextFrame.Data))) {
break // must stop here, because data frames must be sent in the right order
}
s.SendFrame(frame)
s.pendingDataFrameWrites = s.pendingDataFrameWrites[1:] // TODO: Memory Leak ???
s.sendDataFrame(nextFrame, true)
}
}

func (s *stream) scheduleDataFrameWrite(frame *frames.DataFrame) {
s.pendingDataFrameWrites = append(s.pendingDataFrameWrites, frame)
}

// called after firstPendingDataFrame() returned != nil, so we know len(s.pendingDataFrames) > 0
func (s *stream) popFirstPendingDataFrameWrite() *frames.DataFrame {
result := s.pendingDataFrameWrites[0]
s.pendingDataFrameWrites = s.pendingDataFrameWrites[1:] // TODO: Will pendingDataFrameWrites[0] ever get free() ?
return result
}

func (s *stream) addRequestHeaders(headers ...hpack.HeaderField) {
for _, header := range headers {
s.requestHeaders = append(s.requestHeaders, header)
Expand Down

0 comments on commit 12d6fcf

Please sign in to comment.