Skip to content

Commit

Permalink
feat: Run SQL queries on backend
Browse files Browse the repository at this point in the history
  • Loading branch information
jrgns committed Oct 29, 2024
1 parent 1f19cd1 commit 175d1fe
Show file tree
Hide file tree
Showing 17 changed files with 249 additions and 31 deletions.
87 changes: 87 additions & 0 deletions pkg/framer/execute_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package framer

import (
"context"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/iotsitewise"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/iot-sitewise-datasource/pkg/framer/fields"
"github.com/grafana/iot-sitewise-datasource/pkg/models"
"github.com/grafana/iot-sitewise-datasource/pkg/sitewise/resource"
)

type Rows []*iotsitewise.Row

type QueryResults iotsitewise.ExecuteQueryOutput

func (a QueryResults) Frames(_ context.Context, _ resource.ResourceProvider) (data.Frames, error) {
length := len(a.Rows)
f := make([]*data.Field, 0)

for _, col := range a.Columns {
f = append(f, fields.DatumField(*col, length))
}

for i, row := range a.Rows {
for j, datum := range row.Data {
if datum.ScalarValue == nil {
backend.Logger.Debug("nil datum")
continue
}

err := SetValue(a.Columns[j], *datum.ScalarValue, f[j], i)
if err != nil {
backend.Logger.Debug("Error setting value", "error", err)
}
}
}

frame := data.NewFrame("", f...)

frame.Meta = &data.FrameMeta{
Custom: models.SitewiseCustomMeta{
NextToken: aws.StringValue(a.NextToken),
},
}

return data.Frames{frame}, nil
}

func SetValue(col *iotsitewise.ColumnInfo, scalarValue string, field *data.Field, index int) error {
typeConverter := map[string]func(string) (interface{}, error){
"BOOLEAN": func(s string) (interface{}, error) {
return strconv.ParseBool(s)
},
"INTEGER": func(s string) (interface{}, error) {
return strconv.ParseInt(s, 10, 64)
},
"STRING": func(s string) (interface{}, error) {
if col.Name != nil && *col.Name == "event_timestamp" {
if t, err := strconv.ParseInt(s, 10, 64); err == nil {
return time.Unix(0, t*int64(time.Nanosecond)), nil
}
}
return s, nil
},
"DOUBLE": func(s string) (interface{}, error) {
return strconv.ParseFloat(s, 64)
},
}

converter, exists := typeConverter[*col.Type.ScalarType]
if !exists {
return nil // or return an error if you want to handle unsupported types
}

value, err := converter(scalarValue)
if err != nil {
return err
}

field.Set(index, value)
return nil
}
4 changes: 3 additions & 1 deletion pkg/framer/fields/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ func FieldTypeForPropertyValue(property *iotsitewise.DescribeAssetPropertyOutput
return data.FieldTypeString
case "STRUCT":
return data.FieldTypeString
default:
case "DOUBLE":
return data.FieldTypeFloat64
default:
return data.FieldTypeString
}
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/framer/list_asset_properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
type AssetProperties iotsitewise.ListAssetPropertiesOutput

type assetPropertySummaryFields struct {
Id *data.Field
Name *data.Field
Id *data.Field
Name *data.Field
}

func (f *assetPropertySummaryFields) fields() data.Fields {
Expand All @@ -27,8 +27,8 @@ func (f *assetPropertySummaryFields) fields() data.Fields {

func newAssetPropertySummaryFields(length int) *assetPropertySummaryFields {
return &assetPropertySummaryFields{
Id: fields.IdField(length),
Name: fields.NameField(length),
Id: fields.IdField(length),
Name: fields.NameField(length),
}
}

Expand All @@ -39,7 +39,7 @@ func (a AssetProperties) Frames(_ context.Context, _ resource.ResourceProvider)

for i, assetProperty := range a.AssetPropertySummaries {
assetPropertyFields.Id.Set(i, *assetProperty.Id)
assetPropertyFields.Name.Set(i, *assetProperty.Path[len(assetProperty.Path) - 1].Name)
assetPropertyFields.Name.Set(i, *assetProperty.Path[len(assetProperty.Path)-1].Name)
}

frame := data.NewFrame("", assetPropertyFields.fields()...)
Expand All @@ -51,4 +51,4 @@ func (a AssetProperties) Frames(_ context.Context, _ resource.ResourceProvider)
}

return data.Frames{frame}, nil
}
}
11 changes: 11 additions & 0 deletions pkg/models/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,14 @@ func GetDescribeAssetModelQuery(dq *backend.DataQuery) (*DescribeAssetModelQuery

return query, nil
}

func GetExecuteQuery(dq *backend.DataQuery) (*ExecuteQuery, error) {
backend.Logger.Debug("Running GetExecuteQuery", "JSON", dq.JSON)
query := &ExecuteQuery{}
if err := json.Unmarshal(dq.JSON, &query); err != nil {
return nil, err
}

query.QueryType = dq.QueryType
return query, nil
}
1 change: 1 addition & 0 deletions pkg/models/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
QueryTypeDescribeAssetModel = "DescribeAssetModel"
QueryTypeListAssetProperties = "ListAssetProperties"
QueryTypeListTimeSeries = "ListTimeSeries"
QueryTypeExecuteQuery = "ExecuteQuery"
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/server/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ type Datasource interface {
HandleListAssociatedAssetsQuery(ctx context.Context, req *backend.QueryDataRequest, query *models.ListAssociatedAssetsQuery) (data.Frames, error)
HandleDescribeAssetModelQuery(ctx context.Context, req *backend.QueryDataRequest, query *models.DescribeAssetModelQuery) (data.Frames, error)
HandleListTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest, query *models.ListTimeSeriesQuery) (data.Frames, error)
HandleExecuteQuery(ctx context.Context, req *backend.QueryDataRequest, query *models.ExecuteQuery) (data.Frames, error)
}
27 changes: 26 additions & 1 deletion pkg/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/iot-sitewise-datasource/pkg/models"
)
Expand Down Expand Up @@ -65,6 +66,10 @@ func (s *Server) HandleDescribeAssetModel(ctx context.Context, req *backend.Quer
return processQueries(ctx, req, s.handleDescribeAssetModelQuery), nil
}

func (s *Server) HandleExecuteQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
return processQueries(ctx, req, s.handleExecuteQuery), nil
}

func (s *Server) handleInterpolatedPropertyValueQuery(ctx context.Context, req *backend.QueryDataRequest, q backend.DataQuery) backend.DataResponse {
query, err := models.GetAssetPropertyValueQuery(&q)
if err != nil {
Expand Down Expand Up @@ -242,7 +247,7 @@ func (s *Server) handleListTimeSeriesQuery(ctx context.Context, req *backend.Que
}

frames, err := s.Datasource.HandleListTimeSeriesQuery(ctx, req, query)

if err != nil {
return DataResponseErrorRequestFailed(err)
}
Expand Down Expand Up @@ -305,3 +310,23 @@ func (s *Server) handleDescribeAssetModelQuery(ctx context.Context, req *backend
Error: nil,
}
}

func (s *Server) handleExecuteQuery(ctx context.Context, req *backend.QueryDataRequest, q backend.DataQuery) backend.DataResponse {
log.DefaultLogger.FromContext(ctx).Debug("Running S.handleExecuteQuery")
query, err := models.GetExecuteQuery(&q)
if err != nil {
backend.Logger.Warn("Error un-marshalling query", "error", err)
return DataResponseErrorUnmarshal(err)
}

frames, err := s.Datasource.HandleExecuteQuery(ctx, req, query)
if err != nil {
backend.Logger.Warn("Error executing query", "error", err)
return DataResponseErrorRequestFailed(err)
}

return backend.DataResponse{
Frames: frames,
Error: nil,
}
}
3 changes: 3 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/grafana/iot-sitewise-datasource/pkg/sitewise"

"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"

"github.com/grafana/iot-sitewise-datasource/pkg/models"

Expand Down Expand Up @@ -64,11 +65,13 @@ func getQueryHandlers(s *Server) *datasource.QueryTypeMux {
mux.HandleFunc(models.QueryTypeDescribeAsset, s.HandleDescribeAsset)
mux.HandleFunc(models.QueryTypeListAssetProperties, s.HandleListAssetProperties)
mux.HandleFunc(models.QueryTypeListTimeSeries, s.HandleListTimeSeries)
mux.HandleFunc(models.QueryTypeExecuteQuery, s.HandleExecuteQuery)

return mux
}

func NewServerInstance(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
log.DefaultLogger.FromContext(ctx).Debug("Creating new Server Instance")
ds, err := sitewise.NewDatasource(ctx, settings)
if err != nil {
return nil, err
Expand Down
34 changes: 34 additions & 0 deletions pkg/sitewise/api/execute_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package api

import (
"context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/iotsitewise"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/iot-sitewise-datasource/pkg/framer"
"github.com/grafana/iot-sitewise-datasource/pkg/models"
"github.com/grafana/iot-sitewise-datasource/pkg/sitewise/client"
)

func ExecuteQuery(ctx context.Context, client client.ExecuteQueryClient, query models.ExecuteQuery) (*framer.QueryResults, error) {
backend.Logger.FromContext(ctx).Debug("Running ExecuteQuery", "query", query.QueryStatement)
input := &iotsitewise.ExecuteQueryInput{
QueryStatement: aws.String(query.QueryStatement),
}
if query.NextToken != "" {
input.NextToken = aws.String(query.NextToken)
}

resp, err := client.ExecuteQueryWithContext(ctx, input)

if err != nil {
return nil, err
}

return &framer.QueryResults{
Rows: resp.Rows,
Columns: resp.Columns,
NextToken: resp.NextToken,
}, nil
}
34 changes: 34 additions & 0 deletions pkg/sitewise/api/execute_query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package api_test

import (
"context"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/iotsitewise"
"github.com/grafana/iot-sitewise-datasource/pkg/models"
"github.com/grafana/iot-sitewise-datasource/pkg/sitewise/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type fakeExecuteQueryClient struct {
assetId string
}

func (f *fakeExecuteQueryClient) ExecuteQueryWithContext(ctx aws.Context, input *iotsitewise.ExecuteQueryInput, opts ...request.Option) (*iotsitewise.ExecuteQueryOutput, error) {
retVal := iotsitewise.ExecuteQueryOutput{NextToken: aws.String("bar")} // Fixme
return &retVal, nil
}

func TestExecuteQuery(t *testing.T) {
client := fakeExecuteQueryClient{}
query := models.ExecuteQuery{
BaseQuery: models.BaseQuery{AssetIds: []string{"foo"}},
}
framer, err := api.ExecuteQuery(context.Background(), &client, query)
require.NoError(t, err)
assert.Equal(t, "foo", client.assetId)
assert.Equal(t, "bar", *framer.NextToken)
}
20 changes: 20 additions & 0 deletions pkg/sitewise/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go/service/iotsitewise"
"github.com/aws/aws-sdk-go/service/iotsitewise/iotsitewiseiface"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/iot-sitewise-datasource/pkg/models"
)

Expand All @@ -34,6 +35,10 @@ type ListAssetPropertiesClient interface {
ListAssetPropertiesWithContext(aws.Context, *iotsitewise.ListAssetPropertiesInput, ...request.Option) (*iotsitewise.ListAssetPropertiesOutput, error)
}

type ExecuteQueryClient interface {
ExecuteQueryWithContext(aws.Context, *iotsitewise.ExecuteQueryInput, ...request.Option) (*iotsitewise.ExecuteQueryOutput, error)
}

type sitewiseClient struct {
iotsitewiseiface.IoTSiteWiseAPI
}
Expand Down Expand Up @@ -303,3 +308,18 @@ func GetClient(region string, settings models.AWSSiteWiseDataSourceSetting, prov
})
return &sitewiseClient{c}, nil
}

func ExecuteQuery(ctx context.Context, client ExecuteQueryClient, query models.ExecuteQuery) (*iotsitewise.ExecuteQueryOutput, error) {
backend.Logger.Error("Execute Query", "query", query.QueryStatement)
input := &iotsitewise.ExecuteQueryInput{
QueryStatement: &query.QueryStatement,
NextToken: &query.NextToken,
}

output, err := client.ExecuteQueryWithContext(ctx, input)
if err != nil {
return nil, err
}

return output, err
}
7 changes: 7 additions & 0 deletions pkg/sitewise/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,10 @@ func (ds *Datasource) HandleListAssetPropertiesQuery(ctx context.Context, req *b
return api.ListAssetProperties(ctx, sw, *query)
})
}

func (ds *Datasource) HandleExecuteQuery(ctx context.Context, req *backend.QueryDataRequest, query *models.ExecuteQuery) (data.Frames, error) {
log.DefaultLogger.FromContext(ctx).Debug("Running DS.HandleExecuteQuery")
return ds.invoke(ctx, req, &query.BaseQuery, func(ctx context.Context, sw client.SitewiseClient) (framer.Framer, error) {
return api.ExecuteQuery(ctx, sw, *query)
})
}
4 changes: 1 addition & 3 deletions src/SitewiseQueryEditor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export function SitewiseQueryEditor(props: Props) {
});

if (newEditorMode === QueryEditorMode.Code) {
query.queryType = QueryType.SQL;
query.queryType = QueryType.ExecuteQuery;
}
changeEditorMode(query, newEditorMode, onChange);
},
Expand Down Expand Up @@ -64,9 +64,7 @@ export function SitewiseQueryEditor(props: Props) {
<QueryEditorHeader<DataSource, SitewiseQuery, SitewiseOptions>
{...props}
enableRunButton
showAsyncQueryButtons
extraHeaderElementRight={<QueryEditorModeToggle mode={editorMode!} onChange={onEditorModeChange} />}
// cancel={props.datasource.cancel} TODO: Implement cancel
/>
<Space v={0.5} />
<EditorRows>
Expand Down
Loading

0 comments on commit 175d1fe

Please sign in to comment.