From 4f25a91d31c7a5d03234bcffcd38ff088a8f06bc Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Sun, 14 Aug 2016 05:01:38 +0000 Subject: [PATCH] improve logging and add support for the ingest-attachment plugin in es 5 --- README.md | 119 +++++++++++++++++++++++++++++++++--- monstache.go | 169 ++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 238 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index 1f89ffc..11162c4 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,8 @@ A sample TOML config file looks like this: namespace-exclude-regex = "^mydb.ignorecollection$" gtm-channel-size = 200 index-files = true + file-highlighting = true + file-namespaces = ["users.fs.files"] verbose = true All options in the config file above also work if passed explicity by the same name to the monstache command @@ -73,6 +75,8 @@ The following defaults are used for missing config values: namespace-exclude-regex -> nil gtm-channel-size -> 100 index-files -> false + file-highlighting -> false + file-namespaces -> nil verbose -> false When `resume` is true, monstache writes the timestamp of mongodb operations it has successfully synced to elasticsearch @@ -111,6 +115,13 @@ For versions of elasticsearch prior to version 5, you should install the [mapper of elasticsearch the mapper-attachment plugin is deprecated and you should install the [ingest-attachment](https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest-attachment.html) plugin instead. For further information on how to configure monstache to index content from grids, see the section [Indexing Gridfs Files](#files). +The `file-namespaces` config must be set when `index-files` is enabled. `file-namespaces` must be set to an array of mongodb +namespace strings. Files uploaded through gridfs to any of the namespaces in `file-namespaces` will be retrieved and their +raw content indexed into elasticsearch via either the mapper-attachments or ingest-attachment plugin. + +When `file-highlighting` is true monstache will enable the ability to return highlighted keywords in the extracted text of files +for queries on files which were indexed in elasticsearch from gridfs. + When `verbose` is true monstache with enable debug logging including a trace of requests to elasticsearch When `elasticseach-retry-seconds` is greater than 0 a failed request to elasticsearch with retry the request after the given number of seconds @@ -217,23 +228,27 @@ namespace of all collections which will hold GridFS files. For example in your T file-namespaces = ["users.fs.files", "posts.fs.files"] + file-highlighting = true + The above configuration tells monstache that you wish to index the raw content of GridFS files in the `users` and `posts` mongodb databases. By default, mongodb uses a bucket named `fs`, so if you just use the defaults your collection name will be `fs.files`. However, if you have customized the bucket name, then your file collection would be something like `mybucket.files` and the entire namespace would be `users.mybucket.files`. When you configure monstache this way it will perform an additional operation at startup to ensure the destination indexes in -elasticsearch have a field named `filecontent` with a type mapping of `attachment`. +elasticsearch have a field named `file` with a type mapping of `attachment`. For the example TOML configuration above, monstache would initialize 2 indices in preparation for indexing into elasticsearch by issuing the following REST commands: +For elasticsearch versions prior to version 5... + POST /users { "mappings": { "fs.files": { "properties": { - "filecontent": { "type": "attachment" } + "file": { "type": "attachment" } }}}} POST /posts @@ -241,27 +256,115 @@ elasticsearch by issuing the following REST commands: "mappings": { "fs.files": { "properties": { - "filecontent": { "type": "attachment" } + "file": { "type": "attachment" } }}}} +For elasticsearch version 5 and above... + + PUT /_ingest/pipeline/attachment + { + "description" : "Extract file information", + "processors" : [ + { + "attachment" : { + "field" : "file" + } + } + ] + } When a file is inserted into mongodb via GridFS, monstache will detect the new file, use the mongodb api to retrieve the raw -content, and index a document into elasticsearch with the raw content stored in a `filecontent` field as a base64 +content, and index a document into elasticsearch with the raw content stored in a `file` field as a base64 encoded string. The elasticsearch plugin will then extract text content from the raw content using -[Apache Tika](https://tika.apache.org/) -, tokenize the text content, and allow you to query on the content of the file. +[Apache Tika](https://tika.apache.org/), tokenize the text content, and allow you to query on the content of the file. To test this feature of monstache you can simply use the [mongofiles](https://docs.mongodb.com/manual/reference/program/mongofiles/) command to quickly add a file to mongodb via GridFS. Continuing the example above one could issue the following command to put a file named `resume.docx` into GridFS and after a short time this file should be searchable in elasticsearch in the index `users` under the type `fs.files`. - mongofiles -d users put resume.docx - After a short time you should be able to query the contents of resume.docx in the users index in elasticsearch curl -XGET 'http://localhost:9200/users/fs.files/_search?q=golang' +If you would like to see the text extracted by Apache Tika you can project the appropriate sub-field + +For elasticsearch versions prior to version 5... + + curl localhost:9200/users/fs.files/_search?pretty -d '{ + "fields": [ "file.content" ], + "query": { + "match": { + "_all": "golang" + } + } + }' + +For elasticsearch version 5 and above... + + curl localhost:9200/users/fs.files/_search?pretty -d '{ + "_source": [ "attachment.content" ], + "query": { + "match": { + "_all": "golang" + } + } + }' + +When `file-highlighting` is enabled you can add a highlight clause to your query + +For elasticsearch versions prior to version 5... + + curl localhost:9200/users/fs.files/_search?pretty -d '{ + "fields": ["file.content"], + "query": { + "match": { + "file.content": "golang" + } + }, + "highlight": { + "fields": { + "file.content": { + } + } + } + }' + +For elasticsearch version 5 and above... + + curl localhost:9200/users/fs.files/_search?pretty -d '{ + "_source": ["attachment.content"], + "query": { + "match": { + "attachment.content": "golang" + } + }, + "highlight": { + "fields": { + "attachment.content": { + } + } + } + }' + + +The highlight response will contain emphasis on the matching terms + +For elasticsearch versions prior to version 5... + + "hits" : [ { + "highlight" : { + "file.content" : [ "I like to program in golang.\n\n" ] + } + } ] + +For elasticsearch version 5 and above... + + "hits" : [{ + "highlight" : { + "attachment.content" : [ "I like to program in golang." ] + } + }] diff --git a/monstache.go b/monstache.go index aec7ea7..5c68e1e 100644 --- a/monstache.go +++ b/monstache.go @@ -19,16 +19,20 @@ import ( "gopkg.in/mgo.v2/bson" "io" "io/ioutil" + "log" "net" "net/http" "os" "os/signal" "regexp" + "strconv" "strings" "syscall" "time" ) +var infoLog *log.Logger = log.New(os.Stdout, "INFO ", log.Flags()) + var mapEnvs map[string]*executionEnv var mapIndexTypes map[string]*indexTypeMapping var fileNamespaces map[string]bool @@ -69,12 +73,14 @@ type configOptions struct { Resume bool Replay bool IndexFiles bool `toml:"index-files"` + FileHighlighting bool `toml:"file-highlighting"` ElasticMaxConns int `toml:"elasticsearch-max-conns"` ElasticRetrySeconds int `toml:"elasticsearch-retry-seconds"` ElasticMaxDocs int `toml:"elasticsearch-max-docs"` ElasticMaxBytes int `toml:"elasticsearch-max-bytes"` ElasticMaxSeconds int `toml:"elasticsearch-max-seconds"` - ChannelSize int `toml:"gtm-channel-size"` + ElasticMajorVersion int + ChannelSize int `toml:"gtm-channel-size"` ConfigFile string Script []javascript Mapping []indexTypeMapping @@ -91,41 +97,110 @@ func TestElasticSearchConn(conn *elastigo.Conn, configuration *configOptions) (e if err == nil { version := result["version"].(map[string]interface{}) if version == nil { - err = errors.New("ERROR: Unable to determine elasticsearch version") + err = errors.New("Unable to determine elasticsearch version") } else { - number := version["number"] - if number == nil { - err = errors.New("ERROR: Unable to determine elasticsearch version") + number := version["number"].(string) + if number == "" { + err = errors.New("Unable to determine elasticsearch version") } else if configuration.Verbose { - fmt.Println(fmt.Sprintf("Successfully connected to elasticsearch version %s", number)) + infoLog.Printf("Successfully connected to elasticsearch version %s", number) + } + versionParts := strings.Split(number, ".") + if len(versionParts) > 0 { + version, err := strconv.Atoi(versionParts[0]) + if err == nil { + configuration.ElasticMajorVersion = version + } + + } else { + err = errors.New("Unable to parse elasticsearch version") } } } return } -func EnsureFileMapping(conn *elastigo.Conn, namespace string) (err error) { +func IngestAttachment(conn *elastigo.Conn, esIndex string, esType string, esId string, data map[string]interface{}) (err error) { + var body []byte + args := map[string]interface{}{ + "pipeline": "attachment", + } + body, err = json.Marshal(data) + if err == nil { + _, err = conn.DoCommand("PUT", fmt.Sprintf("/%s/%s/%s", esIndex, esType, esId), args, string(body)) + } + return err +} + +func EnsureFileMapping(conn *elastigo.Conn, namespace string, configuration *configOptions) (err error) { + if configuration.ElasticMajorVersion < 5 { + return EnsureFileMappingMapperAttachment(conn, namespace, configuration) + } else { + return EnsureFileMappingIngestAttachment(conn, namespace, configuration) + } +} + +func EnsureFileMappingIngestAttachment(conn *elastigo.Conn, namespace string, configuration *configOptions) (err error) { + var body []byte + pipeline := map[string]interface{}{ + "description": "Extract file information", + "processors": [1]map[string]interface{}{ + map[string]interface{}{ + "attachment": map[string]interface{}{ + "field": "file", + }, + }, + }, + } + body, err = json.Marshal(pipeline) + if err == nil { + _, err = conn.DoCommand("PUT", "/_ingest/pipeline/attachment", nil, string(body)) + } + return err +} + +func EnsureFileMappingMapperAttachment(conn *elastigo.Conn, namespace string, configuration *configOptions) (err error) { + var body []byte parts := strings.SplitN(namespace, ".", 2) esIndex, esType := parts[0], parts[1] if m := mapIndexTypes[namespace]; m != nil { esIndex, esType = m.Index, m.Type } + props := map[string]interface{}{ + "properties": map[string]interface{}{ + "file": map[string]interface{}{ + "type": "attachment", + }, + }, + } + file := props["properties"].(map[string]interface{})["file"].(map[string]interface{}) + types := map[string]interface{}{ + esType: props, + } + mappings := map[string]interface{}{ + "mappings": types, + } + if configuration.FileHighlighting { + file["fields"] = map[string]interface{}{ + "content": map[string]interface{}{ + "type": "string", + "term_vector": "with_positions_offsets", + "store": true, + }, + } + } if exists, _ := conn.ExistsIndex(esIndex, "", nil); exists { - _, err = conn.DoCommand("PUT", fmt.Sprintf("/%s/%s/_mapping", esIndex, esType), nil, fmt.Sprintf(` - { "%s": { - "properties": { - "filecontent": { "type": "attachment" } - }}} - `, esType)) + body, err = json.Marshal(types) + if err != nil { + return err + } + _, err = conn.DoCommand("PUT", fmt.Sprintf("/%s/%s/_mapping", esIndex, esType), nil, string(body)) } else { - _, err = conn.DoCommand("PUT", fmt.Sprintf("/%s", esIndex), nil, fmt.Sprintf(` - { - "mappings": { - "%s": { - "properties": { - "filecontent": { "type": "attachment" } - }}}} - `, esType)) + body, err = json.Marshal(mappings) + if err != nil { + return err + } + _, err = conn.DoCommand("PUT", fmt.Sprintf("/%s", esIndex), nil, string(body)) } return err } @@ -202,7 +277,7 @@ func AddFileContent(session *mgo.Session, op *gtm.Op) (err error) { return err } writer.Flush() - op.Data["filecontent"] = base64.StdEncoding.EncodeToString(buff.Bytes()) + op.Data["file"] = base64.StdEncoding.EncodeToString(buff.Bytes()) return err } @@ -256,6 +331,7 @@ func (configuration *configOptions) ParseCommandLineFlags() *configOptions { flag.BoolVar(&configuration.Resume, "resume", false, "True to capture the last timestamp of this run and resume on a subsequent run") flag.BoolVar(&configuration.Replay, "replay", false, "True to replay all events from the oplog and index them in elasticsearch") flag.BoolVar(&configuration.IndexFiles, "index-files", false, "True to index gridfs files into elasticsearch. Requires the elasticsearch mapper-attachments (deprecated) or ingest-attachment plugin") + flag.BoolVar(&configuration.FileHighlighting, "file-highlighting", false, "True to enable the ability to highlight search times for a file query") flag.StringVar(&configuration.ResumeName, "resume-name", "", "Name under which to load/store the resume state. Defaults to 'default'") flag.StringVar(&configuration.NsRegex, "namespace-regex", "", "A regex which is matched against an operation's namespace (.). Only operations which match are synched to elasticsearch") flag.StringVar(&configuration.NsRegex, "namespace-exclude-regex", "", "A regex which is matched against an operation's namespace (.). Only operations which do not match are synched to elasticsearch") @@ -352,6 +428,9 @@ func (configuration *configOptions) LoadConfigFile() *configOptions { if !configuration.IndexFiles && tomlConfig.IndexFiles { configuration.IndexFiles = true } + if !configuration.FileHighlighting && tomlConfig.FileHighlighting { + configuration.FileHighlighting = true + } if !configuration.Replay && tomlConfig.Replay { configuration.Replay = true } @@ -439,15 +518,14 @@ func (configuration *configOptions) ConfigHttpTransport() error { } func TraceRequest(method, url, body string) { - fmt.Println(fmt.Sprintf("<%s> request sent to <%s>", - method, url)) + infoLog.Printf("%s request sent to %s", method, url) if body != "" { - fmt.Println("request body was ...") - fmt.Println(body) + infoLog.Printf("request body: %s", body) } } func main() { + log.SetPrefix("ERROR ") sigs := make(chan os.Signal, 1) done := make(chan bool, 1) @@ -457,14 +535,11 @@ func main() { configuration.ParseCommandLineFlags().LoadConfigFile().SetDefaults() if err := configuration.ConfigHttpTransport(); err != nil { - fmt.Println("ERROR: Unable to configure HTTP transport") - panic(err) + log.Panicf("Unable to configure HTTP transport: %s", err) } mongo, err := configuration.DialMongo() if err != nil { - fmt.Println(fmt.Sprintf("ERROR: Unable to connect to mongodb using URL <%s>", - configuration.MongoUrl)) - panic(err) + log.Panicf("Unable to connect to mongodb using URL %s: %s", configuration.MongoUrl, err) } defer mongo.Close() mongo.SetMode(mgo.Monotonic, true) @@ -477,9 +552,8 @@ func main() { elastic.RequestTracer = TraceRequest } if err := TestElasticSearchConn(elastic, configuration); err != nil { - fmt.Println(fmt.Sprintf("ERROR: Unable to validate connection to elasticsearch using Protocol ( %s ) Host ( %s ) Port ( %s ) Username ( %s ) Password ( %s )", - elastic.Protocol, elastic.Domain, elastic.Port, elastic.Username, elastic.Password)) - panic(err) + log.Panicf("Unable to validate connection to elasticsearch using %s://%s:%s: %s", + elastic.Protocol, elastic.Domain, elastic.Port, err) } indexer := elastic.NewBulkIndexerErrors(configuration.ElasticMaxConns, configuration.ElasticRetrySeconds) if configuration.ElasticMaxDocs != 0 { @@ -527,11 +601,10 @@ func main() { if configuration.IndexFiles { if len(configuration.FileNamespaces) == 0 { - fmt.Println("Configuration error: file indexing is ON but no file namespaces are configured") - os.Exit(1) + log.Fatalln("File indexing is ON but no file namespaces are configured") } for _, namespace := range configuration.FileNamespaces { - if err := EnsureFileMapping(elastic, namespace); err != nil { + if err := EnsureFileMapping(elastic, namespace, configuration); err != nil { panic(err) } } @@ -559,11 +632,11 @@ func main() { os.Exit(exitStatus) case err = <-errs: exitStatus = 1 - fmt.Println(fmt.Sprintf("ERROR: %s", err)) + log.Println(err) case indexErr := <-indexer.ErrorChannel: errs <- indexErr.Err case op := <-ops: - indexed, objectId, indexType := false, OpIdToString(op), IndexTypeMapping(op) + ingestAttachment, indexed, objectId, indexType := false, false, OpIdToString(op), IndexTypeMapping(op) if op.IsDelete() { indexer.Delete(indexType.Index, indexType.Type, objectId) indexed = true @@ -573,14 +646,26 @@ func main() { if err := AddFileContent(mongo, op); err != nil { errs <- err } + if configuration.ElasticMajorVersion >= 5 { + ingestAttachment = true + } } } PrepareDataForIndexing(op.Data) if err := MapData(op); err == nil { - if err := indexer.Index(indexType.Index, indexType.Type, objectId, "", "", nil, op.Data); err == nil { - indexed = true + if ingestAttachment { + if err := IngestAttachment(elastic, indexType.Index, indexType.Type, objectId, op.Data); err == nil { + indexed = true + } else { + errs <- err + } + } else { - errs <- err + if err := indexer.Index(indexType.Index, indexType.Type, objectId, "", "", nil, op.Data); err == nil { + indexed = true + } else { + errs <- err + } } } else { errs <- err