Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neo4j Connector #12

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ A collection of Datastore related Flogo contributions. This repository consists
### Activities
* [couchbase](activity/couchbase): Couchbase
* [mongodb](activity/mongodb): MongoDB
* [neo4j](activity/neo4j): Neo4j

## Contributing and support

Expand Down
70 changes: 70 additions & 0 deletions neo4j/activity/executeCypherQuery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<!--
title: Neo4j Execute Cypher Query
weight: 4622
-->
# Neo4j Execute Cypher Query
This activity allows you to query Neo4j Graph DB using Cypher Query Language

## Installation

### Flogo CLI
```bash
flogo install github.com/project-flogo/datastore-contrib/neo4j/activity/executeCypherQuery
```

## Configuration

### Settings:
| Name | Type | Description
| :--- | :--- | :---
| connection | connection | Choose a Neo4j connection from the drop down - ***REQUIRED***

### Input:

| Name | Type | Description
| :--- | :--- | :---
| cypherQuery | string | The Cypher Query to execute


### Output:

| Name | Type | Description
| :--- | :--- | :---
| output | any | Returns cypher query execution response

## Example


```json
{
"id": "executeCypherQuery_2",
"name": "Neo4j Execute Cypher Query",
"description": "Neo4j Execute Cypher Query activity",
"activity": {
"ref": "#executeCypherQuery",
"input": {
"cypherQuery": "MATCH (n) RETURN n LIMIT 25"
},
"settings": {
"accessMode": "Read",
"databaseName": "neo4j",
"connection": "conn://neo4jcon"
}
}
}

"connections": {
"neo4jcon": {
"ref": "github.com/project-flogo/datastore-contrib/neo4j/connection",
"settings": {
"name": "neo4jcon",
"description": "",
"connectionURI": "bolt://localhost:7687",
"credType": "None",
"username": "",
"password": ""
}
}
}

```
122 changes: 122 additions & 0 deletions neo4j/activity/executeCypherQuery/activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package executeCypherQuery

import (
"github.com/neo4j/neo4j-go-driver/neo4j"
"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data/coerce"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/core/support/log"
)

var logquery = log.ChildLogger(log.RootLogger(), "neo4j-executecyppherquery")

func init() {
err := activity.Register(&Activity{}, New)
if err != nil {
logquery.Errorf("Neo4j Execute Query Activity init error : %s ", err.Error())
}
}

// New functioncommon
func New(ctx activity.InitContext) (activity.Activity, error) {
settings := &Settings{}
err := metadata.MapToStruct(ctx.Settings(), settings, true)
if err != nil {
return nil, err
}
if settings.Connection != "" {

neo4jcon, toConnerr := coerce.ToConnection(settings.Connection)
if toConnerr != nil {
return nil, toConnerr
}
driver := neo4jcon.GetConnection().(neo4j.Driver)
accessMode := neo4j.AccessModeRead
if settings.AccessMode != "Read" {
accessMode = neo4j.AccessModeWrite
}
act := &Activity{driver: driver, accessMode: accessMode, databaseName: settings.DatabaseName}
return act, nil
}
return nil, nil
}

// Activity is a stub for your Activity implementation
type Activity struct {
driver neo4j.Driver
accessMode neo4j.AccessMode
databaseName string
}

var activityMd = activity.ToMetadata(&Input{}, &Output{})

// Metadata implements activity.Activity.Metadata
func (a *Activity) Metadata() *activity.Metadata {
return activityMd
}

//Cleanup method
func (a *Activity) Cleanup() error {
logquery.Debugf("cleaning up Neo4j activity")
return nil
}

type NodeOutput struct {
Id int64
Labels []string
Props map[string]interface{}
}

// Eval implements activity.Activity.Eval
func (a *Activity) Eval(context activity.Context) (done bool, err error) {
logquery.Debugf("Executing neo4j cypher query Activity")

input := &Input{}
err = context.GetInputObject(input)
if err != nil {
return true, nil
}

sessionConfig := neo4j.SessionConfig{AccessMode: a.accessMode, DatabaseName: a.databaseName}
session, err := a.driver.NewSession(sessionConfig)
if err != nil {
logquery.Errorf("===session error==", err)
return false, err
}

result, err := session.Run(input.CypherQuery, input.QueryParams)
if err != nil {
return false, err
}

//nodeList := []NodeOutput{}
nodeList := []interface{}{}
for result.Next() {
keys := result.Record().Keys()
for i, _ := range keys {
record := result.Record().GetByIndex(i)
switch record.(type) {
case neo4j.Node:
node := record.(neo4j.Node)
nodeOutput := NodeOutput{Id: node.Id(),
Labels: node.Labels(),
Props: node.Props(),
}
nodeList = append(nodeList, nodeOutput)
case string:
node := record.(string)
nodeList = append(nodeList, node)
case int64:
node := record.(int64)
nodeList = append(nodeList, node)
case float64:
node := record.(float64)
nodeList = append(nodeList, node)
}
}
}
context.SetOutput("response", nodeList)
session.Close()

return true, nil
}
172 changes: 172 additions & 0 deletions neo4j/activity/executeCypherQuery/activity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package executeCypherQuery

import (
"encoding/json"
"fmt"
"io/ioutil"
"testing"

"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data/mapper"
"github.com/project-flogo/core/data/resolve"
"github.com/project-flogo/core/support"
"github.com/project-flogo/core/support/log"
"github.com/project-flogo/core/support/test"
_ "github.com/project-flogo/datastore-contrib/neo4j/connection"
"github.com/stretchr/testify/assert"
)

var activityMetadata *activity.Metadata

var settingsRead = `{
"settings": {
"connection": {
"id": "e1e890d0-de91-11e9-aef0-13201957902e",
"name": "neo4jcon",
"ref": "github.com/project-flogo/datastore-contrib/neo4j/connection",
"settings": {
"name": "neo4jcon",
"description": "",
"connectionURI": "bolt://localhost:7687",
"credType": "None",
"username": "",
"password": ""
}
},
"databaseName": "neo4j",
"accessMode": "Read"
}
}`

var settingsWrite = `{
"settings": {
"connection": {
"id": "e1e890d0-de91-11e9-aef0-13201957902e",
"name": "neo4jcon",
"ref": "github.com/project-flogo/datastore-contrib/neo4j/connection",
"settings": {
"name": "neo4jcon",
"description": "",
"connectionURI": "bolt://localhost:7687",
"credType": "None",
"username": "",
"password": ""
}
},
"databaseName": "neo4j",
"accessMode": "Write"
}
}`

func getActivityMetadata() *activity.Metadata {

if activityMetadata == nil {
jsonMetadataBytes, err := ioutil.ReadFile("activity.json")
if err != nil {
panic("No Json Metadata found for activity.json path")
}

activityMetadata = activity.ToMetadata(string(jsonMetadataBytes))
}

return activityMetadata
}
func TestMatchQuery(t *testing.T) {
log.RootLogger().Info("****TEST : Executing start****")
m := make(map[string]interface{})
err1 := json.Unmarshal([]byte(settingsRead), &m)
assert.Nil(t, err1)
mf := mapper.NewFactory(resolve.GetBasicResolver())

support.RegisterAlias("connection", "connection", "github.com/project-flogo/datastore-contrib/neo4j/connection")
fmt.Println("=======Settings========", m["settings"])
iCtx := test.NewActivityInitContext(m["settings"], mf)
act, err := New(iCtx)
assert.Nil(t, err)
tc := test.NewActivityContext(act.Metadata())
//tc.SetInput("cypherQuery", "MATCH (n:Movie) RETURN n LIMIT 25")
tc.SetInput("cypherQuery", "MATCH (n) RETURN n LIMIT 25")
//tc.SetInput("cypherQuery", "MATCH (p:Person)-[:ACTED_IN]->(n:Movie) RETURN p LIMIT 25")
_, err = act.Eval(tc)
// Getting outputs
testOutput := tc.GetOutput("response")
jsonOutput, _ := json.Marshal(testOutput)
log.RootLogger().Infof("jsonOutput is : %s", string(jsonOutput))
log.RootLogger().Info("****TEST : Executing ends****")
assert.Nil(t, err)
}

func TestCreateQuery(t *testing.T) {
log.RootLogger().Info("****TEST : Executing start****")
m := make(map[string]interface{})
err1 := json.Unmarshal([]byte(settingsWrite), &m)
assert.Nil(t, err1)
mf := mapper.NewFactory(resolve.GetBasicResolver())

support.RegisterAlias("connection", "connection", "github.com/project-flogo/datastore-contrib/neo4j/connection")
fmt.Println("=======Settings========", m["settings"])
iCtx := test.NewActivityInitContext(m["settings"], mf)
act, err := New(iCtx)
assert.Nil(t, err)
tc := test.NewActivityContext(act.Metadata())
//tc.SetInput("cypherQuery", "MATCH (n:Movie) RETURN n LIMIT 25")
tc.SetInput("cypherQuery", "CREATE (n:Item { id: $id, name: $name }) RETURN n.id, n.name")
tc.SetInput("queryParams", map[string]interface{}{"id": 11, "name": "Neel"})
_, err = act.Eval(tc)
// Getting outputs
testOutput := tc.GetOutput("response")
jsonOutput, _ := json.Marshal(testOutput)
log.RootLogger().Infof("jsonOutput is : %s", string(jsonOutput))
log.RootLogger().Info("****TEST : Executing ends****")
assert.Nil(t, err)
}

func TestUpdateQuery(t *testing.T) {
log.RootLogger().Info("****TEST : Executing start****")
m := make(map[string]interface{})
err1 := json.Unmarshal([]byte(settingsWrite), &m)
assert.Nil(t, err1)
mf := mapper.NewFactory(resolve.GetBasicResolver())

support.RegisterAlias("connection", "connection", "github.com/project-flogo/datastore-contrib/neo4j/connection")
fmt.Println("=======Settings========", m["settings"])
iCtx := test.NewActivityInitContext(m["settings"], mf)
act, err := New(iCtx)
assert.Nil(t, err)
tc := test.NewActivityContext(act.Metadata())
//tc.SetInput("cypherQuery", "MATCH (n:Movie) RETURN n LIMIT 25")
tc.SetInput("cypherQuery", "MATCH (p:Person {name: 'Tom Cruise'}) SET p.born = 2020 RETURN p")
//tc.SetInput("queryParams", map[string]interface{}{"id": 11, "name": "Neel"})
_, err = act.Eval(tc)
// Getting outputs
testOutput := tc.GetOutput("response")
jsonOutput, _ := json.Marshal(testOutput)
log.RootLogger().Infof("jsonOutput is : %s", string(jsonOutput))
log.RootLogger().Info("****TEST : Executing ends****")
assert.Nil(t, err)
}

func TestDeleteQuery(t *testing.T) {
log.RootLogger().Info("****TEST : Executing start****")
m := make(map[string]interface{})
err1 := json.Unmarshal([]byte(settingsWrite), &m)
assert.Nil(t, err1)
mf := mapper.NewFactory(resolve.GetBasicResolver())

support.RegisterAlias("connection", "connection", "github.com/project-flogo/datastore-contrib/neo4j/connection")
fmt.Println("=======Settings========", m["settings"])
iCtx := test.NewActivityInitContext(m["settings"], mf)
act, err := New(iCtx)
assert.Nil(t, err)
tc := test.NewActivityContext(act.Metadata())
//tc.SetInput("cypherQuery", "MATCH (n:Movie) RETURN n LIMIT 25")
tc.SetInput("cypherQuery", "MATCH (p:Person {name: 'Jack Nicholson'}) DETACH DELETE p")
//tc.SetInput("queryParams", map[string]interface{}{"id": 11, "name": "Neel"})
_, err = act.Eval(tc)
// Getting outputs
testOutput := tc.GetOutput("response")
jsonOutput, _ := json.Marshal(testOutput)
log.RootLogger().Infof("jsonOutput is : %s", string(jsonOutput))
log.RootLogger().Info("****TEST : Executing ends****")
assert.Nil(t, err)
}
Loading