From ba2161cd0004e2cded4b2015821f8f38b833636c Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Tue, 30 Jan 2018 22:19:49 +0100 Subject: [PATCH] Add Ruby progress callback + customise our GIL unblock function (#151) We should provide some ability for users to re-acquire the GIL and call Ruby callback blocks. We will start with a progress callback which can be used for progress bars and speed computation. We also need to alter the way we unblock the thread so that we correctly handle signals. This adds the following API: sess.progress_callback = ->(download_total, download_now, upload_total, upload_now) { progress_bar.update(download_total) # ...or other interesting actions } The GIL will only be reacquired if this is configured on the Session/Request objects, otherwise we proceed through the same fast path as before with the GIL unlocked. --- CHANGELOG.md | 2 + ext/patron/session_ext.c | 82 ++++++++++++++++++++++++++++++------- lib/patron/request.rb | 4 +- lib/patron/session.rb | 6 +++ spec/session_spec.rb | 53 ++++++++++++++++++++++++ spec/support/test_server.rb | 5 ++- 6 files changed, 133 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b66c15..99daf16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added `Session#progress_callback` which accepts a callable object, which can be used to report session progress during request + execution. * Fixed parsing of response headers when multiple responses are involved (redirect chains and HTTP proxies) ### 0.10.0 diff --git a/ext/patron/session_ext.c b/ext/patron/session_ext.c index df18781..41c1b31 100644 --- a/ext/patron/session_ext.c +++ b/ext/patron/session_ext.c @@ -40,7 +40,12 @@ struct patron_curl_state { membuffer header_buffer; membuffer body_buffer; size_t download_byte_limit; + VALUE user_progress_blk; int interrupt; + size_t dltotal; + size_t dlnow; + size_t ultotal; + size_t ulnow; }; @@ -69,16 +74,45 @@ static size_t file_write_handler(void* stream, size_t size, size_t nmemb, FILE* } } +static int call_user_rb_progress_blk(void* vd_curl_state) { + struct patron_curl_state* state = (struct patron_curl_state*)vd_curl_state; + // Invoke the block with the array + VALUE blk_result = rb_funcall(state->user_progress_blk, + rb_intern("call"), 4, + LONG2NUM(state->dltotal), + LONG2NUM(state->dlnow), + LONG2NUM(state->ultotal), + LONG2NUM(state->ulnow)); + return 0; +} + + /* A non-zero return value from the progress handler will terminate the current * request. We use this fact in order to interrupt any request when either the * user calls the "interrupt" method on the session or when the Ruby interpreter * is attempting to exit. */ -static int session_progress_handler(void *clientp, size_t dltotal, size_t dlnow, size_t ultotal, size_t ulnow) { +static int session_progress_handler(void* clientp, size_t dltotal, size_t dlnow, size_t ultotal, size_t ulnow) { struct patron_curl_state* state = (struct patron_curl_state*) clientp; - UNUSED_ARGUMENT(dlnow); - UNUSED_ARGUMENT(ultotal); - UNUSED_ARGUMENT(ulnow); + state->dltotal = dltotal; + state->dlnow = dlnow; + state->ultotal = ultotal; + state->ulnow = ulnow; + + // If a progress proc has been set, re-acquire the GIL and call it using + // `call_user_rb_progress_blk`. TODO: use the retval of that proc + // to permit premature abort + if(RTEST(state->user_progress_blk)) { + // Even though it is not documented, rb_thread_call_with_gvl is available even when + // rb_thread_call_without_gvl is not. See https://bugs.ruby-lang.org/issues/5543#note-4 + // > rb_thread_call_with_gvl() is globally-visible (but not in headers) + // > for 1.9.3: https://bugs.ruby-lang.org/issues/4328 + #if (defined(HAVE_TBR) || defined(HAVE_TCWOGVL)) && defined(USE_TBR) + rb_thread_call_with_gvl((void *(*)(void *)) call_user_rb_progress_blk, (void*)state); + #else + call_user_rb_progress_blk((void*)state); + #endif + } // Set the interrupt if the download byte limit has been reached if(state->download_byte_limit != 0 && (dltotal > state->download_byte_limit)) { @@ -410,6 +444,7 @@ static void set_options_from_request(VALUE self, VALUE request) { VALUE action_name = rb_funcall(request, rb_intern("action"), 0); VALUE a_c_encoding = rb_funcall(request, rb_intern("automatic_content_encoding"), 0); VALUE download_byte_limit = rb_funcall(request, rb_intern("download_byte_limit"), 0); + VALUE maybe_progress_proc = rb_funcall(request, rb_intern("progress_callback"), 0); if (RTEST(download_byte_limit)) { state->download_byte_limit = FIX2INT(download_byte_limit); @@ -417,6 +452,12 @@ static void set_options_from_request(VALUE self, VALUE request) { state->download_byte_limit = 0; } + if (rb_obj_is_proc(maybe_progress_proc)) { + state->user_progress_blk = maybe_progress_proc; + } else { + state->user_progress_blk = Qnil; + } + headers = rb_funcall(request, rb_intern("headers"), 0); if (RTEST(headers)) { if (rb_type(headers) != T_HASH) { @@ -718,6 +759,17 @@ static VALUE select_error(CURLcode code) { return error; } + +/* Uses as the unblocking function when the thread running Patron gets + signaled. The important difference with session_interrupt is that we + are not allowed to touch any Ruby structures while outside the GIL, + but we _are_ permitted to touch our internal curl state struct +*/ +void session_ubf_abort(void* patron_state) { + struct patron_curl_state* state = (struct patron_curl_state*) patron_state; + state->interrupt = INTERRUPT_ABORT; +} + /* Perform the actual HTTP request by calling libcurl. */ static VALUE perform_request(VALUE self) { struct patron_curl_state *state = get_patron_curl_state(self); @@ -745,17 +797,17 @@ static VALUE perform_request(VALUE self) { } #if (defined(HAVE_TBR) || defined(HAVE_TCWOGVL)) && defined(USE_TBR) -#if defined(HAVE_TCWOGVL) - ret = (CURLcode) rb_thread_call_without_gvl( - (void *(*)(void *)) curl_easy_perform, curl, - RUBY_UBF_IO, 0 - ); -#else - ret = (CURLcode) rb_thread_blocking_region( - (rb_blocking_function_t*) curl_easy_perform, curl, - RUBY_UBF_IO, 0 - ); -#endif + #if defined(HAVE_TCWOGVL) + ret = (CURLcode) rb_thread_call_without_gvl( + (void *(*)(void *)) curl_easy_perform, curl, + session_ubf_abort, (void*)state + ); + #else + ret = (CURLcode) rb_thread_blocking_region( + (rb_blocking_function_t*) curl_easy_perform, curl, + session_ubf_abort, (void*)state + ); + #endif #else ret = curl_easy_perform(curl); #endif diff --git a/lib/patron/request.rb b/lib/patron/request.rb index 7e10836..9414f32 100644 --- a/lib/patron/request.rb +++ b/lib/patron/request.rb @@ -25,13 +25,13 @@ def initialize :ignore_content_length, :multipart, :action, :timeout, :connect_timeout, :max_redirects, :headers, :auth_type, :upload_data, :buffer_size, :cacert, :ssl_version, :http_version, :automatic_content_encoding, :force_ipv4, :download_byte_limit, - :low_speed_time, :low_speed_limit + :low_speed_time, :low_speed_limit, :progress_callback ] WRITER_VARS = [ :url, :username, :password, :file_name, :proxy, :proxy_type, :insecure, :ignore_content_length, :multipart, :cacert, :ssl_version, :http_version, :automatic_content_encoding, :force_ipv4, :download_byte_limit, - :low_speed_time, :low_speed_limit + :low_speed_time, :low_speed_limit, :progress_callback ] attr_reader *READER_VARS diff --git a/lib/patron/session.rb b/lib/patron/session.rb index 0e87729..d9be1aa 100644 --- a/lib/patron/session.rb +++ b/lib/patron/session.rb @@ -102,6 +102,11 @@ class Session private :handle_request, :add_cookie_file, :set_debug_file + # @return [#call, nil] callable object that will be called with 4 arguments + # during request/response execution - `dltotal`, `dlnow`, `ultotal`, `ulnow`. + # All these arguments are in bytes. + attr_accessor :progress_callback + # Create a new Session object for performing requests. # # @param args[Hash] options for the Session (same names as the writable attributes of the Session) @@ -372,6 +377,7 @@ def build_request(action, url, headers, options = {}) req.ignore_content_length = options.fetch :ignore_content_length, self.ignore_content_length req.buffer_size = options.fetch :buffer_size, self.buffer_size req.download_byte_limit = options.fetch :download_byte_limit, self.download_byte_limit + req.progress_callback = options.fetch :progress_callback, self.progress_callback req.multipart = options[:multipart] req.upload_data = options[:data] req.file_name = options[:file] diff --git a/spec/session_spec.rb b/spec/session_spec.rb index c5c4c76..d2b27e1 100644 --- a/spec/session_spec.rb +++ b/spec/session_spec.rb @@ -178,6 +178,59 @@ expect {@session.get("/slow")}.to raise_error(Patron::TimeoutError) end + it "is able to terminate the thread that is running a slow request using Thread#kill (uses the custom unblock)" do + t = Thread.new do + session = Patron::Session.new + session.timeout = 40 + session.base_url = "http://localhost:9001" + session.get("/slow") + end + + # Our test server starts sending the body only after 20 seconds. We should be able to abort + # using a signal during that time. + started = Time.now.to_i + sleep 5 # Less than what it takes for the server to respond + t.kill # Kill the thread forcibly + t.join # wrap up the thread. If Patron is still busy there, this join call will still take 15s. + + delta_s = Time.now.to_i - started + expect(delta_s).to be_within(2).of(5) + end + + it "is able to terminate the thread that is running a slow request" do + t = Thread.new do + trap('SIGINT') do + exit # exit the thread + end + session = Patron::Session.new + session.timeout = 40 + session.base_url = "http://localhost:9001" + session.get("/slow") + end + + # Our test server starts sending the body only after 20 seconds. We should be able to abort + # using a signal during that time. + started = Time.now.to_i + sleep 5 # Less than what it takes for the server to respond + Process.kill("INT", Process.pid) # Signal ourselves... + t.join # wrap up the thread. If Patron is still busy there, this join call will still take 15s. + delta_s = Time.now.to_i - started + expect(delta_s).to be_within(2).of(5) + end + + it "receives progress callbacks" do + session = Patron::Session.new + session.timeout = 40 + session.base_url = "http://localhost:9001" + callback_args = [] + session.progress_callback = Proc.new {|dltotal, dlnow, ultotal, ulnow| + callback_args << [dltotal, dlnow, ultotal, ulnow] + } + session.get("/slow") + + expect(callback_args).not_to be_empty + end + it "should follow redirects by default" do @session.max_redirects = 1 response = @session.get("/redirect") diff --git a/spec/support/test_server.rb b/spec/support/test_server.rb index f14c81e..2da5674 100644 --- a/spec/support/test_server.rb +++ b/spec/support/test_server.rb @@ -82,9 +82,10 @@ def do_GET(req,res) class SlowServlet < HTTPServlet::AbstractServlet def do_GET(req,res) - sleep 3 res.header['Content-Type'] = 'text/plain' - res.body = 'beep' + res.body << 'x' + sleep 20 + res.body << 'rest of body' end end