Skip to content

Commit

Permalink
PropertyAlias: Add support for unassociated streams (#231)
Browse files Browse the repository at this point in the history
* Add support for unassociated streams
* Prepare 1.12.0
  • Loading branch information
idastambuk authored Oct 24, 2023
1 parent 8f0c951 commit 3289943
Show file tree
Hide file tree
Showing 30 changed files with 4,785 additions and 57 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

All notable changes to this project will be documented in this file.

## v1.12.0
- Query by property alias: Add support for unassociated streams in [#231](https://github.com/grafana/iot-sitewise-datasource/pull/231)

## v1.11.1
- Revert "Replace deprecated setVariableQueryEditor with CustomVariableSupport" in [#229](https://github.com/grafana/iot-sitewise-datasource/pull/229)

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "grafana-iot-sitewise-datasource",
"version": "1.11.1",
"version": "1.12.0",
"description": "View IoT Sitewise data in grafana",
"scripts": {
"build": "webpack -c ./.config/webpack/webpack.config.ts --env production",
Expand Down
8 changes: 6 additions & 2 deletions pkg/framer/property_value_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ func (p AssetPropertyValueHistory) Frames(ctx context.Context, resources resourc
}

func (p AssetPropertyValueHistory) Frame(ctx context.Context, property *iotsitewise.DescribeAssetPropertyOutput, h []*iotsitewise.AssetPropertyValue) (*data.Frame, error) {

length := len(h)

// TODO: make this work with the API instead of ad-hoc dataType inference
// https://github.com/grafana/iot-sitewise-datasource/issues/98#issuecomment-892947756
if *property.AssetProperty.DataType == *aws.String("?") {
property.AssetProperty.DataType = aws.String(getPropertyVariantValueType(h[0].Value))
if length != 0 {
property.AssetProperty.DataType = aws.String(getPropertyVariantValueType(h[0].Value))
} else {
property.AssetProperty.DataType = aws.String("")
}
}

timeField := fields.TimeField(length)
Expand Down
20 changes: 17 additions & 3 deletions pkg/resource/query_resouce_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/aws/aws-sdk-go/service/iotsitewise"
"github.com/grafana/iot-sitewise-datasource/pkg/models"
"github.com/grafana/iot-sitewise-datasource/pkg/util"
)

type queryResourceProvider struct {
Expand Down Expand Up @@ -56,13 +57,26 @@ func (rp *queryResourceProvider) Property(ctx context.Context) (*iotsitewise.Des

func (rp *queryResourceProvider) Properties(ctx context.Context) (map[string]*iotsitewise.DescribeAssetPropertyOutput, error) {
properties := map[string]*iotsitewise.DescribeAssetPropertyOutput{}
for _, id := range rp.baseQuery.AssetIds {
prop, err := rp.resources.Property(ctx, id, rp.baseQuery.PropertyId, rp.baseQuery.PropertyAlias)
// if the query for a PropertyAlias doesn't have an assetId or propertyId, it means it's a disassociated stream
// in that case, we call Property() with empty values, which will set AssetProperty.Name to the alias
// and will set the EntryId to the alias (to access values in results)
if len(rp.baseQuery.AssetIds) == 0 && rp.baseQuery.PropertyId == "" && rp.baseQuery.PropertyAlias != "" {
prop, err := rp.resources.Property(ctx, "", "", rp.baseQuery.PropertyAlias)
if err != nil {
return nil, err
}
properties[id] = prop
entryId := util.GetEntryId(rp.baseQuery)
properties[*entryId] = prop
} else {
for _, id := range rp.baseQuery.AssetIds {
prop, err := rp.resources.Property(ctx, id, rp.baseQuery.PropertyId, rp.baseQuery.PropertyAlias)
if err != nil {
return nil, err
}
properties[id] = prop
}
}

return properties, nil
}

Expand Down
126 changes: 117 additions & 9 deletions pkg/server/test/property_value_aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ import (
"github.com/grafana/iot-sitewise-datasource/pkg/sitewise/client/mocks"
)

func TestPropertyValueAggregate(t *testing.T) {
type test struct {
name string
query string
isExpression bool
expectedMaxPages int
expectedMaxResults int
expectedDescribeTimeSeriesWithContextArgs *iotsitewise.DescribeTimeSeriesInput
}
type test struct {
name string
query string
isExpression bool
expectedMaxPages int
expectedMaxResults int
expectedDescribeTimeSeriesWithContextArgs *iotsitewise.DescribeTimeSeriesInput
}

func TestPropertyValueAggregate(t *testing.T) {
tests := []test{
{
name: "query by asset id and property id",
Expand Down Expand Up @@ -195,4 +195,112 @@ func TestPropertyValueAggregate(t *testing.T) {
}
}

func TestPropertyValueAggregateWithDisassociatedStream(t *testing.T) {
tc := test{
// an disassociated stream will return nil in DescribeTimeSeriesWithContext for assetId and propertyId
name: "query by property alias of an disassociated stream",
query: `{
"region":"us-west-2",
"propertyAlias":"/amazon/renton/1/rpm",
"aggregates":["SUM"],
"resolution":"1m"
}`,
expectedDescribeTimeSeriesWithContextArgs: &iotsitewise.DescribeTimeSeriesInput{Alias: Pointer("/amazon/renton/1/rpm")},
expectedMaxPages: 1,
expectedMaxResults: 0,
}

t.Run(tc.name, func(t *testing.T) {
mockSw := &mocks.SitewiseClient{}

if tc.expectedDescribeTimeSeriesWithContextArgs != nil {
alias := Pointer("/amazon/renton/1/rpm")
var assetId *string
var propertyId *string

mockSw.On("DescribeTimeSeriesWithContext", mock.Anything, mock.Anything).Return(&iotsitewise.DescribeTimeSeriesOutput{
Alias: alias,
AssetId: assetId,
PropertyId: propertyId,
}, nil)
}
mockSw.On(
"BatchGetAssetPropertyAggregatesPageAggregation",
mock.Anything,
mock.MatchedBy(func(input *iotsitewise.BatchGetAssetPropertyAggregatesInput) bool {
entries := *input.Entries[0]
return *entries.EntryId == "_amazon_renton_1_rpm" &&
*entries.PropertyAlias == "/amazon/renton/1/rpm" &&
*entries.AggregateTypes[0] == "SUM"

}),
tc.expectedMaxPages,
tc.expectedMaxResults,
).Return(&iotsitewise.BatchGetAssetPropertyAggregatesOutput{
NextToken: Pointer("some-next-token"),
SuccessEntries: []*iotsitewise.BatchGetAssetPropertyAggregatesSuccessEntry{{
AggregatedValues: []*iotsitewise.AggregatedValue{{
Timestamp: Pointer(time.Date(2021, 2, 1, 16, 27, 0, 0, time.UTC)),
Value: &iotsitewise.Aggregates{Sum: Pointer(1688.6)},
}},
EntryId: aws.String("_amazon_renton_1_rpm"),
}},
}, nil)

srvr := &server.Server{Datasource: mockedDatasource(mockSw).(*sitewise.Datasource)}

sitewise.GetCache = func() *cache.Cache {
return cache.New(cache.DefaultExpiration, cache.NoExpiration)
}

query := &backend.QueryDataRequest{
PluginContext: backend.PluginContext{},
Queries: []backend.DataQuery{
{
RefID: "A",
QueryType: models.QueryTypePropertyAggregate,
TimeRange: timeRange,
JSON: []byte(tc.query),
},
},
}

if tc.isExpression {
query.Headers = map[string]string{"http_X-Grafana-From-Expr": "true"}
}

qdr, err := srvr.HandlePropertyAggregate(context.Background(), query)
require.Nil(t, err)
_, ok := qdr.Responses["A"]
require.True(t, ok)
require.NotNil(t, qdr.Responses["A"].Frames[0])

expectedFrame := data.NewFrame("/amazon/renton/1/rpm",
data.NewField("time", nil, []time.Time{time.Date(2021, 2, 1, 16, 27, 0, 0, time.UTC)}),
data.NewField("sum", nil, []float64{1688.6}),
).SetMeta(&data.FrameMeta{
Custom: models.SitewiseCustomMeta{
NextToken: "some-next-token",
Resolution: "1m",
Aggregates: []string{models.AggregateSum},
},
})
if diff := cmp.Diff(expectedFrame, qdr.Responses["A"].Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}

mockSw.AssertExpectations(t)
if tc.expectedDescribeTimeSeriesWithContextArgs != nil {
mockSw.AssertCalled(t,
"DescribeTimeSeriesWithContext",
mock.Anything,
tc.expectedDescribeTimeSeriesWithContextArgs,
)
}
mockSw.AssertNotCalled(t, "DescribeAssetPropertyWithContext", mock.Anything, mock.Anything)

})

}

func Pointer[T any](v T) *T { return &v }
Loading

0 comments on commit 3289943

Please sign in to comment.