Skip to content

Commit

Permalink
feat: add getServiceNames api for data observability #290
Browse files Browse the repository at this point in the history
  • Loading branch information
sunface committed Oct 17, 2023
1 parent 2b80cd3 commit 50cdb9b
Show file tree
Hide file tree
Showing 14 changed files with 342 additions and 215 deletions.
15 changes: 14 additions & 1 deletion query/internal/plugins/builtin/observability/api/api.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
package api

import (
ch "github.com/ClickHouse/clickhouse-go/v2"
"github.com/DataObserve/datav/query/pkg/models"
"github.com/gin-gonic/gin"
)

const (
TestDatasourceAPI = "testDatasource"
TestDatasourceAPI = "testDatasource"
GetServiceInfoListAPI = "getServiceInfoList"
GetServiceNamesAPI = "getServiceNames"
)

var APIRoutes = map[string]func(c *gin.Context, ds *models.Datasource, conn ch.Conn) models.PluginResult{
GetServiceInfoListAPI: GetServiceInfoList,
GetServiceNamesAPI: GetServiceNames,
}
84 changes: 84 additions & 0 deletions query/internal/plugins/builtin/observability/api/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package api

import (
"fmt"

ch "github.com/ClickHouse/clickhouse-go/v2"
"github.com/DataObserve/datav/query/pkg/colorlog"
"github.com/DataObserve/datav/query/pkg/models"
"github.com/gin-gonic/gin"
)

var logger = colorlog.RootLogger.New("logger", "observability")

type ServiceNameRes struct {
ServiceName string `ch:"serviceName"`
}

func GetServiceNames(c *gin.Context, ds *models.Datasource, conn ch.Conn) models.PluginResult {
start := c.Query("start")
end := c.Query("end")
query := fmt.Sprintf("SELECT serviceName FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= %s AND timestamp <= %s GROUP BY serviceName", start, end)

var res []ServiceNameRes
err := conn.Select(c.Request.Context(), &res, query)
if err != nil {
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
}

logger.Info("Query service names", "query", query)

columns := []string{"service"}
data := make([][]interface{}, 0)
for _, v := range res {
data = append(data, []interface{}{v.ServiceName})
}

return models.GenPluginResult(models.PluginStatusSuccess, "", models.PluginResultData{
Columns: columns,
Data: data,
})
}

func GetServiceInfoList(c *gin.Context, ds *models.Datasource, conn ch.Conn) models.PluginResult {
fmt.Println("here33333")
// rows, err := conn.Query(c.Request.Context(), query)
// if err != nil {
// colorlog.RootLogger.Info("Error query clickhouse :", "error", err, "ds_id", ds.Id, "query:", query)
// return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
// }
// defer rows.Close()

// columns := rows.Columns()
// columnTypes := rows.ColumnTypes()
// types := make(map[string]string)
// data := make([][]interface{}, 0)
// for rows.Next() {
// v := make([]interface{}, len(columns))
// for i := range v {
// t := columnTypes[i].ScanType()
// v[i] = reflect.New(t).Interface()

// tp := t.String()
// if tp == "time.Time" {
// types[columns[i]] = "time"
// }
// }

// err = rows.Scan(v...)
// if err != nil {
// colorlog.RootLogger.Info("Error scan clickhouse :", "error", err, "ds_id", ds.Id)
// continue
// }

// for i, v0 := range v {
// v1, ok := v0.(*time.Time)
// if ok {
// v[i] = v1.Unix()
// }
// }

// data = append(data, v)
// }
return models.GenPluginResult(models.PluginStatusSuccess, "", nil)
}
10 changes: 0 additions & 10 deletions query/internal/plugins/builtin/observability/api/testDatasource.go

This file was deleted.

57 changes: 9 additions & 48 deletions query/internal/plugins/builtin/observability/observability.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package clickhouse

import (
"reflect"
"sync"
"time"

ch "github.com/ClickHouse/clickhouse-go/v2"
"github.com/DataObserve/datav/query/internal/plugins/builtin/observability/api"
Expand Down Expand Up @@ -41,54 +39,17 @@ func (p *ObservabilityPlugin) Query(c *gin.Context, ds *models.Datasource) model
conns[ds.Id] = conn
connsLock.Unlock()
}

rows, err := conn.Query(c.Request.Context(), query)
if err != nil {
colorlog.RootLogger.Info("Error query clickhouse :", "error", err, "ds_id", ds.Id, "query:", query)
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
}
defer rows.Close()

columns := rows.Columns()
columnTypes := rows.ColumnTypes()
types := make(map[string]string)
data := make([][]interface{}, 0)
for rows.Next() {
v := make([]interface{}, len(columns))
for i := range v {
t := columnTypes[i].ScanType()
v[i] = reflect.New(t).Interface()

tp := t.String()
if tp == "time.Time" {
types[columns[i]] = "time"
}
route, ok := api.APIRoutes[query]
if ok {
res := route(c, ds, conn)
return models.PluginResult{
Status: models.PluginStatusSuccess,
Error: "",
Data: res,
}

err = rows.Scan(v...)
if err != nil {
colorlog.RootLogger.Info("Error scan clickhouse :", "error", err, "ds_id", ds.Id)
continue
}

for i, v0 := range v {
v1, ok := v0.(*time.Time)
if ok {
v[i] = v1.Unix()
}
}

data = append(data, v)
} else {
return models.GenPluginResult(models.PluginStatusError, "api not found", nil)
}

return models.PluginResult{
Status: models.PluginStatusSuccess,
Error: "",
Data: map[string]interface{}{
"columns": columns,
"data": data,
"types": types,
}}
}

func init() {
Expand Down
8 changes: 4 additions & 4 deletions query/internal/plugins/external/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ func (p *ClickHousePlugin) Query(c *gin.Context, ds *models.Datasource) models.P
return models.PluginResult{
Status: models.PluginStatusSuccess,
Error: "",
Data: map[string]interface{}{
"columns": columns,
"data": data,
"types": types,
Data: models.PluginResultData{
Columns: columns,
Data: data,
ColumnTypes: types,
}}
}

Expand Down
13 changes: 13 additions & 0 deletions query/pkg/models/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ type PluginResult struct {
Data interface{} `json:"data,omitempty"`
}

const (
PluginResultFormatTable = "table"
PluginResultFormatMetrics = "metrics"
PluginResultFormatTrace = "traces"
PluginResultFormatLog = "logs"
)

type PluginResultData struct {
Columns []string `json:"columns"`
Data [][]interface{} `json:"data"`
ColumnTypes map[string]string `json:"types,omitempty"`
}

type Plugin interface {
Query(c *gin.Context, ds *Datasource) PluginResult
}
Expand Down
8 changes: 7 additions & 1 deletion ui/src/types/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,11 @@ export interface DatasourcePluginComponents {
export interface QueryPluginResult {
status: "success" | "error"
error: string
data: any
data: QueryPluginData
}

export interface QueryPluginData {
columns: string[]
data: any[][]
types: Record<string,string>
}
132 changes: 131 additions & 1 deletion ui/src/utils/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,138 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { PanelQuery } from "types/dashboard"
import { QueryPluginData } from "types/plugin"
import { FieldType, SeriesData } from "types/seriesData"
import { jsonToEqualPairs1, parseLegendFormat } from "./format"
import { alignTimeSeriesData } from "./seriesData"
import { isEmpty } from "./validate"
import { replaceWithVariables } from "./variable"

export const isPluginDisabled = (p) => {
if (p && p.settings?.disabled) {
return p.settings.disabled()
}
}
}



export const queryPluginDataToTimeSeries = (data: QueryPluginData, query: PanelQuery) => {
const seriesMap: Record<string,SeriesData> = {}
const formats = parseLegendFormat(query.legend)

for (var i=0;i<data.data.length;i++) {
const row = data.data[i]
const labels = {}
let timeValue;
let timeFieldName;
let value;
let valueFieldName;
row.forEach((v, i) => {
const labelName = data.columns[i]
const valueType = data.types[labelName] ?? typeof v as any
if (valueType == FieldType.Time) {
if (!timeValue) {
timeValue = v
timeFieldName = labelName
}
} else if (valueType == FieldType.Number) {
if (!value) {
value = v
valueFieldName = labelName
}
} else {
labels[labelName] = v
}
})

if (!timeFieldName) {
return []
}

let seriesName;
if (isEmpty(labels)) {
seriesName = query.id
} else {
seriesName = jsonToEqualPairs1(labels)
}

const series = seriesMap[seriesName]
if (!series) {
seriesMap[seriesName] = {
queryId: query.id,
name: seriesName,
labels: labels,
fields: [
{
name: timeFieldName,
type: FieldType.Time,
values: [timeValue]
},
{
name: valueFieldName,
type: FieldType.Number,
values: [value]
},
]
}
} else {
series.fields[0].values.push(timeValue)
series.fields[1].values.push(value)
}
}


const res = Object.values(seriesMap)
for (const s of res) {
if (!isEmpty(query.legend)) {
s.name = query.legend
if (!isEmpty(formats)) {
for (const format of formats) {
const l = s.labels[format]
if (l) {
s.name= s.name.replaceAll(`{{${format}}}`, l)
}
}
}
// replace ${xxx} format with corresponding variables
s.name= replaceWithVariables(s.name)
}

}


const seriesList = Object.values(seriesMap)
alignTimeSeriesData(seriesList)

return seriesList
}


export const queryPluginDataToTable= (data: QueryPluginData, query: PanelQuery) => {
const series: SeriesData = {
queryId: query.id,
name: isEmpty(query.legend) ? query.id.toString() : query.legend,
fields: []
}

data.columns.forEach((c,i) => {
series.fields.push({
name: c,
values: []
})
})

data.data.forEach((row,i) => {
row.forEach((v,i) => {
const f = series.fields[i]
if (!f.type && data.types) {
f.type = data.types[f.name] ?? typeof v as any
}
f.values.push(v)
})
})

return [series]
}

Loading

0 comments on commit 50cdb9b

Please sign in to comment.