Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WithContext methods to the Fluent struct including handling of context deadlines #86

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

arnogeurts
Copy link

Fixes #85

This pull requests adds PostWithContext, PostWithTimeAndContext, EncodeAndPostDataWithContext, EncodeDataWithContext to the Fluent struct. The context deadlines and cancellations are handled at different levels in the code to make sure no unnecessary work is done.

@arnogeurts arnogeurts force-pushed the add-with-context-methods branch from 2b2330c to 5e1b54c Compare September 14, 2020 12:46
@tagomoris
Copy link
Member

Ah, I need time to review this change. I'll take time for this later, but please ping me if there will be no further comments in weeks.

Copy link
Member

@tagomoris tagomoris left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just checked only the diff lines. Will check the entire code (not only diff) later.

default:
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
}
return nil
}

// appendBufferWithFeedback appends data to buffer and waits for the result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appendBufferWithFeedback did you update the name after writing this comment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did, will change the comment :)

case f.pending <- bufferInput{msg: msg, result: result}:
// don't do anything
case <-msg.ctx.Done():
// because the result channel is not used, it can safely be returned to the sync pool.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this kind of comment 👍

@@ -386,7 +444,11 @@ func (f *Fluent) run() {
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
continue
}
err := f.write(entry)
err := f.write(entry.msg)
if entry.result != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any cases to get nil on entry.result?

Copy link
Author

@arnogeurts arnogeurts Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so in this change the sync flow and the async flow get very similar. Both write to the pending channel and have a separate go-routine taking entries of that channel and writing the data to the TCP connection.

In case of the async flow no feedback is required, so there will be no result channel in the entry. As you can see in appendBuffer an entry is written to the channel without a result channel.

In case of the sync flow, a result channel is added to the entry, so the feedback can be returned to the caller. And as you can see appendBufferBlocking puts the result channel in the entry and waits on the feedback (hence the "blocking").

Comment on lines +171 to +172
f.wg.Add(1)
go f.run()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those should be only for Async, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tagomoris
Copy link
Member

Sorry, I couldn't take the time for further review. Let me try in this week or the next week...

waitDuration := time.Duration(waitTime) * time.Millisecond
if deadline, hasDeadLine := msg.ctx.Deadline(); hasDeadLine && deadline.Before(time.Now().Add(waitDuration)) {
// the context deadline is within the wait time, so after the sleep the deadline will have been
// exceeded. It is a waste of time to wait on that.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it wait until the deadline?
This code is to return DeadlineExceeded when now + waitDuration > deadline but the DeadlineExceeded should occur when the current time is after the deadline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If deadline.Before(time.Now()), it should return DeadlineExceeded immediately.
If deadline.Before(time.Now().Add(waitDuration), it should sleep until the deadline.
MaxRetryWait may be configured as minutes or hours, so the current code could cause unexpectedly early return of errors.

data []byte
ack string
}

type bufferInput struct {
msg *msgToSend
result chan<- error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be syncWriteResult or a similar one to show it's only about sync-write.

wg sync.WaitGroup
resultPool sync.Pool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want a different name for this too, like result above. The current name looks so confusing to me.

@@ -78,17 +79,24 @@ func NewErrUnknownNetwork(network string) error {
}

type msgToSend struct {
ctx context.Context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ctx is referred only in f.write(), and its argument is only the msg. Putting contexts into msgToSend only for passing it to f.write() looks too much to me (because msgToSend is a message, not any other internal structure!)
How about containing ctx in bufferInput? (And add an argument ctx to f.write()?)
It's a structure of msg and internal objects, so it seems a better struct to have the context.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do so, EncodeDataWithContext is not needed anymore, right?
That function looks only for putting contexts to msgToSend structure.

})
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add another test case for the context timeout, via sync and async logger?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add WithContext methods to Fluent struct
3 participants