Skip to content

Commit

Permalink
flush buffered output lines after 100ms
Browse files Browse the repository at this point in the history
  • Loading branch information
dimkr committed Jan 16, 2025
1 parent 78ebafb commit 3ec67f5
Showing 1 changed file with 68 additions and 32 deletions.
100 changes: 68 additions & 32 deletions front/text/line.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2024 Dima Krasner
Copyright 2024, 2025 Dima Krasner
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@ import (
"io"
"net"
"slices"
"time"
)

// LineWriter wraps an [io.Writer] with line-based buffering and writes in a separate routine.
Expand All @@ -33,7 +34,10 @@ type LineWriter struct {
err error
}

const bufferSize = 16
const (
bufferSize = 16
flushInterval = time.Millisecond * 100
)

func LineBuffered(inner io.Writer) *LineWriter {
w := &LineWriter{
Expand All @@ -45,50 +49,71 @@ func LineBuffered(inner io.Writer) *LineWriter {
go func() {
lines := 0

for {
buf, ok := <-w.c
if !ok {
break
}
t := time.NewTicker(flushInterval)
defer t.Stop()

if len(buf) == 0 {
continue
}
loop:
for {
select {
case buf, ok := <-w.c:
if !ok {
break loop
}

for {
if len(buf) == 0 {
break
continue
}

i := bytes.IndexByte(buf, '\n')
if i == -1 {
w.buffer.Write(buf)
break
}
for {
if len(buf) == 0 {
break
}

w.buffer.Write(buf[:i+1])
lines++
i := bytes.IndexByte(buf, '\n')
if i == -1 {
w.buffer.Write(buf)
t.Stop()
break
}

if lines == bufferSize {
_, err := w.inner.Write(w.buffer.Bytes())
if err != nil {
w.done <- err
w.buffer.Write(buf[:i+1])
lines++

// continue reading until closed, to unblock the writing routine
for {
if _, ok := <-w.c; !ok {
break
}
// flush if we have $bufferSize lines in the buffer
if lines == bufferSize {
if _, err := w.inner.Write(w.buffer.Bytes()); err != nil {
w.stop(err)
return
}

return
w.buffer.Reset()
lines = 0

t.Stop()
} else if lines > 0 {
t.Reset(flushInterval)
}

w.buffer.Reset()
lines = 0
buf = buf[i+1:]
}

buf = buf[i+1:]
// flush if we havea lines waiting for >=$flushInterval in the buffer
case <-t.C:
buf := w.buffer.Bytes()

if len(buf) == 0 || buf[len(buf)-1] != '\n' {
continue
}

if _, err := w.inner.Write(buf); err != nil {
w.stop(err)
return
}

w.buffer.Reset()
lines = 0

t.Stop()
}
}

Expand All @@ -105,6 +130,17 @@ func LineBuffered(inner io.Writer) *LineWriter {
return w
}

func (w *LineWriter) stop(err error) {
// continue reading until closed, to unblock the writing routine
for {
if _, ok := <-w.c; !ok {
break
}
}

w.done <- err
}

func (w *LineWriter) Unwrap() io.Writer {
return w.inner
}
Expand Down

0 comments on commit 3ec67f5

Please sign in to comment.