Skip to content

Commit

Permalink
Add root level if statements
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail authored and lucasoares committed Mar 8, 2024
1 parent 35eee03 commit 22eaa3b
Show file tree
Hide file tree
Showing 10 changed files with 601 additions and 172 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ All notable changes to this project will be documented in this file.

### Added

- Field `credit` added to the `amqp_1` input to specify the maximum number of unacknowledged messages the sender can transmit.
- Field `credit` added to the `amqp_1` input to specify the maximum number of unacknowledged messages the sender can transmit.
- Bloblang now supports root-level `if` statements.

### Changed

- The default value of the `amqp_1.credit` input has changed from `1` to `64`
- The default value of the `amqp_1.credit` input has changed from `1` to `64`.

## 4.25.1 - 2024-03-01

Expand Down
101 changes: 24 additions & 77 deletions internal/bloblang/mapping/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/benthosdev/benthos/v4/internal/value"
)

//------------------------------------------------------------------------------

// Message is an interface type to be given to a query function, it allows the
// function to resolve fields and metadata from a message.
type Message interface {
Expand Down Expand Up @@ -39,34 +37,13 @@ func LineAndColOf(input, clip []rune) (line, col int) {

//------------------------------------------------------------------------------

// Statement describes an isolated mapping statement, where the result of a
// query function is to be mapped according to an Assignment.
type Statement struct {
input []rune
assignment Assignment
query query.Function
}

// NewStatement initialises a new mapping statement from an Assignment and
// query.Function. The input parameter is an optional slice pointing to the
// parsed expression that created the statement.
func NewStatement(input []rune, assignment Assignment, query query.Function) *Statement {
return &Statement{
input: input,
assignment: assignment,
query: query,
}
}

//------------------------------------------------------------------------------

// Executor is a parsed bloblang mapping that can be executed on a Benthos
// message.
type Executor struct {
annotation string
input []rune
maps map[string]query.Function
statements []*Statement
statements []Statement

maxMapStacks int
}
Expand All @@ -77,7 +54,7 @@ const defaultMaxMapStacks = 5000
// and a list of assignments to be executed on each mapping. The input parameter
// is an optional slice pointing to the parsed expression that created the
// executor.
func NewExecutor(annotation string, input []rune, maps map[string]query.Function, statements ...*Statement) *Executor {
func NewExecutor(annotation string, input []rune, maps map[string]query.Function, statements ...Statement) *Executor {
return &Executor{
annotation: annotation,
input: input,
Expand Down Expand Up @@ -179,18 +156,24 @@ func (e *Executor) mapPart(appendTo *message.Part, index int, reference Message)
vars := map[string]any{}

for _, stmt := range e.statements {
res, err := stmt.query.Exec(query.FunctionContext{
err := stmt.Execute(query.FunctionContext{
Maps: e.maps,
Vars: vars,
Index: index,
MsgBatch: reference,
NewMeta: newPart,
NewValue: &newValue,
}.WithValueFunc(lazyValue))
}.WithValueFunc(lazyValue),
AssignmentContext{
Vars: vars,
Meta: newPart,
Value: &newValue,
},
)
if err != nil {
var line int
if len(e.input) > 0 && len(stmt.input) > 0 {
line, _ = LineAndColOf(e.input, stmt.input)
if len(e.input) > 0 && len(stmt.Input()) > 0 {
line, _ = LineAndColOf(e.input, stmt.Input())
}
var ctxErr query.ErrNoContext
if parseErr != nil && errors.As(err, &ctxErr) {
Expand All @@ -202,21 +185,6 @@ func (e *Executor) mapPart(appendTo *message.Part, index int, reference Message)
}
return nil, fmt.Errorf("failed assignment (line %v): %w", line, err)
}
if _, isNothing := res.(value.Nothing); isNothing {
// Skip assignment entirely
continue
}
if err = stmt.assignment.Apply(res, AssignmentContext{
Vars: vars,
Meta: newPart,
Value: &newValue,
}); err != nil {
var line int
if len(e.input) > 0 && len(stmt.input) > 0 {
line, _ = LineAndColOf(e.input, stmt.input)
}
return nil, fmt.Errorf("failed to assign result (line %v): %w", line, err)
}
}

switch newValue.(type) {
Expand Down Expand Up @@ -247,7 +215,7 @@ func (e *Executor) QueryTargets(ctx query.TargetsContext) (query.TargetsContext,

var paths []query.TargetPath
for _, stmt := range e.statements {
_, tmpPaths := stmt.query.QueryTargets(childCtx)
_, tmpPaths := stmt.QueryTargets(childCtx)
paths = append(paths, tmpPaths...)
}

Expand All @@ -259,7 +227,7 @@ func (e *Executor) QueryTargets(ctx query.TargetsContext) (query.TargetsContext,
func (e *Executor) AssignmentTargets() []TargetPath {
var paths []TargetPath
for _, stmt := range e.statements {
paths = append(paths, stmt.assignment.Target())
paths = append(paths, stmt.AssignmentTargets()...)
}
return paths
}
Expand All @@ -275,20 +243,12 @@ func (e *Executor) Exec(ctx query.FunctionContext) (any, error) {
ctx.NewValue = &newObj

for _, stmt := range e.statements {
res, err := stmt.query.Exec(ctx)
if err != nil {
return nil, formatExecErr(err, true, e.input, stmt.input)
}
if _, isNothing := res.(value.Nothing); isNothing {
// Skip assignment entirely
continue
}
if err = stmt.assignment.Apply(res, AssignmentContext{
if err := stmt.Execute(ctx, AssignmentContext{
Vars: ctx.Vars,
// Meta: meta, Prevented for now due to .from(int)
Value: &newObj,
}); err != nil {
return nil, formatExecErr(err, false, e.input, stmt.input)
return nil, formatExecErr(err, e.input, stmt.Input())
}
}

Expand All @@ -298,16 +258,8 @@ func (e *Executor) Exec(ctx query.FunctionContext) (any, error) {
// ExecOnto a provided assignment context.
func (e *Executor) ExecOnto(ctx query.FunctionContext, onto AssignmentContext) error {
for _, stmt := range e.statements {
res, err := stmt.query.Exec(ctx)
if err != nil {
return formatExecErr(err, true, e.input, stmt.input)
}
if _, isNothing := res.(value.Nothing); isNothing {
// Skip assignment entirely
continue
}
if err = stmt.assignment.Apply(res, onto); err != nil {
return formatExecErr(err, false, e.input, stmt.input)
if err := stmt.Execute(ctx, onto); err != nil {
return formatExecErr(err, e.input, stmt.Input())
}
}
return nil
Expand Down Expand Up @@ -336,20 +288,16 @@ func (e *Executor) ToString(ctx query.FunctionContext) (string, error) {
//------------------------------------------------------------------------------

type failedAssignmentErr struct {
line int
onExec bool
err error
line int
err error
}

func (f *failedAssignmentErr) Unwrap() error {
return f.err
}

func (f *failedAssignmentErr) Error() string {
if f.onExec {
return fmt.Sprintf("failed assignment (line %v): %v", f.line, f.err)
}
return fmt.Sprintf("failed to assign result (line %v): %v", f.line, f.err)
return fmt.Sprintf("failed assignment (line %v): %v", f.line, f.err)
}

type errStacks struct {
Expand All @@ -361,7 +309,7 @@ func (e *errStacks) Error() string {
return fmt.Sprintf("entering %v exceeded maximum allowed stacks of %v, this could be due to unbounded recursion", e.annotation, e.maxStacks)
}

func formatExecErr(err error, onExec bool, input, stmtInput []rune) error {
func formatExecErr(err error, input, stmtInput []rune) error {
var u *failedAssignmentErr
if errors.As(err, &u) {
return u
Expand All @@ -378,8 +326,7 @@ func formatExecErr(err error, onExec bool, input, stmtInput []rune) error {
}

return &failedAssignmentErr{
line: line,
onExec: onExec,
err: err,
line: line,
err: err,
}
}
Loading

0 comments on commit 22eaa3b

Please sign in to comment.