Skip to content

Commit

Permalink
Ruby-dig-like for replay uri and query body string (#15)
Browse files Browse the repository at this point in the history
* fix: URI selector with get-in
* feat: query body attr accessor
* feat: get-in replay-for-impact
* feat: deep replay with selectors
  • Loading branch information
dainiusjocas authored Sep 24, 2021
1 parent 757ffcf commit fc84164
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 59 deletions.
113 changes: 113 additions & 0 deletions docs/replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Replay

This document describes group os supported operations that are about replaying queries from a query log.
There are 3 replay operations:

- replay,
- deep-replay,
- replay-for-impact.

The idea behind them all is to take some queries from a query log and re-run those (modified) queries against some Elasticsearch cluster.
This document describes each operation.

## `replay`

```shell
$ print $(ket replay --docs)
"Take some queries from an Elasticsearch cluster (transform them) replay the queries
to another Elasticsearch cluster, and store the responses in yet another Elasticsearch
cluster."
```

Configuration:
```shell
ket replay --defaults | jq
=>
{
"max_docs": 10,
"source": {
"remote": {
"host": "http://localhost:9200"
},
"index": "query_logs_index"
},
"replay": {
"description": "Description of the query replay.",
"uri_attr": "uri",
"repeats": 1,
"concurrency": 1,
"uri-transforms": [],
"query-transforms": [],
"id": "id-of-the-replay",
"connection.url": "http://localhost:9200",
"replay_data_attr": "replay",
"query_attr": "request"
},
"sink": {
"connection.url": "http://localhost:9200",
"dest.index": "replay_sink_index",
"batch.size": 50
}
}
```
Properties under the `replay` key:
- `description`: a plain text description of the replay, will be stored in the sink index;
- `uri_attr`: a key (or path) under which an uri is expected to be found in the `query_log` document.
- `repeats`: how many times to replay the same query from the `query_log`
- `concurrency`: the maximum number of concurrent requests to the `replay` cluster at any given moment.
- `uri-transforms`: a list of changes applied to the `uri` string before constructing request to the `replay` cluster.
- `query-transforms`: a list of scrips that are applied to the query body before constructing request to the `replay`
- `id`: a string identifier of the replay, will be stored in the sink index;
- `connection.url`: what is the hostname of the `replay` cluster.
- `replay_data_attr`: under which attribute to store the replay data in the `sink`.
- `query_attr`: a key (or path) under which the raw query string is expected to be found in the `query_log` document.

"Path" is a list that describes "how to get value from a nested Json".
The "key (or path)" has the same semantics as [Ruby dig method](https://apidock.com/ruby/v2_5_5/Hash/dig).
When only string is provided (i.e. only a key) it is treated as a path with one element, e.g.:
`"uri_attr": "uri"` is equal to ` "uri_attr": ["uri"]`.

`uri-transforms` is a list of string transformations,
where a part of the string matched by a regex under `match`
is replaced by a string under `replacement`, e.g.:

```json
[
{
"match" : "_count\\?",
"replacement" : "_search?size=0&"
}
]
```

`query-transforms` is a list with transformation scripts
to be applied on the raw query Json, e.g.:
```json
[
{
"lang" : "js",
"script" : "(q) => Object.assign(q, {'_source': true})"
}
]
```

## `deep-replay`

```shell
$ printf "$(ket deep-replay --docs)\n"
"From an Elasticsearch cluster takes some queries, replays them
to (another) Elasticsearch cluster with top-k results where k might be very big, like 1M.
Each hit with the metadata is written to a specified Kafka topic.
URIs can be transformed, queries can be transformed."
```

## `replay-for-impact`

```shell
$ printf "$(ket replay-for-impact --docs)\n"
"Fetches baseline query and for a list of query transforms and values, generates variations of the query,
then invokes _rank_eval API for metrics on what is the impact of the query transforms to the ranking.
Impact is defined as 1 minus precision-at-K."
```

This is the trickiest operation
7 changes: 4 additions & 3 deletions src/replay/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
[sink.elasticsearch.index :as es-sink]
[source.elasticsearch :as es]
[replay.transform.uri :as transform-uri]
[replay.transform.query :as transform-query])
[replay.transform.query :as transform-query]
[replay.transform.selector :as selector])
(:import (java.time Instant)
(java.util UUID)))

Expand Down Expand Up @@ -39,10 +40,10 @@
(let [replay-conf (:replay conf)
es-host (:connection.url replay-conf)
transform-fn (transform-query/transform-fn (:query-transforms replay-conf))
original-query-key (keyword (:query_attr replay-conf))]
query-selector (selector/path->selector (:query_attr replay-conf))]
(fn [{source :_source :as input-doc} channel]
(let [endpoint (transform-uri/construct-endpoint source replay-conf)
original-query (get source original-query-key)
^String original-query (get-in source query-selector)
query (transform-fn original-query)
start (System/currentTimeMillis)]
(http/request
Expand Down
65 changes: 32 additions & 33 deletions src/replay/deep.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
[core.async :as async]
[core.json :as json]
[source.elasticsearch :as es]
[sink :as sink])
[sink :as sink]
[replay.transform.selector :as selector]
[replay.transform.uri :as transform-uri])
(:import (java.time Instant)))

(def DEFAULT_DEPTH 1)
(def DEFAULT_PAGE_SIZE 5000)
(def DEFAULT_PAGE_SIZE 3000)

(defn prepare-query [query replay-conf]
(-> query
Expand All @@ -17,50 +19,47 @@
(assoc :explain true)
(assoc :sort ["_score" {:created_at "desc"}])))

(defn additional-data [query-log-attrs query-log-entry-source]
(loop [[attr & attrs] query-log-attrs
acc {}]
(if attr
(recur attrs
(assoc acc (:key attr)
(get-in query-log-entry-source
(selector/path->selector
(:selector attr)))))
acc)))

(defn query-es-afn [conf]
(let [replay-conf (:replay conf)
depth (or (:depth replay-conf) DEFAULT_DEPTH)
query-log-host (-> conf :source :remote :host)
dest-es-host (:connection.url replay-conf)
doc-fetch-strategy (or (:doc-fetch-strategy replay-conf) :search-after-with-pit)]
doc-fetch-strategy (or (:doc-fetch-strategy replay-conf) :search-after-with-pit)
query-selector (selector/path->selector (:query_attr replay-conf))
query-log-attrs (partial additional-data (:query-log-attrs replay-conf))]
(fn [query-log-entry channel]
(let [index-name (or (:target-index replay-conf)
(-> query-log-entry :fields :uri.index first))
query (-> query-log-entry :_source :request json/decode (prepare-query replay-conf))
(let [query-log-entry-source (get query-log-entry :_source)
raw-endpoint (transform-uri/construct-endpoint query-log-entry-source replay-conf)
^String index-name (or (:target-index replay-conf)
(transform-uri/get-index-or-alias raw-endpoint))
^String raw-query (get-in query-log-entry-source query-selector)
query (-> raw-query json/decode (prepare-query replay-conf))
hits (es/fetch {:max_docs depth
:source {:remote {:host dest-es-host}
:index index-name
:query query
:strategy doc-fetch-strategy}})]
(sink/store! (map (fn [resp rank]
{:key (format "%s:%s:%s" (:id replay-conf) (:_id query-log-entry) rank)
:value {:replay_id (:id replay-conf)
:query_log_host query-log-host
:query_log_id (:_id query-log-entry)
:x_user_id (-> query-log-entry
:_source
:request_headers
:x-user-id
first)
:x_search_session_id (-> query-log-entry
:_source
:request_headers
:x-search-session-id
first)
:x_anon_id (-> query-log-entry
:_source
:request_headers
:x-anon-id
first)
:query_body (-> query-log-entry :_source :request)
:query-timestamp (str (Instant/ofEpochMilli
(-> query-log-entry
:_source
:header.timestamp)))
:replay-timestamp (str (Instant/now))
:replay_host dest-es-host
:rank rank
:hit resp}
:value (merge {:replay_id (:id replay-conf)
:query_log_host query-log-host
:query_log_id (:_id query-log-entry)
:replay-timestamp (str (Instant/now))
:replay_host dest-es-host
:rank rank
:hit resp}
(query-log-attrs query-log-entry-source))
:headers {}})
hits (range))
conf)
Expand Down
30 changes: 16 additions & 14 deletions src/replay/impact.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
[sink.elasticsearch.index :as es-sink]
[source.elasticsearch :as es]
[replay.transform.uri :as transform.uri]
[replay.transform.impact :as impact-transform]))
[replay.transform.impact :as impact-transform]
[replay.transform.uri :as transform-uri]
[replay.transform.selector :as selector]))

(set! *warn-on-reflection* true)

; https://www.elastic.co/guide/en/elasticsearch/reference/current/search-rank-eval.html

(defn get-index-or-alias [endpoint]
(last (re-find #"^/(.*)/_search" endpoint)))

(defn prepare-endpoint
"Prepares the endpoint for PIT queries: remove preference, routing, index name."
[^String endpoint]
Expand Down Expand Up @@ -138,17 +137,20 @@
(select-keys (assoc provided-metric-config :k k)
(keys default-metric-config)))}))

(defn measure-impact [opts query-log-entry]
(let [target-es-host (get-in opts [:replay :connection.url])
raw-endpoint (get-in query-log-entry [:_source :uri])
target-index (or (get-in opts [:replay :target-index]) (get-index-or-alias raw-endpoint))
k (get-top-k opts)
query-body (json/decode (get-in query-log-entry [:_source :request]))
metric (get-metric opts)
pit (assoc (pit/init target-es-host target-index opts) :keep_alive "30s")
(defn measure-impact [replay-conf query-log-entry]
(let [target-es-host (get-in replay-conf [:replay :connection.url])
query-log-entry-source (get-in query-log-entry [:_source])
raw-endpoint (transform-uri/construct-endpoint query-log-entry-source (:replay replay-conf))
target-index (or (get-in replay-conf [:replay :target-index])
(transform-uri/get-index-or-alias raw-endpoint))
k (get-top-k replay-conf)
query-selector (selector/path->selector (:query_attr (:replay replay-conf)))
query-body (json/decode (get-in query-log-entry-source query-selector))
metric (get-metric replay-conf)
pit (assoc (pit/init target-es-host target-index replay-conf) :keep_alive "30s")
baseline-ratings-url (format "%s%s" target-es-host (prepare-endpoint raw-endpoint))
baseline-ratings (get-baseline-ratings baseline-ratings-url query-body pit k (get-in opts [:replay :ignore-timeouts]))
grouped-variations (get-grouped-query-variations query-body opts k)
baseline-ratings (get-baseline-ratings baseline-ratings-url query-body pit k (get-in replay-conf [:replay :ignore-timeouts]))
grouped-variations (get-grouped-query-variations query-body replay-conf k)
rank-eval-resp (query-rank-eval-api target-es-host target-index baseline-ratings grouped-variations metric pit)]
(log/infof "RFI metric used: '%s'" metric)
(construct-rfi-records rank-eval-resp query-log-entry grouped-variations baseline-ratings k)))
Expand Down
16 changes: 16 additions & 0 deletions src/replay/transform/selector.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
(ns replay.transform.selector)

(defn path->selector
"path-to-data can be either a String or an array for get-in.
When array then keywordize strings while integers should be preserved
Returns an array that can be used as get-in ks param"
[path-to-data]
(if (string? path-to-data)
[(keyword path-to-data)]
(mapv (fn [selector]
(if (string? selector)
(keyword selector)
selector)) path-to-data)))

(comment
(replay.transform.selector/path->selector ["a" 1 "b"]))
44 changes: 39 additions & 5 deletions src/replay/transform/uri.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns replay.transform.uri
(:require [clojure.string :as str]))
(:require [clojure.string :as str]
[replay.transform.selector :as selector]))

(defn transform-uri [uri transforms]
(defn transform-uri [^String uri transforms]
(reduce (fn [uri {:keys [match replacement]}]
(str/replace uri (re-pattern match) replacement)) uri transforms))

Expand All @@ -11,12 +12,45 @@
[{:match "bar"
:replacement "moo"}]))

(defn construct-endpoint [doc replay-conf]
(defn extract-uri [doc replay-conf]
(let [uri-attr-path (:uri_attr replay-conf)
; uri-attr-path should can be either string of a list of keys to get-in
uri-selector (selector/path->selector uri-attr-path)]
(get-in doc uri-selector)))

(defn construct-endpoint
"Either a hardcoded uri or transformed uri from the original query."
[doc replay-conf]
(or (:uri replay-conf)
(let [uri (get doc (keyword (:uri_attr replay-conf)))]
(let [uri (extract-uri doc replay-conf)]
(transform-uri uri (:uri-transforms replay-conf)))))

(comment
(replay.transform.uri/construct-endpoint
{:uri "/foo/bar/baz"}
{:uri_attr "uri"
:uri-transforms [{:match "bar"
:replacement "XXXX"}]})

(replay.transform.uri/construct-endpoint
{:elasticsearch {:request {:uri "/foo/bar/baz"}}}
{:uri_attr ["elasticsearch" "request" "uri"]
:uri-transforms [{:match "bar"
:replacement "XXXX"}]}))

(defn transform
"Applies string transformations in order on the uri."
[{:keys [uri transforms] :as request}]
[{:keys [^String uri transforms] :as request}]
(assoc request :transformed-uri (transform-uri uri transforms)))

(comment
(replay.transform.uri/transform
{:uri "/foo/bar/baz"
:transforms [{:match "bar"
:replacement "XXXX"}]}))

(defn get-index-or-alias
"Given Elasticsearch uri, extracts index or alias name"
[endpoint]
(or (last (re-find #"^/(.*)/_search" endpoint))
(last (re-find #"^https?://.+(:.+)?/(.*)/_search" endpoint))))
4 changes: 0 additions & 4 deletions test/replay/impact_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
(:require [clojure.test :refer [deftest is testing]]
[replay.impact :as impact]))

(deftest index-name-extraction
(let [uri "/index-name/_search?preference=7c5fe2d7-d313-4362-a62f-4c1e10e999fd"]
(is (= "index-name" (impact/get-index-or-alias uri)))))

(deftest url-transformations
(let [uri "/index-name/_search?preference=7c5fe2d7-d313-4362-a62f-4c1e10e999fd"]
(is (= "/_search" (impact/prepare-endpoint uri))))
Expand Down
22 changes: 22 additions & 0 deletions test/replay/transform/uri_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,26 @@
(let [doc {:foo "/foo/_count?q=elastic"}
replay-conf {:uri_attr "foo"
:uri-transforms [{:match "_count\\?" :replacement "_search?size=0&"}]}]
(is (= "/foo/_search?size=0&q=elastic" (transform-uri/construct-endpoint doc replay-conf)))))
(testing "uri.attr is a path into a nested map"
(let [doc {:bar {:foo "/foo/_count?q=elastic"}}
replay-conf {:uri_attr ["bar" "foo"]
:uri-transforms [{:match "_count\\?" :replacement "_search?size=0&"}]}]
(is (= "/foo/_search?size=0&q=elastic" (transform-uri/construct-endpoint doc replay-conf)))))

(testing "uri.attr is a path into a nested map and array"
(let [doc {:bar [{:no-foo "XXXXXXX"} {:foo "/foo/_count?q=elastic"}]}
replay-conf {:uri_attr ["bar" 1 "foo"]
:uri-transforms [{:match "_count\\?" :replacement "_search?size=0&"}]}]
(is (= "/foo/_search?size=0&q=elastic" (transform-uri/construct-endpoint doc replay-conf))))))

(deftest index-name-extraction
(testing "without host"
(let [uri "/index-name/_search?preference=7c5fe2d7-d313-4362-a62f-4c1e10e999fd"]
(is (= "index-name" (transform-uri/get-index-or-alias uri)))))
(testing "with host"
(let [uri "http://localhost/index-name/_search?preference=7c5fe2d7-d313-4362-a62f-4c1e10e999fd"]
(is (= "index-name" (transform-uri/get-index-or-alias uri)))))
(testing "with host and port"
(let [uri "http://localhost:9200/index-name/_search?preference=7c5fe2d7-d313-4362-a62f-4c1e10e999fd"]
(is (= "index-name" (transform-uri/get-index-or-alias uri))))))

0 comments on commit fc84164

Please sign in to comment.