diff --git a/api/api.go b/api/api.go index 47597fc..794cb77 100644 --- a/api/api.go +++ b/api/api.go @@ -183,10 +183,11 @@ type ( } LogConfig struct { - AccountID string `json:"account_id,omitempty"` - IndirectUpload bool `json:"indirect_upload,omitempty"` // Whether to directly upload via signed link or using log service - URL string `json:"url,omitempty"` - Token string `json:"token,omitempty"` + AccountID string `json:"account_id,omitempty"` + IndirectUpload bool `json:"indirect_upload,omitempty"` // Whether to directly upload via signed link or using log service + URL string `json:"url,omitempty"` + Token string `json:"token,omitempty"` + TrimNewLineSuffix bool `json:"trim_new_line_suffix,omitempty"` } JunitReport struct { diff --git a/handler/setup.go b/handler/setup.go index c478f5e..2165985 100644 --- a/handler/setup.go +++ b/handler/setup.go @@ -74,7 +74,7 @@ func HandleSetup(config *config.Config) http.HandlerFunc { log.Out = os.Stdout } else { client := state.GetLogStreamClient() - wc := livelog.New(client, s.LogKey, id, nil) + wc := livelog.New(client, s.LogKey, id, nil, s.LogConfig.TrimNewLineSuffix) defer func() { if err := wc.Close(); err != nil { logrus.WithError(err).Debugln("failed to close log stream") diff --git a/handler/step.go b/handler/step.go index d365aa8..7d8ddec 100644 --- a/handler/step.go +++ b/handler/step.go @@ -77,7 +77,7 @@ func HandleStartStep(config *config.Config) http.HandlerFunc { ctx := r.Context() logger.FromRequest(r).WithField("stage_id", s.StageRuntimeID). WithField("step_id", s.ID).Traceln("starting step execution") - if err := stageData.StepExecutor.StartStep(ctx, &s, stageData.State.GetSecrets(), stageData.State.GetLogStreamClient(), stageData.State.GetTIConfig()); err != nil { + if err := stageData.StepExecutor.StartStep(ctx, &s, stageData.State.GetSecrets(), stageData.State.GetLogStreamClient(), stageData.State.GetTIConfig(), stageData.State.GetLogConfig()); err != nil { WriteError(w, err) } diff --git a/livelog/livelog.go b/livelog/livelog.go index 7f10e52..95497da 100644 --- a/livelog/livelog.go +++ b/livelog/livelog.go @@ -47,23 +47,25 @@ type Writer struct { history []*logstream.Line prev []byte - closed bool - close chan struct{} - ready chan struct{} + closed bool + trimNewLineSuffix bool + close chan struct{} + ready chan struct{} } // New returns a new writer -func New(client logstream.Client, key, name string, nudges []logstream.Nudge) *Writer { +func New(client logstream.Client, key, name string, nudges []logstream.Nudge, trimNewLineSuffix bool) *Writer { b := &Writer{ - client: client, - key: key, - name: name, - now: time.Now(), - limit: defaultLimit, - interval: defaultInterval, - nudges: nudges, - close: make(chan struct{}), - ready: make(chan struct{}, 1), + client: client, + key: key, + name: name, + now: time.Now(), + limit: defaultLimit, + interval: defaultInterval, + nudges: nudges, + close: make(chan struct{}), + ready: make(chan struct{}, 1), + trimNewLineSuffix: trimNewLineSuffix, } go b.Start() return b @@ -105,6 +107,11 @@ func (b *Writer) Write(p []byte) (n int, err error) { if part == "" { continue } + + if b.trimNewLineSuffix { + part = strings.TrimSuffix(part, "\n") + } + line := &logstream.Line{ Level: defaultLevel, Message: part, diff --git a/livelog/livelog_test.go b/livelog/livelog_test.go index e46dac9..bc3ca9e 100644 --- a/livelog/livelog_test.go +++ b/livelog/livelog_test.go @@ -16,7 +16,7 @@ import ( func TestLineWriterSingle(t *testing.T) { client := new(mockClient) - w := New(client, "1", "1", nil) + w := New(client, "1", "1", nil, false) w.SetInterval(time.Duration(0)) w.num = 4 w.Write([]byte("foo\nbar\n")) // nolint:errcheck @@ -40,6 +40,32 @@ func TestLineWriterSingle(t *testing.T) { } } +func TestLineWriterSingleWithTrimNewLineSuffixEnabled(t *testing.T) { + client := new(mockClient) + w := New(client, "1", "1", nil, true) + w.SetInterval(time.Duration(0)) + w.num = 4 + w.Write([]byte("foo\nbar\n")) // nolint:errcheck + + a := w.pending + b := []*logstream.Line{ + {Number: 4, Message: "foo"}, + {Number: 5, Message: "bar"}, + } + if err := compare(a, b); err != nil { + t.Fail() + fmt.Print(a) + t.Log(err) + } + + w.Close() + a = client.uploaded + if err := compare(a, b); err != nil { + t.Fail() + t.Log(err) + } +} + func compare(a, b []*logstream.Line) error { if len(a) != len(b) { return fmt.Errorf("expected size: %d, actual: %d", len(a), len(b)) diff --git a/pipeline/runtime/step_executor.go b/pipeline/runtime/step_executor.go index 9815cbf..e4239ae 100644 --- a/pipeline/runtime/step_executor.go +++ b/pipeline/runtime/step_executor.go @@ -59,7 +59,7 @@ func NewStepExecutor(engine *engine.Engine) *StepExecutor { } } -func (e *StepExecutor) StartStep(ctx context.Context, r *api.StartStepRequest, secrets []string, client logstream.Client, tiConfig *tiCfg.Cfg) error { +func (e *StepExecutor) StartStep(ctx context.Context, r *api.StartStepRequest, secrets []string, client logstream.Client, tiConfig *tiCfg.Cfg, logConfig *api.LogConfig) error { if r.ID == "" { return &errors.BadRequestError{Msg: "ID needs to be set"} } @@ -75,7 +75,7 @@ func (e *StepExecutor) StartStep(ctx context.Context, r *api.StartStepRequest, s e.mu.Unlock() go func() { - state, outputs, artifact, outputV2, optimizationState, stepErr := e.executeStep(r, secrets, client, tiConfig) + state, outputs, artifact, outputV2, optimizationState, stepErr := e.executeStep(r, secrets, client, tiConfig, logConfig) status := StepStatus{Status: Complete, State: state, StepErr: stepErr, Outputs: outputs, Artifact: artifact, OutputV2: outputV2, OptimizationState: optimizationState} e.mu.Lock() e.stepStatus[r.ID] = status @@ -231,13 +231,13 @@ func (e *StepExecutor) executeStepDrone(r *api.StartStepRequest, tiConfig *tiCfg return runStep() } -func (e *StepExecutor) executeStep(r *api.StartStepRequest, secrets []string, client logstream.Client, tiConfig *tiCfg.Cfg) (*runtime.State, map[string]string, []byte, []*api.OutputV2, string, error) { +func (e *StepExecutor) executeStep(r *api.StartStepRequest, secrets []string, client logstream.Client, tiConfig *tiCfg.Cfg, logConfig *api.LogConfig) (*runtime.State, map[string]string, []byte, []*api.OutputV2, string, error) { if r.LogDrone { state, err := e.executeStepDrone(r, tiConfig) return state, nil, nil, nil, "", err } - wc := livelog.New(client, r.LogKey, r.Name, getNudges()) + wc := livelog.New(client, r.LogKey, r.Name, getNudges(), logConfig.TrimNewLineSuffix) wr := logstream.NewReplacer(wc, secrets) err := wr.Open() // nolint:errcheck if err != nil { diff --git a/pipeline/state.go b/pipeline/state.go index 3cfc82a..5b00442 100644 --- a/pipeline/state.go +++ b/pipeline/state.go @@ -73,6 +73,10 @@ func (s *State) GetTIConfig() *tiCfg.Cfg { return &s.tiConfig } +func (s *State) GetLogConfig() *api.LogConfig { + return &s.logConfig +} + func (s *State) GetNetwork() string { return s.network }