-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor - Move command execution responsibility to CommandHandler from IOThread #1358
Refactor - Move command execution responsibility to CommandHandler from IOThread #1358
Conversation
@soumya-codes @lucifercr07 - please review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code changes look great Prateek. I have done a lot of nit picking around naming convention. Please feel free to open a separate PR to address them.
@@ -104,15 +104,16 @@ func (manager *ShardManager) GetShard(id ShardID) *ShardThread { | |||
return nil | |||
} | |||
|
|||
// RegisterIOThread registers a io-thread with all Shards present in the ShardManager. | |||
func (manager *ShardManager) RegisterIOThread(id string, request, processing chan *ops.StoreResponse) { | |||
// RegisterCommandHandler registers a command handler with all Shards present in the ShardManager. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as earlier, what do you think of naming Manager to Registar/Registry? This name provides the precise nature of the work done by this type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me think on this, will discuss with you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, minor comments.
internal/server/resp/server.go
Outdated
func GenerateUniqueIOThreadID() string { | ||
count := atomic.AddUint64(&ioThreadCounter, 1) | ||
timestamp := time.Now().UnixNano()/int64(time.Millisecond) - startTime | ||
return fmt.Sprintf("W-%d-%d", timestamp, count) | ||
} | ||
|
||
func GenerateUniqueCommandHandlerID() string { | ||
count := atomic.AddUint64(&cmdHandlerCounter, 1) | ||
timestamp := time.Now().UnixNano()/int64(time.Millisecond) - startTime | ||
return fmt.Sprintf("W-%d-%d", timestamp, count) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we optimise this as below? Also any reasons why we're keeping ID format for cmdHandler and IOThread same?
func GenerateUniqueID(prefix string, counter *uint64) string {
count := atomic.AddUint64(counter, 1)
timestamp := time.Now().UnixMilli() - startTime
return fmt.Sprintf("%s-%d-%d", prefix, timestamp, count)
}
func GenerateUniqueIOThreadID() string {
return GenerateUniqueID("W", &ioThreadCounter)
}
func GenerateUniqueCommandHandlerID() string {
return GenerateUniqueID("W", &cmdHandlerCounter)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes made this change. Now using "I" for io thread and "C" for command handler.
internal/commandhandler/manager.go
Outdated
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
if m.CommandHandlerCount() >= m.maxCmdHandlers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this returns error we've already registered an iothread, may be we can move this check earlier?
Also this'd initiate a server shutdown I believe would that be okay? Shouldn't we just drop the new client gracefully and continue server ops?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch - thanks!
Moved registrations earlier. Now goroutines are started if both registrations are successful.
Also, logged error in registrations, connection request is dropped and server continues to run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, minor comments.
internal/commandhandler/manager.go
Outdated
} else if responseChan != nil && preprocessingChan == nil { | ||
m.ShardManager.RegisterCommandHandler(cmdHandler.ID(), responseChan, nil) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this can be placed before this if else condition and each of the two channels can be checked and initialized separately. It would avoid errors where either one of the two channels is somehow uninitialized and doesn't fit into the existing if-else logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JyotinderSingh - One clarification. Currently AcceptConnectionRequests
is the only one using RegisterCommandHandler
function and it is initializing both channels always. I am not sure why this check exist in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the entire if else check and added a check on respChan. Please see this comment
… usage due to padding
…arting goroutines
Let's fix the linter errors before merging |
@JyotinderSingh - fixed. |
TODO
[x] Investigate CopyTest failure
[ ] Investigate SDK tests failure - we decided to move ahead without resolving this to unblock others.
This PR is a preparatory step towards solving nearly 50-60% CPU utilisation due to context shift between user space and kernel space for frequent read and write calls.
As first step, this PR moves command execution logic out of IOThread to a new entity
CommandHandler
. IOThread is now responsible for reading from and writing to the client connection, and CommandHandler is responsible for execution of command by interacting with ShardManager and WatchManager.Currently, there is 1:1 mapping between IOThread and CommandHandler, and both of them are spawned by Resp Server. IOThread and CommandHandler communicate using 3 channels
ioThreadReadChan
- to send command from IOThread to CommandHandlerioThreadWriteChan
- to send response from CommandHandler to IOThreadioThreadErrChan
- to send connection error signal from IOThread to CommandHandler, so that CommandHandler shuts down when IOHandler does