We have moved our forum to GitHub Discussions. For questions about Phalcon v3/v4/v5 you can visit here and for Phalcon v6 here.

Creating a Beanstalk background worker

Hello,

I've read the Beanstalk documentation, as well as a similar question. My scenario is the following:

  • an apache/Phalcon micro-app receives jobs from another server
  • I run the jobs using PHP/libvirt/ssh
  • I create some package and keep it locally for serving

At no moment is there a view, HTML or CSS used. This is all for back-end processing. I've followed the guide to creating a Beanstalk queue and adding objects. Within the same class is also the process_job method.

class master
{
    public static function make_jobs($kvms)
    {
        $queue = new Beanstalk(array('host' => 'localhost', 'port' => '11300'));
        foreach ($kvms as $kvm) {
            $job = new queue_job($kvm);
            $queue->put(array('process_job' => $job));
        }
    }

    public static function process_job($job)
    {
        print_r($job);
    }

What is not clear from either the documentation or the forum questions, is where do I set the background worker? How can I dispatch either an event listener which will be alerted of new Beanstalk jobs, or a handler which will infinetely loop for beanstalk jobs, similar to the example from the documentation:

while (($job = $queue->peekReady()) !== false) {
    $jobBody = $job->getBody();
    // ... do the actual job which will take some time
    $job->delete();
}

Any help is greatly appreciated.

edited Oct '16

There are two parts while working with background jobs i.e. message broker service. First, you should put your messages/payload into the message broker (normally). This usually happens at runtime, for instance, your page sends email after user registration, so instead of waiting for external SMTP service to process the mail 3-5 seconds, you put the job into background using Beanstalk. Then, some other app (usually CLI app running in the background as a daemon) will fetch this job and process it (i.e. it will contact external SMTP, do the TLS handshake, auth, send mail and recieve status).

I'll post you an example from my app later, now I need to go.

Best is to just use cli task.

@Jonathan OK, that does make sense, so I'm looking at two separate applications, a web micro-app which receives the jobs, and then a background daemon which processes them?

edited Oct '16

Not receive, which put them:

Web micro-app is putting jobs to queue. Background cli daemon(or daemons) is receiving and processing them.

That is what I ended up doing, thanks :-)

edited Oct '16

Well, I've run into a logic issue.

Whereas the first time I put jobs into the beanstalk queue, I can remove them, the second time I put jobs in it, the while loop appears to get stuck/frozen.

Is this intended behaviour? I think my misconception is from the fact that the while loop appears to return the same job over and over again.

edited Oct '16

What you mean ? Here you have example of my cli task for handling queue tube:

<?php
/**
 * Created by PhpStorm.
 * User: Wojtek
 * Date: 2016-08-20
 * Time: 13:54
 */

use Phalcon\Cli\Task;
use Phalcon\Logger\Adapter\File;
use Phalcon\Queue\Beanstalk;

/**
 * Class LogTask
 *
 * @package Suzuki\Cli\Task
 */
class LogTask extends Task
{
    /**
     * @var Beanstalk
     */
    protected $queue;

    /**
     * @var File
     */
    protected $loggerBeforeDispatch;

    /**
     * @var File
     */
    protected $loggerBeforeException;

    /**
     * @var File
     */
    protected $loggerBeforeSendRequest;

    /**
     * @var File
     */
    protected $loggerSqlDebug;

    /**
     * @var File
     */
    protected $loggerPerformance;

    /**
     * @var File
     */
    protected $loggerModel;

    /**
     * @var File
     */
    protected $mailer;

    /**
     * Sets services
     */
    public function initialize()
    {
        $this->queue = $this->di->get('queue');
        $this->loggerBeforeDispatch = $this->di->get('loggerBeforeDispatch');
        $this->loggerBeforeException = $this->di->get('loggerBeforeException');
        $this->loggerBeforeSendRequest = $this->di->get('loggerBeforeSendRequest');
        $this->loggerSqlDebug = $this->di->get('loggerSqlDebug');
        $this->loggerPerformance = $this->di->get('loggerPerformance');
        $this->loggerModel = $this->di->get('loggerModel');
        $this->mailer = $this->di->get('mailer');
    }

    /**
     * Handle task
     */
    public function logAction()
    {
        $this->queue->watch('log');
        while ($this->queue->statsTube('log')["current-jobs-ready"] > 0 && ($job = $this->queue->reserve())) {
            $body = $job->getBody();
            if (isset($body['type'])) {
                switch ($body['type']) {
                    case 'beforeException':
                        $this->loggerBeforeException->error($body['message']);
                        break;
                    case 'beforeDispatch':
                        $this->loggerBeforeDispatch->info($body['message']);
                        break;
                    case 'beforeSendRequest':
                        $this->loggerBeforeSendRequest->info($body['message']);
                        break;
                    case 'beforeQuery':
                        $this->loggerSqlDebug->info($body['message']);
                        break;
                    case 'performance':
                        $this->loggerPerformance->info($body['message']);
                        break;
                    case 'modelException':
                        $this->loggerModel->info($body['message']);
                        break;
                    default:
                        break;
                }
            }
            $job->delete();
        }
    }
}

Don't have any problem with this. Are you sure you delete job after handling it ?



79.0k
Accepted
answer
edited Oct '16

Yeah, that's common issue when you are starting to use Beanstalk.

This is basic distinction you need to make:

//this method reads only non-zero buffer from the stream
            while (($job = $this->messageBroker->peekReady()) !== false)

            //This method works in a loop by default and reads buffer on a persistent basis reserving jobs from queue
                while (($job = $this->messageBroker->reserve()))

Well, I've run into a logic issue.

Whereas the first time I put jobs into the beanstalk queue, I can remove them, the second time I put jobs in it, the while loop appears to get stuck/frozen.

Is this intended behaviour? I think my misconception is from the fact that the while loop appears to return the same job over and over again.

edited Oct '16

Thank you both for your help!

I ended up consuming the queue, and creating processes (https://github.com/iFixit/forker) to handle each job. My problem was that if I had no available slot for a job, I left it in the queue, and then the loop would "freeze".

while (($job = $this->messageBroker->peekReady()) !== false) {
    if (can_execute($job)) {
        // fork, exec 
        $job->delete();
    }
}

So in the above case, if I couldn't execute I'd simply leave it there. When new jobs arrived in the queue (from the web interface) then I got a weird behaviour, where the same previous job was constantly looped over.

I solved it by simply moving the every job as a suspended fork-exec, and let the semaphores deal with allocation issues. Not sure if it is an ideal solution, but since this is alpha version it will have to do for now.

while (($job = $this->messageBroker->peekReady()) !== false) {
    exec_suspend($job);
    $job->delete();
}

I am still however a bit confused as to the difference between reserve() and peekReady().

Why don't you try with reserve() method, as I posted difference here?

@Jonathan I've tried it, many thanks!

@Jonathan , I used reserve() in this way :

            $queue->watch("my_tube");
            while (($job = $queue->reserve())) {

              // job handling and after i deleted it
                $job->delete();

      }

It is done on cli task. So When I start my task, it works fine , and the process is putted in background. After few days, the process stopped to work.

If you have any idea , why ?

Do I need while(true) block to never stop my task ?

thanks in advance for your help,

edited Mar '17

Because there are no jobs :) Add some cron to run your cli task.

edited Mar '17

Even w/o any jobs it should not die like that.

while ($this->messageBroker->reserve()) {
// handle queue...
}

You don't need to watch().

Bottom line: The sad fact is that beanstalkd has been abandoned and development stalled 3 yrs ago :/