Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Phalcon\Queue\Beanstalk enhancements #1650

Merged
merged 3 commits into from Dec 11, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@
- Optimized Phalcon\Paginator\Adapter\NativeArray (#1653)
- Phalcon\Queue:
- Fixed bug in Phalcon\Queue\Beanstalk::read() (#1348, #1612)
- Bug fixes in beanstalkd protocol implementation
- Bug fixes in beanstalkd protocol implementation (#1650)
- Optimizations (#1621)
- Added peekDelayed() and peekburied() to Phalcon\Queue\Beanstalk (#1650)
- Added kick(), bury(), release(), touch() to Phalcon\Queue\Beanstalk\Job (#1650)
- Phalcon\Security:
- Phalcon\Security\Exception inherits from Phalcon\Exception, not from \Phalcon\DI\Exception
- Added Phalcon\Security::computeHmac() (#1347)
Expand Down
104 changes: 87 additions & 17 deletions ext/queue/beanstalk.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, watch){

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "WATCH")) {
if (PHALCON_IS_STRING(status, "WATCHING")) {
PHALCON_OBS_VAR(watching_tube);
phalcon_array_fetch_long(&watching_tube, response, 1, PH_NOISY);
RETURN_CCTOR(watching_tube);
Expand All @@ -378,15 +378,42 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, watch){
RETURN_MM_FALSE;
}

static void phalcon_queue_beanstalk_peek_common(zval *return_value, zval *this_ptr, zval *response TSRMLS_DC)
{
zval *job_id, *length, *serialized = NULL, *body;

if (!phalcon_array_isset_long_fetch(&job_id, response, 1)) {
job_id = PHALCON_GLOBAL(z_null);
}

if (!phalcon_array_isset_long_fetch(&length, response, 2)) {
length = PHALCON_GLOBAL(z_null);
}

phalcon_call_method_params(serialized, &serialized, this_ptr, SL("read"), zend_inline_hash_func(SS("read")) TSRMLS_CC, 1, length);
if (EG(exception)) {
return;
}

MAKE_STD_ZVAL(body);
phalcon_unserialize(body, serialized TSRMLS_CC);
zval_ptr_dtor(&serialized);
if (Z_REFCOUNT_P(body) >= 1) {
Z_DELREF_P(body);
}

object_init_ex(return_value, phalcon_queue_beanstalk_job_ce);
phalcon_call_method_params(NULL, NULL, return_value, SL("__construct"), zend_inline_hash_func(SS("__construct")) TSRMLS_CC, 3, this_ptr, job_id, body);
}

/**
* Inspect the next ready job.
*
* @return boolean|Phalcon\Queue\Beanstalk\Job
*/
PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady){

zval *command, *response, *status, *job_id, *length;
zval *serialized_body, *body;
zval *command, *response, *status;

PHALCON_MM_GROW();

Expand All @@ -400,26 +427,69 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady){
PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "FOUND")) {
PHALCON_OBS_VAR(job_id);
phalcon_array_fetch_long(&job_id, response, 1, PH_NOISY);

PHALCON_OBS_VAR(length);
phalcon_array_fetch_long(&length, response, 2, PH_NOISY);

PHALCON_INIT_VAR(serialized_body);
phalcon_call_method_p1(serialized_body, this_ptr, "read", length);

PHALCON_INIT_VAR(body);
phalcon_unserialize(body, serialized_body TSRMLS_CC);
object_init_ex(return_value, phalcon_queue_beanstalk_job_ce);
phalcon_call_method_p3_noret(return_value, "__construct", this_ptr, job_id, body);

phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC);
RETURN_MM();
}

RETURN_MM_FALSE;
}

/**
* Return the delayed job with the shortest delay left
*
* @return boolean|Phalcon\Queue\Beanstalk\Job
*/
PHP_METHOD(Phalcon_Queue_Beanstalk, peekDelayed){

zval *command, *response, *status;

PHALCON_MM_GROW();

PHALCON_INIT_VAR(command);
ZVAL_STRING(command, "peek-delayed", 1);
phalcon_call_method_p1_noret(this_ptr, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, this_ptr, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "FOUND")) {
phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC);
RETURN_MM();
}

RETURN_MM_FALSE;
}

/**
* Return the next job in the list of buried jobs
*
* @return boolean|Phalcon\Queue\Beanstalk\Job
*/
PHP_METHOD(Phalcon_Queue_Beanstalk, peekBuried){

zval *command, *response, *status;

PHALCON_MM_GROW();

PHALCON_INIT_VAR(command);
ZVAL_STRING(command, "peek-buried", 1);
phalcon_call_method_p1_noret(this_ptr, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, this_ptr, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "FOUND")) {
phalcon_queue_beanstalk_peek_common(return_value, getThis(), response TSRMLS_CC);
RETURN_MM();
}

RETURN_MM_FALSE;
}

/**
* Reads the latest status from the Beanstalkd server
*
Expand Down
4 changes: 4 additions & 0 deletions ext/queue/beanstalk.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ PHP_METHOD(Phalcon_Queue_Beanstalk, reserve);
PHP_METHOD(Phalcon_Queue_Beanstalk, choose);
PHP_METHOD(Phalcon_Queue_Beanstalk, watch);
PHP_METHOD(Phalcon_Queue_Beanstalk, peekReady);
PHP_METHOD(Phalcon_Queue_Beanstalk, peekDelayed);
PHP_METHOD(Phalcon_Queue_Beanstalk, peekBuried);
PHP_METHOD(Phalcon_Queue_Beanstalk, readStatus);
PHP_METHOD(Phalcon_Queue_Beanstalk, read);
PHP_METHOD(Phalcon_Queue_Beanstalk, write);
Expand Down Expand Up @@ -68,6 +70,8 @@ PHALCON_INIT_FUNCS(phalcon_queue_beanstalk_method_entry){
PHP_ME(Phalcon_Queue_Beanstalk, choose, arginfo_phalcon_queue_beanstalk_choose, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, watch, arginfo_phalcon_queue_beanstalk_watch, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, peekReady, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, peekDelayed, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, peekBuried, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, readStatus, NULL, ZEND_ACC_PROTECTED)
PHP_ME(Phalcon_Queue_Beanstalk, read, arginfo_phalcon_queue_beanstalk_read, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk, write, NULL, ZEND_ACC_PROTECTED)
Expand Down
147 changes: 146 additions & 1 deletion ext/queue/beanstalk/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getBody){
/**
* Removes a job from the server entirely
*
* @param string $id
* @return boolean
*/
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete){
Expand Down Expand Up @@ -132,6 +131,152 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete){
RETURN_MM_FALSE;
}

/**
* The release command puts a reserved job back into the ready queue (and marks
* its state as "ready") to be run by any client. It is normally used when the job
* fails because of a transitory error.
*
* @return boolean
*/
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, release){

zval *priority = NULL, *delay = NULL;
zval *id, *command, *queue, *response, *status;

phalcon_fetch_params(0, 0, 2, &priority, &delay);

PHALCON_MM_GROW();

if (!priority) {
PHALCON_INIT_VAR(priority);
ZVAL_LONG(priority, 100);
}

if (!delay) {
delay = PHALCON_GLOBAL(z_zero);
}

id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);

PHALCON_ALLOC_GHOST_ZVAL(command);
PHALCON_CONCAT_SVSVSV(command, "release ", id, " ", priority, " ", delay);
phalcon_call_method_p1_noret(queue, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, queue, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "RELEASED")) {
RETURN_MM_TRUE;
}

RETURN_MM_FALSE;
}

/**
* The bury command puts a job into the "buried" state. Buried jobs are put into
* a FIFO linked list and will not be touched by the server again until a client
* kicks them with the "kick" command.
*
* @return boolean
*/
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, bury){

zval *priority = NULL;
zval *id, *command, *queue, *response, *status;

phalcon_fetch_params(0, 0, 1, &priority);

PHALCON_MM_GROW();

if (!priority) {
PHALCON_INIT_VAR(priority);
ZVAL_LONG(priority, 100);
}

id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);

PHALCON_ALLOC_GHOST_ZVAL(command);
PHALCON_CONCAT_SVSV(command, "bury ", id, " ", priority);
phalcon_call_method_p1_noret(queue, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, queue, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "BURIED")) {
RETURN_MM_TRUE;
}

RETURN_MM_FALSE;
}

/**
* The bury command puts a job into the "buried" state. Buried jobs are put into
* a FIFO linked list and will not be touched by the server again until a client
* kicks them with the "kick" command.
*
* @return boolean
*/
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, touch){

zval *id, *command, *queue, *response, *status;

PHALCON_MM_GROW();

id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);

PHALCON_ALLOC_GHOST_ZVAL(command);
PHALCON_CONCAT_SV(command, "touch ", id);
phalcon_call_method_p1_noret(queue, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, queue, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "TOUCHED")) {
RETURN_MM_TRUE;
}

RETURN_MM_FALSE;
}

/**
* Move the job to the ready queue if it is delayed or buried.
*
* @return boolean
*/
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, kick){

zval *id, *command, *queue, *response, *status;

PHALCON_MM_GROW();

id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
queue = phalcon_fetch_nproperty_this(this_ptr, SL("_queue"), PH_NOISY_CC);

PHALCON_ALLOC_GHOST_ZVAL(command);
PHALCON_CONCAT_SV(command, "kick-job ", id);
phalcon_call_method_p1_noret(queue, "write", command);

PHALCON_INIT_VAR(response);
phalcon_call_method(response, queue, "readstatus");

PHALCON_OBS_VAR(status);
phalcon_array_fetch_long(&status, response, 0, PH_NOISY);
if (PHALCON_IS_STRING(status, "KICKED")) {
RETURN_MM_TRUE;
}

RETURN_MM_FALSE;
}

PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __wakeup) {

zval *id = phalcon_fetch_nproperty_this(this_ptr, SL("_id"), PH_NOISY_CC);
Expand Down
8 changes: 8 additions & 0 deletions ext/queue/beanstalk/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __construct);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getId);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, getBody);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, delete);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, release);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, bury);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, touch);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, kick);
PHP_METHOD(Phalcon_Queue_Beanstalk_Job, __wakeup);

ZEND_BEGIN_ARG_INFO_EX(arginfo_phalcon_queue_beanstalk_job___construct, 0, 0, 3)
Expand All @@ -38,6 +42,10 @@ PHALCON_INIT_FUNCS(phalcon_queue_beanstalk_job_method_entry){
PHP_ME(Phalcon_Queue_Beanstalk_Job, getId, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, getBody, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, delete, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, release, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, bury, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, touch, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, kick, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Phalcon_Queue_Beanstalk_Job, __wakeup, NULL, ZEND_ACC_PUBLIC)
PHP_FE_END
};
Loading