Skip to content

Commit

Permalink
lexer parser added
Browse files Browse the repository at this point in the history
  • Loading branch information
TusharMohapatra07 committed Nov 24, 2024
1 parent 2f9d9ce commit d9535b1
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 49 deletions.
20 changes: 12 additions & 8 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
inputconfig:
csvsourcefilename: sample.csv
inputmethod: CSV
collection: sampledata
credentialfileaddr: firebaseConfig.json
document: "1"
inputmethod: Firebase
outputconfig:
csvdestinationfilename: samplenew.csv
outputmethod: CSV
cronjob:
cronexpression: "@every 5s" # Every 5 seconds
jobname: "Data Fetch and Process"
task: "process_csv"
collection: abcd
connstring: mongodb://localhost:27017/
database: test1
outputmethod: MongoDB
transformations: |
ADD_FIELD("processed_at", '2004-10-22')
validations: |
FIELD("age") RANGE(30, 35)
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ toolchain go1.22.9

require (
firebase.google.com/go v3.13.0+incompatible
github.com/golang/mock v1.1.1
github.com/jlaffaye/ftp v0.2.0
github.com/manifoldco/promptui v0.9.0
github.com/pkg/sftp v1.13.7
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
64 changes: 25 additions & 39 deletions integrations/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,60 +109,46 @@ func (m MongoDBDestination) SendData(data interface{}, req interfaces.Request) e
}
logger.Infof("Connecting to MongoDB destination...")

// Initialize MongoDB client
clientOptions := options.Client().ApplyURI(req.TargetMongoDBConnString)
client, err := mongo.Connect(context.TODO(), clientOptions)
if err != nil {
return err
return fmt.Errorf("failed to connect to MongoDB: %w", err)
}
defer func() {
if err = client.Disconnect(context.TODO()); err != nil {
logger.Errorf("Error disconnecting MongoDB client: %v", err)
}
}()

collection := client.Database(req.TargetMongoDBDatabase).Collection(req.TargetMongoDBCollection)

// Assert that data is a slice of bson.M
dataSlice, ok := data.([]bson.M)
if !ok {
dataSlice, _ = TransformDataToBSON(data)
// Transform data to BSON
bsonData, err := TransformDataToBSON(data)
if err != nil {
return fmt.Errorf("data transformation failed: %w", err)
}

// Buffered channel for sending documents
dataChannel := make(chan bson.M, bufferSize)
errorChannel := make(chan error, bufferSize)
var wg sync.WaitGroup

// Goroutines for worker pool
for i := 0; i < bufferSize; i++ { // Worker pool
wg.Add(1)
go func() {
defer wg.Done()
for doc := range dataChannel {
if _, err := collection.InsertOne(context.TODO(), doc); err != nil {
logger.Errorf("Error inserting into collection %s: %v", req.TargetMongoDBCollection, err)
errorChannel <- err
} else {
logger.Infof("Data sent to MongoDB target collection %s: %v", req.TargetMongoDBCollection, doc)
}
}
}()
}
// Access database and collection
collection := client.Database(req.TargetMongoDBDatabase).Collection(req.TargetMongoDBCollection)

// Feed data into the channel
go func() {
for _, doc := range dataSlice {
dataChannel <- doc
// Insert data into MongoDB
if len(bsonData) == 1 {
// Insert a single document
_, err = collection.InsertOne(context.TODO(), bsonData[0])
if err != nil {
return fmt.Errorf("failed to insert document: %w", err)
}
close(dataChannel)
}()

wg.Wait()
close(errorChannel)

// Check for errors in the error channel
if len(errorChannel) > 0 {
return errors.New("one or more errors occurred while inserting data")
} else {
// Insert multiple documents
docs := make([]interface{}, len(bsonData))
for i, doc := range bsonData {
docs[i] = doc
}
_, err = collection.InsertMany(context.TODO(), docs)
if err != nil {
return fmt.Errorf("failed to insert documents: %w", err)
}
logger.Infof("Successfully inserted %d documents into MongoDB collection %s", len(bsonData), req.TargetMongoDBCollection)
}

return nil
Expand Down
76 changes: 76 additions & 0 deletions language/lexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package language

import (
"fmt"
"regexp"
"strings"
)

// TokenType represents the type of a token
type TokenType string

const (
TokenField TokenType = "FIELD"
TokenCondition TokenType = "CONDITION"
TokenOperator TokenType = "OPERATOR"
TokenValue TokenType = "VALUE"
TokenLogical TokenType = "LOGICAL"
TokenSeparator TokenType = "SEPARATOR"
TokenTransform TokenType = "TRANSFORM"
TokenInvalid TokenType = "INVALID"
)

// Token represents a single token
type Token struct {
Type TokenType
Value string
}

// Lexer for parsing rules
type Lexer struct {
input string
pos int
}

// NewLexer initializes a lexer with the input string
func NewLexer(input string) *Lexer {
return &Lexer{
input: strings.TrimSpace(input),
pos: 0,
}
}

// Tokenize splits the input into tokens
func (l *Lexer) Tokenize(input string) ([]Token, error) {
var tokens []Token
pos := 0
patterns := map[TokenType]*regexp.Regexp{
TokenField: regexp.MustCompile(`^FIELD\("([^"]+)"\)`), // Match FIELD("field_name")
TokenCondition: regexp.MustCompile(`^(TYPE|RANGE|MATCHES|IN|REQUIRED)`), // Custom conditions
TokenValue: regexp.MustCompile(`^"([^"]*)"|'([^']*)'|[\d\.]+|\([^)]*\)`), // Match strings, numbers, lists
TokenLogical: regexp.MustCompile(`^(AND|OR|NOT)`), // Logical operators
TokenSeparator: regexp.MustCompile(`^,`), // Separators
}

for pos < len(input) {
input = strings.TrimSpace(input[pos:])
pos = 0

matched := false
for tokenType, pattern := range patterns {
if loc := pattern.FindStringIndex(input); loc != nil && loc[0] == 0 {
value := input[loc[0]:loc[1]]
tokens = append(tokens, Token{Type: tokenType, Value: value})
pos += len(value)
matched = true
break
}
}

if !matched {
return nil, fmt.Errorf("unexpected token at: %s", input)
}
}

return tokens, nil
}
62 changes: 62 additions & 0 deletions language/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package language

import (
"errors"
"fmt"
)

// Node represents a node in the Abstract Syntax Tree (AST)
type Node struct {
Type TokenType
Value string
Children []*Node
}

// Parser for validation and transformation rules
type Parser struct{}

// NewParser initializes a parser
func NewParser() *Parser {
return &Parser{}
}

func (p *Parser) ParseRules(tokens []Token) (*Node, error) {
if len(tokens) < 3 {
return nil, errors.New("insufficient parameters")
}

root := &Node{Type: "ROOT", Children: []*Node{}}
var currentField string

for i := 0; i < len(tokens); i++ {
token := tokens[i]

if token.Type == "FIELD" {
// Set the current field and continue to the next token
currentField = token.Value
} else if token.Type == "CONDITION" {
// Ensure there is a following value
if i+1 >= len(tokens) {
return nil, errors.New("expected value after condition")
}

condition := token
value := tokens[i+1] // Next token is the value

node := &Node{Type: "EXPRESSION", Children: []*Node{
{Type: "FIELD", Value: currentField},
{Type: "CONDITION", Value: condition.Value},
{Type: "VALUE", Value: value.Value},
}}

root.Children = append(root.Children, node)

// Move past the value token
i++
} else {
return nil, fmt.Errorf("unexpected token: %s", token.Value)
}
}

return root, nil
}
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,5 +229,8 @@ func mapConfigToRequest(config map[string]interface{}) interfaces.Request {
SFTPPassword: getStringField(config, "password", ""),
WebSocketSourceURL: getStringField(config, "url", ""),
WebSocketDestURL: getStringField(config, "url", ""),
CredentialFileAddr: getStringField(config, "credentialfileaddr", ""),
Collection: getStringField(config, "collection", ""),
Document: getStringField(config, "document", ""),
}
}

0 comments on commit d9535b1

Please sign in to comment.