Skip to content
This repository has been archived by the owner on Aug 31, 2022. It is now read-only.
/ gobolt Public archive

Commit

Permalink
Apply changes related to seabolt's encapsulation
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Oct 25, 2018
1 parent 3c94e8f commit 0a4bbb3
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 149 deletions.
60 changes: 7 additions & 53 deletions cmd/go-bolt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import (
"errors"
"flag"
"fmt"
"os"
"time"

"github.com/neo4j-drivers/gobolt"
"net/url"
"os"
"strings"
)

Expand All @@ -39,7 +37,6 @@ var (
query string
mode string
debug bool
stats bool
)

func executeQuery() {
Expand All @@ -50,7 +47,6 @@ func executeQuery() {

logger := simpleLogger(logLevelDebug, os.Stderr)

start := time.Now()
connector, err := gobolt.NewConnector(parsedURI, map[string]interface{}{
"scheme": "basic",
"principal": username,
Expand All @@ -60,68 +56,38 @@ func executeQuery() {
panic(err)
}
defer connector.Close()
elapsed := time.Since(start)
if stats {
logger.Infof("NewConnector took %s", elapsed)
}

accessMode := gobolt.AccessModeWrite
if strings.ToLower(mode) == "read" {
accessMode = gobolt.AccessModeRead
}

start = time.Now()
conn, err := connector.Acquire(accessMode)
if err != nil {
panic(err)
}
defer conn.Close()
elapsed = time.Since(start)
if stats {
logger.Infof("Acquire took %s", elapsed)
}

start = time.Now()
runMsg, err := conn.Run(query, nil, nil, 0, nil)
if err != nil {
panic(err)
}
elapsed = time.Since(start)
if stats {
logger.Infof("Run took %s", elapsed)
}

start = time.Now()
pullAllMsg, err := conn.PullAll()
if err != nil {
panic(err)
}
elapsed = time.Since(start)
if stats {
logger.Infof("PullAll took %s", elapsed)
}

start = time.Now()
err = conn.Flush()
if err != nil {
panic(err)
}
elapsed = time.Since(start)
if stats {
logger.Infof("Flush took %s", elapsed)
}

start = time.Now()
records, err := conn.FetchSummary(runMsg)
if records != 0 {
panic(errors.New("unexpected summary fetch return"))
}
elapsed = time.Since(start)
if stats {
logger.Infof("FetchSummary took %s", elapsed)
}

start = time.Now()
fields, err := conn.Fields()
if err != nil {
panic(err)
Expand All @@ -135,12 +101,7 @@ func executeQuery() {
fmt.Print(fields[i])
}
fmt.Println()
elapsed = time.Since(start)
if stats {
logger.Infof("Summary processing took %s", elapsed)
}

start = time.Now()
for {
fetch, err := conn.Fetch(pullAllMsg)
if err != nil {
Expand All @@ -165,25 +126,19 @@ func executeQuery() {

fmt.Println()
}
elapsed = time.Since(start)
if stats {
logger.Infof("Result processing took %s", elapsed)
}
}

func main() {
flag.Parse()
executeQuery()

if stats {
current, peak, events := gobolt.GetAllocationStats()
current, peak, events := gobolt.GetAllocationStats()

fmt.Fprintf(os.Stderr, "=====================================\n")
fmt.Fprintf(os.Stderr, "current allocation : %d bytes\n", current)
fmt.Fprintf(os.Stderr, "peak allocation : %d bytes\n", peak)
fmt.Fprintf(os.Stderr, "allocation events : %d\n", events)
fmt.Fprintf(os.Stderr, "=====================================\n")
}
fmt.Fprintf(os.Stderr, "=====================================\n")
fmt.Fprintf(os.Stderr, "current allocation : %d bytes\n", current)
fmt.Fprintf(os.Stderr, "peak allocation : %d bytes\n", peak)
fmt.Fprintf(os.Stderr, "allocation events : %d\n", events)
fmt.Fprintf(os.Stderr, "=====================================\n")
}

func init() {
Expand All @@ -195,5 +150,4 @@ func init() {
flag.StringVar(&query, "query", "UNWIND RANGE(1,1000) AS N RETURN N", "cypher query to run")
flag.StringVar(&mode, "mode", "write", "access mode for routing mode (read or write)")
flag.BoolVar(&debug, "debug", true, "whether to use debug logging")
flag.BoolVar(&stats, "stats", true, "whether to dump allocation stats on exit")
}
12 changes: 7 additions & 5 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ func (connection *neo4jConnection) Id() string {
}

func (connection *neo4jConnection) RemoteAddress() string {
connectedAddress := connection.cInstance.resolved_address
connectedAddress := C.BoltConnection_remote_endpoint(connection.cInstance)
if connectedAddress == nil {
return "UNKNOWN"
}

return fmt.Sprintf("%s:%s", C.GoString(connectedAddress.host), C.GoString(connectedAddress.port))
return fmt.Sprintf("%s:%s", C.GoString(C.BoltAddress_host(connectedAddress)), C.GoString(C.BoltAddress_port(connectedAddress)))
}

func (connection *neo4jConnection) Server() string {
Expand Down Expand Up @@ -228,7 +228,9 @@ func (connection *neo4jConnection) DiscardAll() (RequestHandle, error) {
}

func (connection *neo4jConnection) assertReadyState() error {
if connection.cInstance.status != C.BOLT_READY {
cStatus := C.BoltConnection_status(connection.cInstance)

if C.BoltStatus_get_state(cStatus) != C.BOLT_CONNECTION_STATE_READY {
return newError(connection, "unexpected connection state")
}

Expand All @@ -245,7 +247,7 @@ func (connection *neo4jConnection) Flush() error {
}

func (connection *neo4jConnection) Fetch(request RequestHandle) (FetchType, error) {
res := C.BoltConnection_fetch(connection.cInstance, C.bolt_request(request))
res := C.BoltConnection_fetch(connection.cInstance, C.BoltRequest(request))

if err := connection.assertReadyState(); err != nil {
return FetchTypeError, err
Expand All @@ -255,7 +257,7 @@ func (connection *neo4jConnection) Fetch(request RequestHandle) (FetchType, erro
}

func (connection *neo4jConnection) FetchSummary(request RequestHandle) (int, error) {
res := C.BoltConnection_fetch_summary(connection.cInstance, C.bolt_request(request))
res := C.BoltConnection_fetch_summary(connection.cInstance, C.BoltRequest(request))
if res < 0 {
return -1, newError(connection, "unable to fetch summary")
}
Expand Down
88 changes: 40 additions & 48 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ type neo4jConnector struct {
authToken map[string]interface{}
config Config

cAddress *C.struct_BoltAddress
cInstance *C.struct_BoltConnector
cAddress *C.BoltAddress
cInstance *C.BoltConnector
cLogger *C.struct_BoltLog
cResolver *C.struct_BoltAddressResolver

Expand Down Expand Up @@ -122,7 +122,7 @@ func (conn *neo4jConnector) Acquire(mode AccessMode) (Connection, error) {
cMode = C.BOLT_ACCESS_MODE_READ
}

cResult := C.BoltConnector_acquire(conn.cInstance, cMode)
cResult := C.BoltConnector_acquire(conn.cInstance, C.BoltAccessMode(cMode))
if cResult.connection == nil {
codeText := C.GoString(C.BoltError_get_string(cResult.connection_error))
context := C.GoString(cResult.connection_error_ctx)
Expand All @@ -140,9 +140,9 @@ func (conn *neo4jConnector) release(connection *neo4jConnection) error {

// GetAllocationStats returns statistics about seabolt (C) allocations
func GetAllocationStats() (int64, int64, int64) {
current := C.BoltMem_current_allocation()
peak := C.BoltMem_peak_allocation()
events := C.BoltMem_allocation_events()
current := C.BoltStat_memory_allocation_current()
peak := C.BoltStat_memory_allocation_peak()
events := C.BoltStat_memory_allocation_events()

return int64(current), int64(peak), int64(events)
}
Expand All @@ -160,11 +160,10 @@ func NewConnector(uri *url.URL, authToken map[string]interface{}, config *Config
}
}

cTrust := (*C.struct_BoltTrust)(C.malloc(C.sizeof_struct_BoltTrust))
cTrust.certs = nil
cTrust.certs_len = 0
cTrust.skip_verify = 0
cTrust.skip_verify_hostname = 0
cTrust := C.BoltTrust_create()
C.BoltTrust_set_certs(cTrust, nil, 0)
C.BoltTrust_set_skip_verify(cTrust, 0)
C.BoltTrust_set_skip_verify_hostname(cTrust, 0)

certsBuf, err := pemEncodeCerts(config.TLSCertificates)
if err != nil {
Expand All @@ -173,38 +172,34 @@ func NewConnector(uri *url.URL, authToken map[string]interface{}, config *Config

if certsBuf != nil {
certsBytes := certsBuf.String()
cTrust.certs = C.CString(certsBytes)
cTrust.certs_len = C.int32_t(certsBuf.Len())
C.BoltTrust_set_certs(cTrust, C.CString(certsBytes), C.int(certsBuf.Len()))
}

if config.TLSSkipVerify {
cTrust.skip_verify = 1
C.BoltTrust_set_skip_verify(cTrust, 1)
}

if config.TLSSkipVerifyHostname {
cTrust.skip_verify_hostname = 1
C.BoltTrust_set_skip_verify_hostname(cTrust, 1)
}

cSocketOpts := (*C.struct_BoltSocketOptions)(C.malloc(C.sizeof_struct_BoltSocketOptions))
cSocketOpts.connect_timeout = C.int(config.SockConnectTimeout / time.Millisecond)
cSocketOpts.recv_timeout = C.int(config.SockRecvTimeout / time.Millisecond)
cSocketOpts.send_timeout = C.int(config.SockSendTimeout / time.Millisecond)
cSocketOpts.keepalive = 0

if config.SockKeepalive {
cSocketOpts.keepalive = 1
cSocketOpts := C.BoltSocketOptions_create()
C.BoltSocketOptions_set_connect_timeout(cSocketOpts, C.int(config.SockConnectTimeout/time.Millisecond))
C.BoltSocketOptions_set_keep_alive(cSocketOpts, 1)
if !config.SockKeepalive {
C.BoltSocketOptions_set_keep_alive(cSocketOpts, 0)
}

valueSystem := createValueSystem(config)

var mode uint32 = C.BOLT_DIRECT
var mode uint32 = C.BOLT_MODE_DIRECT
if uri.Scheme == "bolt+routing" {
mode = C.BOLT_ROUTING
mode = C.BOLT_MODE_ROUTING
}

var transport uint32 = C.BOLT_SOCKET
var transport uint32 = C.BOLT_TRANSPORT_PLAINTEXT
if config.Encryption {
transport = C.BOLT_SECURE_SOCKET
transport = C.BOLT_TRANSPORT_ENCRYPTED
}

userAgentStr := C.CString("Go Driver/1.7")
Expand All @@ -217,21 +212,21 @@ func NewConnector(uri *url.URL, authToken map[string]interface{}, config *Config

cLogger := registerLogging(key, config.Log)
cResolver := registerResolver(key, config.AddressResolver)
cConfig := C.struct_BoltConfig{
mode: mode,
transport: transport,
trust: cTrust,
user_agent: userAgentStr,
routing_context: routingContextValue,
address_resolver: cResolver,
log: cLogger,
max_pool_size: C.int(config.MaxPoolSize),
max_connection_lifetime: C.int(config.MaxConnLifetime / time.Millisecond),
max_connection_acquire_time: C.int(config.ConnAcquisitionTimeout / time.Millisecond),
sock_opts: cSocketOpts,
}

cInstance := C.BoltConnector_create(address, authTokenValue, &cConfig)
cConfig := C.BoltConfig_create()
C.BoltConfig_set_mode(cConfig, C.BoltMode(mode))
C.BoltConfig_set_transport(cConfig, C.BoltTransport(transport))
C.BoltConfig_set_trust(cConfig, cTrust)
C.BoltConfig_set_user_agent(cConfig, userAgentStr)
C.BoltConfig_set_routing_context(cConfig, routingContextValue)
C.BoltConfig_set_address_resolver(cConfig, cResolver)
C.BoltConfig_set_log(cConfig, cLogger)
C.BoltConfig_set_max_pool_size(cConfig, C.int(config.MaxPoolSize))
C.BoltConfig_set_max_connection_life_time(cConfig, C.int(config.MaxConnLifetime/time.Millisecond))
C.BoltConfig_set_max_connection_acquisition_time(cConfig, C.int(config.ConnAcquisitionTimeout/time.Millisecond))
C.BoltConfig_set_socket_options(cConfig, cSocketOpts)

cInstance := C.BoltConnector_create(address, authTokenValue, cConfig)
conn := &neo4jConnector{
key: key,
uri: uri,
Expand All @@ -249,18 +244,15 @@ func NewConnector(uri *url.URL, authToken map[string]interface{}, config *Config
C.free(unsafe.Pointer(portStr))
C.BoltValue_destroy(routingContextValue)
C.BoltValue_destroy(authTokenValue)

if cTrust.certs != nil {
C.free(unsafe.Pointer(cTrust.certs))
}
C.free(unsafe.Pointer(cTrust))
C.free(unsafe.Pointer(cSocketOpts))
C.BoltTrust_destroy(cTrust)
C.BoltSocketOptions_destroy(cSocketOpts)
C.BoltConfig_destroy(cConfig)

return conn, nil
}

func createValueSystem(config *Config) *boltValueSystem {
valueHandlersBySignature := make(map[int8]ValueHandler, len(config.ValueHandlers))
valueHandlersBySignature := make(map[int16]ValueHandler, len(config.ValueHandlers))
valueHandlersByType := make(map[reflect.Type]ValueHandler, len(config.ValueHandlers))
for _, handler := range config.ValueHandlers {
for _, readSignature := range handler.ReadableStructs() {
Expand Down
12 changes: 8 additions & 4 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ func (failure *defaultGenericError) Error() string {
}

func newError(connection *neo4jConnection, description string) error {
if connection.cInstance.error == C.BOLT_SERVER_FAILURE {
cStatus := C.BoltConnection_status(connection.cInstance)
errorCode := C.BoltStatus_get_error(cStatus)

if errorCode == C.BOLT_SERVER_FAILURE {
failure, err := connection.valueSystem.valueAsDictionary(C.BoltConnection_failure(connection.cInstance))
if err != nil {
return connection.valueSystem.genericErrorFactory("unable to construct database error: %s", err.Error())
Expand Down Expand Up @@ -178,10 +181,11 @@ func newError(connection *neo4jConnection, description string) error {
return connection.valueSystem.databaseErrorFactory(classification, code, message)
}

codeText := C.GoString(C.BoltError_get_string(connection.cInstance.error))
context := C.GoString(connection.cInstance.error_ctx)
state := C.BoltStatus_get_state(cStatus)
errorText := C.GoString(C.BoltError_get_string(errorCode))
context := C.GoString(C.BoltStatus_get_error_context(cStatus))

return connection.valueSystem.connectorErrorFactory(int(connection.cInstance.status), int(connection.cInstance.error), codeText, context, description)
return connection.valueSystem.connectorErrorFactory(int(state), int(errorCode), errorText, context, description)
}

func newGenericError(format string, args ...interface{}) GenericError {
Expand Down
Loading

0 comments on commit 0a4bbb3

Please sign in to comment.