From 95367157bc89da0590d318274f134e315be2ae29 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Tue, 8 Oct 2024 19:26:55 +0000 Subject: [PATCH 1/4] Delete jobs CLI --- cmd.go | 57 +++++++++++++++++++++++++++++- indexer/db.go | 67 +++++++++++++++++++++++++++++++----- synchronizer/synchronizer.go | 8 +++-- 3 files changed, 121 insertions(+), 11 deletions(-) diff --git a/cmd.go b/cmd.go index c658762..469532b 100644 --- a/cmd.go +++ b/cmd.go @@ -856,15 +856,70 @@ func CreateDatabaseOperationCommand() *cobra.Command { }, } - createJobsCommand.Flags().StringVar(&jobChain, "jobChain", "ethereum", "The blockchain to crawl (default: ethereum)") + createJobsCommand.Flags().StringVar(&jobChain, "chain", "ethereum", "The blockchain to crawl (default: ethereum)") createJobsCommand.Flags().StringVar(&address, "address", "", "The address to create jobs for") createJobsCommand.Flags().StringVar(&abiFile, "abi-file", "", "The path to the ABI file") createJobsCommand.Flags().StringVar(&customerId, "customer-id", "", "The customer ID to create jobs for (default: '')") createJobsCommand.Flags().StringVar(&userId, "user-id", "00000000-0000-0000-0000-000000000000", "The user ID to create jobs for (default: '00000000-0000-0000-0000-000000000000')") createJobsCommand.Flags().Uint64Var(&deployBlock, "deploy-block", 0, "The block number to deploy contract (default: 0)") + var jobIds, jobAddresses, jobCustomerIds []string + + deleteJobsCommand := &cobra.Command{ + Use: "delete-jobs", + Short: "Delete existing jobs", + PreRunE: func(cmd *cobra.Command, args []string) error { + indexerErr := indexer.CheckVariablesForIndexer() + if indexerErr != nil { + return indexerErr + } + + indexer.InitDBConnection() + + blockchainErr := seer_blockchain.CheckVariablesForBlockchains() + if blockchainErr != nil { + return blockchainErr + } + + return nil + }, + RunE: func(cmd *cobra.Command, args []string) error { + abiJobs, selectJobsErr := indexer.DBConnection.SelectAbiJobs(jobChain, jobAddresses, jobCustomerIds, false, false, []string{}) + if selectJobsErr != nil { + return fmt.Errorf("error selecting ABI jobs: %w", selectJobsErr) + } + + var jobIds []string + abiJobChains := make(map[string]int) + for _, abiJob := range abiJobs { + jobIds = append(jobIds, abiJob.ID) + if _, ok := abiJobChains[abiJob.Chain]; !ok { + abiJobChains[abiJob.Chain] = 0 + } + abiJobChains[abiJob.Chain]++ + } + fmt.Printf("Found %d total:\n", len(jobIds)) + for k, v := range abiJobChains { + fmt.Printf("- %s - %d jobs\n", k, v) + } + + deleteJobsErr := indexer.DBConnection.DeleteJobs(jobIds) + if deleteJobsErr != nil { + return deleteJobsErr + } + + return nil + }, + } + + deleteJobsCommand.Flags().StringVar(&jobChain, "chain", "", "The blockchain to crawl") + deleteJobsCommand.Flags().StringSliceVar(&jobIds, "job-ids", []string{}, "The list of job UUIDs separated by coma") + deleteJobsCommand.Flags().StringSliceVar(&jobAddresses, "addresses", []string{}, "The list of addresses created jobs for separated by coma") + deleteJobsCommand.Flags().StringSliceVar(&jobCustomerIds, "customer-ids", []string{}, "The list of customer IDs created jobs for separated by coma") + indexCommand.AddCommand(deploymentBlocksCommand) indexCommand.AddCommand(createJobsCommand) + indexCommand.AddCommand(deleteJobsCommand) databaseCmd.AddCommand(indexCommand) return databaseCmd diff --git a/indexer/db.go b/indexer/db.go index 9acdcc6..e6640cd 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -1131,12 +1131,12 @@ func (p *PostgreSQLpgx) UpdateAbiJobsStatus(blockchain string) error { return nil } -func (p *PostgreSQLpgx) SelectAbiJobs(blockchain string, addresses []string, customersIds []string, autoJobs bool) ([]CustomerUpdates, map[string]AbiJobsDeployInfo, error) { +func (p *PostgreSQLpgx) SelectAbiJobs(blockchain string, addresses []string, customersIds []string, autoJobs, isDeployBlockNotNull bool, abiTypes []string) ([]AbiJob, error) { pool := p.GetPool() conn, err := pool.Acquire(context.Background()) if err != nil { - return nil, nil, err + return nil, err } defer conn.Release() @@ -1144,16 +1144,32 @@ func (p *PostgreSQLpgx) SelectAbiJobs(blockchain string, addresses []string, cus queryArgs := make(pgx.NamedArgs) - queryArgs["chain"] = blockchain - queryBuilder.WriteString(` SELECT id, address, user_id, customer_id, abi_selector, chain, abi_name, status, historical_crawl_status, progress, moonworm_task_pickedup, '[' || abi || ']' as abi, (abi::jsonb)->>'type' AS abiType, created_at, updated_at, deployment_block_number FROM abi_jobs - WHERE chain = @chain AND ((abi::jsonb)->>'type' = 'function' or (abi::jsonb)->>'type' = 'event') and deployment_block_number is not null + WHERE true `) + if len(abiTypes) != 0 { + var abiConditions []string + for _, abiType := range abiTypes { + abiConditions = append(abiConditions, fmt.Sprintf("(abi::jsonb)->>'type' = '%s'", abiType)) + } + + queryBuilder.WriteString(fmt.Sprintf("AND (%s) ", strings.Join(abiConditions, " or "))) + } + + if isDeployBlockNotNull { + queryBuilder.WriteString(" AND deployment_block_number IS NOT null") + } + + if blockchain != "" { + queryBuilder.WriteString(" AND chain = @chain ") + queryArgs["chain"] = blockchain + } + if autoJobs { queryBuilder.WriteString(" AND historical_crawl_status != 'done' ") } @@ -1166,7 +1182,7 @@ func (p *PostgreSQLpgx) SelectAbiJobs(blockchain string, addresses []string, cus for i, address := range addresses { addressBytes, err := decodeAddress(address) if err != nil { - return nil, nil, err + return nil, err } addressesBytes[i] = addressBytes // Assign directly to the index } @@ -1182,15 +1198,19 @@ func (p *PostgreSQLpgx) SelectAbiJobs(blockchain string, addresses []string, cus rows, err := conn.Query(context.Background(), queryBuilder.String(), queryArgs) if err != nil { log.Println("Error querying ABI jobs from database", err) - return nil, nil, err + return nil, err } abiJobs, err := pgx.CollectRows(rows, pgx.RowToStructByName[AbiJob]) if err != nil { log.Println("Error collecting ABI jobs rows", err) - return nil, nil, err + return nil, err } + return abiJobs, nil +} + +func ConvertToCustomerUpdatedAndDeployBlockDicts(abiJobs []AbiJob) ([]CustomerUpdates, map[string]AbiJobsDeployInfo, error) { if len(abiJobs) == 0 { return []CustomerUpdates{}, map[string]AbiJobsDeployInfo{}, nil } @@ -1522,3 +1542,34 @@ func (p *PostgreSQLpgx) CreateJobsFromAbi(chain string, address string, abiFile return nil } + +func (p *PostgreSQLpgx) DeleteJobs(jobIds []string) error { + if len(jobIds) == 0 { + log.Println("Nothing to delete") + return nil + } + + pool := p.GetPool() + + conn, err := pool.Acquire(context.Background()) + if err != nil { + return err + } + defer conn.Release() + + var queryBuilder strings.Builder + queryBuilder.WriteString("DELETE FROM abi_jobs WHERE id = ANY(@jobIds)") + + queryArgs := make(pgx.NamedArgs) + queryArgs["jobIds"] = jobIds + + _, delErr := conn.Query(context.Background(), queryBuilder.String(), queryArgs) + if delErr != nil { + log.Printf("Error querying ABI jobs from database, err %v", delErr) + return delErr + } + + log.Printf("Deleted %d jobs", len(jobIds)) + + return nil +} diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 415536c..58b5483 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -430,9 +430,13 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s } // Retrieve customer updates and deployment blocks - customerUpdates, addressesAbisInfo, err := indexer.DBConnection.SelectAbiJobs(d.blockchain, addresses, customerIds, autoJobs) + abiJobs, selectJobsErr := indexer.DBConnection.SelectAbiJobs(d.blockchain, addresses, customerIds, autoJobs, true, []string{"function", "event"}) + if selectJobsErr != nil { + return fmt.Errorf("error selecting ABI jobs: %w", selectJobsErr) + } + customerUpdates, addressesAbisInfo, err := indexer.ConvertToCustomerUpdatedAndDeployBlockDicts(abiJobs) if err != nil { - return fmt.Errorf("error selecting ABI jobs: %w", err) + return fmt.Errorf("error parsing ABI jobs: %w", err) } fmt.Printf("Found %d customer updates\n", len(customerUpdates)) From df9c8ce8ffbfda50c40ecabadb770bc8896fce80 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Wed, 9 Oct 2024 12:14:48 +0000 Subject: [PATCH 2/4] Prompt for delete CLI --- cmd.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/cmd.go b/cmd.go index 469532b..bee1051 100644 --- a/cmd.go +++ b/cmd.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "context" "encoding/json" "errors" @@ -864,6 +865,7 @@ func CreateDatabaseOperationCommand() *cobra.Command { createJobsCommand.Flags().Uint64Var(&deployBlock, "deploy-block", 0, "The block number to deploy contract (default: 0)") var jobIds, jobAddresses, jobCustomerIds []string + var silentFlag bool deleteJobsCommand := &cobra.Command{ Use: "delete-jobs", @@ -903,6 +905,25 @@ func CreateDatabaseOperationCommand() *cobra.Command { fmt.Printf("- %s - %d jobs\n", k, v) } + output := "no" + if silentFlag { + output = "yes" + } else { + var promptErr error + output, promptErr = StringPrompt("Continue? (y/yes)") + if promptErr != nil { + return promptErr + } + } + + switch output { + case "y": + case "yes": + default: + fmt.Println("Canceled") + return nil + } + deleteJobsErr := indexer.DBConnection.DeleteJobs(jobIds) if deleteJobsErr != nil { return deleteJobsErr @@ -916,6 +937,7 @@ func CreateDatabaseOperationCommand() *cobra.Command { deleteJobsCommand.Flags().StringSliceVar(&jobIds, "job-ids", []string{}, "The list of job UUIDs separated by coma") deleteJobsCommand.Flags().StringSliceVar(&jobAddresses, "addresses", []string{}, "The list of addresses created jobs for separated by coma") deleteJobsCommand.Flags().StringSliceVar(&jobCustomerIds, "customer-ids", []string{}, "The list of customer IDs created jobs for separated by coma") + deleteJobsCommand.Flags().BoolVar(&silentFlag, "silent", false, "Set this flag to run command without prompt") indexCommand.AddCommand(deploymentBlocksCommand) indexCommand.AddCommand(createJobsCommand) @@ -1214,3 +1236,17 @@ func CreateEVMGenerateCommand() *cobra.Command { return evmGenerateCmd } + +func StringPrompt(label string) (string, error) { + var output string + r := bufio.NewReader(os.Stdin) + + fmt.Fprint(os.Stderr, label+" ") + var readErr error + output, readErr = r.ReadString('\n') + if readErr != nil { + return "", readErr + } + + return strings.TrimSpace(output), nil +} From 50210eb608d7ea6a7c4a74189346890f9609760b Mon Sep 17 00:00:00 2001 From: kompotkot Date: Wed, 9 Oct 2024 18:01:59 +0000 Subject: [PATCH 3/4] Copy job CLI --- cmd.go | 98 ++++++++++++++++++++++++++++++++++++---------- indexer/db.go | 72 ++++++++++++++++++++++++++++++++++ version/version.go | 2 +- 3 files changed, 151 insertions(+), 21 deletions(-) diff --git a/cmd.go b/cmd.go index bee1051..3e55b08 100644 --- a/cmd.go +++ b/cmd.go @@ -823,15 +823,14 @@ func CreateDatabaseOperationCommand() *cobra.Command { return blockchainErr } - return nil - }, - RunE: func(cmd *cobra.Command, args []string) error { - - // check if the chain is supported + // Check if the chain is supported if _, ok := seer_blockchain.BlockchainURLs[jobChain]; !ok { return fmt.Errorf("chain %s is not supported", jobChain) } + return nil + }, + RunE: func(cmd *cobra.Command, args []string) error { client, clientErr := seer_blockchain.NewClient(jobChain, seer_blockchain.BlockchainURLs[jobChain], 30) if clientErr != nil { return clientErr @@ -857,7 +856,7 @@ func CreateDatabaseOperationCommand() *cobra.Command { }, } - createJobsCommand.Flags().StringVar(&jobChain, "chain", "ethereum", "The blockchain to crawl (default: ethereum)") + createJobsCommand.Flags().StringVar(&jobChain, "chain", "", "The blockchain") createJobsCommand.Flags().StringVar(&address, "address", "", "The address to create jobs for") createJobsCommand.Flags().StringVar(&abiFile, "abi-file", "", "The path to the ABI file") createJobsCommand.Flags().StringVar(&customerId, "customer-id", "", "The customer ID to create jobs for (default: '')") @@ -891,19 +890,7 @@ func CreateDatabaseOperationCommand() *cobra.Command { return fmt.Errorf("error selecting ABI jobs: %w", selectJobsErr) } - var jobIds []string - abiJobChains := make(map[string]int) - for _, abiJob := range abiJobs { - jobIds = append(jobIds, abiJob.ID) - if _, ok := abiJobChains[abiJob.Chain]; !ok { - abiJobChains[abiJob.Chain] = 0 - } - abiJobChains[abiJob.Chain]++ - } - fmt.Printf("Found %d total:\n", len(jobIds)) - for k, v := range abiJobChains { - fmt.Printf("- %s - %d jobs\n", k, v) - } + jobIds := indexer.GetJobIds(abiJobs, false) output := "no" if silentFlag { @@ -933,15 +920,86 @@ func CreateDatabaseOperationCommand() *cobra.Command { }, } - deleteJobsCommand.Flags().StringVar(&jobChain, "chain", "", "The blockchain to crawl") + deleteJobsCommand.Flags().StringVar(&jobChain, "chain", "", "The blockchain") deleteJobsCommand.Flags().StringSliceVar(&jobIds, "job-ids", []string{}, "The list of job UUIDs separated by coma") deleteJobsCommand.Flags().StringSliceVar(&jobAddresses, "addresses", []string{}, "The list of addresses created jobs for separated by coma") deleteJobsCommand.Flags().StringSliceVar(&jobCustomerIds, "customer-ids", []string{}, "The list of customer IDs created jobs for separated by coma") deleteJobsCommand.Flags().BoolVar(&silentFlag, "silent", false, "Set this flag to run command without prompt") + var sourceCustomerId, destCustomerId string + + copyJobsCommand := &cobra.Command{ + Use: "copy-jobs", + Short: "Copy jobs between customers", + PreRunE: func(cmd *cobra.Command, args []string) error { + indexerErr := indexer.CheckVariablesForIndexer() + if indexerErr != nil { + return indexerErr + } + + indexer.InitDBConnection() + + blockchainErr := seer_blockchain.CheckVariablesForBlockchains() + if blockchainErr != nil { + return blockchainErr + } + + if sourceCustomerId == "" || destCustomerId == "" { + return fmt.Errorf("values for --source-customer-id and --dest-customer-id should be set") + } + + // Check if the chain is supported + if _, ok := seer_blockchain.BlockchainURLs[jobChain]; !ok { + return fmt.Errorf("chain %s is not supported", jobChain) + } + + return nil + }, + RunE: func(cmd *cobra.Command, args []string) error { + abiJobs, selectJobsErr := indexer.DBConnection.SelectAbiJobs(jobChain, []string{}, []string{sourceCustomerId}, false, false, []string{}) + if selectJobsErr != nil { + return fmt.Errorf("error selecting ABI jobs: %w", selectJobsErr) + } + + indexer.GetJobIds(abiJobs, false) + + output := "no" + if silentFlag { + output = "yes" + } else { + var promptErr error + output, promptErr = StringPrompt("Continue? (y/yes)") + if promptErr != nil { + return promptErr + } + } + + switch output { + case "y": + case "yes": + default: + fmt.Println("Canceled") + return nil + } + + copyErr := indexer.DBConnection.CopyAbiJobs(sourceCustomerId, destCustomerId, abiJobs) + if copyErr != nil { + return copyErr + } + + return nil + }, + } + + copyJobsCommand.Flags().StringVar(&jobChain, "chain", "", "The blockchain to crawl") + copyJobsCommand.Flags().StringVar(&sourceCustomerId, "source-customer-id", "", "Source customer ID with jobs to copy") + copyJobsCommand.Flags().StringVar(&destCustomerId, "dest-customer-id", "", "Destination customer ID where to copy jobs") + copyJobsCommand.Flags().BoolVar(&silentFlag, "silent", false, "Set this flag to run command without prompt") + indexCommand.AddCommand(deploymentBlocksCommand) indexCommand.AddCommand(createJobsCommand) indexCommand.AddCommand(deleteJobsCommand) + indexCommand.AddCommand(copyJobsCommand) databaseCmd.AddCommand(indexCommand) return databaseCmd diff --git a/indexer/db.go b/indexer/db.go index e6640cd..30ad211 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -1210,6 +1210,78 @@ func (p *PostgreSQLpgx) SelectAbiJobs(blockchain string, addresses []string, cus return abiJobs, nil } +func GetJobIds(abiJobs []AbiJob, isSilent bool) []string { + var jobIds []string + abiJobChains := make(map[string]int) + for _, abiJob := range abiJobs { + jobIds = append(jobIds, abiJob.ID) + if _, ok := abiJobChains[abiJob.Chain]; !ok { + abiJobChains[abiJob.Chain] = 0 + } + abiJobChains[abiJob.Chain]++ + } + + if !isSilent { + fmt.Printf("Found %d total:\n", len(jobIds)) + for k, v := range abiJobChains { + fmt.Printf("- %s - %d jobs\n", k, v) + } + } + + return jobIds +} + +func (p *PostgreSQLpgx) CopyAbiJobs(sourceCustomerId, destCustomerId string, abiJobs []AbiJob) error { + pool := p.GetPool() + + ctx := context.Background() + + conn, err := pool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + + tx, err := conn.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + _, prepErr := tx.Prepare(ctx, "insertAbiJob", ` + INSERT INTO abi_jobs (id, address, user_id, customer_id, abi_selector, chain, abi_name, status, historical_crawl_status, progress, moonworm_task_pickedup, abi, deployment_block_number, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, now(), now()) + `) + if prepErr != nil { + return err + } + + for _, abiJob := range abiJobs { + jobID := uuid.New() + + if len(abiJob.Abi) <= 2 || abiJob.Abi[0] != '[' || abiJob.Abi[len(abiJob.Abi)-1] != ']' { + log.Printf("Passed ABI job, incorrect format: %s", abiJob.Abi) + continue + } + abi := abiJob.Abi[1 : len(abiJob.Abi)-1] + abiBytes := []byte(abi) + + _, execErr := tx.Exec(ctx, "insertAbiJob", jobID, abiJob.Address, abiJob.UserID, destCustomerId, abiJob.AbiSelector, abiJob.Chain, abiJob.AbiName, "true", "pending", 0, false, abiBytes, abiJob.DeploymentBlockNumber) + if execErr != nil { + return execErr + } + } + + commitErr := tx.Commit(ctx) + if commitErr != nil { + return commitErr + } + + log.Printf("Copied %d ABI jobs from customer %s to %s.", len(abiJobs), sourceCustomerId, destCustomerId) + + return nil +} + func ConvertToCustomerUpdatedAndDeployBlockDicts(abiJobs []AbiJob) ([]CustomerUpdates, map[string]AbiJobsDeployInfo, error) { if len(abiJobs) == 0 { return []CustomerUpdates{}, map[string]AbiJobsDeployInfo{}, nil diff --git a/version/version.go b/version/version.go index 73d2f4f..9c975c4 100644 --- a/version/version.go +++ b/version/version.go @@ -1,3 +1,3 @@ package version -var SeerVersion string = "0.2.1" +var SeerVersion string = "0.3.1" From 956371fa2e9994cbb3c64cf396edd31592708ca2 Mon Sep 17 00:00:00 2001 From: kompotkot Date: Thu, 10 Oct 2024 11:42:46 +0000 Subject: [PATCH 4/4] Null deployment block number when copy abi jobs --- indexer/db.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/indexer/db.go b/indexer/db.go index 30ad211..1846681 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -1249,8 +1249,8 @@ func (p *PostgreSQLpgx) CopyAbiJobs(sourceCustomerId, destCustomerId string, abi defer tx.Rollback(ctx) _, prepErr := tx.Prepare(ctx, "insertAbiJob", ` - INSERT INTO abi_jobs (id, address, user_id, customer_id, abi_selector, chain, abi_name, status, historical_crawl_status, progress, moonworm_task_pickedup, abi, deployment_block_number, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, now(), now()) + INSERT INTO abi_jobs (id, address, user_id, customer_id, abi_selector, chain, abi_name, status, historical_crawl_status, progress, moonworm_task_pickedup, abi, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, now(), now()) `) if prepErr != nil { return err @@ -1266,7 +1266,7 @@ func (p *PostgreSQLpgx) CopyAbiJobs(sourceCustomerId, destCustomerId string, abi abi := abiJob.Abi[1 : len(abiJob.Abi)-1] abiBytes := []byte(abi) - _, execErr := tx.Exec(ctx, "insertAbiJob", jobID, abiJob.Address, abiJob.UserID, destCustomerId, abiJob.AbiSelector, abiJob.Chain, abiJob.AbiName, "true", "pending", 0, false, abiBytes, abiJob.DeploymentBlockNumber) + _, execErr := tx.Exec(ctx, "insertAbiJob", jobID, abiJob.Address, abiJob.UserID, destCustomerId, abiJob.AbiSelector, abiJob.Chain, abiJob.AbiName, "true", "pending", 0, false, abiBytes) if execErr != nil { return execErr }