Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search service backend implementation #2225

Open
wants to merge 79 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
24d4930
Add search dsl skeleton
kjanosz Jun 12, 2017
5ad133f
Add initial implementation
kjanosz Jun 12, 2017
8a36eda
Implement range
kjanosz Jun 13, 2017
3d3fe62
Add basic test
kjanosz Jun 13, 2017
321faa6
Merge remote-tracking branch 'origin/master' into search-service/dsl-…
kjanosz Jun 13, 2017
a39a8b6
Add search-service to builder
kjanosz Jun 13, 2017
6466188
Adjust Makefile
kjanosz Jun 13, 2017
1fff179
Add neq function
kjanosz Jun 13, 2017
6eef884
Move query dsl
kjanosz Jun 13, 2017
ca6c627
Interpret match function
kjanosz Jun 13, 2017
d427ed1
Add simple range function implementation
kjanosz Jun 13, 2017
f9229ac
Apply small changes
kjanosz Jun 14, 2017
88dde11
Add ping endpoint
kjanosz Jun 16, 2017
8ea6b32
Update CI files
kjanosz Jun 16, 2017
5a3418c
Merge pull request #2221 from FoxComm/search-service/ping-endpoint
michalrus Jun 16, 2017
5becf5f
Apply review suggestions
kjanosz Jun 19, 2017
d6e1ce4
Merge pull request #2136 from FoxComm/search-service/dsl-skeleton
michalrus Jun 19, 2017
2041ae2
Move files around
kjanosz Jun 19, 2017
173968b
Rename search service in builder.py
kjanosz Jun 19, 2017
0412240
Merge pull request #2239 from FoxComm/search-service/rename
Black-Phoenix Jun 19, 2017
c5cf633
Add dsl interpreter
kjanosz Jun 20, 2017
eb4f8d3
Fix image name
kjanosz Jun 20, 2017
4633769
Merge pull request #2251 from FoxComm/search-service/docker-fix
michalrus Jun 20, 2017
dc0f94b
Make query interpreter extending function
kjanosz Jun 20, 2017
b1725f4
Add JAVA_OPTS to search service docker image cmd
kjanosz Jun 20, 2017
9f614ba
Apply PR suggestions
kjanosz Jun 20, 2017
e0bf2d2
Merge pull request #2255 from FoxComm/search-service/docker-cmd-fix
eugene-sy Jun 20, 2017
9c778c8
Bring back jackson dependency
kjanosz Jun 20, 2017
e9e0d31
Make QueryInterpreter extend function
kjanosz Jun 21, 2017
7bff2c7
Move query interpreter entirely to dedicated package
kjanosz Jun 21, 2017
dc0f333
Prefix agni endpoints with 'api'
kjanosz Jun 21, 2017
f2a53dc
Merge pull request #2271 from FoxComm/search-service/api-endpoint
Black-Phoenix Jun 21, 2017
1ee9df7
Add raw query function
kjanosz Jun 22, 2017
60b03ca
Add exists function
kjanosz Jun 22, 2017
27e23e1
Check formatting on `make build`
kjanosz Jun 22, 2017
6e6d063
Merge pull request #2279 from FoxComm/search-service/check-formatting
michalrus Jun 22, 2017
b26c25d
Merge pull request #2250 from FoxComm/search-service/interpreter-draft
aafa Jun 22, 2017
538fa97
Add nested query function
kjanosz Jun 22, 2017
0532f25
Merge remote-tracking branch 'origin/master' into feature/search-service
kjanosz Jun 23, 2017
5d0f86d
Add configs for the search service
Jun 14, 2017
d4b912b
Update configs
Jun 14, 2017
efc7cca
Spacing
Jun 14, 2017
00902d8
Add hotfix for loading new rsyslog config
Jun 14, 2017
25e377e
More configs
Jun 15, 2017
f652db8
Add extra configs
Jun 19, 2017
634a7e1
Fix builder.py and vars in tabernacle
Jun 19, 2017
e7b3418
Change nginx proxy
Jun 19, 2017
1380abe
Support nested fields
kjanosz Jun 23, 2017
5f7e0e7
Enforce more purity in QueryInterpreter
kjanosz Jun 26, 2017
555b4cb
Change nginx proxy settings + minor improvements
Jun 26, 2017
1846cd8
Add hotfixes
Jun 26, 2017
b6b192d
Testing buildkite
Jun 26, 2017
3775779
Revert "Testing buildkite"
Jun 26, 2017
8aa7360
Support boost per context
kjanosz Jun 26, 2017
d30d799
Add max depth check for bool query
kjanosz Jun 27, 2017
e0f7e2e
Add translate endpoint
kjanosz Jun 27, 2017
d432337
Simplify spawning local agni instance
aafa Jun 28, 2017
271afa1
PR suggestions addressed
aafa Jun 28, 2017
d6d8f47
Merge pull request #2321 from FoxComm/agni/make-run
kjanosz Jun 28, 2017
38fab80
Fix some query functions
kjanosz Jun 28, 2017
54c553b
Merge pull request #2286 from FoxComm/search-service/query-dsl-improv…
aafa Jun 29, 2017
0958904
Merge branch 'master' into feature/search-service
aafa Jun 29, 2017
cba912e
Merge branch 'master' into agni/tabernacle-configs
Jun 30, 2017
4748a10
Merge branch 'master' into feature/search-service
Jun 30, 2017
3ecdd63
Merge branch 'feature/search-service' into agni/tabernacle-configs
Jun 30, 2017
b3d2d2c
Revert the changes for Makefile
Jun 30, 2017
534bd88
Change cpu and mem
Jun 30, 2017
5a78c93
Set JAVA_OPTS
Jun 30, 2017
740f45d
Assign a random port to agni
Jun 30, 2017
adaa616
Remove port from healthChecks
Jun 30, 2017
6c467f3
Remove another ports field
Jun 30, 2017
e83bddd
Fix the typo
Jun 30, 2017
976241c
wire tables to agni with raw queries
eugene-sy Jun 30, 2017
08eb325
fix lint
eugene-sy Jun 30, 2017
c852c04
Another typo
Jun 30, 2017
3ab111e
Merge pull request #2182 from FoxComm/agni/tabernacle-configs
eugene-sy Jul 3, 2017
8b70e61
use class instead of plain functions
eugene-sy Jul 5, 2017
b59f2db
get rid of posts to ES
eugene-sy Jul 5, 2017
24ba4af
Merge pull request #2326 from FoxComm/feature/wire-ashes-to-agni
Jan 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agni/.env.local.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export SEARCH_SERVER=elasticsearch://10.240.0.18:9300
export PORT=9000
File renamed without changes.
25 changes: 16 additions & 9 deletions search-service/Makefile → agni/Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
include ../makelib
header = $(call baseheader, $(1), search-service)
header = $(call baseheader, $(1), agni)

DOCKER_REPO ?= $(DOCKER_STAGE_REPO)
DOCKER_IMAGE ?= search-service
DOCKER_IMAGE ?= agni
DOCKER_TAG ?= master
SBT_CMD = sbt -DDOCKER_REPO=$(DOCKER_REPO) -DDOCKER_TAG=${DOCKER_TAG}

autoformat-check:
../utils/scalafmt/scalafmt.sh --test

run:
$(call header, Run locally)
sbt '~api/re-start'

clean:
$(call header, Cleaning)
${SBT_CMD} 'clean'
${SBT_CMD} '; clean'

build: fmt-check
$(call header, Building)
Expand All @@ -22,17 +29,17 @@ fmt-check:

test:
$(call header, Testing)
${SBT_CMD} 'test'
${SBT_CMD} '; test'

docker:
$(call header, Dockerizing)
${SBT_CMD} 'api/docker'
${SBT_CMD} '; api/docker'

docker-push:
$(call header, Registering)
${SBT_CMD} 'api/dockerPush'
${SBT_CMD} '; api/dockerPush'

docker-build:
${SBT_CMD} 'api/dockerBuildAndPush'
docker-all:
${SBT_CMD} '; api/dockerBuildAndPush'

.PHONY: clean build fmt fmt-check test docker docker-push docker-build
.PHONY: clean build fmt fmt-check test docker docker-push docker-all
54 changes: 54 additions & 0 deletions agni/api/app/foxcomm/agni/api/Api.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package foxcomm.agni.api

import com.twitter.finagle.Http
import com.twitter.finagle.http.Status
import com.twitter.util.Await
import foxcomm.agni._
import foxcomm.agni.dsl.query._
import foxcomm.agni.interpreter.es.queryInterpreter
import foxcomm.utils.finch._
import io.circe.generic.extras.auto._
import io.finch._
import io.finch.circe._
import monix.execution.Scheduler
import org.elasticsearch.common.ValidationException

object Api extends App {
def endpoints(searchService: SearchService)(implicit s: Scheduler) =
post("api" :: "search" :: "translate" :: jsonBody[SearchPayload.fc]) { (searchPayload: SearchPayload.fc) ⇒
searchService
.translate(searchPayload = searchPayload)
.map(Ok)
.toTwitterFuture
} :+: post(
"api" :: "search" :: string :: string :: param("size")
.as[Int] :: paramOption("from").as[Int] :: jsonBody[SearchPayload]) {
(searchIndex: String, searchType: String, size: Int, from: Option[Int], searchQuery: SearchPayload) ⇒
searchService
.searchFor(searchIndex = searchIndex,
searchType = searchType,
searchPayload = searchQuery,
searchSize = size,
searchFrom = from)
.map(Ok)
.toTwitterFuture
} :+: get("ping") {
Ok("pong")
}

def errorHandler[A]: PartialFunction[Throwable, Output[A]] = {
case ex: ValidationException ⇒ Output.failure(ex, Status.BadRequest)
case ex: Exception ⇒ Output.failure(ex, Status.InternalServerError)
case ex ⇒ Output.failure(new RuntimeException(ex), Status.InternalServerError)
}

implicit val s: Scheduler = Scheduler.global
val config = AppConfig.load()
val svc = SearchService.fromConfig(config, queryInterpreter)

Await.result(
Http.server
.withStreaming(enabled = true)
.serve(s"${config.http.interface}:${config.http.port}",
endpoints(svc).handle(errorHandler).toServiceAs[Application.Json]))
}
14 changes: 5 additions & 9 deletions search-service/build.sbt → agni/build.sbt
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
import sbtassembly.AssemblyKeys.assemblyExcludedJars
import sbtassembly.{MergeStrategy, PathList}

name := "search-service"

version := "0.1-SNAPSHOT"

lazy val core = (project in file("core"))
.settings(Settings.common)
.settings(
libraryDependencies ++= Dependencies.core ++ Dependencies.es ++ Dependencies.circe
libraryDependencies ++= Dependencies.core ++ Dependencies.es ++ Dependencies.circe ++ Dependencies.monix ++ Dependencies.test.core
)

lazy val finch = (project in file("finch"))
.settings(Settings.common)
.settings(
libraryDependencies ++= Dependencies.finch ++ Dependencies.circe :+ Dependencies.jwt
libraryDependencies ++= Dependencies.finch ++ Dependencies.circe ++ Dependencies.jwt ++ Dependencies.monix
)

lazy val api = (project in file("api"))
Expand All @@ -24,8 +19,9 @@ lazy val api = (project in file("api"))
libraryDependencies ++= Dependencies.finch
)
.settings(
mainClass in assembly := Some("foxcomm.search.api.Api"),
assemblyJarName in assembly := "search-service.jar",
Settings.appName := "agni",
mainClass in assembly := Some("foxcomm.agni.api.Api"),
assemblyJarName in assembly := s"${Settings.appName.value}.jar",
assemblyMergeStrategy in assembly := {
case PathList("BUILD") ⇒ MergeStrategy.discard
case PathList("META-INF", "io.netty.versions.properties") ⇒ MergeStrategy.discard
Expand Down
36 changes: 36 additions & 0 deletions agni/core/app/foxcomm/agni/AppConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package foxcomm.agni

import cats.data.NonEmptyList
import com.typesafe.config.ConfigFactory
import java.net.InetSocketAddress
import pureconfig._
import scala.util.Try

final case class AppConfig(http: AppConfig.Http, elasticsearch: AppConfig.ElasticSearch)

@SuppressWarnings(Array("org.wartremover.warts.Equals"))
object AppConfig {
implicit val readHostConfig: ConfigReader[NonEmptyList[InetSocketAddress]] =
ConfigReader.fromNonEmptyStringTry(s ⇒
Try {
val withoutPrefix = s.stripPrefix("elasticsearch://")
val hosts = withoutPrefix.split(',').map { host ⇒
val parts = host.split(':')
require(parts.length == 2,
"ElasticSearch uri must be in format elasticsearch://host:port,host:port,...")
new InetSocketAddress(parts(0), parts(1).toInt)
}
require(hosts.length >= 1, "At least single ElasticSearch host should be specified")
NonEmptyList.fromListUnsafe(hosts.toList)
})

final case class Http(interface: String, port: Int)

final case class ElasticSearch(host: NonEmptyList[InetSocketAddress], cluster: String)

def load(): AppConfig = {
val config =
ConfigFactory.systemProperties.withFallback(ConfigFactory.load())
loadConfigOrThrow[AppConfig](config, "app")
}
}
108 changes: 108 additions & 0 deletions agni/core/app/foxcomm/agni/SearchService.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package foxcomm.agni

import cats.implicits._
import foxcomm.agni.interpreter.es._
import io.circe._
import io.circe.jawn.parseByteBuffer
import monix.eval.{Coeval, Task}
import org.elasticsearch.action.search.{SearchAction, SearchRequestBuilder, SearchResponse}
import org.elasticsearch.client.Client
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.common.xcontent.{ToXContent, XContentFactory}
import org.elasticsearch.index.query.QueryBuilder
import org.elasticsearch.search.SearchHit

@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class SearchService private (client: Client, qi: ESQueryInterpreter) {
import SearchService.ExtractJsonObject

def translate(searchPayload: SearchPayload.fc): Task[Json] = {
def buildJson(qb: QueryBuilder): Coeval[Json] =
Coeval.eval {
val builder = XContentFactory.jsonBuilder()
builder.prettyPrint()
builder.startObject()
builder.field("query")
qb.toXContent(builder, ToXContent.EMPTY_PARAMS)
builder.endObject()
parseByteBuffer(builder.bytes().toChannelBuffer.toByteBuffer)
.fold(Coeval.raiseError(_), Coeval.eval(_))
}.flatten

for {
builder ← qi(searchPayload.query).task
json ← buildJson(builder).task
} yield json
}

def searchFor(searchIndex: String,
searchType: String,
searchPayload: SearchPayload,
searchSize: Int,
searchFrom: Option[Int]): Task[SearchResult] = {
def prepareBuilder: Coeval[SearchRequestBuilder] = Coeval.eval {
val builder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
builder
.setIndices(searchIndex)
.setTypes(searchType)
.setSize(searchSize)
searchFrom.foreach(builder.setFrom)
searchPayload.fields.foreach(fs ⇒ builder.setFetchSource(fs.toList.toArray, Array.empty[String]))
builder
}

def evalQuery(builder: SearchRequestBuilder): Coeval[SearchRequestBuilder] = searchPayload match {
case SearchPayload.es(query, _) ⇒
Coeval.eval(builder.setQuery(Json.fromJsonObject(query).toBytes))
case SearchPayload.fc(query, _) ⇒
qi(query).map(builder.setQuery)
}

def setupBuilder: Task[SearchRequestBuilder] = (prepareBuilder flatMap evalQuery).task

for {
builder ← setupBuilder
request = builder.request()
response ← async[SearchResponse, SearchResult](client.search(request, _))
} yield {
val hits = response.getHits
SearchResult(
result = hits
.hits()
.view
.collect {
case ExtractJsonObject(obj) ⇒ obj
}
.toList,
pagination = SearchPagination(total = hits.totalHits()),
maxScore = hits.getMaxScore
)
}
}
}

object SearchService {
object ExtractJsonObject {
def unapply(hit: SearchHit): Option[JsonObject] =
parseByteBuffer(hit.sourceRef.toChannelBuffer.toByteBuffer).toOption
.flatMap(_.asObject)
}

def apply(client: Client, qi: ESQueryInterpreter): SearchService =
new SearchService(client, qi)

def fromConfig(config: AppConfig, qi: ESQueryInterpreter): SearchService = {
val esConfig = config.elasticsearch
val settings =
Settings.settingsBuilder().put("cluster.name", esConfig.cluster).build()
val client = TransportClient
.builder()
.settings(settings)
.build()
.addTransportAddresses(esConfig.host.toList.map(new InetSocketTransportAddress(_)): _*)

apply(client, qi)
}
}
7 changes: 7 additions & 0 deletions agni/core/app/foxcomm/agni/dsl/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package foxcomm.agni

import io.circe.generic.extras.Configuration

package object dsl {
implicit def configuration: Configuration = foxcomm.agni.configuration
}
Loading