Skip to content
This repository has been archived by the owner on Aug 31, 2022. It is now read-only.
/ gobolt Public archive

Commit

Permalink
Pass new transaction timeout and metadata to seabolt
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Sep 19, 2018
1 parent 5c4c9ef commit 2442af8
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 42 deletions.
2 changes: 1 addition & 1 deletion cmd/go-bolt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func executeQuery() {
}

start = time.Now()
runMsg, err := conn.Run(query, &map[string]interface{}{})
runMsg, err := conn.Run(query, nil, nil, 0, nil)
if err != nil {
panic(err)
}
Expand Down
102 changes: 77 additions & 25 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import "C"
import (
"errors"
"fmt"
"time"
"unsafe"
)

Expand All @@ -38,10 +39,10 @@ type Connection interface {
RemoteAddress() string
Server() string

Begin(bookmarks []string) (RequestHandle, error)
Begin(bookmarks []string, txTimeout time.Duration, txMetadata map[string]interface{}) (RequestHandle, error)
Commit() (RequestHandle, error)
Rollback() (RequestHandle, error)
Run(cypher string, args *map[string]interface{}) (RequestHandle, error)
Run(cypher string, args map[string]interface{}, bookmarks []string, txTimeout time.Duration, txMetadata map[string]interface{}) (RequestHandle, error)
PullAll() (RequestHandle, error)
DiscardAll() (RequestHandle, error)
Reset() (RequestHandle, error)
Expand Down Expand Up @@ -81,16 +82,41 @@ func (connection *neo4jConnection) Server() string {
return C.GoString(server)
}

func (connection *neo4jConnection) Begin(bookmarks []string) (RequestHandle, error) {
bookmarks_list := connection.valueSystem.valueToConnector(bookmarks)
defer C.BoltValue_destroy(bookmarks_list)
func (connection *neo4jConnection) Begin(bookmarks []string, txTimeout time.Duration, txMetadata map[string]interface{}) (RequestHandle, error) {
var res C.int

res := C.BoltConnection_set_begin_tx_bookmark(connection.cInstance, bookmarks_list)
if res!= C.BOLT_SUCCESS {
return -1, newConnectionError(connection, "unable to set bookmarks for BEGIN message")
res = C.BoltConnection_clear_begin(connection.cInstance)
if res != C.BOLT_SUCCESS {
return -1, newConnectionError(connection, "unable to clear BEGIN message")
}

if len(bookmarks) > 0 {
bookmarks_value := connection.valueSystem.valueToConnector(bookmarks)
res := C.BoltConnection_set_begin_bookmarks(connection.cInstance, bookmarks_value)
C.BoltValue_destroy(bookmarks_value)
if res != C.BOLT_SUCCESS {
return -1, newConnectionError(connection, "unable to set bookmarks for BEGIN message")
}
}

if txTimeout > 0 {
timeOut := C.int64_t(txTimeout / time.Millisecond)
res := C.BoltConnection_set_begin_tx_timeout(connection.cInstance, timeOut)
if res != C.BOLT_SUCCESS {
return -1, newConnectionError(connection, "unable to set tx timeout for BEGIN message")
}
}

if len(txMetadata) > 0 {
metadata_value := connection.valueSystem.valueToConnector(txMetadata)
res := C.BoltConnection_set_begin_tx_metadata(connection.cInstance, metadata_value)
C.BoltValue_destroy(metadata_value)
if res != C.BOLT_SUCCESS {
return -1, newConnectionError(connection, "unable to set tx metadata for BEGIN message")
}
}

res = C.BoltConnection_load_begin_tx_request(connection.cInstance)
res = C.BoltConnection_load_begin_request(connection.cInstance)
if res != C.BOLT_SUCCESS {
return -1, newConnectionError(connection, "unable to generate BEGIN message")
}
Expand All @@ -116,35 +142,61 @@ func (connection *neo4jConnection) Rollback() (RequestHandle, error) {
return RequestHandle(C.BoltConnection_last_request(connection.cInstance)), nil
}

func (connection *neo4jConnection) Run(cypher string, params *map[string]interface{}) (RequestHandle, error) {
stmt := C.CString(cypher)
defer C.free(unsafe.Pointer(stmt))
func (connection *neo4jConnection) Run(cypher string, params map[string]interface{}, bookmarks []string, txTimeout time.Duration, txMetadata map[string]interface{}) (RequestHandle, error) {
var res C.int

var actualParams map[string]interface{}
if params == nil {
actualParams = map[string]interface{}(nil)
} else {
actualParams = *params
res = C.BoltConnection_clear_run(connection.cInstance)
if res != C.BOLT_SUCCESS {
return -1, newConnectionError(connection, "unable to clear RUN message")
}

res := C.BoltConnection_set_run_cypher(connection.cInstance, stmt, C.size_t(len(cypher)), C.int32_t(len(actualParams)))
cypherStr := C.CString(cypher)
res = C.BoltConnection_set_run_cypher(connection.cInstance, cypherStr, C.size_t(len(cypher)), C.int32_t(len(params)))
C.free(unsafe.Pointer(cypherStr))
if res != C.BOLT_SUCCESS {
return -1, newConnectionError(connection, "unable to set cypher statement")
}

i := 0
for k, v := range actualParams {
index := C.int32_t(i)
key := C.CString(k)
var index C.int32_t = 0
for paramName, paramValue := range params {
paramNameLen := C.size_t(len(paramName))
paramNameStr := C.CString(paramName)

boltValue := C.BoltConnection_set_run_cypher_parameter(connection.cInstance, index, key, C.size_t(len(k)))
boltValue := C.BoltConnection_set_run_cypher_parameter(connection.cInstance, index, paramNameStr, paramNameLen)
C.free(unsafe.Pointer(paramNameStr))
if boltValue == nil {
return -1, newConnectionError(connection, "unable to get cypher statement parameter value to set")
}

connection.valueSystem.valueAsConnector(boltValue, v)
connection.valueSystem.valueAsConnector(boltValue, paramValue)

index++
}

if len(bookmarks) > 0 {
bookmarks_value := connection.valueSystem.valueToConnector(bookmarks)
res := C.BoltConnection_set_run_bookmarks(connection.cInstance, bookmarks_value)
C.BoltValue_destroy(bookmarks_value)
if res != C.BOLT_SUCCESS {
return -1, newConnectionError(connection, "unable to set bookmarks for RUN message")
}
}

if txTimeout > 0 {
timeOut := C.int64_t(txTimeout / time.Millisecond)
res := C.BoltConnection_set_run_tx_timeout(connection.cInstance, timeOut)
if res != C.BOLT_SUCCESS {
return -1, newConnectionError(connection, "unable to set tx timeout for RUN message")
}
}

i++
if len(txMetadata) > 0 {
metadata_value := connection.valueSystem.valueToConnector(txMetadata)
res := C.BoltConnection_set_run_tx_metadata(connection.cInstance, metadata_value)
C.BoltValue_destroy(metadata_value)
if res != C.BOLT_SUCCESS {
return -1, newConnectionError(connection, "unable to set tx metadata for RUN message")
}
}

res = C.BoltConnection_load_run_request(connection.cInstance)
Expand Down
30 changes: 14 additions & 16 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,37 +164,27 @@ func NewConnector(uri *url.URL, authToken map[string]interface{}, config *Config
transport = C.BOLT_SECURE_SOCKET
}

userAgent := C.CString("Go Driver/1.7")
defer C.free(unsafe.Pointer(userAgent))

userAgentStr := C.CString("Go Driver/1.7")
routingContextValue := valueSystem.valueToConnector(uri.Query())
defer C.BoltValue_destroy(routingContextValue)

hostname, port := C.CString(uri.Hostname()), C.CString(uri.Port())
defer C.free(unsafe.Pointer(hostname))
defer C.free(unsafe.Pointer(port))

address := C.BoltAddress_create(hostname, port)

authTokenBoltValue := valueSystem.valueToConnector(authToken)
defer C.BoltValue_destroy(authTokenBoltValue)
hostnameStr, portStr := C.CString(uri.Hostname()), C.CString(uri.Port())
address := C.BoltAddress_create(hostnameStr, portStr)
authTokenValue := valueSystem.valueToConnector(authToken)

key := startupLibrary()

cLogger := registerLogging(key, config.Log)
cResolver := registerResolver(key, config.AddressResolver)

cConfig := C.struct_BoltConfig{
mode: mode,
transport: transport,
user_agent: userAgent,
user_agent: userAgentStr,
routing_context: routingContextValue,
address_resolver: cResolver,
log: cLogger,
max_pool_size: C.uint(config.MaxPoolSize),
}

cInstance := C.BoltConnector_create(address, authTokenBoltValue, &cConfig)
cInstance := C.BoltConnector_create(address, authTokenValue, &cConfig)
conn := &neo4jConnector{
key: key,
uri: uri,
Expand All @@ -205,6 +195,14 @@ func NewConnector(uri *url.URL, authToken map[string]interface{}, config *Config
cInstance: cInstance,
cLogger: cLogger,
}

// do cleanup
C.free(unsafe.Pointer(userAgentStr))
C.free(unsafe.Pointer(hostnameStr))
C.free(unsafe.Pointer(portStr))
C.BoltValue_destroy(routingContextValue)
C.BoltValue_destroy(authTokenValue)

return conn, nil
}

Expand Down

0 comments on commit 2442af8

Please sign in to comment.