diff --git a/cmd/gosky/debug.go b/cmd/gosky/debug.go index c29ca404d..b4dd1f597 100644 --- a/cmd/gosky/debug.go +++ b/cmd/gosky/debug.go @@ -48,6 +48,7 @@ var debugCmd = &cli.Command{ compareStreamsCmd, debugGetRepoCmd, debugCompareReposCmd, + compareStreams2Cmd, }, } @@ -293,6 +294,98 @@ var debugStreamCmd = &cli.Command{ }, } +var compareStreams2Cmd = &cli.Command{ + Name: "compare-streams2", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "host1", + Required: true, + }, + &cli.StringFlag{ + Name: "host2", + Required: true, + }, + }, + ArgsUsage: ``, + Action: func(cctx *cli.Context) error { + h1 := cctx.String("host1") + h2 := cctx.String("host2") + + url1 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h1) + url2 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h2) + + d := websocket.DefaultDialer + + var buflk sync.Mutex + buffers := []map[string]string{ + make(map[string]string), + make(map[string]string), + } + + // Create two goroutines for reading events from two URLs + for i, url := range []string{url1, url2} { + go func(i int, url string) { + + oi := (i + 1) % 2 + con, _, err := d.Dial(url, http.Header{}) + if err != nil { + log.Fatalf("Dial failure on url%d: %s", i+1, err) + } + + ctx := context.TODO() + rsc := &events.RepoStreamCallbacks{ + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { + buflk.Lock() + if buffers[oi][evt.Repo] == evt.Rev { + delete(buffers[oi], evt.Repo) + delete(buffers[i], evt.Repo) + } else { + buffers[i][evt.Repo] = evt.Rev + } + buflk.Unlock() + return nil + }, + // TODO: all the other Repo* event types + Error: func(evt *events.ErrorFrame) error { + return fmt.Errorf("%s: %s", evt.Error, evt.Message) + }, + } + seqScheduler := sequential.NewScheduler(fmt.Sprintf("debug-stream-%d", i+1), rsc.EventHandler) + if err := events.HandleRepoStream(ctx, con, seqScheduler); err != nil { + log.Fatalf("HandleRepoStream failure on url%d: %s", i+1, err) + } + }(i, url) + } + + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) + + // Compare events from the two URLs + for { + select { + case <-time.Tick(time.Second / 2): + buflk.Lock() + a := len(buffers[0]) + b := len(buffers[1]) + buflk.Unlock() + + fmt.Println(a, b) + case <-ch: + //printDetailedDelta() + /* + b, err := json.Marshal(buffers) + if err != nil { + return err + } + + fmt.Println(string(b)) + */ + return nil + } + } + }, +} + var compareStreamsCmd = &cli.Command{ Name: "compare-streams", Flags: []cli.Flag{