Skip to content

Commit

Permalink
maybe better compare streams command
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Nov 20, 2024
1 parent f229dc0 commit 0976265
Showing 1 changed file with 93 additions and 0 deletions.
93 changes: 93 additions & 0 deletions cmd/gosky/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var debugCmd = &cli.Command{
compareStreamsCmd,
debugGetRepoCmd,
debugCompareReposCmd,
compareStreams2Cmd,
},
}

Expand Down Expand Up @@ -293,6 +294,98 @@ var debugStreamCmd = &cli.Command{
},
}

var compareStreams2Cmd = &cli.Command{
Name: "compare-streams2",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "host1",
Required: true,
},
&cli.StringFlag{
Name: "host2",
Required: true,
},
},
ArgsUsage: `<cursor>`,
Action: func(cctx *cli.Context) error {
h1 := cctx.String("host1")
h2 := cctx.String("host2")

url1 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h1)
url2 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h2)

d := websocket.DefaultDialer

var buflk sync.Mutex
buffers := []map[string]string{
make(map[string]string),
make(map[string]string),
}

// Create two goroutines for reading events from two URLs
for i, url := range []string{url1, url2} {
go func(i int, url string) {

oi := (i + 1) % 2
con, _, err := d.Dial(url, http.Header{})
if err != nil {
log.Fatalf("Dial failure on url%d: %s", i+1, err)
}

ctx := context.TODO()
rsc := &events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
buflk.Lock()
if buffers[oi][evt.Repo] == evt.Rev {
delete(buffers[oi], evt.Repo)
delete(buffers[i], evt.Repo)
} else {
buffers[i][evt.Repo] = evt.Rev
}
buflk.Unlock()
return nil
},
// TODO: all the other Repo* event types
Error: func(evt *events.ErrorFrame) error {
return fmt.Errorf("%s: %s", evt.Error, evt.Message)
},
}
seqScheduler := sequential.NewScheduler(fmt.Sprintf("debug-stream-%d", i+1), rsc.EventHandler)
if err := events.HandleRepoStream(ctx, con, seqScheduler); err != nil {
log.Fatalf("HandleRepoStream failure on url%d: %s", i+1, err)
}
}(i, url)
}

ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)

// Compare events from the two URLs
for {
select {
case <-time.Tick(time.Second / 2):
buflk.Lock()
a := len(buffers[0])
b := len(buffers[1])
buflk.Unlock()

fmt.Println(a, b)
case <-ch:
//printDetailedDelta()
/*
b, err := json.Marshal(buffers)
if err != nil {
return err
}
fmt.Println(string(b))
*/
return nil
}
}
},
}

var compareStreamsCmd = &cli.Command{
Name: "compare-streams",
Flags: []cli.Flag{
Expand Down

0 comments on commit 0976265

Please sign in to comment.