Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I want to use a queue for each job class even when using Sucker Punch with Active Job #224

Open
ledsun opened this issue Nov 19, 2018 · 0 comments

Comments

@ledsun
Copy link

ledsun commented Nov 19, 2018

In Readme it is written "ach job acts as its own queue".
But when using Sucker Punch with Active Job, All jobs are executed in one queue.

Cause

I think that this is because the perform_async method of the JobWrapper class is called in the SuckerPunchAdapter.
https://github.com/rails/rails/blob/b2eb1d1c55a59fee1e6c4cba7030d8ceb524267c/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb#L24

JobWrapper.perform_async job.serialize

SuckerPunch retrieves job information from job class properties.
https://github.com/brandonhilkert/sucker_punch/blob/master/lib/sucker_punch/job.rb#L37

SuckerPunch::Queue.find_or_create(self.to_s, num_workers, num_jobs_max)

Hypothesis

If this will be fixed like the following, it seems possible to run the job in a different queue.

job.class.perform_async job.arguments

However, this fix makes AcitveJob log incomplete.
Job start log like the following will be lost.

[ActiveJob] [SearchJob] [2163535d-688d-494a-9433-5a8c9057f392] Performing SearchJob (Job ID: 2163535d-688d-494a-9433-5a8c9057f392) from SuckerPunch(SearchJob) with arguments: "b8f04e7e-69c9-4dc4-9ea6-955f00193512"

If you execute the job using ActiveJob :: Base.execute as follows, the start log is output.

Base.execute job.serialize

If you can separately specify the class calling perform_async and the block to be executed, you should be able to accomplish both to run the job in a different queue and to log ActiveJob.

job.class.perform_async { Base.execute job.serialize }

Solution

I think that the following modifications are necessary.
SuckerPunch::Job.perform_async executes the block when called with block. When there is no block, it instantiates its own class and perform it.

def perform_async(*args, &block)
  return unless SuckerPunch::RUNNING.true?
  queue = SuckerPunch::Queue.find_or_create(self.to_s, num_workers, num_jobs_max)
  if block
    queue.post(args) { |job_args| __run_perform(&block)  }
  else
    queue.post(args) { |job_args| __run_perform { self.new.perform(*job_args) }  }
  end
end

And the __run_perform method executes the received block.

def __run_perform(&block)
  SuckerPunch::Counter::Busy.new(self.to_s).increment
  result = block.call
  SuckerPunch::Counter::Processed.new(self.to_s).increment
  result
rescue => ex
  SuckerPunch::Counter::Failed.new(self.to_s).increment
  SuckerPunch.exception_handler.call(ex, self, args)
ensure
  SuckerPunch::Counter::Busy.new(self.to_s).decrement
end

Conclusions

I think this is a good idea. Once you agree, we will make a pull request.
Please let me know if you need anything such as tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant