From 4bbe5c2938234520b3c1b43bdba7713b62e31018 Mon Sep 17 00:00:00 2001 From: Aaron Kahn Date: Thu, 8 Aug 2024 12:10:42 -0700 Subject: [PATCH 1/6] Update documentation links --- README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 6f0ea2c..c838889 100644 --- a/README.md +++ b/README.md @@ -24,8 +24,8 @@ The service handles device connectivity as well as receiving and storing transmi - `openssl req -out your-domain.com.csr -key private-key.pem -subj /CN=your-domain.com/ -new` 6. Ensure the generated CSR passes [check_csr.sh](https://github.com/teslamotors/fleet-telemetry/blob/main/tools/check_csr.sh). - `./check_csr.sh your-domain.com.csr` -7. Generate a Partner Authentication Token. ([docs](https://developer.tesla.com/docs/fleet-api#partner-authentication-token)) -8. Register your application with Fleet API by sending `domain` and `csr` to the [register](https://developer.tesla.com/docs/fleet-api#register) endpoint. Use the partner authentication token generated in step 7 as a Bearer token. +7. Generate a Partner Authentication Token. ([docs](https://developer.tesla.com/docs/fleet-api/authentication/partner-tokens)) +8. Register your application with Fleet API by sending `domain` and `csr` to the [register](https://developer.tesla.com/docs/fleet-api/endpoints/partner-endpoints#register) endpoint. Use the partner authentication token generated in step 7 as a Bearer token. 9. Wait for Tesla to process your CSR. This may take up to two weeks. Once complete, you will receive an email from Tesla. The generated certificate will not be sent back to you; it is attached to your account on the backend and is used internally when configuring a vehicle to stream data. 10. Configure your fleet-telemetry server. Full details are described in [install steps](#install-steps). 11. Validate server configuration using [check_server_cert.sh](https://github.com/teslamotors/fleet-telemetry/blob/main/tools/check_server_cert.sh). @@ -34,10 +34,10 @@ The service handles device connectivity as well as receiving and storing transmi - `port`: the port your fleet-telemetry server is available on. Defaults to 443. - `ca`: the full certificate chain used to generate the server's TLS cert/key. - `./check_server_cert.sh validate_server.json` -12. Ensure your virtual key has been added to the vehicle you intend to configure. To add your virtual key to the vehicle, redirect the owner to https://tesla.com/_ak/your-domain.com. If using authorization code flow, the owner of the vehicle must have [authorized your application](https://developer.tesla.com/docs/fleet-api#third-party-token) with `vehicle_device_data` scope before they are able to add your key. -13. Send your configuration to a vehicle. Using a third-party token, send a [fleet_telemetry_config](https://developer.tesla.com/docs/fleet-api#fleet_telemetry_config-create) request. -14. Wait for `synced` to be true when getting [fleet_telemetry_config](https://developer.tesla.com/docs/fleet-api#fleet_telemetry_config-get). -15. At this point, the vehicle should be streaming data to your fleet-telemetry server. If you are not seeing messages come through, call [fleet_telemetry_errors](https://developer.tesla.com/docs/fleet-api#fleet_telemetry_errors). +12. Ensure your virtual key has been added to the vehicle you intend to configure. To add your virtual key to the vehicle, redirect the owner to https://tesla.com/_ak/your-domain.com. If using authorization code flow, the owner of the vehicle must have [authorized your application](https://developer.tesla.com/docs/fleet-api/authentication/third-party-tokens) with `vehicle_device_data` scope before they are able to add your key. +13. Send your configuration to a vehicle. Using a third-party token, send a [fleet_telemetry_config](https://developer.tesla.com/docs/fleet-api/endpoints/vehicle-endpoints#fleet-telemetry-config-create) request. +14. Wait for `synced` to be true when getting [fleet_telemetry_config](https://developer.tesla.com/docs/fleet-api/endpoints/vehicle-endpoints#fleet-telemetry-config-get). +15. At this point, the vehicle should be streaming data to your fleet-telemetry server. If you are not seeing messages come through, call [fleet_telemetry_errors](https://developer.tesla.com/docs/fleet-api/endpoints/partner-endpoints#fleet-telemetry-errors). - If fleet_telemetry_errors is not yielding any results, please reach out to [fleetapisupport@tesla.com](mailto:fleetapisupport@tesla.com). Include your client ID and the VIN you are trying to setup. ### Install on Kubernetes with Helm Chart (recommended) From c6a81c626cb95880696407d51eba5fb79a05c8db Mon Sep 17 00:00:00 2001 From: Aaron Kahn Date: Thu, 15 Aug 2024 12:21:44 -0700 Subject: [PATCH 2/6] Add topics to kafka backends description --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index c838889..f1023de 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,7 @@ Vehicles must be running firmware version 2023.20.6 or later. Some older model ## Backends/dispatchers The following [dispatchers](./telemetry/producer.go#L10-L19) are supported * Kafka (preferred): Configure with the config.json file. See implementation here: [config/config.go](./config/config.go) + * Topics will need to be created for \*prefix\*`_V`, \*prefix\*`_alerts`, and \*prefix\*`_errors`. The default prefix is `tesla` * Kinesis: Configure with standard [AWS env variables and config files](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html). The default AWS credentials and config files are: `~/.aws/credentials` and `~/.aws/config`. * By default, stream names will be \*configured namespace\*_\*topic_name\* ex.: `tesla_V`, `tesla_errors`, `tesla_alerts`, etc * Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }` From 58eada85471887c40a410d35ffb6e42ecfe64a35 Mon Sep 17 00:00:00 2001 From: agbpatro Date: Mon, 26 Aug 2024 14:53:01 -0700 Subject: [PATCH 3/6] Support docker publish with latest tag (#202) --- .github/workflows/publish.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 35a52a7..cd79bc7 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -50,4 +50,4 @@ jobs: with: platforms: linux/amd64,linux/arm64 push: true - tags: tesla/fleet-telemetry:${{ steps.release.outputs.version }} + tags: tesla/fleet-telemetry:latest,tesla/fleet-telemetry:${{ steps.release.outputs.version }} From 696f2ffaa4dff8a00d003d4be48ee169d0aeb643 Mon Sep 17 00:00:00 2001 From: Patrick Demers Date: Tue, 27 Aug 2024 11:25:23 -0700 Subject: [PATCH 4/6] improve record log formatting (#200) --- README.md | 3 + config/config.go | 5 +- config/config_initializer.go | 5 +- config/config_initializer_test.go | 3 + datastore/simple/logger.go | 45 +++- datastore/simple/logger_test.go | 137 ++++++++++++ datastore/simple/simple_suite_test.go | 13 ++ datastore/simple/transformers/payload.go | 83 ++++++++ datastore/simple/transformers/payload_test.go | 196 ++++++++++++++++++ .../transformers/transformers_suite_test.go | 13 ++ .../simple/transformers/utilities_test.go | 8 + .../simple/transformers/vehicle_alert.go | 33 +++ .../simple/transformers/vehicle_alert_test.go | 52 +++++ .../simple/transformers/vehicle_error.go | 20 ++ .../simple/transformers/vehicle_error_test.go | 54 +++++ telemetry/record.go | 1 + telemetry/record_test.go | 77 +++++++ 17 files changed, 742 insertions(+), 6 deletions(-) create mode 100644 datastore/simple/logger_test.go create mode 100644 datastore/simple/simple_suite_test.go create mode 100644 datastore/simple/transformers/payload.go create mode 100644 datastore/simple/transformers/payload_test.go create mode 100644 datastore/simple/transformers/transformers_suite_test.go create mode 100644 datastore/simple/transformers/utilities_test.go create mode 100644 datastore/simple/transformers/vehicle_alert.go create mode 100644 datastore/simple/transformers/vehicle_alert_test.go create mode 100644 datastore/simple/transformers/vehicle_error.go create mode 100644 datastore/simple/transformers/vehicle_error_test.go diff --git a/README.md b/README.md index f1023de..f739982 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,9 @@ For ease of installation and operation, run Fleet Telemetry on Kubernetes or a s "flush_period": int - ms flush period } }, + "logger": { + "verbose": bool - include data types in the logs. Only applicable for records of type 'V' + }, "kafka": { // librdkafka kafka config, seen here: https://raw.githubusercontent.com/confluentinc/librdkafka/master/CONFIGURATION.md "bootstrap.servers": "kafka:9092", "queue.buffering.max.messages": 1000000 diff --git a/config/config.go b/config/config.go index 473c5b5..ae6384c 100644 --- a/config/config.go +++ b/config/config.go @@ -76,6 +76,9 @@ type Config struct { // Monitoring defines information for metrics Monitoring *metrics.MonitoringConfig `json:"monitoring,omitempty"` + // LoggerConfig configures the simple logger + LoggerConfig *simple.Config `json:"logger,omitempty"` + // LogLevel set the log-level LogLevel string `json:"log_level,omitempty"` @@ -250,7 +253,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l } producers := make(map[telemetry.Dispatcher]telemetry.Producer) - producers[telemetry.Logger] = simple.NewProtoLogger(logger) + producers[telemetry.Logger] = simple.NewProtoLogger(c.LoggerConfig, logger) requiredDispatchers := make(map[telemetry.Dispatcher][]string) for recordName, dispatchRules := range c.Records { diff --git a/config/config_initializer.go b/config/config_initializer.go index deeef47..ea65e23 100644 --- a/config/config_initializer.go +++ b/config/config_initializer.go @@ -8,6 +8,7 @@ import ( "github.com/sirupsen/logrus/hooks/test" + "github.com/teslamotors/fleet-telemetry/datastore/simple" logrus "github.com/teslamotors/fleet-telemetry/logger" "github.com/teslamotors/fleet-telemetry/metrics" "github.com/teslamotors/fleet-telemetry/telemetry" @@ -41,7 +42,9 @@ func loadApplicationConfig(configFilePath string) (*Config, error) { return nil, err } - config := &Config{} + config := &Config{ + LoggerConfig: &simple.Config{}, + } err = json.NewDecoder(configFile).Decode(&config) if err != nil { return nil, err diff --git a/config/config_initializer_test.go b/config/config_initializer_test.go index 1a2c062..701a31a 100644 --- a/config/config_initializer_test.go +++ b/config/config_initializer_test.go @@ -42,6 +42,7 @@ var _ = Describe("Test application config initialization", func() { Expect(err).NotTo(HaveOccurred()) expectedConfig.MetricCollector = loadedConfig.MetricCollector + expectedConfig.LoggerConfig = loadedConfig.LoggerConfig expectedConfig.AckChan = loadedConfig.AckChan Expect(loadedConfig).To(Equal(expectedConfig)) }) @@ -67,6 +68,8 @@ var _ = Describe("Test application config initialization", func() { loadedConfig, err := loadTestApplicationConfig(TestSmallConfig) Expect(err).NotTo(HaveOccurred()) + Expect(loadedConfig.LoggerConfig).ToNot(BeNil()) + expectedConfig.LoggerConfig = loadedConfig.LoggerConfig expectedConfig.MetricCollector = loadedConfig.MetricCollector expectedConfig.AckChan = loadedConfig.AckChan Expect(loadedConfig).To(Equal(expectedConfig)) diff --git a/datastore/simple/logger.go b/datastore/simple/logger.go index e624bd1..0408dd5 100644 --- a/datastore/simple/logger.go +++ b/datastore/simple/logger.go @@ -1,18 +1,28 @@ package simple import ( + "fmt" + + "github.com/teslamotors/fleet-telemetry/datastore/simple/transformers" logrus "github.com/teslamotors/fleet-telemetry/logger" + "github.com/teslamotors/fleet-telemetry/protos" "github.com/teslamotors/fleet-telemetry/telemetry" ) +type Config struct { + // Verbose controls whether types are explicitly shown in the logs. Only applicable for record type 'V'. + Verbose bool `json:"verbose"` +} + // ProtoLogger is a simple protobuf logger type ProtoLogger struct { + Config *Config logger *logrus.Logger } // NewProtoLogger initializes the parameters for protobuf payload logging -func NewProtoLogger(logger *logrus.Logger) telemetry.Producer { - return &ProtoLogger{logger: logger} +func NewProtoLogger(config *Config, logger *logrus.Logger) telemetry.Producer { + return &ProtoLogger{Config: config, logger: logger} } // SetReliableAckTxType no-op for logger datastore @@ -21,15 +31,42 @@ func (p *ProtoLogger) ProcessReliableAck(entry *telemetry.Record) { // Produce sends the data to the logger func (p *ProtoLogger) Produce(entry *telemetry.Record) { - data, err := entry.GetJSONPayload() + data, err := p.recordToLogMap(entry) if err != nil { p.logger.ErrorLog("json_unmarshal_error", err, logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata()}) return } - p.logger.ActivityLog("logger_json_unmarshal", logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata(), "data": string(data)}) + p.logger.ActivityLog("logger_json_unmarshal", logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata(), "data": data}) } // ReportError noop method func (p *ProtoLogger) ReportError(message string, err error, logInfo logrus.LogInfo) { return } + +// recordToLogMap converts the data of a record to a map or slice of maps +func (p *ProtoLogger) recordToLogMap(record *telemetry.Record) (interface{}, error) { + payload, err := record.GetProtoMessage() + if err != nil { + return nil, err + } + + switch payload := payload.(type) { + case *protos.Payload: + return transformers.PayloadToMap(payload, p.Config.Verbose, p.logger), nil + case *protos.VehicleAlerts: + alertMaps := make([]map[string]interface{}, len(payload.Alerts)) + for i, alert := range payload.Alerts { + alertMaps[i] = transformers.VehicleAlertToMap(alert) + } + return alertMaps, nil + case *protos.VehicleErrors: + errorMaps := make([]map[string]interface{}, len(payload.Errors)) + for i, vehicleError := range payload.Errors { + errorMaps[i] = transformers.VehicleErrorToMap(vehicleError) + } + return errorMaps, nil + default: + return nil, fmt.Errorf("unknown txType: %s", record.TxType) + } +} diff --git a/datastore/simple/logger_test.go b/datastore/simple/logger_test.go new file mode 100644 index 0000000..23fc5a6 --- /dev/null +++ b/datastore/simple/logger_test.go @@ -0,0 +1,137 @@ +package simple_test + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/teslamotors/fleet-telemetry/datastore/simple" + logrus "github.com/teslamotors/fleet-telemetry/logger" + "github.com/teslamotors/fleet-telemetry/protos" + "github.com/teslamotors/fleet-telemetry/telemetry" + + "github.com/sirupsen/logrus/hooks/test" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" +) + +var _ = Describe("ProtoLogger", func() { + var ( + protoLogger *simple.ProtoLogger + testLogger *logrus.Logger + hook *test.Hook + config *simple.Config + ) + + BeforeEach(func() { + testLogger, hook = logrus.NoOpLogger() + config = &simple.Config{Verbose: false} + protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.ProtoLogger) + }) + + Describe("NewProtoLogger", func() { + It("creates a new ProtoLogger", func() { + Expect(protoLogger).NotTo(BeNil()) + Expect(protoLogger.Config).To(Equal(config)) + }) + }) + + Describe("ProcessReliableAck", func() { + It("does not panic", func() { + entry := &telemetry.Record{} + Expect(func() { protoLogger.ProcessReliableAck(entry) }).NotTo(Panic()) + }) + }) + + Describe("Produce", func() { + var ( + record *telemetry.Record + ) + + BeforeEach(func() { + payload := &protos.Payload{ + Vin: "TEST123", + CreatedAt: timestamppb.New(time.Unix(0, 0)), + Data: []*protos.Datum{ + { + Key: protos.Field_VehicleName, + Value: &protos.Value{ + Value: &protos.Value_StringValue{StringValue: "TestVehicle"}, + }, + }, + { + Key: protos.Field_Gear, + Value: &protos.Value{ + Value: &protos.Value_ShiftStateValue{ShiftStateValue: protos.ShiftState_ShiftStateD}, + }, + }, + }, + } + payloadBytes, err := proto.Marshal(payload) + Expect(err).NotTo(HaveOccurred()) + + record = &telemetry.Record{ + Vin: "TEST123", + PayloadBytes: payloadBytes, + TxType: "V", + } + }) + + It("logs data", func() { + protoLogger.Produce(record) + + lastLog := hook.LastEntry() + Expect(lastLog.Message).To(Equal("logger_json_unmarshal")) + Expect(lastLog.Data).To(HaveKeyWithValue("vin", "TEST123")) + Expect(lastLog.Data).To(HaveKey("data")) + + data, ok := lastLog.Data["data"].(map[string]interface{}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal(map[string]interface{}{ + "VehicleName": "TestVehicle", + "Gear": "ShiftStateD", + "Vin": "TEST123", + "CreatedAt": "1970-01-01T00:00:00Z", + })) + }) + + It("logs an error when unmarshaling fails", func() { + record.PayloadBytes = []byte("invalid payload") + protoLogger.Produce(record) + + lastLog := hook.LastEntry() + Expect(lastLog.Message).To(Equal("json_unmarshal_error")) + Expect(lastLog.Data).To(HaveKeyWithValue("vin", "TEST123")) + Expect(lastLog.Data).To(HaveKey("metadata")) + }) + + Context("when verbose set to true", func() { + BeforeEach(func() { + config.Verbose = true + protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.ProtoLogger) + }) + + It("does not include types in the data", func() { + protoLogger.Produce(record) + + data, ok := hook.LastEntry().Data["data"].(map[string]interface{}) + Expect(ok).To(BeTrue()) + Expect(data).To(Equal(map[string]interface{}{ + "VehicleName": map[string]interface{}{"stringValue": "TestVehicle"}, + "Gear": map[string]interface{}{"shiftStateValue": "ShiftStateD"}, + "Vin": "TEST123", + "CreatedAt": "1970-01-01T00:00:00Z", + })) + }) + }) + }) + + Describe("ReportError", func() { + It("succeeds", func() { + Expect(func() { + protoLogger.ReportError("test error", nil, logrus.LogInfo{}) + }).NotTo(Panic()) + }) + }) +}) diff --git a/datastore/simple/simple_suite_test.go b/datastore/simple/simple_suite_test.go new file mode 100644 index 0000000..8b5a634 --- /dev/null +++ b/datastore/simple/simple_suite_test.go @@ -0,0 +1,13 @@ +package simple_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestSimple(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Simple Suite Tests") +} diff --git a/datastore/simple/transformers/payload.go b/datastore/simple/transformers/payload.go new file mode 100644 index 0000000..090053b --- /dev/null +++ b/datastore/simple/transformers/payload.go @@ -0,0 +1,83 @@ +package transformers + +import ( + "time" + + logrus "github.com/teslamotors/fleet-telemetry/logger" + "github.com/teslamotors/fleet-telemetry/protos" +) + +func PayloadToMap(payload *protos.Payload, includeTypes bool, logger *logrus.Logger) map[string]interface{} { + convertedPayload := make(map[string]interface{}, len(payload.Data)+2) + convertedPayload["Vin"] = payload.Vin + convertedPayload["CreatedAt"] = payload.CreatedAt.AsTime().Format(time.RFC3339) + + for _, datum := range payload.Data { + if datum == nil || datum.Value == nil { + logger.ActivityLog("unknown_payload_data_type", logrus.LogInfo{"vin": payload.Vin}) + continue + } + name := protos.Field_name[int32(datum.Key.Number())] + value, ok := transformValue(datum.Value.Value, includeTypes) + if !ok { + logger.ActivityLog("unknown_payload_value_data_type", logrus.LogInfo{"name": name, "vin": payload.Vin}) + continue + } + convertedPayload[name] = value + } + + return convertedPayload +} + +func transformValue(value interface{}, includeTypes bool) (interface{}, bool) { + var outputValue interface{} + var outputType string + + // ordered by expected frequency + switch v := value.(type) { + case *protos.Value_StringValue: + outputType = "stringValue" + outputValue = v.StringValue + case *protos.Value_LocationValue: + outputType = "locationValue" + outputValue = map[string]float64{ + "latitude": v.LocationValue.Latitude, + "longitude": v.LocationValue.Longitude, + } + case *protos.Value_FloatValue: + outputType = "floatValue" + outputValue = v.FloatValue + case *protos.Value_IntValue: + outputType = "intValue" + outputValue = v.IntValue + case *protos.Value_DoubleValue: + outputType = "doubleValue" + outputValue = v.DoubleValue + case *protos.Value_LongValue: + outputType = "longValue" + outputValue = v.LongValue + case *protos.Value_BooleanValue: + outputType = "booleanValue" + outputValue = v.BooleanValue + case *protos.Value_Invalid: + outputType = "invalid" + outputValue = "" + if includeTypes { + outputValue = true + } + case *protos.Value_ShiftStateValue: + outputType = "shiftStateValue" + outputValue = v.ShiftStateValue.String() + case *protos.Value_ChargingValue: + outputType = "chargingValue" + outputValue = v.ChargingValue.String() + default: + return nil, false + } + + if includeTypes { + return map[string]interface{}{outputType: outputValue}, true + } + + return outputValue, true +} diff --git a/datastore/simple/transformers/payload_test.go b/datastore/simple/transformers/payload_test.go new file mode 100644 index 0000000..1e1695c --- /dev/null +++ b/datastore/simple/transformers/payload_test.go @@ -0,0 +1,196 @@ +package transformers_test + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/teslamotors/fleet-telemetry/datastore/simple/transformers" + logrus "github.com/teslamotors/fleet-telemetry/logger" + "github.com/teslamotors/fleet-telemetry/protos" + + "google.golang.org/protobuf/types/known/timestamppb" +) + +var _ = Describe("Payload", func() { + logger, _ := logrus.NoOpLogger() + Describe("PayloadToMap", func() { + It("includes the vin and createdAt fields", func() { + now := timestamppb.Now() + payload := &protos.Payload{ + Data: []*protos.Datum{}, + Vin: "TEST123", + CreatedAt: now, + } + result := transformers.PayloadToMap(payload, false, logger) + Expect(result["Vin"]).To(Equal("TEST123")) + Expect(result["CreatedAt"]).To(Equal(now.AsTime().Format(time.RFC3339))) + }) + + It("handles nil data", func() { + now := timestamppb.Now() + payload := &protos.Payload{ + Data: []*protos.Datum{ + nil, + &protos.Datum{ + Value: nil, + }, + &protos.Datum{ + Key: protos.Field_BatteryHeaterOn, + Value: &protos.Value{ + Value: &protos.Value_BooleanValue{BooleanValue: true}, + }, + }, + }, + Vin: "TEST123", + CreatedAt: now, + } + result := transformers.PayloadToMap(payload, false, logger) + Expect(result["Vin"]).To(Equal("TEST123")) + Expect(result["CreatedAt"]).To(Equal(now.AsTime().Format(time.RFC3339))) + Expect(result["BatteryHeaterOn"]).To(Equal(true)) + }) + + DescribeTable("converting datum to key-value pairs", + func(datum *protos.Datum, includeTypes bool, expectedKey string, expectedValue interface{}) { + payload := &protos.Payload{ + Data: []*protos.Datum{datum}, + Vin: "TEST123", + CreatedAt: timestamppb.Now(), + } + result := transformers.PayloadToMap(payload, includeTypes, logger) + Expect(result[expectedKey]).To(Equal(expectedValue)) + }, + Entry("String value with types excluded", + &protos.Datum{ + Key: protos.Field_VehicleName, + Value: &protos.Value{Value: &protos.Value_StringValue{StringValue: "CyberBeast"}}, + }, + excludeTypes, + "VehicleName", + "CyberBeast", + ), + Entry("String value with types included", + &protos.Datum{ + Key: protos.Field_VehicleName, + Value: &protos.Value{Value: &protos.Value_StringValue{StringValue: "CyberBeast"}}, + }, + includeTypes, + "VehicleName", + map[string]interface{}{ + "stringValue": "CyberBeast", + }, + ), + Entry("Integer value with types excluded", + &protos.Datum{ + Key: protos.Field_Odometer, + Value: &protos.Value{Value: &protos.Value_IntValue{IntValue: 50000}}, + }, + excludeTypes, + "Odometer", + int32(50000), + ), + Entry("Integer value with types included", + &protos.Datum{ + Key: protos.Field_Odometer, + Value: &protos.Value{Value: &protos.Value_IntValue{IntValue: 50000}}, + }, + includeTypes, + "Odometer", + map[string]interface{}{ + "intValue": int32(50000), + }, + ), + Entry("Float value with types excluded", + &protos.Datum{ + Key: protos.Field_BatteryLevel, + Value: &protos.Value{Value: &protos.Value_FloatValue{FloatValue: 75.5}}, + }, + excludeTypes, + "BatteryLevel", + float32(75.5), + ), + Entry("Float value with types included", + &protos.Datum{ + Key: protos.Field_BatteryLevel, + Value: &protos.Value{Value: &protos.Value_FloatValue{FloatValue: 75.5}}, + }, + includeTypes, + "BatteryLevel", + map[string]interface{}{ + "floatValue": float32(75.5), + }, + ), + Entry("Boolean value with types excluded", + &protos.Datum{ + Key: protos.Field_SentryMode, + Value: &protos.Value{Value: &protos.Value_BooleanValue{BooleanValue: true}}, + }, + excludeTypes, + "SentryMode", + true, + ), + Entry("Boolean value with types included", + &protos.Datum{ + Key: protos.Field_SentryMode, + Value: &protos.Value{Value: &protos.Value_BooleanValue{BooleanValue: true}}, + }, + includeTypes, + "SentryMode", + map[string]interface{}{ + "booleanValue": true, + }, + ), + Entry("ShiftState with enums as strings and types excluded", + &protos.Datum{ + Key: protos.Field_Gear, + Value: &protos.Value{Value: &protos.Value_ShiftStateValue{ShiftStateValue: protos.ShiftState_ShiftStateD}}, + }, + excludeTypes, + "Gear", + "ShiftStateD", + ), + Entry("ShiftState with types included", + &protos.Datum{ + Key: protos.Field_Gear, + Value: &protos.Value{Value: &protos.Value_ShiftStateValue{ShiftStateValue: protos.ShiftState_ShiftStateD}}, + }, + includeTypes, + "Gear", + map[string]interface{}{ + "shiftStateValue": "ShiftStateD", + }, + ), + Entry("ChargeState with types excluded", + &protos.Datum{ + Key: protos.Field_ChargeState, + Value: &protos.Value{Value: &protos.Value_ChargingValue{ChargingValue: protos.ChargingState_ChargeStateCharging}}, + }, + excludeTypes, + "ChargeState", + "ChargeStateCharging", + ), + Entry("Invalid with types excluded", + &protos.Datum{ + Key: protos.Field_BMSState, + Value: &protos.Value{Value: &protos.Value_Invalid{}}, + }, + excludeTypes, + "BMSState", + "", + ), + Entry("Invalid with types included", + &protos.Datum{ + Key: protos.Field_BMSState, + Value: &protos.Value{Value: &protos.Value_Invalid{}}, + }, + includeTypes, + "BMSState", + map[string]interface{}{ + "invalid": true, + }, + ), + ) + }) +}) diff --git a/datastore/simple/transformers/transformers_suite_test.go b/datastore/simple/transformers/transformers_suite_test.go new file mode 100644 index 0000000..e1ca54f --- /dev/null +++ b/datastore/simple/transformers/transformers_suite_test.go @@ -0,0 +1,13 @@ +package transformers_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestTransformers(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Transformers Suite Tests") +} diff --git a/datastore/simple/transformers/utilities_test.go b/datastore/simple/transformers/utilities_test.go new file mode 100644 index 0000000..b92a1b2 --- /dev/null +++ b/datastore/simple/transformers/utilities_test.go @@ -0,0 +1,8 @@ +package transformers_test + +// Utility constants to make tests more readable + +const ( + includeTypes = true + excludeTypes = false +) diff --git a/datastore/simple/transformers/vehicle_alert.go b/datastore/simple/transformers/vehicle_alert.go new file mode 100644 index 0000000..fd81d5e --- /dev/null +++ b/datastore/simple/transformers/vehicle_alert.go @@ -0,0 +1,33 @@ +package transformers + +import ( + "github.com/teslamotors/fleet-telemetry/protos" +) + +// VehicleAlertToMap converts a VehicleAlert proto message to a map representation +func VehicleAlertToMap(alert *protos.VehicleAlert) map[string]interface{} { + alertMap := map[string]interface{}{ + "Name": alert.Name, + } + + if alert.StartedAt != nil { + alertMap["StartedAt"] = alert.StartedAt.AsTime().Unix() + } + + if alert.EndedAt != nil { + alertMap["EndedAt"] = alert.EndedAt.AsTime().Unix() + } + + if alert.Audiences == nil { + alertMap["Audiences"] = nil + return alertMap + } + + audiences := make([]interface{}, len(alert.Audiences)) + for i, audience := range alert.Audiences { + audiences[i] = audience.String() + } + alertMap["Audiences"] = audiences + + return alertMap +} diff --git a/datastore/simple/transformers/vehicle_alert_test.go b/datastore/simple/transformers/vehicle_alert_test.go new file mode 100644 index 0000000..eecb62b --- /dev/null +++ b/datastore/simple/transformers/vehicle_alert_test.go @@ -0,0 +1,52 @@ +package transformers_test + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/teslamotors/fleet-telemetry/datastore/simple/transformers" + "github.com/teslamotors/fleet-telemetry/protos" + + "google.golang.org/protobuf/types/known/timestamppb" +) + +var _ = Describe("VehicleAlert", func() { + Describe("VehicleAlertToMap", func() { + var ( + alert *protos.VehicleAlert + ) + + BeforeEach(func() { + alert = &protos.VehicleAlert{ + Name: "TestAlert", + StartedAt: timestamppb.New(time.Now().Add(-1 * time.Hour)), + EndedAt: timestamppb.New(time.Now()), + Audiences: []protos.Audience{protos.Audience_Customer, protos.Audience_Service}, + } + }) + + It("includes all expected data", func() { + result := transformers.VehicleAlertToMap(alert) + + Expect(result).To(HaveLen(4)) + Expect(result["Name"]).To(Equal("TestAlert")) + Expect(result["StartedAt"]).To(BeNumerically("~", time.Now().Add(-1*time.Hour).Unix(), 1)) + Expect(result["EndedAt"]).To(BeNumerically("~", time.Now().Unix(), 1)) + Expect(result["Audiences"]).To(ConsistOf("Customer", "Service")) + }) + + It("handles missing fields", func() { + alert.EndedAt = nil + alert.Audiences = nil + result := transformers.VehicleAlertToMap(alert) + + Expect(result).To(HaveLen(3)) + Expect(result["Name"]).To(Equal("TestAlert")) + Expect(result["StartedAt"]).To(BeNumerically("~", time.Now().Add(-1*time.Hour).Unix(), 1)) + Expect(result).NotTo(HaveKey("EndedAt")) + Expect(result["Audiences"]).To(BeNil()) + }) + }) +}) diff --git a/datastore/simple/transformers/vehicle_error.go b/datastore/simple/transformers/vehicle_error.go new file mode 100644 index 0000000..eb56665 --- /dev/null +++ b/datastore/simple/transformers/vehicle_error.go @@ -0,0 +1,20 @@ +package transformers + +import ( + "github.com/teslamotors/fleet-telemetry/protos" +) + +// VehicleErrorToMap converts a VehicleError proto message to a map representation +func VehicleErrorToMap(vehicleError *protos.VehicleError) map[string]interface{} { + errorMap := map[string]interface{}{ + "Name": vehicleError.Name, + "Body": vehicleError.Body, + "Tags": vehicleError.Tags, + } + + if vehicleError.CreatedAt != nil { + errorMap["CreatedAt"] = vehicleError.CreatedAt.AsTime().Unix() + } + + return errorMap +} diff --git a/datastore/simple/transformers/vehicle_error_test.go b/datastore/simple/transformers/vehicle_error_test.go new file mode 100644 index 0000000..1160010 --- /dev/null +++ b/datastore/simple/transformers/vehicle_error_test.go @@ -0,0 +1,54 @@ +package transformers_test + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/teslamotors/fleet-telemetry/datastore/simple/transformers" + "github.com/teslamotors/fleet-telemetry/protos" + + "google.golang.org/protobuf/types/known/timestamppb" +) + +var _ = Describe("VehicleError", func() { + Describe("VehicleErrorToMap", func() { + var ( + vehicleError *protos.VehicleError + ) + + BeforeEach(func() { + vehicleError = &protos.VehicleError{ + Name: "TestError", + CreatedAt: timestamppb.New(time.Now()), + Tags: map[string]string{"tag1": "value1", "tag2": "value2"}, + Body: "Error details", + } + }) + + It("includes all expected data", func() { + result := transformers.VehicleErrorToMap(vehicleError) + + Expect(result).To(HaveLen(4)) + Expect(result["Name"]).To(Equal("TestError")) + Expect(result["CreatedAt"]).To(BeNumerically("~", time.Now().Unix(), 1)) + Expect(result["Tags"]).To(HaveKeyWithValue("tag1", "value1")) + Expect(result["Tags"]).To(HaveKeyWithValue("tag2", "value2")) + Expect(result["Body"]).To(Equal("Error details")) + }) + + It("handles missing fields", func() { + vehicleError.Tags = nil + vehicleError.Body = "" + + result := transformers.VehicleErrorToMap(vehicleError) + + Expect(result).To(HaveLen(4)) + Expect(result["Name"]).To(Equal("TestError")) + Expect(result["CreatedAt"]).To(BeNumerically("~", time.Now().Unix(), 1)) + Expect(result["Body"]).To(Equal("")) + Expect(result["Tags"]).To(BeEmpty()) + }) + }) +}) diff --git a/telemetry/record.go b/telemetry/record.go index 9099b3b..10dd1c7 100644 --- a/telemetry/record.go +++ b/telemetry/record.go @@ -7,6 +7,7 @@ import ( logrus "github.com/teslamotors/fleet-telemetry/logger" "github.com/teslamotors/fleet-telemetry/protos" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" diff --git a/telemetry/record_test.go b/telemetry/record_test.go index 3e5181c..ff1ee5b 100644 --- a/telemetry/record_test.go +++ b/telemetry/record_test.go @@ -346,6 +346,83 @@ func locationDatum(field protos.Field, location *protos.LocationValue) *protos.D } } +func intDatum(field protos.Field, value int32) *protos.Datum { + return &protos.Datum{ + Key: field, + Value: &protos.Value{ + Value: &protos.Value_IntValue{ + IntValue: value, + }, + }, + } +} + +func doubleDatum(field protos.Field, value float64) *protos.Datum { + return &protos.Datum{ + Key: field, + Value: &protos.Value{ + Value: &protos.Value_DoubleValue{ + DoubleValue: value, + }, + }, + } +} + +func boolDatum(field protos.Field, value bool) *protos.Datum { + return &protos.Datum{ + Key: field, + Value: &protos.Value{ + Value: &protos.Value_BooleanValue{ + BooleanValue: value, + }, + }, + } +} + +func floatDatum(field protos.Field, value float32) *protos.Datum { + return &protos.Datum{ + Key: field, + Value: &protos.Value{ + Value: &protos.Value_FloatValue{ + FloatValue: value, + }, + }, + } +} + +func longDatum(field protos.Field, value int64) *protos.Datum { + return &protos.Datum{ + Key: field, + Value: &protos.Value{ + Value: &protos.Value_LongValue{ + LongValue: value, + }, + }, + } +} + +func chargingStateDatum(field protos.Field, value protos.ChargingState) *protos.Datum { + return &protos.Datum{ + Key: field, + Value: &protos.Value{ + Value: &protos.Value_ChargingValue{ + ChargingValue: value, + }, + }, + } +} + +func shiftStateDatum(field protos.Field, value protos.ShiftState) *protos.Datum { + return &protos.Datum{ + Key: field, + Value: &protos.Value{ + Value: &protos.Value_ShiftStateValue{ + ShiftStateValue: value, + }, + }, + } +} + // clone creates a "clean" clone of the given proto.LocationValue so we can use DeepEqual freely. func clone(o *protos.LocationValue) *protos.LocationValue { if o == nil { From b0116cb39efca097ac81d662c8266366f6447f93 Mon Sep 17 00:00:00 2001 From: agbpatro Date: Wed, 28 Aug 2024 10:02:07 -0700 Subject: [PATCH 5/6] Now that we are no longer using json marshalling and unmarshalling, it makes sense to update logging names (#204) --- datastore/simple/logger.go | 5 ++--- datastore/simple/logger_test.go | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/datastore/simple/logger.go b/datastore/simple/logger.go index 0408dd5..263301b 100644 --- a/datastore/simple/logger.go +++ b/datastore/simple/logger.go @@ -33,15 +33,14 @@ func (p *ProtoLogger) ProcessReliableAck(entry *telemetry.Record) { func (p *ProtoLogger) Produce(entry *telemetry.Record) { data, err := p.recordToLogMap(entry) if err != nil { - p.logger.ErrorLog("json_unmarshal_error", err, logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata()}) + p.logger.ErrorLog("record_logging_error", err, logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata()}) return } - p.logger.ActivityLog("logger_json_unmarshal", logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata(), "data": data}) + p.logger.ActivityLog("record_payload", logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata(), "data": data}) } // ReportError noop method func (p *ProtoLogger) ReportError(message string, err error, logInfo logrus.LogInfo) { - return } // recordToLogMap converts the data of a record to a map or slice of maps diff --git a/datastore/simple/logger_test.go b/datastore/simple/logger_test.go index 23fc5a6..87ddef4 100644 --- a/datastore/simple/logger_test.go +++ b/datastore/simple/logger_test.go @@ -82,7 +82,7 @@ var _ = Describe("ProtoLogger", func() { protoLogger.Produce(record) lastLog := hook.LastEntry() - Expect(lastLog.Message).To(Equal("logger_json_unmarshal")) + Expect(lastLog.Message).To(Equal("record_payload")) Expect(lastLog.Data).To(HaveKeyWithValue("vin", "TEST123")) Expect(lastLog.Data).To(HaveKey("data")) @@ -101,7 +101,7 @@ var _ = Describe("ProtoLogger", func() { protoLogger.Produce(record) lastLog := hook.LastEntry() - Expect(lastLog.Message).To(Equal("json_unmarshal_error")) + Expect(lastLog.Message).To(Equal("record_logging_error")) Expect(lastLog.Data).To(HaveKeyWithValue("vin", "TEST123")) Expect(lastLog.Data).To(HaveKey("metadata")) }) From c3daf762faee7905946a4b4da27fa4ff6034765a Mon Sep 17 00:00:00 2001 From: agbpatro Date: Fri, 6 Sep 2024 11:51:32 -0700 Subject: [PATCH 6/6] Remove unused param (#209) --- server/monitoring/profile_server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/monitoring/profile_server.go b/server/monitoring/profile_server.go index ccbc5b2..88ad794 100644 --- a/server/monitoring/profile_server.go +++ b/server/monitoring/profile_server.go @@ -40,7 +40,7 @@ func (p *profileServer) liveProfiler(config *config.Config) func(w http.Response } // gcStats display GC stats -func (p *profileServer) gcStats(config *config.Config) func(w http.ResponseWriter, r *http.Request) { +func (p *profileServer) gcStats() func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { stats := &debug.GCStats{} debug.ReadGCStats(stats) @@ -59,7 +59,7 @@ func (p *profileServer) gcStats(config *config.Config) func(w http.ResponseWrite // StartProfilerServer initializes the profiler on http func StartProfilerServer(config *config.Config, mux *http.ServeMux, logger *logrus.Logger) { profileServer := &profileServer{} - mux.HandleFunc("/gc_stats", profileServer.gcStats(config)) + mux.HandleFunc("/gc_stats", profileServer.gcStats()) mux.HandleFunc("/live_profiler", profileServer.liveProfiler(config)) logger.ActivityLog("profiler_started", logrus.LogInfo{"port": config.Monitoring.ProfilerPort})