Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the initial raw decoding #168

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 0 additions & 54 deletions deviceList.go

This file was deleted.

79 changes: 79 additions & 0 deletions device_tracker/device_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package device_tracker

import (
"context"
"time"

"github.com/jellydator/ttlcache/v3"
)

type DeviceTracker interface {
// UpdateDevice location updates a device location. If any of deviceId, lat,
// and lng are their zero values, no update will occur.
UpdateDeviceLocation(deviceId string, lat, lon float64, scanContext string)
// GetAllDevices returns info about all devices.
GetAllDevices() map[string]ApiDeviceLocation
// Run runs the automatic cleanup process, blocking until `ctx` is cancelled.
Run(ctx context.Context)
}

type ApiDeviceLocation struct {
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
LastUpdate int64 `json:"last_update"`
ScanContext string `json:"scan_context"`
}

type DeviceLocation struct {
Latitude float64
Longitude float64
LastUpdate int64
ScanContext string
}

type deviceTracker struct {
maxDeviceTTL time.Duration
deviceLocation *ttlcache.Cache[string, DeviceLocation]
}

func (tracker *deviceTracker) UpdateDeviceLocation(deviceId string, lat, lon float64, scanContext string) {
if lat == 0 || lon == 0 || deviceId == "" {
return
}
tracker.deviceLocation.Set(deviceId, DeviceLocation{
Latitude: lat,
Longitude: lon,
LastUpdate: time.Now().Unix(),
ScanContext: scanContext,
}, tracker.maxDeviceTTL)
}

func (tracker *deviceTracker) GetAllDevices() map[string]ApiDeviceLocation {
locations := map[string]ApiDeviceLocation{}
for _, key := range tracker.deviceLocation.Items() {
deviceLocation := key.Value()
locations[key.Key()] = ApiDeviceLocation(deviceLocation)
}
return locations
}

func (tracker *deviceTracker) Run(ctx context.Context) {
ctx, cancel_fn := context.WithCancel(ctx)
defer cancel_fn()
go func() {
defer tracker.deviceLocation.Stop()
<-ctx.Done()
}()
tracker.deviceLocation.Start()
}

func NewDeviceTracker(maxDeviceTTLHours int) DeviceTracker {
maxDeviceTTL := time.Hour * time.Duration(maxDeviceTTLHours)
tracker := &deviceTracker{
maxDeviceTTL: maxDeviceTTL,
deviceLocation: ttlcache.New[string, DeviceLocation](
ttlcache.WithTTL[string, DeviceLocation](maxDeviceTTL),
),
}
return tracker
}
56 changes: 4 additions & 52 deletions grpc_server_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"golbat/config"
pb "golbat/grpc"

_ "google.golang.org/grpc/encoding/gzip" // Install the gzip compressor
"google.golang.org/grpc/metadata"
"time"
)

// server is used to implement helloworld.GreeterServer.
Expand All @@ -24,59 +24,11 @@ func (s *grpcRawServer) SubmitRawProto(ctx context.Context, in *pb.RawProtoReque
}
}

uuid := in.DeviceId
account := in.Username
level := int(in.TrainerLevel)
scanContext := ""
if in.ScanContext != nil {
scanContext = *in.ScanContext
}

latTarget, lonTarget := float64(in.LatTarget), float64(in.LonTarget)
globalHaveAr := in.HaveAr
var protoData []ProtoData

for _, v := range in.Contents {

inboundRawData := ProtoData{
Method: int(v.Method),
Account: account,
Level: level,
ScanContext: scanContext,
Lat: latTarget,
Lon: lonTarget,
Data: v.ResponsePayload,
Request: v.RequestPayload,
Uuid: uuid,
HaveAr: func() *bool {
if v.HaveAr != nil {
return v.HaveAr
}
return globalHaveAr
}(),
}

protoData = append(protoData, inboundRawData)
}

protoData := rawProtoDecoder.GetProtoDataFromGRPC(in)
// Process each proto in a packet in sequence, but in a go-routine
go func() {
timeout := 5 * time.Second
if config.Config.Tuning.ExtendedTimeout {
timeout = 30 * time.Second
}
go rawProtoDecoder.Decode(context.Background(), protoData, decode)

for _, entry := range protoData {
// provide independent cancellation contexts for each proto decode
ctx, cancel := context.WithTimeout(context.Background(), timeout)
decode(ctx, entry.Method, &entry)
cancel()
}
}()

if latTarget != 0 && lonTarget != 0 && uuid != "" {
UpdateDeviceLocation(uuid, latTarget, lonTarget, scanContext)
}
deviceTracker.UpdateDeviceLocation(protoData.Uuid, protoData.Lat(), protoData.Lon(), protoData.ScanContext)

return &pb.RawProtoResponse{Message: "Processed"}, nil
}
Loading