Skip to content

Commit

Permalink
chore: Remove ifc from interface names (#15072)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana authored Nov 22, 2024
1 parent b406015 commit 3cc3db9
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 28 deletions.
4 changes: 2 additions & 2 deletions pkg/blockbuilder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ type PartitionController interface {
// containing log data and "committed" is the consumer group
type PartitionJobController struct {
stepLen int64
part partition.ReaderIfc
part partition.Reader
backoff backoff.Config
decoder *kafka.Decoder
}

func NewPartitionJobController(
controller partition.ReaderIfc,
controller partition.Reader,
backoff backoff.Config,
) (*PartitionJobController, error) {
decoder, err := kafka.NewDecoder()
Expand Down
4 changes: 2 additions & 2 deletions pkg/kafka/partition/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type partitionCommitter struct {
lastCommittedOffset prometheus.Gauge

logger log.Logger
reader ReaderIfc
reader Reader
commitFreq time.Duration

toCommit *atomic.Int64
wg sync.WaitGroup
cancel context.CancelFunc
}

func newCommitter(reader ReaderIfc, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
func newCommitter(reader Reader, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
c := &partitionCommitter{
logger: logger,
reader: reader,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/partition/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestPartitionCommitter(t *testing.T) {
reg := prometheus.NewRegistry()
partitionID := int32(1)
consumerGroup := "test-consumer-group"
reader := newReader(
reader := newStdReader(
client,
kafkaCfg.Topic,
partitionID,
Expand Down
36 changes: 18 additions & 18 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Record struct {
Offset int64
}

type ReaderIfc interface {
type Reader interface {
Topic() string
Partition() int32
ConsumerGroup() string
Expand Down Expand Up @@ -91,8 +91,8 @@ func newReaderMetrics(r prometheus.Registerer) *readerMetrics {
}
}

// Reader provides low-level access to Kafka partition reading operations
type Reader struct {
// StdReader provides low-level access to Kafka partition reading operations
type StdReader struct {
client *kgo.Client
topic string
partitionID int32
Expand All @@ -101,13 +101,13 @@ type Reader struct {
logger log.Logger
}

func NewReader(
func NewStdReader(
cfg kafka.Config,
partitionID int32,
instanceID string,
logger log.Logger,
reg prometheus.Registerer,
) (*Reader, error) {
) (*StdReader, error) {
// Create a new Kafka client for this reader
clientMetrics := client.NewReaderClientMetrics("partition-reader", reg)
c, err := client.NewReaderClient(
Expand All @@ -120,7 +120,7 @@ func NewReader(
}

// Create the reader
return newReader(
return newStdReader(
c,
cfg.Topic,
partitionID,
Expand All @@ -130,16 +130,16 @@ func NewReader(
), nil
}

// NewReader creates a new Reader instance
func newReader(
// newStdReader creates a new StdReader instance
func newStdReader(
client *kgo.Client,
topic string,
partitionID int32,
consumerGroup string,
logger log.Logger,
reg prometheus.Registerer,
) *Reader {
return &Reader{
) *StdReader {
return &StdReader{
client: client,
topic: topic,
partitionID: partitionID,
Expand All @@ -150,22 +150,22 @@ func newReader(
}

// Topic returns the topic being read
func (r *Reader) Topic() string {
func (r *StdReader) Topic() string {
return r.topic
}

// Partition returns the partition being read
func (r *Reader) Partition() int32 {
func (r *StdReader) Partition() int32 {
return r.partitionID
}

// ConsumerGroup returns the consumer group
func (r *Reader) ConsumerGroup() string {
func (r *StdReader) ConsumerGroup() string {
return r.consumerGroup
}

// FetchLastCommittedOffset retrieves the last committed offset for this partition
func (r *Reader) FetchLastCommittedOffset(ctx context.Context) (int64, error) {
func (r *StdReader) FetchLastCommittedOffset(ctx context.Context) (int64, error) {
req := kmsg.NewPtrOffsetFetchRequest()
req.Topics = []kmsg.OffsetFetchRequestTopic{{
Topic: r.topic,
Expand Down Expand Up @@ -210,7 +210,7 @@ func (r *Reader) FetchLastCommittedOffset(ctx context.Context) (int64, error) {
}

// FetchPartitionOffset retrieves the offset for a specific position
func (r *Reader) FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error) {
func (r *StdReader) FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error) {
partitionReq := kmsg.NewListOffsetsRequestTopicPartition()
partitionReq.Partition = r.partitionID
partitionReq.Timestamp = int64(position)
Expand Down Expand Up @@ -257,7 +257,7 @@ func (r *Reader) FetchPartitionOffset(ctx context.Context, position SpecialOffse
}

// Poll retrieves the next batch of records from Kafka
func (r *Reader) Poll(ctx context.Context) ([]Record, error) {
func (r *StdReader) Poll(ctx context.Context) ([]Record, error) {
start := time.Now()
fetches := r.client.PollFetches(ctx)
r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds())
Expand Down Expand Up @@ -303,14 +303,14 @@ func (r *Reader) Poll(ctx context.Context) ([]Record, error) {
return records, nil
}

func (r *Reader) SetOffsetForConsumption(offset int64) {
func (r *StdReader) SetOffsetForConsumption(offset int64) {
r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
r.topic: {r.partitionID: kgo.NewOffset().At(offset)},
})
}

// Commit commits an offset to the consumer group
func (r *Reader) Commit(ctx context.Context, offset int64) error {
func (r *StdReader) Commit(ctx context.Context, offset int64) error {
admin := kadm.NewClient(r.client)

// Commit the last consumed offset.
Expand Down
6 changes: 3 additions & 3 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type ReaderService struct {
services.Service

cfg ReaderConfig
reader ReaderIfc
reader Reader
consumerFactory ConsumerFactory
logger log.Logger
metrics *serviceMetrics
Expand All @@ -82,7 +82,7 @@ func NewReaderService(
) (*ReaderService, error) {

// Create the reader
reader, err := NewReader(
reader, err := NewStdReader(
kafkaCfg,
partitionID,
instanceID,
Expand All @@ -109,7 +109,7 @@ func NewReaderService(

func newReaderServiceFromIfc(
cfg ReaderConfig,
reader ReaderIfc,
reader Reader,
consumerFactory ConsumerFactory,
logger log.Logger,
reg prometheus.Registerer,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func readersFromKafkaCfg(
kafkaCfg kafka.Config,
consumerFactory ConsumerFactory,
partition int32,
) (ReaderIfc, *ReaderService) {
) (Reader, *ReaderService) {
partitionReader, err := NewReaderService(
kafkaCfg,
partition,
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,7 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
return nil, fmt.Errorf("calculating block builder partition ID: %w", err)
}

reader, err := partition.NewReader(
reader, err := partition.NewStdReader(
t.Cfg.KafkaConfig,
ingestPartitionID,
id,
Expand Down

0 comments on commit 3cc3db9

Please sign in to comment.