Skip to content

Commit

Permalink
fix(outputs.bigquery): Correct use of auto-detected project ID (influ…
Browse files Browse the repository at this point in the history
  • Loading branch information
Hipska authored Dec 11, 2023
1 parent 9cf8afc commit fe6e5d8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
12 changes: 8 additions & 4 deletions plugins/outputs/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
)

Expand Down Expand Up @@ -75,7 +76,7 @@ func (s *BigQuery) Connect() error {
defer cancel()

// Check if the compact table exists
_, err := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTable).Metadata(ctx)
_, err := s.client.Dataset(s.Dataset).Table(s.CompactTable).Metadata(ctx)
if err != nil {
return fmt.Errorf("compact table: %w", err)
}
Expand All @@ -102,7 +103,10 @@ func (s *BigQuery) setUpDefaultClient() error {
credentialsOption = option.WithCredentials(creds)
}

client, err := bigquery.NewClient(ctx, s.Project, credentialsOption)
client, err := bigquery.NewClient(ctx, s.Project,
credentialsOption,
option.WithUserAgent(internal.ProductToken()),
)
s.client = client
return err
}
Expand Down Expand Up @@ -136,7 +140,7 @@ func (s *BigQuery) writeCompact(metrics []telegraf.Metric) error {
defer cancel()

// Always returns an instance, even if table doesn't exist (anymore).
inserter := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTable).Inserter()
inserter := s.client.Dataset(s.Dataset).Table(s.CompactTable).Inserter()

var compactValues []*bigquery.ValuesSaver
for _, m := range metrics {
Expand Down Expand Up @@ -269,7 +273,7 @@ func (s *BigQuery) insertToTable(metricName string, metrics []bigquery.ValueSave
defer cancel()

tableName := s.metricToTable(metricName)
table := s.client.DatasetInProject(s.Project, s.Dataset).Table(tableName)
table := s.client.Dataset(s.Dataset).Table(tableName)
inserter := table.Inserter()

if err := inserter.Put(ctx, metrics); err != nil {
Expand Down
31 changes: 31 additions & 0 deletions plugins/outputs/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,24 @@ func TestWriteCompact(t *testing.T) {
require.NoError(t, b.Close())
}

func TestAutoDetect(t *testing.T) {
srv := localBigQueryServer(t)
defer srv.Close()

b := &BigQuery{
Dataset: "test-dataset",
Timeout: defaultTimeout,
CompactTable: "test-metrics",
}

credentialsJSON := []byte(`{"type": "service_account", "project_id": "test-project"}`)

require.NoError(t, b.Init())
require.NoError(t, b.setUpTestClientWithJSON(srv.URL, credentialsJSON))
require.NoError(t, b.Connect())
require.NoError(t, b.Close())
}

func (b *BigQuery) setUpTestClient(endpointURL string) error {
noAuth := option.WithoutAuthentication()
endpoint := option.WithEndpoint(endpointURL)
Expand All @@ -228,6 +246,19 @@ func (b *BigQuery) setUpTestClient(endpointURL string) error {
return nil
}

func (b *BigQuery) setUpTestClientWithJSON(endpointURL string, credentialsJSON []byte) error {
noAuth := option.WithoutAuthentication()
endpoint := option.WithEndpoint(endpointURL)
credentials := option.WithCredentialsJSON(credentialsJSON)

ctx := context.Background()

c, err := bigquery.NewClient(ctx, b.Project, credentials, noAuth, endpoint)

b.client = c
return err
}

func localBigQueryServer(t *testing.T) *httptest.Server {
srv := httptest.NewServer(http.NotFoundHandler())

Expand Down

0 comments on commit fe6e5d8

Please sign in to comment.