Skip to content

Commit

Permalink
Merge pull request #76 from totegamma/feat/refactor-timeline-algorithm
Browse files Browse the repository at this point in the history
Feat/refactor timeline algorithm
  • Loading branch information
totegamma authored Sep 6, 2024
2 parents 3c3f1c3 + b1cd50b commit 6095dfa
Show file tree
Hide file tree
Showing 26 changed files with 2,825 additions and 791 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ jobs:
type=raw,value=latest,enable=${{ !contains(env.TAG, 'beta') }}
type=semver,pattern={{raw}}
- name: Set platforms based on tag
id: set-platforms
run: |
if [[ "${{ env.TAG }}" == *"beta"* ]]; then
echo "PLATFORMS=linux/amd64" >> $GITHUB_ENV
else
echo "PLATFORMS=linux/amd64,linux/arm64,linux/arm64/v8" >> $GITHUB_ENV
fi
# Build and push Docker image with Buildx (don't push on PR)
# https://github.com/docker/build-push-action
- name: Build and push Docker image
Expand All @@ -84,7 +93,7 @@ jobs:
context: .
file: ${{ matrix.dockerfile }}
push: ${{ github.event_name != 'pull_request' }}
platforms: linux/amd64,linux/arm64,linux/arm64/v8
platforms: ${{ env.PLATFORMS }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
Expand Down
99 changes: 91 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Client interface {
GetChunks(ctx context.Context, domain string, timelines []string, queryTime time.Time, opts *Options) (map[string]core.Chunk, error)
GetKey(ctx context.Context, domain, id string, opts *Options) ([]core.Key, error)
GetDomain(ctx context.Context, domain string, opts *Options) (core.Domain, error)
GetChunkItrs(ctx context.Context, domain string, timelines []string, epoch string, opts *Options) (map[string]string, error)
GetChunkBodies(ctx context.Context, domain string, query map[string]string, opts *Options) (map[string]core.Chunk, error)
}

type client struct {
Expand Down Expand Up @@ -215,7 +217,10 @@ func (c *client) GetEntity(ctx context.Context, domain, address string, opts *Op
return core.Entity{}, fmt.Errorf("Domain is offline")
}

response, err := httpRequest[core.Entity](ctx, &c.client, "GET", "https://"+domain+"/api/v1/entity/"+address, "", opts)
url := "https://" + domain + "/api/v1/entity/" + address
span.SetAttributes(attribute.String("url", url))

response, err := httpRequest[core.Entity](ctx, &c.client, "GET", url, "", opts)
if err != nil {
span.RecordError(err)

Expand All @@ -238,7 +243,10 @@ func (c *client) GetMessage(ctx context.Context, domain, id string, opts *Option

}

response, err := httpRequest[core.Message](ctx, &c.client, "GET", "https://"+domain+"/api/v1/message/"+id, "", opts)
url := "https://" + domain + "/api/v1/message/" + id
span.SetAttributes(attribute.String("url", url))

response, err := httpRequest[core.Message](ctx, &c.client, "GET", url, "", opts)
if err != nil {
span.RecordError(err)

Expand All @@ -260,7 +268,10 @@ func (c *client) GetAssociation(ctx context.Context, domain, id string, opts *Op
return core.Association{}, fmt.Errorf("Domain is offline")
}

response, err := httpRequest[core.Association](ctx, &c.client, "GET", "https://"+domain+"/api/v1/association/"+id, "", opts)
url := "https://" + domain + "/api/v1/association/" + id
span.SetAttributes(attribute.String("url", url))

response, err := httpRequest[core.Association](ctx, &c.client, "GET", url, "", opts)
if err != nil {
span.RecordError(err)

Expand All @@ -282,7 +293,10 @@ func (c *client) GetProfile(ctx context.Context, domain, id string, opts *Option
return core.Profile{}, fmt.Errorf("Domain is offline")
}

response, err := httpRequest[core.Profile](ctx, &c.client, "GET", "https://"+domain+"/api/v1/profile/"+id, "", opts)
url := "https://" + domain + "/api/v1/profile/" + id
span.SetAttributes(attribute.String("url", url))

response, err := httpRequest[core.Profile](ctx, &c.client, "GET", url, "", opts)
if err != nil {
span.RecordError(err)

Expand All @@ -304,7 +318,10 @@ func (c *client) GetTimeline(ctx context.Context, domain, id string, opts *Optio
return core.Timeline{}, fmt.Errorf("Domain is offline")
}

response, err := httpRequest[core.Timeline](ctx, &c.client, "GET", "https://"+domain+"/api/v1/timeline/"+id, "", opts)
url := "https://" + domain + "/api/v1/timeline/" + id
span.SetAttributes(attribute.String("url", url))

response, err := httpRequest[core.Timeline](ctx, &c.client, "GET", url, "", opts)
if err != nil {
span.RecordError(err)

Expand All @@ -329,7 +346,67 @@ func (c *client) GetChunks(ctx context.Context, domain string, timelines []strin
timelinesStr := strings.Join(timelines, ",")
timeStr := fmt.Sprintf("%d", queryTime.Unix())

response, err := httpRequest[map[string]core.Chunk](ctx, &c.client, "GET", "https://"+domain+"/api/v1/timelines/chunks?timelines="+timelinesStr+"&time="+timeStr, "", opts)
url := "https://" + domain + "/api/v1/timelines/chunks?timelines=" + timelinesStr + "&time=" + timeStr
span.SetAttributes(attribute.String("url", url))

response, err := httpRequest[map[string]core.Chunk](ctx, &c.client, "GET", url, "", opts)
if err != nil {
span.RecordError(err)

if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.lastFailed[domain] = time.Now()
}

return nil, err
}

return *response, nil
}

func (c *client) GetChunkItrs(ctx context.Context, domain string, timelines []string, epoch string, opts *Options) (map[string]string, error) {
ctx, span := tracer.Start(ctx, "Client.GetChunkItrs")
defer span.End()

if !c.IsOnline(domain) {
return nil, fmt.Errorf("Domain is offline")
}

timelinesStr := strings.Join(timelines, ",")

url := "https://" + domain + "/api/v1/chunks/itr?timelines=" + timelinesStr + "&epoch=" + epoch
span.SetAttributes(attribute.String("url", url))

response, err := httpRequest[map[string]string](ctx, &c.client, "GET", url, "", opts)
if err != nil {
span.RecordError(err)

if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
c.lastFailed[domain] = time.Now()
}

return nil, err
}

return *response, nil
}

func (c *client) GetChunkBodies(ctx context.Context, domain string, query map[string]string, opts *Options) (map[string]core.Chunk, error) {
ctx, span := tracer.Start(ctx, "Client.GetChunkBodies")
defer span.End()

if !c.IsOnline(domain) {
return nil, fmt.Errorf("Domain is offline")
}

queries := []string{}
for key, value := range query {
queries = append(queries, key+":"+value)
}

url := "https://" + domain + "/api/v1/chunks/body?query=" + strings.Join(queries, ",")
span.SetAttributes(attribute.String("url", url))

response, err := httpRequest[map[string]core.Chunk](ctx, &c.client, "GET", url, "", opts)
if err != nil {
span.RecordError(err)

Expand All @@ -351,7 +428,10 @@ func (c *client) GetKey(ctx context.Context, domain, id string, opts *Options) (
return nil, fmt.Errorf("Domain is offline")
}

response, err := httpRequest[[]core.Key](ctx, &c.client, "GET", "https://"+domain+"/api/v1/key/"+id, "", opts)
url := "https://" + domain + "/api/v1/key/" + id
span.SetAttributes(attribute.String("url", url))

response, err := httpRequest[[]core.Key](ctx, &c.client, "GET", url, "", opts)
if err != nil {
span.RecordError(err)

Expand All @@ -373,7 +453,10 @@ func (c *client) GetDomain(ctx context.Context, domain string, opts *Options) (c
return core.Domain{}, fmt.Errorf("Domain is offline")
}

response, err := httpRequest[core.Domain](ctx, &c.client, "GET", "https://"+domain+"/api/v1/domain", "", opts)
url := "https://" + domain + "/api/v1/domain"
span.SetAttributes(attribute.String("url", url))

response, err := httpRequest[core.Domain](ctx, &c.client, "GET", url, "", opts)
if err != nil {
span.RecordError(err)

Expand Down
30 changes: 30 additions & 0 deletions client/mock/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 8 additions & 15 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,28 +218,29 @@ func main() {
defer mc.Close()

client := client.NewClient()
timelineKeeper := timeline.NewKeeper(rdb, mc, client, conconf)

globalPolicy := concurrent.GetDefaultGlobalPolicy()

policy := concurrent.SetupPolicyService(rdb, globalPolicy, conconf)
agent := concurrent.SetupAgent(db, rdb, mc, client, policy, conconf, config.Server.RepositoryPath)
agent := concurrent.SetupAgent(db, rdb, mc, timelineKeeper, client, policy, conconf, config.Server.RepositoryPath)

domainService := concurrent.SetupDomainService(db, client, conconf)
domainHandler := domain.NewHandler(domainService)

userKvService := concurrent.SetupUserkvService(db)
userkvHandler := userkv.NewHandler(userKvService)

messageService := concurrent.SetupMessageService(db, rdb, mc, client, policy, conconf)
messageService := concurrent.SetupMessageService(db, rdb, mc, timelineKeeper, client, policy, conconf)
messageHandler := message.NewHandler(messageService)

associationService := concurrent.SetupAssociationService(db, rdb, mc, client, policy, conconf)
associationService := concurrent.SetupAssociationService(db, rdb, mc, timelineKeeper, client, policy, conconf)
associationHandler := association.NewHandler(associationService)

profileService := concurrent.SetupProfileService(db, rdb, mc, client, policy, conconf)
profileHandler := profile.NewHandler(profileService)

timelineService := concurrent.SetupTimelineService(db, rdb, mc, client, policy, conconf)
timelineService := concurrent.SetupTimelineService(db, rdb, mc, timelineKeeper, client, policy, conconf)
timelineHandler := timeline.NewHandler(timelineService)

entityService := concurrent.SetupEntityService(db, rdb, mc, client, policy, conconf)
Expand All @@ -254,7 +255,7 @@ func main() {
ackService := concurrent.SetupAckService(db, rdb, mc, client, policy, conconf)
ackHandler := ack.NewHandler(ackService)

storeService := concurrent.SetupStoreService(db, rdb, mc, client, policy, conconf, config.Server.RepositoryPath)
storeService := concurrent.SetupStoreService(db, rdb, mc, timelineKeeper, client, policy, conconf, config.Server.RepositoryPath)
storeHandler := store.NewHandler(storeService)

subscriptionService := concurrent.SetupSubscriptionService(db, rdb, mc, client, policy, conconf)
Expand Down Expand Up @@ -383,14 +384,6 @@ func main() {
)
prometheus.MustRegister(resourceCountMetrics)

var timelineRealtimeConnectionMetrics = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "cc_timeline_realtime_connections",
Help: "timeline realtime connections",
},
)
prometheus.MustRegister(timelineRealtimeConnectionMetrics)

go func() {
for {
time.Sleep(15 * time.Second)
Expand Down Expand Up @@ -440,13 +433,13 @@ func main() {
}
resourceCountMetrics.WithLabelValues("timeline").Set(float64(count))

count = timelineService.CurrentRealtimeConnectionCount()
timelineRealtimeConnectionMetrics.Set(float64(count))
timelineService.UpdateMetrics()
}
}()

e.GET("/metrics", echoprometheus.NewHandler())

timelineKeeper.Start(context.Background())
agent.Boot()

port := ":8000"
Expand Down
4 changes: 2 additions & 2 deletions core/dbschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ type Timeline struct {
// immutable
type TimelineItem struct {
ResourceID string `json:"resourceID" gorm:"primaryKey;type:char(27);"`
TimelineID string `json:"timelineID" gorm:"primaryKey;type:char(26);"`
TimelineID string `json:"timelineID" gorm:"primaryKey;type:char(26);index:idx_timeline_id_c_date"`
Owner string `json:"owner" gorm:"type:char(42);"`
Author *string `json:"author,omitempty" gorm:"type:char(42);"`
SchemaID uint `json:"-"`
Schema string `json:"schema,omitempty" gorm:"-"`
CDate time.Time `json:"cdate,omitempty" gorm:"->;<-:create;type:timestamp with time zone;not null;default:clock_timestamp()"`
CDate time.Time `json:"cdate,omitempty" gorm:"->;<-:create;type:timestamp with time zone;not null;default:clock_timestamp();index:idx_timeline_id_c_date"`
}

type Ack struct {
Expand Down
23 changes: 21 additions & 2 deletions core/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,40 @@ import (
"time"
)

const (
chunkLength = 600
)

func Time2Chunk(t time.Time) string {
// chunk by 10 minutes
return fmt.Sprintf("%d", (t.Unix()/600)*600)
return fmt.Sprintf("%d", (t.Unix()/chunkLength)*chunkLength)
}

func NextChunk(chunk string) string {
i, _ := strconv.ParseInt(chunk, 10, 64)
return fmt.Sprintf("%d", i+chunkLength)
}

func PrevChunk(chunk string) string {
i, _ := strconv.ParseInt(chunk, 10, 64)
return fmt.Sprintf("%d", i-chunkLength)
}

func Chunk2RecentTime(chunk string) time.Time {
i, _ := strconv.ParseInt(chunk, 10, 64)
return time.Unix(i+600, 0)
return time.Unix(i+chunkLength, 0)
}

func Chunk2ImmediateTime(chunk string) time.Time {
i, _ := strconv.ParseInt(chunk, 10, 64)
return time.Unix(i, 0)
}

func EpochTime(epoch string) time.Time {
i, _ := strconv.ParseInt(epoch, 10, 64)
return time.Unix(i, 0)
}

func TypedIDToType(id string) string {
if len(id) != 27 {
return ""
Expand Down
6 changes: 3 additions & 3 deletions core/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,18 @@ type TimelineService interface {
ListTimelineBySchema(ctx context.Context, schema string) ([]Timeline, error)
ListTimelineByAuthor(ctx context.Context, author string) ([]Timeline, error)

GetChunks(ctx context.Context, timelines []string, pivot time.Time) (map[string]Chunk, error)
GetChunksFromRemote(ctx context.Context, host string, timelines []string, pivot time.Time) (map[string]Chunk, error)
GetChunks(ctx context.Context, timelines []string, epoch string) (map[string]Chunk, error)

ListTimelineSubscriptions(ctx context.Context) (map[string]int64, error)
Count(ctx context.Context) (int64, error)
CurrentRealtimeConnectionCount() int64
NormalizeTimelineID(ctx context.Context, timeline string) (string, error)
GetOwners(ctx context.Context, timelines []string) ([]string, error)

Query(ctx context.Context, timelineID, schema, owner, author string, until time.Time, limit int) ([]TimelineItem, error)

Realtime(ctx context.Context, request <-chan []string, response chan<- Event)

UpdateMetrics()
}

type JobService interface {
Expand Down
Loading

0 comments on commit 6095dfa

Please sign in to comment.