Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base collector #99

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 55 additions & 65 deletions doc/implementing_a_collector.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,48 @@
This sequance diagram shows the high level flow of data from the SUT to User and contains
```mermaid
sequenceDiagram
participant User
participant Runner
participant Collector
participant Fetcher
participant SUT
participant callback

User ->>+ Runner: Runs binary
note left of Runner: runner.Run()

Runner ->> Runner: Gather collectors
note right of Runner: GetCollectorsToRun()

Runner ->> Collector: Starts collector
note over Runner,Collector: Runner.start() calls Collector.Start()

Runner ->> Runner: Spawns Runner.Poller() goroutine for collector
loop Runner.Poller(): Until user interupts or number of polls is reached
Runner ->>+ Collector: Calls Poll
note over Runner,Collector: Runner.Poller() calls Collector.Poll()

Collector ->>+ Fetcher: Requests Data
note over Collector,Fetcher: Collector.poll() calls function <br> in devices submodule which <br> inturn calls Fetcher.Fetch()
Fetcher ->>+ SUT: Runs commands
note left of SUT: Fetcher.Fetch() calls runCommands()

SUT ->>- Fetcher: Command result
note left of SUT: runCommands() returns result
Fetcher ->>- Collector: Returns results
note left of Fetcher: Fetcher.Fetch() unpacks results into a struct


Collector ->> callback: sends data to callback
note left of callback: Callback.Call()
callback ->> User: Presents formatted data to user
Collector ->>- Runner: Sends poll sucess/failure via results channel
note left of Collector: resultsChan <- PollResult{CollectorName, Errors}

Runner ->> Runner: Reacts to failures
end
Runner ->>- Collector: Stops collector
note left of Collector: Runner.cleanUpAll() calls Collector.CleanUp()
participant User
participant Runner
participant Collector
participant Fetcher
participant SUT
participant callback

User ->>+ Runner: Runs binary
note left of Runner: runner.Run()

Runner ->> Runner: Gather collectors
note right of Runner: GetCollectorsToRun()

Runner ->> Collector: Starts collector
note over Runner,Collector: Runner.start() calls Collector.Start()

Runner ->> Runner: Spawns Runner.Poller() goroutine for collector
loop Runner.Poller(): Until user interupts or number of polls is reached
Runner ->>+ Collector: Calls Poll
note over Runner,Collector: Runner.Poller() calls Collector.Poll()

Collector ->>+ Fetcher: Requests Data
note over Collector,Fetcher: Collector.poll() calls function <br> in devices submodule which <br> inturn calls Fetcher.Fetch()
Fetcher ->>+ SUT: Runs commands
note left of SUT: Fetcher.Fetch() calls runCommands()

SUT ->>- Fetcher: Command result
note left of SUT: runCommands() returns result
Fetcher ->>- Collector: Returns results
note left of Fetcher: Fetcher.Fetch() unpacks results into a struct


Collector ->> callback: sends data to callback
note left of callback: Callback.Call()
callback ->> User: Presents formatted data to user
Collector ->>- Runner: Sends poll sucess/failure via results channel
note left of Collector: resultsChan <- PollResult{CollectorName, Errors}

Runner ->> Runner: Reacts to failures
end
Runner ->>- Collector: Stops collector
note left of Collector: Runner.cleanUpAll() calls Collector.CleanUp()
```

## Step by step
Expand All @@ -60,12 +60,10 @@ An example of a very simple collector:
In `collectors/collectors.go` any arguments additional should be added to the `CollectionConstuctor`
```go
...

type CollectionConstuctor struct {
...
Msg string
...
Msg string
}

...
```

Expand All @@ -84,6 +82,7 @@ const (
AnnouncementMsg = "custom-anouncement"
)

// Reporting message
type AnnouncementMessage struct {
Msg string `json:"msg"`
}
Expand All @@ -98,22 +97,10 @@ func (annMsg *AnnouncementMessage) GetAnalyserFormat() (*callbacks.AnalyserFor
return &formatted, nil
}

// Collector
type AnnouncementCollector struct {
callback callbacks.Callback
*baseCollector
msg string
pollInterval int
}

func (announcer *AnnouncementCollector) GetPollInterval() int {
return announcer.pollInterval
}

func (announcer *AnnouncementCollector) IsAnnouncer() bool {
return true
}

func (announcer *AnnouncementCollector) Start() error {
return nil
}

func (announcer *AnnouncementCollector) Poll(resultsChan chan PollResult, wg *utils.WaitGroupCount) {
Expand All @@ -134,12 +121,15 @@ func (announcer *AnnouncementCollector) Poll(resultsChan chan PollResult, wg *ut
}
}

func (announcer *AnnouncementCollector) CleanUp() error {
return nil
}

func NewAnnouncementCollector(constuctor *CollectionConstuctor) (Collector, error) {
announcer := AnnouncementCollector{callback:constructor.Callback, msg:constructor.Msg}
announcer := AnnouncementCollector{
baseCollector: newBaseCollector(
constructor.PollInterval,
false,
constructor.Callback,
),
msg:constructor.Msg,
}
return &announcer, nil
}

Expand Down
14 changes: 7 additions & 7 deletions doc/implimenting_a_fetcher.md → doc/implementing_a_fetcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func (uptime *MyUptime) GetAnalyserFormat() {
func GetUptime(ctx clients.ContainerContext) (MyUptime, error) {
uptime := MyUptime{}
uptimeFetcher.Fetch(ctx, &uptime)
if err != nil {
log.Debugf("failed to fetch uptime %s", err.Error())
if err != nil {
log.Debugf("failed to fetch uptime %s", err.Error())
return gpsNav, err
}
return uptime, nil
Expand All @@ -53,11 +53,11 @@ func GetUptime(ctx clients.ContainerContext) (MyUptime, error) {
If you wish to extract/filter/transform the raw data you can define a post processing function as follows
```go
type MyUptime struct {
Raw string `fetcherKey:"MyUptimeValue"`
Raw string `fetcherKey:"MyUptimeValue"`
Uptime string `fetcherKey:"Uptime"`
Load1 string `fetcherKey:"load1"`
Load5 string `fetcherKey:"load5"`
Load15 string `fetcherKey:"load15"`
Load1 string `fetcherKey:"load1"`
Load5 string `fetcherKey:"load5"`
Load15 string `fetcherKey:"load15"`
}

var (
Expand All @@ -71,7 +71,7 @@ var (
)

func init(){
uptimeFetcher = NewFetcher()
uptimeFetcher = NewFetcher()
err := uptimeFetcher.AddNewCommand("MyUptimeValue", "uptime", true)
if err != nil {
panic("Failed to add uptime command to fetcher")
Expand Down
38 changes: 38 additions & 0 deletions pkg/collectors/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,41 @@ type PollResult struct {
CollectorName string
Errors []error
}

type baseCollector struct {
callback callbacks.Callback
isAnnouncer bool
running bool
pollInterval int
}

func (base *baseCollector) GetPollInterval() int {
return base.pollInterval
}

func (base *baseCollector) IsAnnouncer() bool {
return base.isAnnouncer
}

func (base *baseCollector) Start() error {
base.running = true
return nil
}

func (base *baseCollector) CleanUp() error {
base.running = false
return nil
}

func newBaseCollector(
pollInterval int,
isAnnouncer bool,
callback callbacks.Callback,
) *baseCollector {
return &baseCollector{
callback: callback,
isAnnouncer: isAnnouncer,
running: false,
pollInterval: pollInterval,
}
}
32 changes: 8 additions & 24 deletions pkg/collectors/dev_info_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"

"github.com/redhat-partner-solutions/vse-sync-collection-tools/pkg/callbacks"
"github.com/redhat-partner-solutions/vse-sync-collection-tools/pkg/clients"
"github.com/redhat-partner-solutions/vse-sync-collection-tools/pkg/collectors/contexts"
"github.com/redhat-partner-solutions/vse-sync-collection-tools/pkg/collectors/devices"
Expand All @@ -21,32 +19,21 @@ import (
)

type DevInfoCollector struct {
callback callbacks.Callback
*baseCollector
ctx clients.ContainerContext
devInfo *devices.PTPDeviceInfo
quit chan os.Signal
erroredPolls chan PollResult
requiresFetch chan bool
ctx clients.ContainerContext
interfaceName string
count uint32
running bool
wg sync.WaitGroup
pollInterval int
}

const (
DevInfoCollectorName = "DevInfo"
DeviceInfo = "device-info"
)

func (ptpDev *DevInfoCollector) GetPollInterval() int {
return ptpDev.pollInterval
}

func (ptpDev *DevInfoCollector) IsAnnouncer() bool {
return true
}

// Start sets up the collector so it is ready to be polled
func (ptpDev *DevInfoCollector) Start() error {
ptpDev.running = true
Expand Down Expand Up @@ -109,7 +96,6 @@ func (ptpDev *DevInfoCollector) poll() error {
func (ptpDev *DevInfoCollector) Poll(resultsChan chan PollResult, wg *utils.WaitGroupCount) {
defer func() {
wg.Done()
atomic.AddUint32(&ptpDev.count, 1)
}()
errorsToReturn := make([]error, 0)
err := ptpDev.poll()
Expand All @@ -130,10 +116,6 @@ func (ptpDev *DevInfoCollector) CleanUp() error {
return nil
}

func (ptpDev *DevInfoCollector) GetPollCount() int {
return int(atomic.LoadUint32(&ptpDev.count))
}

func verify(ptpDevInfo *devices.PTPDeviceInfo, constructor *CollectionConstructor) error {
checkErrors := make([]error, 0)
checks := []validations.Validation{
Expand Down Expand Up @@ -189,15 +171,17 @@ func NewDevInfoCollector(constructor *CollectionConstructor) (Collector, error)
}

collector := DevInfoCollector{
interfaceName: constructor.PTPInterface,
baseCollector: newBaseCollector(
constructor.DevInfoAnnouceInterval,
true,
constructor.Callback,
),
ctx: ctx,
running: false,
callback: constructor.Callback,
interfaceName: constructor.PTPInterface,
devInfo: &ptpDevInfo,
quit: make(chan os.Signal),
erroredPolls: constructor.ErroredPolls,
requiresFetch: make(chan bool, 1),
pollInterval: constructor.DevInfoAnnouceInterval,
}

return &collector, nil
Expand Down
35 changes: 7 additions & 28 deletions pkg/collectors/dpll_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,23 @@ import (
"errors"
"fmt"

"github.com/redhat-partner-solutions/vse-sync-collection-tools/pkg/callbacks"
"github.com/redhat-partner-solutions/vse-sync-collection-tools/pkg/clients"
"github.com/redhat-partner-solutions/vse-sync-collection-tools/pkg/collectors/contexts"
"github.com/redhat-partner-solutions/vse-sync-collection-tools/pkg/collectors/devices"
"github.com/redhat-partner-solutions/vse-sync-collection-tools/pkg/utils"
)

type DPLLCollector struct {
callback callbacks.Callback
*baseCollector
ctx clients.ContainerContext
interfaceName string
running bool
pollInterval int
}

const (
DPLLCollectorName = "DPLL"
DPLLInfo = "dpll-info"
)

func (dpll *DPLLCollector) GetPollInterval() int {
return dpll.pollInterval
}

func (dpll *DPLLCollector) IsAnnouncer() bool {
return false
}

// Start sets up the collector so it is ready to be polled
func (dpll *DPLLCollector) Start() error {
dpll.running = true
return nil
}

// polls for the dpll info then passes it to the callback
func (dpll *DPLLCollector) poll() error {
dpllInfo, err := devices.GetDevDPLLInfo(dpll.ctx, dpll.interfaceName)
Expand Down Expand Up @@ -71,12 +54,6 @@ func (dpll *DPLLCollector) Poll(resultsChan chan PollResult, wg *utils.WaitGroup
}
}

// CleanUp stops a running collector
func (dpll *DPLLCollector) CleanUp() error {
dpll.running = false
return nil
}

// Returns a new DPLLCollector from the CollectionConstuctor Factory
func NewDPLLCollector(constructor *CollectionConstructor) (Collector, error) {
ctx, err := contexts.GetPTPDaemonContext(constructor.Clientset)
Expand All @@ -89,11 +66,13 @@ func NewDPLLCollector(constructor *CollectionConstructor) (Collector, error) {
}

collector := DPLLCollector{
interfaceName: constructor.PTPInterface,
baseCollector: newBaseCollector(
constructor.PollInterval,
false,
constructor.Callback,
),
ctx: ctx,
running: false,
callback: constructor.Callback,
pollInterval: constructor.PollInterval,
interfaceName: constructor.PTPInterface,
}

dpllFSExists, err := devices.IsDPLLFileSystemPresent(collector.ctx, collector.interfaceName)
Expand Down
Loading
Loading