Skip to content

Commit

Permalink
wip: batch entries up to limit the limit specified in the api
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinwcyu committed Sep 18, 2024
1 parent b54865e commit 0fe9c30
Show file tree
Hide file tree
Showing 30 changed files with 1,926 additions and 124 deletions.
3 changes: 2 additions & 1 deletion cspell.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"pkg/**/*_test.go",
"provisioning/**/*.yaml",
"**/testdata/*.json",
"**/testdata/*.jsonc",
"**/dashboards/*.json",
"src/static/**",
"vendor/**",
Expand Down Expand Up @@ -70,7 +71,7 @@
"eslintcache",
"lefthook",
"ssjagad",
"jackspeak",
"jackspeak",
"nvmrc",
"golangci"
]
Expand Down
61 changes: 35 additions & 26 deletions pkg/framer/property_aggregate_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

type AssetPropertyAggregatesBatch struct {
Request iotsitewise.BatchGetAssetPropertyAggregatesInput
Response iotsitewise.BatchGetAssetPropertyAggregatesOutput
Requests []iotsitewise.BatchGetAssetPropertyAggregatesInput
Responses []iotsitewise.BatchGetAssetPropertyAggregatesOutput
}

// getAggregationFields enforces ordering of aggregate fields
Expand Down Expand Up @@ -86,40 +86,49 @@ func addAggregateFieldValues(idx int, fields map[string]*data.Field, aggs *iotsi
}

func (a AssetPropertyAggregatesBatch) Frames(ctx context.Context, resources resource.ResourceProvider) (data.Frames, error) {
resp := a.Response
frames := data.Frames{}
successEntriesLength := 0
for _, r := range a.Responses {
successEntriesLength += len(r.SuccessEntries)
}
frames := make(data.Frames, 0, successEntriesLength)

properties, err := resources.Properties(ctx)
if err != nil {
return nil, err
}

for i, e := range resp.SuccessEntries {
property := properties[*e.EntryId]
frame, err := a.Frame(ctx, property, e.AggregatedValues)
if err != nil {
return nil, err
}
frame.Meta = &data.FrameMeta{
Custom: models.SitewiseCustomMeta{
NextToken: aws.StringValue(resp.NextToken),
Resolution: aws.StringValue(a.Request.Entries[i].Resolution),
Aggregates: aws.StringValueSlice(a.Request.Entries[i].AggregateTypes),
},
}
frames = append(frames, frame)
}
for i, r := range a.Responses {
request := a.Requests[i]
for j, e := range r.SuccessEntries {
property := properties[*e.EntryId]
frame, err := a.Frame(ctx, property, e.AggregatedValues)
if err != nil {
return nil, err
}

for _, e := range resp.ErrorEntries {
property := properties[*e.EntryId]
frame := data.NewFrame(getFrameName(property))
if e.ErrorMessage != nil {
frame.Meta = &data.FrameMeta{
Notices: []data.Notice{{Severity: data.NoticeSeverityError, Text: *e.ErrorMessage}},
Custom: models.SitewiseCustomMeta{
NextToken: aws.StringValue(r.NextToken),
EntryId: *e.EntryId,
Resolution: aws.StringValue(request.Entries[j].Resolution),
Aggregates: aws.StringValueSlice(request.Entries[j].AggregateTypes),
},
}
frames = append(frames, frame)
}

for _, e := range r.ErrorEntries {
property := properties[*e.EntryId]
frame := data.NewFrame(getFrameName(property))
if e.ErrorMessage != nil {
frame.Meta = &data.FrameMeta{
Notices: []data.Notice{{Severity: data.NoticeSeverityError, Text: *e.ErrorMessage}},
}
}
frames = append(frames, frame)
}
frames = append(frames, frame)
}

return frames, nil
}

Expand Down Expand Up @@ -152,4 +161,4 @@ func (a AssetPropertyAggregatesBatch) Frame(ctx context.Context, property *iotsi

return frame, nil

}
}
59 changes: 35 additions & 24 deletions pkg/framer/property_value_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,58 @@ import (
)

type AssetPropertyValueBatch struct {
*iotsitewise.BatchGetAssetPropertyValueOutput
Responses []*iotsitewise.BatchGetAssetPropertyValueOutput
AnomalyAssetIds []string
SitewiseClient client.SitewiseClient
}

func (p AssetPropertyValueBatch) Frames(ctx context.Context, resources resource.ResourceProvider) (data.Frames, error) {
frames := data.Frames{}
successEntriesLength := 0
for _, r := range p.Responses {
successEntriesLength += len(r.SuccessEntries)
}
frames := make(data.Frames, 0, successEntriesLength)

properties, err := resources.Properties(ctx)
if err != nil {
return nil, err
}

for _, e := range p.SuccessEntries {
property := properties[*e.EntryId]
if util.IsAssetProperty(property) && *property.AssetProperty.DataType == *aws.String("?") && e.AssetPropertyValue != nil {
property.AssetProperty.DataType = aws.String(getPropertyVariantValueType(e.AssetPropertyValue.Value))
}
for _, r := range p.Responses {
for _, e := range r.SuccessEntries {
property := properties[*e.EntryId]
if util.IsAssetProperty(property) && *property.AssetProperty.DataType == *aws.String("?") && e.AssetPropertyValue != nil {
property.AssetProperty.DataType = aws.String(getPropertyVariantValueType(e.AssetPropertyValue.Value))
}

var frame *data.Frame
if property.AssetId != nil && slices.Contains(p.AnomalyAssetIds, *property.AssetId) {
frame, err = p.frameL4ePropertyValue(ctx, property, e.AssetPropertyValue)
if err != nil {
return nil, err
var frame *data.Frame
if property.AssetId != nil && slices.Contains(p.AnomalyAssetIds, *property.AssetId) {
frame, err = p.frameL4ePropertyValue(ctx, property, e.AssetPropertyValue)
if err != nil {
return nil, err
}
} else {
frame = p.framePropertyValue(property, e.AssetPropertyValue)
}
frame.Meta = &data.FrameMeta{
Custom: models.SitewiseCustomMeta{
NextToken: aws.StringValue(r.NextToken),
EntryId: *e.EntryId,
},
}
} else {
frame = p.framePropertyValue(property, e.AssetPropertyValue)
frames = append(frames, frame)
}

frames = append(frames, frame)
}

for _, e := range p.ErrorEntries {
property := properties[*e.EntryId]
frame := data.NewFrame(*property.AssetName)
if e.ErrorMessage != nil {
frame.Meta = &data.FrameMeta{
Notices: []data.Notice{{Severity: data.NoticeSeverityError, Text: *e.ErrorMessage}},
for _, e := range r.ErrorEntries {
property := properties[*e.EntryId]
frame := data.NewFrame(*property.AssetName)
if e.ErrorMessage != nil {
frame.Meta = &data.FrameMeta{
Notices: []data.Notice{{Severity: data.NoticeSeverityError, Text: *e.ErrorMessage}},
}
}
frames = append(frames, frame)
}
frames = append(frames, frame)
}

return frames, nil
Expand Down
62 changes: 31 additions & 31 deletions pkg/framer/property_value_history_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,52 @@ import (
)

type AssetPropertyValueHistoryBatch struct {
*iotsitewise.BatchGetAssetPropertyValueHistoryOutput
Responses []*iotsitewise.BatchGetAssetPropertyValueHistoryOutput
Query models.AssetPropertyValueQuery
AnomalyAssetIds []string
SitewiseClient client.SitewiseClient
}

func (p AssetPropertyValueHistoryBatch) Frames(ctx context.Context, resources resource.ResourceProvider) (data.Frames, error) {
frames := make(data.Frames, 0, len(p.SuccessEntries))
successEntriesLength := 0
for _, r := range p.Responses {
successEntriesLength += len(r.SuccessEntries)
}
frames := make(data.Frames, 0, successEntriesLength)

properties, err := resources.Properties(ctx)
if err != nil {
return frames, err
}

for _, h := range p.SuccessEntries {
frame, err := p.Frame(ctx, properties[*h.EntryId], h.AssetPropertyValueHistory)
if err != nil {
return nil, err
}
if frame != nil {
frames = append(frames, frame)
for _, r := range p.Responses {
for _, s := range r.SuccessEntries {
frame, err := p.Frame(ctx, properties[*s.EntryId], s.AssetPropertyValueHistory)
frame.Meta = &data.FrameMeta{
Custom: models.SitewiseCustomMeta{
NextToken: aws.StringValue(r.NextToken),
EntryId: *s.EntryId,
Resolution: models.PropertyQueryResolutionRaw,
},
}
if err != nil {
return nil, err
}
if frame != nil {
frames = append(frames, frame)
}
}
}

for _, e := range p.ErrorEntries {
property := properties[*e.EntryId]
frame := data.NewFrame(getFrameName(property))
if e.ErrorMessage != nil {
frame.Meta = &data.FrameMeta{
Notices: []data.Notice{{Severity: data.NoticeSeverityError, Text: *e.ErrorMessage}},
for _, e := range r.ErrorEntries {
property := properties[*e.EntryId]
frame := data.NewFrame(getFrameName(property))
if e.ErrorMessage != nil {
frame.Meta = &data.FrameMeta{
Notices: []data.Notice{{Severity: data.NoticeSeverityError, Text: *e.ErrorMessage}},
}
}
frames = append(frames, frame)
}
frames = append(frames, frame)
}

return frames, nil
Expand Down Expand Up @@ -93,13 +107,6 @@ func (p AssetPropertyValueHistoryBatch) framePropertyValues(property *iotsitewis
valueField,
qualityField)

frame.Meta = &data.FrameMeta{
Custom: models.SitewiseCustomMeta{
NextToken: aws.StringValue(p.NextToken),
Resolution: models.PropertyQueryResolutionRaw,
},
}

for i, v := range h {
if v.Value != nil && getPropertyVariantValue(v.Value) != nil {
timeField.Set(i, getTime(v.Timestamp))
Expand Down Expand Up @@ -129,13 +136,6 @@ func (p AssetPropertyValueHistoryBatch) frameL4ePropertyValues(ctx context.Conte
frameName,
dataFields...)

frame.Meta = &data.FrameMeta{
Custom: models.SitewiseCustomMeta{
NextToken: aws.StringValue(p.NextToken),
Resolution: models.PropertyQueryResolutionRaw,
},
}

return frame, nil
}

Expand Down
Loading

0 comments on commit 0fe9c30

Please sign in to comment.