Skip to content

Commit

Permalink
closes #2 and closes #3
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Jun 13, 2016
1 parent 7af6608 commit 41faee1
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 27 deletions.
78 changes: 77 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ A sample TOML config file looks like this:
namespace-regex = "^mydb.mycollection$"
namespace-exclude-regex = "^mydb.ignorecollection$"
gtm-channel-size = 200
index-files = true


All options in the config file above also work if passed explicity by the same name to the monstache command
Expand All @@ -61,8 +62,9 @@ The following defaults are used for missing config values:
namespace-regex -> nil
namespace-exclude-regex -> nil
gtm-channel-size -> 100
index-files -> false

When `resume` is true, monstache writes the timestamp of mongodb operations it has succefully synced to elasticsearch
When `resume` is true, monstache writes the timestamp of mongodb operations it has successfully synced to elasticsearch
to the collection `monstache.monstache`. It also reads this value from that collection when it starts in order to replay
events which it might have missed because monstache was stopped. monstache uses the value of `resume-name` as a key when
storing and retrieving timestamps. If `resume` is true but `resume-name` is not supplied this key defaults to `default`.
Expand Down Expand Up @@ -91,6 +93,13 @@ are processed at once a larger channel size may prevent blocking in gtm.
When `mongo-pem-file` is supplied monstache will use the supplied file path to add a local certificate to x509 cert
pool when connecting to mongodb. This should only be used when mongodb is configured with SSL enabled.

When `index-files` is true monstache will index the raw content of files stored in GridFS into elasticsearch as an attachment type.
By default `index-files` is false meaning that monstache will only index metadata associated with files stored in GridFS.
In order for `index-files` to index the raw content of files stored in GridFS you must install a plugin for elasticsearch.
For versions of elasticsearch prior to version 5, you should install the [mapper-attachments](https://www.elastic.co/guide/en/elasticsearch/plugins/2.3/mapper-attachments.html) plugin. In version 5 or greater
of elasticsearch the mapper-attachment plugin is deprecated and you should install the [ingest-attachment](https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest-attachment.html) plugin instead.
For further information on how to configure monstache to index content from grids, see the section [Indexing Gridfs Files](#files).

### Config Syntax ###

For information on the syntax of the mongodb URL see [Standard Connection String Format](https://docs.mongodb.com/v3.0/reference/connection-string/#standard-connection-string-format)
Expand Down Expand Up @@ -164,3 +173,70 @@ use closures to maintain state between invocations of your mapping function.

Finally, since Otto makes it so easy, the venerable [Underscore](http://underscorejs.org/) library is included for you at
no extra charge. Feel free to abuse the power of the `_`.

<a name="files"></a>
### Indexing GridFS Files ###

As of version 1.1 monstache supports indexing the raw content of files stored in GridFS into elasticsearch for full
text search. This feature requires that you install an elasticsearch plugin which enables the field type `attachment`.
For versions of elasticsearch prior to version 5 you should install the
[mapper-attachments](https://www.elastic.co/guide/en/elasticsearch/plugins/2.3/mapper-attachments.html) plugin.
For version 5 or later of elasticsearch you should instead install the
[ingest-attachment](https://www.elastic.co/guide/en/elasticsearch/plugins/master/ingest-attachment.html) plugin.

Once you have installed the appropriate plugin for elasticsearch, getting file content from GridFS into elasticsearch is
as simple as configuring monstache. You will want to enable the `index-files` option and also tell monstache the
namespace of all collections which will hold GridFS files. For example in your TOML config file,

index-files = true

file-namespaces = ["users.fs.files", "posts.fs.files"]

The above configuration tells monstache that you wish to index the raw content of GridFS files in the `users` and `posts`
mongodb databases. By default, mongodb uses a bucket named `fs`, so if you just use the defaults your collection name will
be `fs.files`. However, if you have customized the bucket name, then your file collection would be something like `mybucket.files`
and the entire namespace would be `users.mybucket.files`.

When you configure monstache this way it will perform an additional operation at startup to ensure the destination indexes in
elasticsearch have a field named `filecontent` with a type mapping of `attachment`.

For the example TOML configuration above, monstache would initialize 2 indices in preparation for indexing into
elasticsearch by issuing the following REST commands:

POST /users
{
"mappings": {
"fs.files": {
"properties": {
"filecontent": { "type": "attachment" }
}}}}

POST /posts
{
"mappings": {
"fs.files": {
"properties": {
"filecontent": { "type": "attachment" }
}}}}


When a file is inserted into mongodb via GridFS, monstache will detect the new file, use the mongodb api to retrieve the raw
content, and index a document into elasticsearch with the raw content stored in a `filecontent` field as a base64
encoded string. The elasticsearch plugin will then extract text content from the raw content using
[Apache Tika](https://tika.apache.org/)
, tokenize the text content, and allow you to query on the content of the file.

To test this feature of monstache you can simply use the [mongofiles](https://docs.mongodb.com/manual/reference/program/mongofiles/)
command to quickly add a file to mongodb via GridFS. Continuing the example above one could issue the following command to put a
file named `resume.docx` into GridFS and after a short time this file should be searchable in elasticsearch in the index `users`
under the type `fs.files`.


mongofiles -d users put resume.docx


After a short time you should be able to query the contents of resume.docx in the users index in elasticsearch

curl -XGET 'http://localhost:9200/user/fs.files/_search?q=golang'


137 changes: 111 additions & 26 deletions monstache.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
package main

import (
"bufio"
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"flag"
"fmt"
"github.com/BurntSushi/toml"
elastigo "github.com/mattbaird/elastigo/lib"
"github.com/rwynn/gtm"
"github.com/robertkrimen/otto"
_ "github.com/robertkrimen/otto/underscore"
"github.com/rwynn/gtm"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"io"
"io/ioutil"
"net"
"os"
"os/signal"
"regexp"
"strings"
"syscall"
)

var mapEnvs map[string]*executionEnv
var mapIndexTypes map[string]*indexTypeMapping
var fileNamespaces map[string]bool

var chunksRegex = regexp.MustCompile("\\.chunks$")
var systemsRegex = regexp.MustCompile("system\\..+$")
Expand All @@ -33,50 +39,78 @@ const elasticMaxConnsDefault int = 10
const gtmChannelSizeDefault int = 100

type executionEnv struct {
Vm *otto.Otto
Vm *otto.Otto
Script string
}

type javascript struct {
Namespace string
Script string
Script string
}

type indexTypeMapping struct {
Namespace string
Index string
Type string
Index string
Type string
}

type configOptions struct {
MongoUrl string `toml:"mongo-url"`
MongoPemFile string `toml:"mongo-pem-file"`
MongoPemFile string `toml:"mongo-pem-file"`
ElasticUrl string `toml:"elasticsearch-url"`
ResumeName string `toml:"resume-name"`
NsRegex string `toml:"namespace-regex"`
NsRegex string `toml:"namespace-regex"`
NsExcludeRegex string `toml:"namespace-exclude-regex"`
Resume bool
Replay bool
ElasticMaxConns int `toml:"elasticsearch-max-conns"`
ChannelSize int `toml:"gtm-channel-size"`
IndexFiles bool `toml:"index-files"`
ElasticMaxConns int `toml:"elasticsearch-max-conns"`
ChannelSize int `toml:"gtm-channel-size"`
ConfigFile string
Script []javascript
Mapping []indexTypeMapping
Script []javascript
Mapping []indexTypeMapping
FileNamespaces []string `toml:"file-namespaces"`
}

func EnsureFileMapping(conn *elastigo.Conn, namespace string) (err error) {
parts := strings.SplitN(namespace, ".", 2)
esIndex, esType := parts[0], parts[1]
if m := mapIndexTypes[namespace]; m != nil {
esIndex, esType = m.Index, m.Type
}
if exists, _ := conn.ExistsIndex(esIndex, "", nil); exists {
_, err = conn.DoCommand("PUT", fmt.Sprintf("/%s/%s/_mapping", esIndex, esType), nil, fmt.Sprintf(`
{ "%s": {
"properties": {
"filecontent": { "type": "attachment" }
}}}
`, esType))
} else {
_, err = conn.DoCommand("PUT", fmt.Sprintf("/%s", esIndex), nil, fmt.Sprintf(`
{
"mappings": {
"%s": {
"properties": {
"filecontent": { "type": "attachment" }
}}}}
`, esType))
}
return err
}

func DefaultIndexTypeMapping(op *gtm.Op) *indexTypeMapping {
return &indexTypeMapping{
Namespace: op.Namespace,
Index: op.GetDatabase(),
Type: op.GetCollection(),
Index: op.GetDatabase(),
Type: op.GetCollection(),
}
}

func IndexTypeMapping(op *gtm.Op) *indexTypeMapping {
mapping := DefaultIndexTypeMapping(op)
if mapIndexTypes != nil {
if m := mapIndexTypes[op.Namespace]; m != nil {
mapping = m;
mapping = m
}
}
return mapping
Expand All @@ -101,7 +135,7 @@ func MapData(op *gtm.Op) error {
val, err := env.Vm.Call("module.exports", op.Data, op.Data)
if err != nil {
return err
} else if (!val.IsObject()) {
} else if !val.IsObject() {
return errors.New("exported function must return an object")
}
data, err := val.Export()
Expand All @@ -122,6 +156,24 @@ func PrepareDataForIndexing(data map[string]interface{}) {
delete(data, "_source")
}

func AddFileContent(session *mgo.Session, op *gtm.Op) (err error) {
var buff bytes.Buffer
writer, db, bucket := bufio.NewWriter(&buff), session.DB(op.GetDatabase()), strings.SplitN(op.GetCollection(), ".", 2)[0]
file, err := db.GridFS(bucket).OpenId(op.Data["_id"])
if file != nil {
defer file.Close()
}
if err != nil {
return err
}
if _, err = io.Copy(writer, file); err != nil {
return err
}
writer.Flush()
op.Data["filecontent"] = base64.StdEncoding.EncodeToString(buff.Bytes())
return err
}

func NotMonstache(op *gtm.Op) bool {
return op.GetDatabase() != "monstache"
}
Expand Down Expand Up @@ -165,6 +217,7 @@ func (configuration *configOptions) ParseCommandLineFlags() *configOptions {
flag.StringVar(&configuration.ConfigFile, "f", "", "Location of configuration file")
flag.BoolVar(&configuration.Resume, "resume", false, "True to capture the last timestamp of this run and resume on a subsequent run")
flag.BoolVar(&configuration.Replay, "replay", false, "True to replay all events from the oplog and index them in elasticsearch")
flag.BoolVar(&configuration.IndexFiles, "index-files", false, "True to index gridfs files into elasticsearch. Requires the elasticsearch mapper-attachments (deprecated) or ingest-attachment plugin")
flag.StringVar(&configuration.ResumeName, "resume-name", "", "Name under which to load/store the resume state. Defaults to 'default'")
flag.StringVar(&configuration.NsRegex, "namespace-regex", "", "A regex which is matched against an operation's namespace (<database>.<collection>). Only operations which match are synched to elasticsearch")
flag.StringVar(&configuration.NsRegex, "namespace-exclude-regex", "", "A regex which is matched against an operation's namespace (<database>.<collection>). Only operations which do not match are synched to elasticsearch")
Expand All @@ -179,8 +232,8 @@ func (configuration *configOptions) LoadIndexTypes() {
if m.Namespace != "" && m.Index != "" && m.Type != "" {
mapIndexTypes[m.Namespace] = &indexTypeMapping{
Namespace: m.Namespace,
Index: m.Index,
Type: m.Type,
Index: m.Index,
Type: m.Type,
}
} else {
panic("mappings must specify namespace, index, and type attributes")
Expand All @@ -195,7 +248,7 @@ func (configuration *configOptions) LoadScripts() {
for _, s := range configuration.Script {
if s.Namespace != "" && s.Script != "" {
env := &executionEnv{
Vm: otto.New(),
Vm: otto.New(),
Script: s.Script,
}
if err := env.Vm.Set("module", make(map[string]interface{})); err != nil {
Expand Down Expand Up @@ -240,6 +293,9 @@ func (configuration *configOptions) LoadConfigFile() *configOptions {
if configuration.ChannelSize == 0 {
configuration.ChannelSize = tomlConfig.ChannelSize
}
if !configuration.IndexFiles && tomlConfig.IndexFiles {
configuration.IndexFiles = true
}
if !configuration.Replay && tomlConfig.Replay {
configuration.Replay = true
}
Expand All @@ -255,12 +311,24 @@ func (configuration *configOptions) LoadConfigFile() *configOptions {
if configuration.NsExcludeRegex == "" {
configuration.NsExcludeRegex = tomlConfig.NsExcludeRegex
}
if configuration.IndexFiles {
configuration.FileNamespaces = tomlConfig.FileNamespaces
tomlConfig.LoadGridFsConfig()
}
tomlConfig.LoadScripts()
tomlConfig.LoadIndexTypes()
}
return configuration
}

func (configuration *configOptions) LoadGridFsConfig() *configOptions {
fileNamespaces = make(map[string]bool)
for _, namespace := range configuration.FileNamespaces {
fileNamespaces[namespace] = true
}
return configuration
}

func (configuration *configOptions) SetDefaults() *configOptions {
if configuration.MongoUrl == "" {
configuration.MongoUrl = mongoUrlDefault
Expand All @@ -285,7 +353,7 @@ func (configuration *configOptions) DialMongo() (*mgo.Session, error) {
} else {
return nil, err
}
tlsConfig := &tls.Config{ RootCAs: certs }
tlsConfig := &tls.Config{RootCAs: certs}
dialInfo, err := mgo.ParseURL(configuration.MongoUrl)
if err != nil {
return nil, err
Expand Down Expand Up @@ -356,8 +424,20 @@ func main() {
}
}

if configuration.IndexFiles {
if len(configuration.FileNamespaces) == 0 {
fmt.Println("Configuration error: file indexing is ON but no file namespaces are configured")
os.Exit(1)
}
for _, namespace := range configuration.FileNamespaces {
if err := EnsureFileMapping(elastic, namespace); err != nil {
panic(err)
}
}
}

var filter gtm.OpFilter = nil
filterChain := []gtm.OpFilter{ NotMonstache, NotSystem, NotChunks }
filterChain := []gtm.OpFilter{NotMonstache, NotSystem, NotChunks}
if configuration.NsRegex != "" {
filterChain = append(filterChain, FilterWithRegex(configuration.NsRegex))
}
Expand All @@ -367,8 +447,8 @@ func main() {
filter = gtm.ChainOpFilters(filterChain...)

ops, errs := gtm.Tail(mongo, &gtm.Options{
After: after,
Filter: filter,
After: after,
Filter: filter,
ChannelSize: configuration.ChannelSize,
})
exitStatus := 0
Expand All @@ -380,13 +460,18 @@ func main() {
exitStatus = 1
fmt.Println(err)
case op := <-ops:
indexed := false
objectId := OpIdToString(op)
indexType := IndexTypeMapping(op)
indexed, objectId, indexType := false, OpIdToString(op), IndexTypeMapping(op)
if op.IsDelete() {
indexer.Delete(indexType.Index, indexType.Type, objectId)
indexed = true
} else {
} else if op.Data != nil {
if configuration.IndexFiles {
if fileNamespaces[op.Namespace] {
if err := AddFileContent(mongo, op); err != nil {
errs <- err
}
}
}
PrepareDataForIndexing(op.Data)
if err := MapData(op); err == nil {
if err := indexer.Index(indexType.Index, indexType.Type, objectId, "", "", nil, op.Data); err == nil {
Expand Down

0 comments on commit 41faee1

Please sign in to comment.