Skip to content

Commit

Permalink
Merge pull request #70 from thehubbleproject/add-sync
Browse files Browse the repository at this point in the history
Revive sync
  • Loading branch information
vaibhavchellani authored Nov 11, 2020
2 parents f51eab3 + 66af639 commit 5a9dc97
Show file tree
Hide file tree
Showing 13 changed files with 499 additions and 1,603 deletions.
91 changes: 17 additions & 74 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,17 @@ package aggregator

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/BOPR/common"
"github.com/BOPR/config"
"github.com/BOPR/core"
"github.com/BOPR/wallet"
"github.com/kilic/bn254/bls"
)

const (
AggregatingService = "aggregator"
COMMITMENT_SIZE = 1
)

var (
ErrNoTxsFound = errors.New("no tx found")
)

// Aggregator is the service which is supposed to create batches
Expand Down Expand Up @@ -92,6 +84,12 @@ func (a *Aggregator) startAggregating(ctx context.Context, interval time.Duratio
for {
select {
case <-ticker.C:
if isCatchingUp, err := core.IsCatchingUp(); err != nil {
return
} else if isCatchingUp {
a.Logger.Info("Commander catching up, aborting aggregation till next poll")
return
}
a.wg.Wait()
a.wg.Add(1)
go a.pickBatch()
Expand All @@ -108,90 +106,35 @@ func (a *Aggregator) pickBatch() {
if err != nil {
fmt.Println("Error while popping txs from mempool", "Error", err)
}
a.ProcessAndSubmitBatch(txs)
a.processAndSubmitBatch(txs)
}

func (a *Aggregator) ProcessAndSubmitBatch(txs []core.Tx) {
func (a *Aggregator) processAndSubmitBatch(txs []core.Tx) {
a.Logger.Info("Processing new batch", "numberOfTxs", len(txs))

// Step-2
commitments, err := a.ProcessTx(txs)
commitments, err := a.processTxs(txs)
if err != nil {
fmt.Println("Error while processing tx", "error", err)
return
}

// Step-3
// Submit all commitments on-chain
a.LoadedBazooka.SubmitBatch(commitments)
txHash, err := a.LoadedBazooka.SubmitBatch(commitments)
if err != nil {
fmt.Println("Error while submitting batch", "error", err)
return
}
}

// ProcessTx fetches all the data required to validate tx from smart contact
// and calls the proccess tx function to return the updated balance root and accounts
func (a *Aggregator) ProcessTx(txs []core.Tx) (commitments []core.Commitment, err error) {
if len(txs) == 0 {
return commitments, ErrNoTxsFound
}
for i, tx := range txs {
a.Logger.Info("Processing transaction", "txNumber", i, "of", len(txs))

rootAcc, err := a.DB.GetRoot()
if err != nil {
return commitments, err
}
a.Logger.Debug("Latest root", "root", rootAcc.Hash)
currentRoot, err := core.HexToByteArray(rootAcc.Hash)
if err != nil {
return commitments, err
}
fromStateProof, toStateProof, txDBConn, err := tx.GetVerificationData()
if err != nil {
a.Logger.Error("Unable to create verification data", "error", err)
return commitments, err
}

newRoot, err := a.LoadedBazooka.ProcessTx(currentRoot, tx, fromStateProof, toStateProof)
if err != nil {
a.Logger.Error("Error processing tx", "tx", tx.String(), "error", err)
if txDBConn.Instance != nil {
txDBConn.Instance.Rollback()
txDBConn.Close()
}
return commitments, err
}
if txDBConn.Instance != nil {
txDBConn.Instance.Commit()
txDBConn.Close()
}

if i%COMMITMENT_SIZE == 0 {
txInCommitment := txs[i : i+COMMITMENT_SIZE]
a.Logger.Info("Preparing a commitment", "NumOfTxs", len(txInCommitment), "type", txs[0].Type, "totalCommitmentsYet", len(commitments))
aggregatedSig, err := aggregateSignatures(txInCommitment)
if err != nil {
return commitments, err
}
commitment := core.Commitment{Txs: txInCommitment, UpdatedRoot: newRoot, BatchType: tx.Type, AggregatedSignature: aggregatedSig.ToBytes()}
commitments = append(commitments, commitment)
}
currentRoot = newRoot
lastCommitment := commitments[len(commitments)-1]
newBatch := core.NewBatch(lastCommitment.UpdatedRoot.String(), config.GlobalCfg.OperatorAddress, txHash, lastCommitment.BatchType, core.BATCH_BROADCASTED)
err = a.DB.AddNewBatch(newBatch)
if err != nil {
return
}
return commitments, nil
}

// generates aggregated signature for commitment
func aggregateSignatures(txs []core.Tx) (aggregatedSig bls.Signature, err error) {
var signatures []*bls.Signature
for _, tx := range txs {
sig, err := wallet.BytesToSignature(tx.Signature)
if err != nil {
return aggregatedSig, err
}
signatures = append(signatures, &sig)
}
return wallet.NewAggregateSignature(signatures)
func (a *Aggregator) processTxs(txs []core.Tx) (commitments []core.Commitment, err error) {
return core.ProcessTxs(a.DB, a.LoadedBazooka, txs, false)
}
3 changes: 1 addition & 2 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ func startCmd() *cobra.Command {
}

logger.Info("Starting coordinator with sync and aggregator enabled", "lastSyncedEthBlock",
syncStatus.LastEthBlockBigInt().String(),
"lastSyncedBatch", syncStatus.LastBatchRecorded)
syncStatus.LastEthBlockBigInt().String())

// go routine to catch signal
catchSignal := make(chan os.Signal, 1)
Expand Down
1 change: 1 addition & 0 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func dummyTransfer() *cobra.Command {
}

fmt.Println("Transaction sent!", "Hash", txHash)

return nil
},
}
Expand Down
Loading

0 comments on commit 5a9dc97

Please sign in to comment.