Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete and copy jobs CLI #87

Merged
merged 5 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 155 additions & 6 deletions cmd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bufio"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -822,15 +823,14 @@ func CreateDatabaseOperationCommand() *cobra.Command {
return blockchainErr
}

return nil
},
RunE: func(cmd *cobra.Command, args []string) error {

// check if the chain is supported
// Check if the chain is supported
if _, ok := seer_blockchain.BlockchainURLs[jobChain]; !ok {
return fmt.Errorf("chain %s is not supported", jobChain)
}

return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
client, clientErr := seer_blockchain.NewClient(jobChain, seer_blockchain.BlockchainURLs[jobChain], 30)
if clientErr != nil {
return clientErr
Expand All @@ -856,15 +856,150 @@ func CreateDatabaseOperationCommand() *cobra.Command {
},
}

createJobsCommand.Flags().StringVar(&jobChain, "jobChain", "ethereum", "The blockchain to crawl (default: ethereum)")
createJobsCommand.Flags().StringVar(&jobChain, "chain", "", "The blockchain")
createJobsCommand.Flags().StringVar(&address, "address", "", "The address to create jobs for")
createJobsCommand.Flags().StringVar(&abiFile, "abi-file", "", "The path to the ABI file")
createJobsCommand.Flags().StringVar(&customerId, "customer-id", "", "The customer ID to create jobs for (default: '')")
createJobsCommand.Flags().StringVar(&userId, "user-id", "00000000-0000-0000-0000-000000000000", "The user ID to create jobs for (default: '00000000-0000-0000-0000-000000000000')")
createJobsCommand.Flags().Uint64Var(&deployBlock, "deploy-block", 0, "The block number to deploy contract (default: 0)")

var jobIds, jobAddresses, jobCustomerIds []string
var silentFlag bool

deleteJobsCommand := &cobra.Command{
Use: "delete-jobs",
Short: "Delete existing jobs",
PreRunE: func(cmd *cobra.Command, args []string) error {
indexerErr := indexer.CheckVariablesForIndexer()
if indexerErr != nil {
return indexerErr
}

indexer.InitDBConnection()

blockchainErr := seer_blockchain.CheckVariablesForBlockchains()
if blockchainErr != nil {
return blockchainErr
}

return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
abiJobs, selectJobsErr := indexer.DBConnection.SelectAbiJobs(jobChain, jobAddresses, jobCustomerIds, false, false, []string{})
if selectJobsErr != nil {
return fmt.Errorf("error selecting ABI jobs: %w", selectJobsErr)
}

jobIds := indexer.GetJobIds(abiJobs, false)

output := "no"
if silentFlag {
output = "yes"
} else {
var promptErr error
output, promptErr = StringPrompt("Continue? (y/yes)")
if promptErr != nil {
return promptErr
}
}

switch output {
case "y":
case "yes":
default:
fmt.Println("Canceled")
return nil
}

deleteJobsErr := indexer.DBConnection.DeleteJobs(jobIds)
if deleteJobsErr != nil {
return deleteJobsErr
}

return nil
},
}

deleteJobsCommand.Flags().StringVar(&jobChain, "chain", "", "The blockchain")
deleteJobsCommand.Flags().StringSliceVar(&jobIds, "job-ids", []string{}, "The list of job UUIDs separated by coma")
deleteJobsCommand.Flags().StringSliceVar(&jobAddresses, "addresses", []string{}, "The list of addresses created jobs for separated by coma")
deleteJobsCommand.Flags().StringSliceVar(&jobCustomerIds, "customer-ids", []string{}, "The list of customer IDs created jobs for separated by coma")
deleteJobsCommand.Flags().BoolVar(&silentFlag, "silent", false, "Set this flag to run command without prompt")

var sourceCustomerId, destCustomerId string

copyJobsCommand := &cobra.Command{
Use: "copy-jobs",
Short: "Copy jobs between customers",
PreRunE: func(cmd *cobra.Command, args []string) error {
indexerErr := indexer.CheckVariablesForIndexer()
if indexerErr != nil {
return indexerErr
}

indexer.InitDBConnection()

blockchainErr := seer_blockchain.CheckVariablesForBlockchains()
if blockchainErr != nil {
return blockchainErr
}

if sourceCustomerId == "" || destCustomerId == "" {
return fmt.Errorf("values for --source-customer-id and --dest-customer-id should be set")
}

// Check if the chain is supported
if _, ok := seer_blockchain.BlockchainURLs[jobChain]; !ok {
return fmt.Errorf("chain %s is not supported", jobChain)
}

return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
abiJobs, selectJobsErr := indexer.DBConnection.SelectAbiJobs(jobChain, []string{}, []string{sourceCustomerId}, false, false, []string{})
if selectJobsErr != nil {
return fmt.Errorf("error selecting ABI jobs: %w", selectJobsErr)
}

indexer.GetJobIds(abiJobs, false)

output := "no"
if silentFlag {
output = "yes"
} else {
var promptErr error
output, promptErr = StringPrompt("Continue? (y/yes)")
if promptErr != nil {
return promptErr
}
}

switch output {
case "y":
case "yes":
default:
fmt.Println("Canceled")
return nil
}

copyErr := indexer.DBConnection.CopyAbiJobs(sourceCustomerId, destCustomerId, abiJobs)
if copyErr != nil {
return copyErr
}

return nil
},
}

copyJobsCommand.Flags().StringVar(&jobChain, "chain", "", "The blockchain to crawl")
copyJobsCommand.Flags().StringVar(&sourceCustomerId, "source-customer-id", "", "Source customer ID with jobs to copy")
copyJobsCommand.Flags().StringVar(&destCustomerId, "dest-customer-id", "", "Destination customer ID where to copy jobs")
copyJobsCommand.Flags().BoolVar(&silentFlag, "silent", false, "Set this flag to run command without prompt")

indexCommand.AddCommand(deploymentBlocksCommand)
indexCommand.AddCommand(createJobsCommand)
indexCommand.AddCommand(deleteJobsCommand)
indexCommand.AddCommand(copyJobsCommand)
databaseCmd.AddCommand(indexCommand)

return databaseCmd
Expand Down Expand Up @@ -1159,3 +1294,17 @@ func CreateEVMGenerateCommand() *cobra.Command {

return evmGenerateCmd
}

func StringPrompt(label string) (string, error) {
var output string
r := bufio.NewReader(os.Stdin)

fmt.Fprint(os.Stderr, label+" ")
var readErr error
output, readErr = r.ReadString('\n')
if readErr != nil {
return "", readErr
}

return strings.TrimSpace(output), nil
}
139 changes: 131 additions & 8 deletions indexer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,29 +1131,45 @@ func (p *PostgreSQLpgx) UpdateAbiJobsStatus(blockchain string) error {
return nil
}

func (p *PostgreSQLpgx) SelectAbiJobs(blockchain string, addresses []string, customersIds []string, autoJobs bool) ([]CustomerUpdates, map[string]AbiJobsDeployInfo, error) {
func (p *PostgreSQLpgx) SelectAbiJobs(blockchain string, addresses []string, customersIds []string, autoJobs, isDeployBlockNotNull bool, abiTypes []string) ([]AbiJob, error) {
pool := p.GetPool()

conn, err := pool.Acquire(context.Background())
if err != nil {
return nil, nil, err
return nil, err
}
defer conn.Release()

var queryBuilder strings.Builder

queryArgs := make(pgx.NamedArgs)

queryArgs["chain"] = blockchain

queryBuilder.WriteString(`
SELECT id, address, user_id, customer_id, abi_selector, chain, abi_name, status,
historical_crawl_status, progress, moonworm_task_pickedup, '[' || abi || ']' as abi,
(abi::jsonb)->>'type' AS abiType, created_at, updated_at, deployment_block_number
FROM abi_jobs
WHERE chain = @chain AND ((abi::jsonb)->>'type' = 'function' or (abi::jsonb)->>'type' = 'event') and deployment_block_number is not null
WHERE true
`)

if len(abiTypes) != 0 {
var abiConditions []string
for _, abiType := range abiTypes {
abiConditions = append(abiConditions, fmt.Sprintf("(abi::jsonb)->>'type' = '%s'", abiType))
}

queryBuilder.WriteString(fmt.Sprintf("AND (%s) ", strings.Join(abiConditions, " or ")))
}

if isDeployBlockNotNull {
queryBuilder.WriteString(" AND deployment_block_number IS NOT null")
}

if blockchain != "" {
queryBuilder.WriteString(" AND chain = @chain ")
queryArgs["chain"] = blockchain
}

if autoJobs {
queryBuilder.WriteString(" AND historical_crawl_status != 'done' ")
}
Expand All @@ -1166,7 +1182,7 @@ func (p *PostgreSQLpgx) SelectAbiJobs(blockchain string, addresses []string, cus
for i, address := range addresses {
addressBytes, err := decodeAddress(address)
if err != nil {
return nil, nil, err
return nil, err
}
addressesBytes[i] = addressBytes // Assign directly to the index
}
Expand All @@ -1182,15 +1198,91 @@ func (p *PostgreSQLpgx) SelectAbiJobs(blockchain string, addresses []string, cus
rows, err := conn.Query(context.Background(), queryBuilder.String(), queryArgs)
if err != nil {
log.Println("Error querying ABI jobs from database", err)
return nil, nil, err
return nil, err
}

abiJobs, err := pgx.CollectRows(rows, pgx.RowToStructByName[AbiJob])
if err != nil {
log.Println("Error collecting ABI jobs rows", err)
return nil, nil, err
return nil, err
}

return abiJobs, nil
}

func GetJobIds(abiJobs []AbiJob, isSilent bool) []string {
var jobIds []string
abiJobChains := make(map[string]int)
for _, abiJob := range abiJobs {
jobIds = append(jobIds, abiJob.ID)
if _, ok := abiJobChains[abiJob.Chain]; !ok {
abiJobChains[abiJob.Chain] = 0
}
abiJobChains[abiJob.Chain]++
}

if !isSilent {
fmt.Printf("Found %d total:\n", len(jobIds))
for k, v := range abiJobChains {
fmt.Printf("- %s - %d jobs\n", k, v)
}
}

return jobIds
}

func (p *PostgreSQLpgx) CopyAbiJobs(sourceCustomerId, destCustomerId string, abiJobs []AbiJob) error {
pool := p.GetPool()

ctx := context.Background()

conn, err := pool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()

tx, err := conn.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)

_, prepErr := tx.Prepare(ctx, "insertAbiJob", `
INSERT INTO abi_jobs (id, address, user_id, customer_id, abi_selector, chain, abi_name, status, historical_crawl_status, progress, moonworm_task_pickedup, abi, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, now(), now())
`)
if prepErr != nil {
return err
}

for _, abiJob := range abiJobs {
jobID := uuid.New()

if len(abiJob.Abi) <= 2 || abiJob.Abi[0] != '[' || abiJob.Abi[len(abiJob.Abi)-1] != ']' {
log.Printf("Passed ABI job, incorrect format: %s", abiJob.Abi)
continue
}
abi := abiJob.Abi[1 : len(abiJob.Abi)-1]
abiBytes := []byte(abi)

_, execErr := tx.Exec(ctx, "insertAbiJob", jobID, abiJob.Address, abiJob.UserID, destCustomerId, abiJob.AbiSelector, abiJob.Chain, abiJob.AbiName, "true", "pending", 0, false, abiBytes)
if execErr != nil {
return execErr
}
}

commitErr := tx.Commit(ctx)
if commitErr != nil {
return commitErr
}

log.Printf("Copied %d ABI jobs from customer %s to %s.", len(abiJobs), sourceCustomerId, destCustomerId)

return nil
}

func ConvertToCustomerUpdatedAndDeployBlockDicts(abiJobs []AbiJob) ([]CustomerUpdates, map[string]AbiJobsDeployInfo, error) {
if len(abiJobs) == 0 {
return []CustomerUpdates{}, map[string]AbiJobsDeployInfo{}, nil
}
Expand Down Expand Up @@ -1522,3 +1614,34 @@ func (p *PostgreSQLpgx) CreateJobsFromAbi(chain string, address string, abiFile
return nil

}

func (p *PostgreSQLpgx) DeleteJobs(jobIds []string) error {
if len(jobIds) == 0 {
log.Println("Nothing to delete")
return nil
}

pool := p.GetPool()

conn, err := pool.Acquire(context.Background())
if err != nil {
return err
}
defer conn.Release()

var queryBuilder strings.Builder
queryBuilder.WriteString("DELETE FROM abi_jobs WHERE id = ANY(@jobIds)")

queryArgs := make(pgx.NamedArgs)
queryArgs["jobIds"] = jobIds

_, delErr := conn.Query(context.Background(), queryBuilder.String(), queryArgs)
if delErr != nil {
log.Printf("Error querying ABI jobs from database, err %v", delErr)
return delErr
}

log.Printf("Deleted %d jobs", len(jobIds))

return nil
}
Loading
Loading