Skip to content

Commit

Permalink
Full Sync: Don't allow more than one request to enqueue (#14039)
Browse files Browse the repository at this point in the history
  • Loading branch information
lezama authored Nov 15, 2019
1 parent ade7b28 commit 28904d8
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 125 deletions.
8 changes: 0 additions & 8 deletions packages/sync/src/class-listener.php
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,6 @@ public function action_handler( ...$args ) {
public function bulk_enqueue_full_sync_actions( $action_name, $args_array ) {
$queue = $this->get_full_sync_queue();

/*
* Periodically check the size of the queue, and disable adding to it if
* it exceeds some limit AND the oldest item exceeds the age limit (i.e. sending has stopped).
*/
if ( ! $this->can_add_to_queue( $queue ) ) {
return;
}

/*
* If we add any items to the queue, we should try to ensure that our script
* can't be killed before they are sent.
Expand Down
65 changes: 65 additions & 0 deletions packages/sync/src/class-lock.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php
/**
* Lock class.
*
* @package automattic/jetpack-sync
*/

namespace Automattic\Jetpack\Sync;

/**
* Lock class
*/
class Lock {
/**
* Prefix of the blog lock transient.
*
* @access public
*
* @var string
*/
const LOCK_PREFIX = 'jp_sync_lock_';

/**
* Default Lifetime of the lock.
*
* @access public
*
* @var int
*/
const LOCK_TRANSIENT_EXPIRY = 15; // Seconds.

/**
* Attempt to lock.
*
* @access public
*
* @param string $name lock name.
* @param int $expiry lock duration in seconds.
*
* @return boolean True if succeeded, false otherwise.
*/
public function attempt( $name, $expiry = self::LOCK_TRANSIENT_EXPIRY ) {
$name = self::LOCK_PREFIX . $name;
$locked_time = get_option( $name );
if ( $locked_time ) {
if ( microtime( true ) < $locked_time ) {
return false;
}
}
update_option( $name, microtime( true ) + $expiry );

return true;
}

/**
* Remove the lock.
*
* @access public
*
* @param string $name lock name.
*/
public function remove( $name ) {
delete_option( self::LOCK_PREFIX . $name );
}
}
204 changes: 87 additions & 117 deletions packages/sync/src/modules/class-full-sync.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace Automattic\Jetpack\Sync\Modules;

use Automattic\Jetpack\Sync\Listener;
use Automattic\Jetpack\Sync\Lock;
use Automattic\Jetpack\Sync\Modules;
use Automattic\Jetpack\Sync\Queue;
use Automattic\Jetpack\Sync\Settings;
Expand All @@ -31,14 +32,13 @@ class Full_Sync extends Module {
*/
const STATUS_OPTION_PREFIX = 'jetpack_sync_full_';


/**
* Timeout between the previous and the next allowed full sync.
*
* @todo Remove this as it's no longer used since https://github.com/Automattic/jetpack/pull/4561
* Enqueue Lock name.
*
* @var int
* @var string
*/
const FULL_SYNC_TIMEOUT = 3600;
const ENQUEUE_LOCK_NAME = 'full_sync_enqueue';

/**
* Sync module name.
Expand Down Expand Up @@ -163,7 +163,7 @@ public function start( $module_configs = null ) {
*/
do_action( 'jetpack_full_sync_start', $full_sync_config, $range, $empty );

$this->continue_enqueuing( $full_sync_config, $enqueue_status );
$this->continue_enqueuing( $full_sync_config );

return true;
}
Expand All @@ -174,78 +174,108 @@ public function start( $module_configs = null ) {
* @access public
*
* @param array $configs Full sync configuration for all sync modules.
* @param array $enqueue_status Current status of the queue, indexed by sync modules.
*/
public function continue_enqueuing( $configs = null, $enqueue_status = null ) {
if ( ! $this->is_started() || $this->get_status_option( 'queue_finished' ) ) {
public function continue_enqueuing( $configs = null ) {
if ( ! $this->is_started() || ! ( new Lock() )->attempt( self::ENQUEUE_LOCK_NAME ) || $this->get_status_option( 'queue_finished' ) ) {
return;
}

// If full sync queue is full, don't enqueue more items.
$max_queue_size_full_sync = Settings::get_setting( 'max_queue_size_full_sync' );
$full_sync_queue = new Queue( 'full_sync' );
$this->enqueue( $configs );

$available_queue_slots = $max_queue_size_full_sync - $full_sync_queue->size();
( new Lock() )->remove( self::ENQUEUE_LOCK_NAME );
}

if ( $available_queue_slots <= 0 ) {
return;
} else {
$remaining_items_to_enqueue = min( Settings::get_setting( 'max_enqueue_full_sync' ), $available_queue_slots );
}
/**
* Get Modules that are configured to Full Sync and haven't finished enqueuing
*
* @param array $configs Full sync configuration for all sync modules.
*
* @return array
*/
public function get_remaining_modules_to_enqueue( $configs ) {
$enqueue_status = $this->get_enqueue_status();
return array_filter(
Modules::get_modules(),
/**
* Select configured and not finished modules.
*
* @var $module Module
* @return bool
*/
function ( $module ) use ( $configs, $enqueue_status ) {
// Skip module if not configured for this sync or module is done.
if ( ! isset( $configs[ $module->name() ] ) ) {
return false;
}
if ( ! $configs[ $module->name() ] ) {
return false;
}
if ( isset( $enqueue_status[ $module->name() ][2] ) ) {
if ( true === $enqueue_status[ $module->name() ][2] ) {
return false;
}
}

return true;
}
);
}

/**
* Enqueue the next items to sync.
*
* @access public
*
* @param array $configs Full sync configuration for all sync modules.
*/
public function enqueue( $configs = null ) {
if ( ! $configs ) {
$configs = $this->get_config();
}

if ( ! $enqueue_status ) {
$enqueue_status = $this->get_enqueue_status();
$enqueue_status = $this->get_enqueue_status();
$full_sync_queue = new Queue( 'full_sync' );
$available_queue_slots = Settings::get_setting( 'max_queue_size_full_sync' ) - $full_sync_queue->size();

if ( $available_queue_slots <= 0 ) {
return;
}

$modules = Modules::get_modules();
$modules_processed = 0;
foreach ( $modules as $module ) {
$module_name = $module->name();

// Skip module if not configured for this sync or module is done.
if ( ! isset( $configs[ $module_name ] )
|| // No module config.
! $configs[ $module_name ]
|| // No enqueue status.
! $enqueue_status[ $module_name ]
|| // Finished enqueuing this module.
true === $enqueue_status[ $module_name ][2] ) {
$modules_processed ++;
continue;
}
$remaining_items_to_enqueue = min( Settings::get_setting( 'max_enqueue_full_sync' ), $available_queue_slots );

list( $items_enqueued, $next_enqueue_state ) = $module->enqueue_full_sync_actions( $configs[ $module_name ], $remaining_items_to_enqueue, $enqueue_status[ $module_name ][2] );
/**
* If a module exits early (e.g. because it ran out of full sync queue slots, or we ran out of request time)
* then it should exit early
*/
foreach ( $this->get_remaining_modules_to_enqueue( $configs ) as $module ) {
list( $items_enqueued, $next_enqueue_state ) = $module->enqueue_full_sync_actions( $configs[ $module->name() ], $remaining_items_to_enqueue, $enqueue_status[ $module->name() ][2] );

$enqueue_status[ $module_name ][2] = $next_enqueue_state;
$enqueue_status[ $module->name() ][2] = $next_enqueue_state;

// If items were processed, subtract them from the limit.
if ( ! is_null( $items_enqueued ) && $items_enqueued > 0 ) {
$enqueue_status[ $module_name ][1] += $items_enqueued;
$remaining_items_to_enqueue -= $items_enqueued;
$enqueue_status[ $module->name() ][1] += $items_enqueued;
$remaining_items_to_enqueue -= $items_enqueued;
}

if ( true === $next_enqueue_state ) {
$modules_processed ++;
}
// Stop processing if we've reached our limit of items to enqueue.
if ( 0 >= $remaining_items_to_enqueue ) {
break;
if ( 0 >= $remaining_items_to_enqueue || true !== $next_enqueue_state ) {
$this->set_enqueue_status( $enqueue_status );
return;
}
}

$this->queue_full_sync_end( $configs );
$this->set_enqueue_status( $enqueue_status );
}

if ( count( $modules ) > $modules_processed ) {
return;
}

// Setting autoload to true means that it's faster to check whether we should continue enqueuing.
$this->update_status_option( 'queue_finished', time(), true );

/**
* Enqueue 'jetpack_full_sync_end' and update 'queue_finished' status.
*
* @access public
*
* @param array $configs Full sync configuration for all sync modules.
*/
public function queue_full_sync_end( $configs ) {
$range = $this->get_content_range( $configs );

/**
Expand All @@ -259,6 +289,9 @@ public function continue_enqueuing( $configs = null, $enqueue_status = null ) {
* @param array $range Range of the sync items, containing min and max IDs for some item types.
*/
do_action( 'jetpack_full_sync_end', '', $range );

// Setting autoload to true means that it's faster to check whether we should continue enqueuing.
$this->update_status_option( 'queue_finished', time(), true );
}

/**
Expand Down Expand Up @@ -537,6 +570,7 @@ public function clear_status() {
public function reset_data() {
$this->clear_status();
$this->delete_config();
( new Lock() )->remove( self::ENQUEUE_LOCK_NAME );

$listener = Listener::get_instance();
$listener->get_full_sync_queue()->reset();
Expand Down Expand Up @@ -636,68 +670,4 @@ private function get_config() {
return \Jetpack_Options::get_raw_option( 'jetpack_sync_full_config' );
}

/**
* Update an option manually to bypass filters and caching.
*
* @access private
*
* @param string $name Option name.
* @param mixed $value Option value.
* @return int The number of updated rows in the database.
*/
private function write_option( $name, $value ) {
// We write our own option updating code to bypass filters/caching/etc on set_option/get_option.
global $wpdb;
$serialized_value = maybe_serialize( $value );

/**
* Try updating, if no update then insert
* TODO: try to deal with the fact that unchanged values can return updated_num = 0
* below we used "insert ignore" to at least suppress the resulting error.
*/
$updated_num = $wpdb->query(
$wpdb->prepare(
"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
$serialized_value,
$name
)
);

if ( ! $updated_num ) {
$updated_num = $wpdb->query(
$wpdb->prepare(
"INSERT IGNORE INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )",
$name,
$serialized_value
)
);
}
return $updated_num;
}

/**
* Update an option manually to bypass filters and caching.
*
* @access private
*
* @param string $name Option name.
* @param mixed $default Default option value.
* @return mixed Option value.
*/
private function read_option( $name, $default = null ) {
global $wpdb;
$value = $wpdb->get_var(
$wpdb->prepare(
"SELECT option_value FROM $wpdb->options WHERE option_name = %s LIMIT 1",
$name
)
);
$value = maybe_unserialize( $value );

if ( null === $value && null !== $default ) {
return $default;
}

return $value;
}
}
Loading

0 comments on commit 28904d8

Please sign in to comment.