We have moved our forum to GitHub Discussions. For questions about Phalcon v3/v4/v5 you can visit here and for Phalcon v6 here.

Phalcon and Beanstalkd: How to use it?

Dear all,

I am using phalcon to develop micro applications and want to put my job which is to access http api:

    $queue = new Phalcon\Queue\Beanstalk(array(
        'host' => '127.0.0.1',
        'port' => '11300',
    ));
    $queue->put(array('$job->createJob($job->getId())' => 4871));

how could i use a function which inside Job's model:

    public function createJob($job_id) {
        //access http
    }
  1. The official document has mentioned 4871, what 4871 stands for?
  2. How could I pass parameters to createJob via putting queue?
edited Jun '14

The official document has mentioned 4871, what 4871 stands for?

4871 as shown in the documentation just an example value for the job.

//Insert the job in the queue
$queue->put(array('processVideo' => 4871));

So when you're processing the beanstalk tube, you 'processVideo' would be the 'action', and 4871 would be the value for the action.

How could I pass parameters to createJob via putting queue?

I hope the above has answered your question.

Your code

$queue->put(array('$job->createJob($job->getId())' => 4871));

Does not logically make sense, the action should not be the job ID because your actual Beanstalk worker/job processor would not know to expect incremental IDs s the action.



18.6k
edited Oct '14

This:

public static function createJob($job_id) {
        echo 'Job...';
}

$queue = new Phalcon\Queue\Beanstalk(array(
            'host' => 'localhost',
            'port' => '11300',
));
$queue->put(array('createJob' => 4871));

Where can I define createJob? I am using Micro Application to design API now, and I have to use MQ to do so.



18.6k
edited Jun '14

anyone can help me? anyone can help me? anyone can help me? anyone can help me?



2.6k
edited Jun '14

Beanstalkd is only the queue, the processing of the actual job is your implementation.

With $queue->put(); you can put data which holds information on what to process. This is what your program is going to use to actually process the job.

In the Phalcon examples, an array is given with "processVideo" => 4871. When the job is retrieved from beanstalk this data is availalble from $job->getBody(). Your implementation should know how to handle it. To extend the example a bit more:

while (($job = $queue->peekReady()) !== false) {

    $jobBody = $job->getBody();

    if(isset($jobBody['processVideo']) {
        $videoId = $jobBody['processVideo']; //4871 in this example.

        //Do the actual job using $videoId
        $myProcessor->convertVideoById($videoId);
    }

    //Remove the job from the queue
    $job->delete();
}

The data you put in $queue->put(); is up to you to deside. And the way to handle this data as well. Beanstalk only provides the queue, not the handling of the job.



18.6k
edited Jun '14

Beanstalk only provides the queue, not the handling of the job. <<<<<<<<<<< I know, thanks! I directly speak, I want to use curl to become a MQ once it call http and wait for reply, how can i program it? As i do not want users waiting a long time only for curl request...



2.6k

I want to use curl to become a MQ once it call http and wait for reply, how can i program it? As i do not want users waiting a long time only for curl request...

I don't understand. This conflicts.

  1. Call http and wait for reply
  2. not want users waiting a long time only for curl request

Do you want to put something in the queue with HTTP and return a reponse so it doesn't time out? And when the job actually completes respond to the request with the result?



18.6k

Users access my API, then API is to call a external http address with parameters using CURL, but call http is a action to wait for response, if other web page is slow, my api will slow too, so i need to let them queue.

Sorry for my bad English ^_^!!!



2.6k

This is not really Phalcon related but here it goes anyway. This shows a simple example of using the queue to generate a response. Your API endpoint only creates a job in the queue and can return immediately.

When the item is picked from the queue the request to the external API is made and when a reponse is recieved it calls the callback supplied by the original callee. This example could be improved by splitting the job in two, one job for external api retrieval and a seperate job for the callback. This way the extarnal Api isn't called agian when the callback fails. This also allows for multiple callback attempts by keeping a counter for failed deliveries.

ApiController

public function myApiEndpointAction($myArgument, $callbackUrl) {
    $queue->put(array(
        'jobName' => $myArgument,
        'callbackUrl' => $callbackUrl
    ));

    return new Reponse("Item put in queue", 201);
}

Worker

while (($job = $queue->peekReady()) !== false) {

    $jobBody = $job->getBody();

    if(isset($jobBody['jobName']) {
        $myArgument = $jobBody['jobName']; 
        $callbackUrl = $jobBody['callbackUrl']; 

        //Call external API
        $external = curl_init();
        curl_setopt_array($external, array(
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_URL => 'https://external.api.com'
        ));

        //The reponse from the external API
        $result = curl_exec($external);
        if(!$result) {
            //External api failed
            //Don't delete job, will be pickedup again the next round
            curl_close($external);
            return;
        }
        curl_close($external);

        //Return the response to the original callee
        $callback = curl_init();
        curl_setopt_array($callback, array(
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_URL => $callbackUrl, //The url which the original api caller gave
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => array(
                data => $result //The response from the external api call
            )
        ));
        $deliverd = curl_exec($callback);
        if(!$deliverd) {
            curl_close($callback);
            return;
        }
        curl_close($callback);
    }

    //Remove the job from the queue
    $job->delete();
}

What is the best way to set number of worker ?

@Phalcon , tube can be used to manage job to different category right ?

Example , job that related to sent email , put to "email" tube , job related to sent SMS , put to "sms" tube.

I don't really understand how to use it.

I need to use $queue->choose("email") or $queue->watch("email") while put job to queue ?

And how to peekReady the job from just one tube. Example , I want to just get job list from "email" tube. How ?

Thanks.