We have moved our forum to GitHub Discussions. For questions about Phalcon v3/v4/v5 you can visit here and for Phalcon v6 here.

Queueing with Phalcon and Beanstalk

Hello

I'm about to implement a queue of jobs in my application. Any information, suggestions and ideas are welcome :)

I'm using Phalcon 3.0.3 and PHP 7.0.14 (Debian).

First of all: I'm totally new on job queueing.

I read the docs: https://docs.phalcon.io/en/latest/reference/queue.html

My application is multi-module:

  • 2 web applications
  • An API application
  • CLI application with Tasks for cronjobs

All in the same project, so they can share the same Models and libraries (they're all part of the same thing).

Let's say I need to process videos (like in the Phalcon example): now this is done by the VideoTask in the CLI application. Every minute the CLI application runs video_task process and that Action checks if there is any video ready to be processed and processes it.

I'd like to transform this behaviour in a queue-oriented way. How can I implement this inside my application?

1. Every time a user uploads a video, I have to run this:

$queue->put(
    [
        "processVideo" => 4871,
    ]
);

Now, we are inside a Controller in one of the Web modules of the application.

  • Everything all right till here?

2. There is a script somewhere that implements the while() in which I execute the job "processVideo"

  • Is this script running "forever"? I mean, is it a PHP script that runs an infinite cycle?
  • I need to have all my application Models, libraries, etc., ready to use inside this script: where do I have to put this script? Is it a new "part" of my application together with the "CLI", "API", and "Web" ones?
  • How can I make sure that it starts at system startup? How can I check if it's running at a given time? What if it crashes or shuts down? How can I run it again?

3. Have a look at the example:

while (($job = $queue->peekReady()) !== false) {
    $message = $job->getBody();

    var_dump($message);

    $job->delete();
}
  • Is this condition: $queue->peekReady() !== false always true? If not (I don't think) do I have to put the above code inside a while( true ) {} cycle?

4. Load balancing

Let's say I want to make sure that the server doesn't process more than 3 videos every 10 minutes. So, I have processed a video at 2:23, another one at 2:25 and a third one at 2:26, if I receive a new job at 2:28 I want the Queue to wait until 2:33 before executing that job (and so all the others that arrive).

  • Is it possible to implement this behaviour? How (do you have any docs link I can read)?

Thank you very much for your help.



85.5k
edited Jan '17
  1. pretty much, but you need to specify a tube
  2. a. Yes it uses fork process and random sleep so it doesnt create loaad on ths ystem b. i have

-app
    - modules
    - config
    - models
    - form and so on
- cli < --- in my cli app I use my phalcon app config and include the vendor autoload from my main app and also regiter my namespaces for phalcon loader once again ( i guess you can create helper class for that )

c. well... I can help if you are on debian system otherwise i am not an expert

  1. Pretty much ( i havent dig into that )

  2. well each job creates new process so the kernal probably will load balance it for you, but not 100% working solution

    Many people are using beanstalk, me - I am a redis fan boy so I use https://github.com/chrisboulton/php-resque.

    The difference is quite huge ( as an implementation ) and there is 1 very strange thing that you have to do if you are using redis xD

    I am going to bed in ~20 mins, but i will answer you tomorrow if you have other questions or if you want me to explain you what a huge shit i did to implement a 90% perfect system :D



12.9k

Thank you very much.

Actually I was thinking: let's say I'll implement a properly structured service that starts, stops and gets status of a PHP script. This PHP script is part of the CLI app and implements a infinite loop for serving jobs in the queue.

Ok, but what's the point of implementing the queue itself? Once the user uploads a video I save its data in the database, then the PHP script with the infinite loop will check the DB table and finds out that there is a new video and processes it.

I mean: the DB table actually IS the queue.

Am I wrong?

I found this: https://collaboradev.com/2011/03/31/php-daemons-tutorial/

With a solution like the one in tutorial I can have a total control of the PHP "service": I can implement a cronjob (CLI app) that cehcks every minute if the script is active&running (service php-queue status) and if not, launches it again.



85.5k

is your server mac or debian/ubuntu ?



85.5k
cat /etc/systemd/system/phpworkers.service

[Unit]
Description=Php workers
Wants=network.target
After=network.target

[Service]
User=www-data
Group=www-data
Restart=always
StandardOutput=syslog
StandardError=syslog
ExecStart=/home/user/repos/workers/sites_workers.sh

[Install]
WantedBy=multi-user.target
systemctl enable phpworkers
cat /home/izopi4a/repos/workers/sites_workers.sh
#!/bin/bash

/opt/php-workers/bin/php /var/www/whatever/workers/run_worker.php >> /var/log/workers/whatver_worker.log |

/opt/php-workers/bin/php /var/www/whatver2/workers/run_worker.php >> /var/log/workers/whatever_worker.log;

This is basic loop I have running in the background (Phalcon CLI app):

  //This method works in a loop by default and reads buffer on a persistent basis reserving jobs from queue
                while (($job = $this->messageBroker->reserve())) {

                   //data from message broker service
                $message = $job->getBody();

                //get JOB ID from MQ daemon
                $jid = $job->getId();

                //do something....

                //delete job from message broker queue
                $del = $job->delete();

                }