Skip to content

Commit

Permalink
refactor(api/sessionrecording): rename ProtoReader to Reader
Browse files Browse the repository at this point in the history
The name no longer leaks the implementation. Went with Reader over
SessionRecordingReader to prevent a stutter of writing
`sessionrecording.SessionRecordingReader`.
  • Loading branch information
dustinspecker committed Dec 20, 2024
1 parent 205d29f commit 9cf5ab5
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 30 deletions.
30 changes: 15 additions & 15 deletions api/sessionrecording/session_recording.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ const (
ProtoStreamV1RecordHeaderSize = Int32Size
)

// NewProtoReader returns a new proto reader with slice pool
func NewProtoReader(r io.Reader) *ProtoReader {
return &ProtoReader{
// NewReader returns a new reader with slice pool
func NewReader(r io.Reader) *Reader {
return &Reader{
reader: r,
lastIndex: -1,
}
Expand All @@ -59,8 +59,8 @@ const (
protoReaderStateError = iota
)

// ProtoReader reads protobuf encoding from reader
type ProtoReader struct {
// Reader reads protobuf encoding from reader
type Reader struct {
gzipReader *gzipReader
padding int64
reader io.Reader
Expand All @@ -69,11 +69,11 @@ type ProtoReader struct {
state int
error error
lastIndex int64
stats ProtoReaderStats
stats ReaderStats
}

// ProtoReaderStats contains some reader statistics
type ProtoReaderStats struct {
// ReaderStats contains some reader statistics
type ReaderStats struct {
// SkippedEvents is a counter with encountered
// events recorded several times or events
// that have been out of order as skipped
Expand All @@ -87,7 +87,7 @@ type ProtoReaderStats struct {
}

// ToFields returns a copy of the stats to be used as log fields
func (p ProtoReaderStats) ToFields() map[string]any {
func (p ReaderStats) ToFields() map[string]any {
return map[string]any{
"skipped-events": p.SkippedEvents,
"out-of-order-events": p.OutOfOrderEvents,
Expand All @@ -96,7 +96,7 @@ func (p ProtoReaderStats) ToFields() map[string]any {
}

// Close releases reader resources
func (r *ProtoReader) Close() error {
func (r *Reader) Close() error {
if r.gzipReader != nil {
return r.gzipReader.Close()
}
Expand All @@ -106,7 +106,7 @@ func (r *ProtoReader) Close() error {
// Reset sets reader to read from the new reader
// without resetting the stats, could be used
// to deduplicate the events
func (r *ProtoReader) Reset(reader io.Reader) error {
func (r *Reader) Reset(reader io.Reader) error {
if r.error != nil {
return r.error
}
Expand All @@ -121,19 +121,19 @@ func (r *ProtoReader) Reset(reader io.Reader) error {
return nil
}

func (r *ProtoReader) setError(err error) error {
func (r *Reader) setError(err error) error {
r.state = protoReaderStateError
r.error = err
return err
}

// GetStats returns stats about processed events
func (r *ProtoReader) GetStats() ProtoReaderStats {
func (r *Reader) GetStats() ReaderStats {
return r.stats
}

// Read returns next event or io.EOF in case of the end of the parts
func (r *ProtoReader) Read(ctx context.Context) (apievents.AuditEvent, error) {
func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) {
// periodic checks of context after fixed amount of iterations
// is an extra precaution to avoid
// accidental endless loop due to logic error crashing the system
Expand Down Expand Up @@ -272,7 +272,7 @@ func (r *ProtoReader) Read(ctx context.Context) (apievents.AuditEvent, error) {
}

// ReadAll reads all events until EOF
func (r *ProtoReader) ReadAll(ctx context.Context) ([]apievents.AuditEvent, error) {
func (r *Reader) ReadAll(ctx context.Context) ([]apievents.AuditEvent, error) {
var events []apievents.AuditEvent
for {
event, err := r.Read(ctx)
Expand Down
2 changes: 1 addition & 1 deletion api/sessionrecording/session_recording_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestReadCorruptedRecording(t *testing.T) {
require.NoError(t, err)
defer f.Close()

reader := sessionrecording.NewProtoReader(f)
reader := sessionrecording.NewReader(f)
defer reader.Close()

events, err := reader.ReadAll(ctx)
Expand Down
2 changes: 1 addition & 1 deletion lib/client/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (p *playFromFileStreamer) StreamSessionEvents(
}
defer f.Close()

pr := sessionrecording.NewProtoReader(f)
pr := sessionrecording.NewReader(f)
for i := int64(0); ; i++ {
evt, err := pr.Read(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion lib/events/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
return
}

protoReader := sessionrecording.NewProtoReader(rawSession)
protoReader := sessionrecording.NewReader(rawSession)
defer protoReader.Close()

for {
Expand Down
4 changes: 2 additions & 2 deletions lib/events/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestProtoStreamer(t *testing.T) {
require.NoError(t, err)

for _, part := range parts {
reader := sessionrecording.NewProtoReader(bytes.NewReader(part))
reader := sessionrecording.NewReader(bytes.NewReader(part))
out, err := reader.ReadAll(ctx)
require.NoError(t, err, "part crash %#v", part)
outEvents = append(outEvents, out...)
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestExport(t *testing.T) {
_, err := f.Write(part)
require.NoError(t, err)
}
reader := sessionrecording.NewProtoReader(io.MultiReader(readers...))
reader := sessionrecording.NewReader(io.MultiReader(readers...))
outEvents, err := reader.ReadAll(ctx)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (u *Uploader) sessionErrorFilePath(sid session.ID) string {

type upload struct {
sessionID session.ID
reader *sessionrecording.ProtoReader
reader *sessionrecording.Reader
file *os.File
fileUnlockFn func() error
checkpointFile *os.File
Expand Down Expand Up @@ -442,7 +442,7 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) (err error)

upload := &upload{
sessionID: sessionID,
reader: sessionrecording.NewProtoReader(sessionFile),
reader: sessionrecording.NewReader(sessionFile),
file: sessionFile,
fileUnlockFn: unlock,
}
Expand Down
4 changes: 2 additions & 2 deletions lib/events/filesessions/fileasync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,10 +667,10 @@ func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *ev
require.NoError(t, err)

var outEvents []apievents.AuditEvent
var reader *sessionrecording.ProtoReader
var reader *sessionrecording.Reader
for i, part := range parts {
if i == 0 {
reader = sessionrecording.NewProtoReader(bytes.NewReader(part))
reader = sessionrecording.NewReader(bytes.NewReader(part))
} else {
err := reader.Reset(bytes.NewReader(part))
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion lib/events/playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func Export(ctx context.Context, rs io.ReadSeeker, w io.Writer, exportFormat str
}
switch {
case format.Proto:
protoReader := sessionrecording.NewProtoReader(rs)
protoReader := sessionrecording.NewReader(rs)
for {
event, err := protoReader.Read(ctx)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions lib/events/session_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestSessionWriter(t *testing.T) {
require.NoError(t, err)

for _, part := range parts {
reader := sessionrecording.NewProtoReader(bytes.NewReader(part))
reader := sessionrecording.NewReader(bytes.NewReader(part))
out, err := reader.ReadAll(test.ctx)
require.NoError(t, err, "part crash %#v", part)
outEvents = append(outEvents, out...)
Expand Down Expand Up @@ -421,7 +421,7 @@ func (a *sessionWriterTest) collectEvents(t *testing.T) []apievents.AuditEvent {
for _, part := range parts {
readers = append(readers, bytes.NewReader(part))
}
reader := sessionrecording.NewProtoReader(io.MultiReader(readers...))
reader := sessionrecording.NewReader(io.MultiReader(readers...))
outEvents, err := reader.ReadAll(a.ctx)
require.NoError(t, err, "failed to read")
t.Logf("Reader stats :%v", reader.GetStats().ToFields())
Expand Down
2 changes: 1 addition & 1 deletion lib/events/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestReadCorruptedRecording(t *testing.T) {
require.NoError(t, err)
defer f.Close()

reader := sessionrecording.NewProtoReader(f)
reader := sessionrecording.NewReader(f)
defer reader.Close()

events, err := reader.ReadAll(ctx)
Expand Down
4 changes: 2 additions & 2 deletions lib/events/test/streamsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params
_, err = f.Seek(0, 0)
require.NoError(t, err)

reader := sessionrecording.NewProtoReader(f)
reader := sessionrecording.NewReader(f)
out, err := reader.ReadAll(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -319,7 +319,7 @@ func StreamResumeWithParameters(t *testing.T, handler events.MultipartHandler, p
_, err = f.Seek(0, 0)
require.NoError(t, err)

reader := sessionrecording.NewProtoReader(f)
reader := sessionrecording.NewReader(f)
out, err := reader.ReadAll(ctx)
require.NoError(t, err)

Expand Down

0 comments on commit 9cf5ab5

Please sign in to comment.