diff --git a/front/text/line.go b/front/text/line.go index 7fd2a5f3..b838a300 100644 --- a/front/text/line.go +++ b/front/text/line.go @@ -1,5 +1,5 @@ /* -Copyright 2024 Dima Krasner +Copyright 2024, 2025 Dima Krasner Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import ( "io" "net" "slices" + "time" ) // LineWriter wraps an [io.Writer] with line-based buffering and writes in a separate routine. @@ -33,7 +34,10 @@ type LineWriter struct { err error } -const bufferSize = 16 +const ( + bufferSize = 16 + flushInterval = time.Millisecond * 100 +) func LineBuffered(inner io.Writer) *LineWriter { w := &LineWriter{ @@ -45,50 +49,71 @@ func LineBuffered(inner io.Writer) *LineWriter { go func() { lines := 0 - for { - buf, ok := <-w.c - if !ok { - break - } + t := time.NewTicker(flushInterval) + defer t.Stop() - if len(buf) == 0 { - continue - } + loop: + for { + select { + case buf, ok := <-w.c: + if !ok { + break loop + } - for { if len(buf) == 0 { - break + continue } - i := bytes.IndexByte(buf, '\n') - if i == -1 { - w.buffer.Write(buf) - break - } + for { + if len(buf) == 0 { + break + } - w.buffer.Write(buf[:i+1]) - lines++ + i := bytes.IndexByte(buf, '\n') + if i == -1 { + w.buffer.Write(buf) + t.Stop() + break + } - if lines == bufferSize { - _, err := w.inner.Write(w.buffer.Bytes()) - if err != nil { - w.done <- err + w.buffer.Write(buf[:i+1]) + lines++ - // continue reading until closed, to unblock the writing routine - for { - if _, ok := <-w.c; !ok { - break - } + // flush if we have $bufferSize lines in the buffer + if lines == bufferSize { + if _, err := w.inner.Write(w.buffer.Bytes()); err != nil { + w.stop(err) + return } - return + w.buffer.Reset() + lines = 0 + + t.Stop() + } else if lines > 0 { + t.Reset(flushInterval) } - w.buffer.Reset() - lines = 0 + buf = buf[i+1:] } - buf = buf[i+1:] + // flush if we havea lines waiting for >=$flushInterval in the buffer + case <-t.C: + buf := w.buffer.Bytes() + + if len(buf) == 0 || buf[len(buf)-1] != '\n' { + continue + } + + if _, err := w.inner.Write(buf); err != nil { + w.stop(err) + return + } + + w.buffer.Reset() + lines = 0 + + t.Stop() } } @@ -105,6 +130,17 @@ func LineBuffered(inner io.Writer) *LineWriter { return w } +func (w *LineWriter) stop(err error) { + // continue reading until closed, to unblock the writing routine + for { + if _, ok := <-w.c; !ok { + break + } + } + + w.done <- err +} + func (w *LineWriter) Unwrap() io.Writer { return w.inner }