Skip to content

Commit

Permalink
Add report ownership
Browse files Browse the repository at this point in the history
  • Loading branch information
tmiddlet2666 committed Sep 10, 2024
1 parent cdee274 commit ff88f93
Show file tree
Hide file tree
Showing 12 changed files with 619 additions and 6 deletions.
12 changes: 12 additions & 0 deletions pkg/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,9 @@ var (
extendClientParam bool
grpcClientParam bool
profileFirstParam bool
machineParam string
rackParam string
siteParam string
skipMavenDepsParam bool
backupLogFilesParam bool
validPersistenceModes = []string{"on-demand", "active", "active-backup", "active-async"}
Expand Down Expand Up @@ -1450,6 +1453,9 @@ func init() {
createClusterCmd.Flags().Int32VarP(&jmxRemotePortParam, jmxPortArg, "J", 0, jmxPortMessage)
createClusterCmd.Flags().StringVarP(&jmxRemoteHostParam, jmxHostArg, "j", "", jmxHostMessage)
createClusterCmd.Flags().BoolVarP(&profileFirstParam, profileFirstArg, "F", false, profileFirstMessage)
createClusterCmd.Flags().StringVarP(&machineParam, machineArg, "", "", machineMessage)
createClusterCmd.Flags().StringVarP(&rackParam, rackArg, "", "", rackMessage)
createClusterCmd.Flags().StringVarP(&siteParam, siteArg, "", "", siteMessage)

stopClusterCmd.Flags().BoolVarP(&automaticallyConfirm, "yes", "y", false, confirmOptionMessage)

Expand All @@ -1465,6 +1471,9 @@ func init() {
startClusterCmd.Flags().StringVarP(&jmxRemoteHostParam, jmxHostArg, "j", "", jmxHostMessage)
startClusterCmd.Flags().BoolVarP(&profileFirstParam, profileFirstArg, "F", false, profileFirstMessage)
startClusterCmd.Flags().BoolVarP(&backupLogFilesParam, backupLogFilesArg, "B", false, backupLogFilesMessage)
startClusterCmd.Flags().StringVarP(&machineParam, machineArg, "", "", machineMessage)
startClusterCmd.Flags().StringVarP(&rackParam, rackArg, "", "", rackMessage)
startClusterCmd.Flags().StringVarP(&siteParam, siteArg, "", "", siteMessage)

startConsoleCmd.Flags().StringVarP(&heapMemoryParam, heapMemoryArg, "M", defaultHeap, heapMemoryMessage)
startConsoleCmd.Flags().Int32VarP(&logLevelParam, logLevelArg, "l", 5, logLevelMessage)
Expand Down Expand Up @@ -1493,6 +1502,9 @@ func init() {
scaleClusterCmd.Flags().StringVarP(&profileValueParam, profileArg, "P", "", profileMessage)
scaleClusterCmd.Flags().StringVarP(&serverStartClassParam, startClassArg, "S", "", startClassMessage)
scaleClusterCmd.Flags().BoolVarP(&backupLogFilesParam, backupLogFilesArg, "B", false, backupLogFilesMessage)
scaleClusterCmd.Flags().StringVarP(&machineParam, machineArg, "", "", machineMessage)
scaleClusterCmd.Flags().StringVarP(&rackParam, rackArg, "", "", rackMessage)
scaleClusterCmd.Flags().StringVarP(&siteParam, siteArg, "", "", siteMessage)
}

// sanitizeConnectionName sanitizes a cluster connection
Expand Down
12 changes: 12 additions & 0 deletions pkg/cmd/cluster_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,18 @@ func getCacheServerArgs(connection ClusterConnection, member string, httpPort in
}
}

if machineParam != "" {
baseArgs = append(baseArgs, fmt.Sprintf("-Dcoherence.machine=%s", machineParam))
}

if rackParam != "" {
baseArgs = append(baseArgs, fmt.Sprintf("-Dcoherence.rack=%s", rackParam))
}

if siteParam != "" {
baseArgs = append(baseArgs, fmt.Sprintf("-Dcoherence.site=%s", siteParam))
}

// if default heap is overridden, then use this
if heapMemoryParam != defaultHeap {
heap = heapMemoryParam
Expand Down
77 changes: 75 additions & 2 deletions pkg/cmd/formatting.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (
NameColumn = "NAME"
publisherColumn = "PUBLISHER"
receiverColumn = "RECEIVER"
machineColumn = "MACHINE"
rackColumn = "RACK"
siteColumn = "SITE"
avgSize = "AVG SIZE"
avgApply = "AVG APPLY"
avgBacklogDelay = "AVG BACKLOG DELAY"
Expand Down Expand Up @@ -579,6 +582,76 @@ func FormatTopicsSummary(topicDetails []config.TopicDetail) string {
return table.String()
}

// FormatPartitionOwnership returns the partition ownership in column formatted output.
func FormatPartitionOwnership(partitionDetails map[int]*config.PartitionOwnership) string {
var (
ownershipCount = len(partitionDetails)
keys = make([]int, 0)
header = []string{MemberColumn, "PRIMARIES", "BACKUPS", "PRIMARY PARTITIONS"}
)
if ownershipCount == 0 {
return ""
}

// get and sort the keys
for k := range partitionDetails {
keys = append(keys, k)
}
sort.Ints(keys)

// get the backup-count
backupCount := utils.GetBackupCount(partitionDetails)

if OutputFormat == constants.WIDE {
header = []string{MemberColumn, machineColumn, rackColumn, siteColumn, "PRIMARIES", "BACKUPS", "PRIMARY PARTITIONS"}
}

// build the header for the backups
for i := 0; i < backupCount; i++ {
header = append(header, fmt.Sprintf("BACKUP %d", i+1))
}

table := newFormattedTable().WithAlignment(generateColumnFormats(backupCount)...).WithHeader(header...)

for j := 0; j < len(keys); j++ {
key := keys[j]
value := partitionDetails[key]

memberID := "Orphaned"
if value.MemberID != -1 {
memberID = fmt.Sprintf("%v", value.MemberID)
}

table.AddRow(memberID)

if OutputFormat == constants.WIDE {
table.AddColumnsToRow(value.Machine, value.Rack, value.Site)
}

table.AddColumnsToRow(formatSmallInteger(int32(value.PrimaryPartitions)),
formatSmallInteger(int32(value.BackupPartitions)))

// add primaries and backups
for i := 0; i <= backupCount; i++ {
table.AddColumnsToRow(utils.FormatPartitions(value.PartitionMap[i]))
}
}

return table.String()
}

func generateColumnFormats(count int) []string {
result := []string{R, R, R, L}
if OutputFormat == constants.WIDE {
result = []string{R, L, L, L, R, R, L}
}

for i := 0; i < count; i++ {
result = append(result, L)
}
return result
}

// FormatTopicsSubscribers returns the topics subscriber details in column formatted output
func FormatTopicsSubscribers(topicsSubscribers []config.TopicsSubscriberDetail) string {
var (
Expand Down Expand Up @@ -1388,7 +1461,7 @@ func FormatMembers(members []config.Member, verbose bool, storageMap map[int]boo
WithAlignment(finalAlignment...)

if OutputFormat == constants.WIDE {
table.AddHeaderColumns("MACHINE", "RACK", "SITE", publisherColumn, receiverColumn)
table.AddHeaderColumns(machineColumn, rackColumn, siteColumn, publisherColumn, receiverColumn)
table.AddFormattingFunction(9, networkStatsFormatter)
table.AddFormattingFunction(10, networkStatsFormatter)
}
Expand Down Expand Up @@ -1813,7 +1886,7 @@ func FormatMachines(machines []config.Machine) string {
return strings.Compare(machines[p].MachineName, machines[q].MachineName) < 0
})

table := newFormattedTable().WithHeader("MACHINE", "PROCESSORS", "LOAD", "TOTAL MEMORY", "FREE MEMORY",
table := newFormattedTable().WithHeader(machineColumn, "PROCESSORS", "LOAD", "TOTAL MEMORY", "FREE MEMORY",
"% FREE", "OS", "ARCH", "VERSION").WithAlignment(L, R, R, R, R, R, L, L, L)
table.AddFormattingFunction(5, machineMemoryFormatting)

Expand Down
56 changes: 56 additions & 0 deletions pkg/cmd/monitor_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var validPanels = []panelImpl{
createContentPanel(8, "services", "Services", "show services", servicesContent, servicesPanelData),
createContentPanel(8, "service-members", "Service Members (%SERVICE)", "show service members", serviceMembersContent, servicesPanelData),
createContentPanel(8, "service-distributions", "Service Distributions (%SERVICE)", "show service distributions", serviceDistributionsContent, servicesPanelData),
createContentPanel(8, "service-ownership", "Service Ownership (%SERVICE)", "show service ownership", serviceOwnershipContent, servicesPanelData),
createContentPanel(8, "service-storage", "Service Storage", "show service storage", serviceStorageContent, servicesPanelData),
createContentPanel(8, "topic-members", "Topic Members (%SERVICE/%TOPIC)", "show topic members", topicMembersContent, topicsPanelData),
createContentPanel(8, "subscribers", "Topic Subscribers (%SERVICE/%TOPIC)", "show topic subscribers", topicSubscribersContent, topicsPanelData),
Expand Down Expand Up @@ -639,6 +640,61 @@ var serviceDistributionsContent = func(dataFetcher fetcher.Fetcher, _ clusterSum
return strings.Split(distributions.ScheduledDistributions, "\n"), nil
}

var serviceOwnershipContent = func(dataFetcher fetcher.Fetcher, _ clusterSummaryInfo) ([]string, error) {
var (
membersResult []byte
memberNodeID string
membersDetails = config.ServiceMemberDetails{}
)

if serviceName == "" {
return emptyStringArray, errSelectService
}

servicesResult, err := GetDistributedServices(dataFetcher)
if err != nil {
return emptyStringArray, err
}

if !utils.SliceContains(servicesResult, serviceName) {
return emptyStringArray, fmt.Errorf(unableToFindService, serviceName)
}

// find storage member node
membersResult, err = dataFetcher.GetServiceMembersDetailsJSON(serviceName)
if err != nil {
return emptyStringArray, err
}

if len(membersResult) != 0 {
err = json.Unmarshal(membersResult, &membersDetails)
if err != nil {
return emptyStringArray, utils.GetError("unable to unmarshall members ownership", err)
}

for _, v := range membersDetails.Services {
memberNodeID = v.NodeID
break
}

var ownershipData []byte

ownershipData, err = dataFetcher.GetServiceOwnershipJSON(serviceName, memberNodeID)
if err != nil {
return emptyStringArray, err
}

result, err := getOwnershipData(dataFetcher, ownershipData)
if err != nil {
return emptyStringArray, err
}

return strings.Split(FormatPartitionOwnership(result), "\n"), nil
}

return noContentArray, nil
}

var serviceStorageContent = func(dataFetcher fetcher.Fetcher, _ clusterSummaryInfo) ([]string, error) {
storageSummary, err := getServiceStorageDetails(dataFetcher)

Expand Down
7 changes: 7 additions & 0 deletions pkg/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ const (
healthPortMessage = "starting port for health"
jmxPortMessage = "remote JMX port for management member"
jmxHostMessage = "remote JMX RMI host for management member"
machineMessage = "the machine name to use"
rackMessage = "the rack name to use"
siteMessage = "the site name to use"
cacheConfigMessage = "cache configuration file"
operationalConfigMessage = "override override file"
cacheConfigArg = "cache-config"
Expand All @@ -94,6 +97,9 @@ const (
healthPortArg = "health-port"
jmxPortArg = "jmx-port"
jmxHostArg = "jmx-host"
machineArg = "machine"
rackArg = "rack"
siteArg = "site"
logLevelMessage = "coherence log level"
profileMessage = "profile to add to cluster startup command line"
backupLogFilesMessage = "backup old cache server log files"
Expand Down Expand Up @@ -507,6 +513,7 @@ func Initialize(command *cobra.Command) *cobra.Command {
getCmd.AddCommand(getProxyConnectionsCmd)
getCmd.AddCommand(getUseGradleCmd)
getCmd.AddCommand(getServiceStorageCmd)
getCmd.AddCommand(getServiceOwnershipCmd)
getCmd.AddCommand(getCacheStoresCmd)
getCmd.AddCommand(getColorCmd)
getCmd.AddCommand(getNetworkStatsCmd)
Expand Down
Loading

0 comments on commit ff88f93

Please sign in to comment.