diff --git a/docs/en_US/guide/sources/builtin/http_pull.md b/docs/en_US/guide/sources/builtin/http_pull.md index 59b22750c1..643edae65e 100644 --- a/docs/en_US/guide/sources/builtin/http_pull.md +++ b/docs/en_US/guide/sources/builtin/http_pull.md @@ -133,6 +133,8 @@ Key dynamic properties include: - `PullTime`: The timestamp of the current pull time in int64 format. - `LastPullTime`: The timestamp of the last pull time in int64 format. +- Properties from oAuth: The properties from the oAuth response body. For example, if access request return json + body `{"token": "xxxxxx"}`. Then you can use `{{.token}}` to get the token. For HTTP services that allow time-based filtering, `PullTime` and `LastPullTime` can be harnessed for incremental data pulls. Depending on how the service accepts time parameters: diff --git a/docs/zh_CN/guide/sources/builtin/http_pull.md b/docs/zh_CN/guide/sources/builtin/http_pull.md index eee5f0c7e7..04633939d8 100644 --- a/docs/zh_CN/guide/sources/builtin/http_pull.md +++ b/docs/zh_CN/guide/sources/builtin/http_pull.md @@ -132,6 +132,8 @@ OAuth 2.0 是一个授权协议,让 API 客户端有限度地访问网络服 - `PullTime`:本次拉取的 int64 格式时间戳。 - `LastPullTime`:上次拉取的 int64 格式时间戳。 +- 来自 oAuth 的属性:oAuth 返回体的属性也可以使用。 例如,假设返回体为 `{"token": "xxxxxx"}`,则可通过 `{{.token}}` 访问 + token 。 若目标 HTTP 服务支持过滤开始和结束时间,可以使用这两个属性来实现增量拉取。 diff --git a/internal/io/http/httppull_source.go b/internal/io/http/httppull_source.go index afe703a4a4..3afb760261 100644 --- a/internal/io/http/httppull_source.go +++ b/internal/io/http/httppull_source.go @@ -86,8 +86,17 @@ func (hps *PullSource) doPull(ctx api.StreamContext, rcvTime time.Time, omd5 *st // only update last pull time when there is no error hps.t.PullTime = rcvTime.UnixMilli() } + // Parse body which may contain dynamic time range and tokens, so merge them + var tempProps map[string]any + if hps.tokens != nil { + tempProps = hps.tokens + } else { + tempProps = make(map[string]any) + } + tempProps["LastPullTime"] = hps.t.LastPullTime + tempProps["PullTime"] = hps.t.PullTime // Parse url which may contain dynamic time range - url, err := ctx.ParseTemplate(hps.config.Url, hps.t) + url, err := ctx.ParseTemplate(hps.config.Url, tempProps) if err != nil { return []api.SourceTuple{ &xsql.ErrorSourceTuple{ @@ -104,7 +113,7 @@ func (hps *PullSource) doPull(ctx api.StreamContext, rcvTime time.Time, omd5 *st ctx.GetLogger().Warnf("Refresh HTTP pull token error: %v", err) } } - headers, err := hps.parseHeaders(ctx, hps.tokens) + headers, err := hps.parseHeaders(ctx, tempProps) if err != nil { return []api.SourceTuple{ &xsql.ErrorSourceTuple{ @@ -112,7 +121,7 @@ func (hps *PullSource) doPull(ctx api.StreamContext, rcvTime time.Time, omd5 *st }, } } - body, err := ctx.ParseTemplate(hps.config.Body, hps.t) + body, err := ctx.ParseTemplate(hps.config.Body, tempProps) if err != nil { return []api.SourceTuple{ &xsql.ErrorSourceTuple{ diff --git a/internal/io/http/httppull_source_test.go b/internal/io/http/httppull_source_test.go index 96e0bccea6..9faf9ebf2f 100644 --- a/internal/io/http/httppull_source_test.go +++ b/internal/io/http/httppull_source_test.go @@ -257,6 +257,42 @@ func mockAuthServer() *httptest.Server { jsonOut(w, out) }).Methods(http.MethodPost) + router.HandleFunc("/data6", func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Failed to read request body", http.StatusBadRequest) + return + } + + // Create a Person struct to hold the JSON data + var ddd struct { + Device string `json:"device"` + Token string `json:"token"` + } + + // Unmarshal the JSON data into the Person struct + err = json.Unmarshal(body, &ddd) + if err != nil { + http.Error(w, "Failed to parse JSON", http.StatusBadRequest) + return + } + + if ddd.Token != DefaultToken { + http.Error(w, "invalid token", http.StatusBadRequest) + } + + out := &struct { + DeviceId string `json:"device_id"` + Temperature float64 `json:"temperature"` + Humidity float64 `json:"humidity"` + }{ + DeviceId: "device1", + Temperature: 25.5, + Humidity: 60.0, + } + jsonOut(w, out) + }).Methods(http.MethodPost) + server := httptest.NewUnstartedServer(router) err := server.Listener.Close() if err != nil { @@ -932,6 +968,42 @@ func TestPullWithAuth(t *testing.T) { mock.TestSourceOpen(r, exp, t) } +func TestPullBodyAuth(t *testing.T) { + r := &PullSource{} + server := mockAuthServer() + server.Start() + defer server.Close() + err := r.Configure("data6", map[string]interface{}{ + "method": "POST", + "body": `{"device": "d1", "token": "{{.token}}"}`, + "url": "http://localhost:52345/", + "interval": 100, + "oAuth": map[string]interface{}{ + "access": map[string]interface{}{ + "url": "http://localhost:52345/token", + "body": "{\"username\": \"admin\",\"password\": \"0000\"}", + "expire": "10", + }, + "refresh": map[string]interface{}{ + "url": "http://localhost:52345/refresh", + "headers": map[string]interface{}{ + "Authorization": "Bearer {{.token}}", + "RefreshToken": "{{.refresh_token}}", + }, + }, + }, + }) + if err != nil { + t.Errorf(err.Error()) + return + } + mc := conf.Clock + exp := []api.SourceTuple{ + api.NewDefaultSourceTupleWithTime(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}, mc.Now()), + } + mock.TestSourceOpen(r, exp, t) +} + func TestPullIncremental(t *testing.T) { conf.IsTesting = false conf.InitClock() @@ -1069,9 +1141,7 @@ func TestPullErrorTest(t *testing.T) { name: "wrong body template", conf: map[string]interface{}{"url": "http://localhost:52345/data4", "interval": 10, "body": `{"device": "d1", "start": {{.LastPullTime}}, "end": {{.pullTime}}}`}, exp: []api.SourceTuple{ - &xsql.ErrorSourceTuple{ - Error: errors.New("parse body {\"device\": \"d1\", \"start\": {{.LastPullTime}}, \"end\": {{.pullTime}}} error template: sink:1:54: executing \"sink\" at <.pullTime>: can't evaluate field pullTime in type *http.pullTimeMeta"), - }, + api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "", "humidity": float64(0), "temperature": float64(0)}}, map[string]interface{}{}, time.UnixMilli(143)), }, }, { name: "wrong response",