Gearman分布式任务处理系统(八)开发讲解_笑天居士的博客-程序员秘密

技术标签: 架构  Gearman分布式任务处理系统  Gearman  

这篇重点介绍C-Lib库及client和worker的开发,以0.14版libgearman for C来讲解

Client API

client初始化&析构

gearman_client_st *gearman_client_create(gearman_client_st *client)

void gearman_client_free(gearman_client_st *client)

gearman_return_t gearman_client_add_server(gearman_client_st *client, const char *host, in_port_t port);

gearman_return_t gearman_client_add_servers(gearman_client_st *client, const char *servers); (一次添加多个gearman job-server)


同步操作

void * gearman_client_do(gearman_client_st *client, const char *function_name, const char *unique, const void *workload, size_t workload_size, size_t *result_size, gearman_return_t *ret_ptr);

其中:

i. unique: 的作用是client添加job给worker的时候的一个唯一标识,可选,默认是NULL

ii. workload & workload_size指代执行任务的详细参数及其大小

iii. result_size [out],指代返回数据的大小

iv. ret_ptr [out], 指代gearman的返回status,以下是官方对于返回status的一些说明:

In the case of GEARMAN_WORK_DATA, GEARMAN_WORK_WARNING, or GEARMAN_WORK_STATUS, the caller should take any actions to handle the event and then call this function again. This may happen multiple times until a GEARMAN_WORK_ERROR, GEARMAN_WORK_FAIL, or GEARMAN_SUCCESS (work complete) is returned. For GEARMAN_WORK_DATA or GEARMAN_WORK_WARNING, the result_size will be set to the intermediate data chunk being returned and an allocated data buffer will be returned. For GEARMAN_WORK_STATUS, the caller can use gearman_client_do_status() to get the current tasks status.

总而言之,只有GEARMAN_WORK_ERROR/GEARMAN_WORK_FAIL/GEARMAN_SUCCESS才是三个最终的返回结果,其他的只是临时中间结果,需要进一步调用接受结果的函数(感觉中间结果只有在异步调用过程中才会出现)

v. 输出是返回数据的起始地址,一旦用户用完之后,必须free,否则会出现内存泄露。

void gearman_set_timeout(gearman_universal_st *gearman, int timeout);

设置gearman_client_do多长时调用无返回则超时时间


异步callback操作

Gearman通过使用gearman_client_add_task()来望gearman_client_st中添加task,通过gearman_client_set_created_fn() / gearman_client_set_complete_fn()等来注册callback function,通过gearman_client_run_tasks()来运行gearman_client_st中的task。


异步background操作

系统在background运行job,client定期获得job运行结果,如果成功则返回,反之则继续等待。

gearman_return_t gearman_client_do_background(gearman_client_st *client, const char *function_name, const char *unique, const void *workload, size_t workload_size, char *job_handle);

i. job_handle [out]: 一个job的标识符

ii. 输出:返回状态

* gearman_return_t gearman_client_job_status(gearman_client_st *client, gearman_job_handle_t job_handle, bool *is_known, bool * is_running, uint32_t *numerator, uint32_t *denominator);

* 用户获得在background执行的job的状态

i. is_known [out]: Optional parameter to store the known status in

ii. is_running [out]: Optional parameter to store the running status in

iii. numerator [out]: Optional parameter to store the numerator in

iv. denominator [out]: Optional parameter to store the denominator in

PS: 好像background操作不怎么好使,不知道如何通过获得background的运行结果,这个是我一直困惑的


gearman_client_st的一些属性

gearman_client_st一共有以下3种运行属性:

i. GEARMAN_CLIENT_NON_BLOCKING: client运行在non-blocking mode

ii. GEARMAN_CLIENT_FREE_TASKS: 在task执行完成之后,自动的释放task

iii. GEARMAN_CLIENT_UNBUFFERED_RESULT: Allow the client to read data in chunks rather than have the library buffer the entire data result and pass that back。

可以通过函数gearman_client_add_options() / gearman_client_remove_options() / gearman_client_has_option() 等进行属性添加/删除/判断等


Worker API

/**
 * Initialize a worker structure. Always check the return value even if passing
 * in a pre-allocated structure. Some other initialization may have failed. It
 * is not required to memset() a structure before providing it.
 *
 * @param[in] worker Caller allocated structure, or NULL to allocate one.
 * @return On success, a pointer to the (possibly allocated) structure. On
 *  failure this will be NULL.
 */
GEARMAN_API
gearman_worker_st *gearman_worker_create(gearman_worker_st *worker);


/**
 * Free resources used by a worker structure.
 *
 * @param[in] worker Structure previously initialized with
 *  gearman_worker_create() or gearman_worker_clone().
 */
GEARMAN_API
void gearman_worker_free(gearman_worker_st *worker);


/**
 * Add a job server to a worker. This goes into a list of servers that can be
 * used to run tasks. No socket I/O happens here, it is just added to a list.
 *
 * @param[in] worker Structure previously initialized with
 *  gearman_worker_create() or gearman_worker_clone().
 * @param[in] host Hostname or IP address (IPv4 or IPv6) of the server to add.
 * @param[in] port Port of the server to add.
 * @return Standard gearman return value.
 */
GEARMAN_API
gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
                                           const char *host, in_port_t port);


/**
 * Add a list of job servers to a worker. The format for the server list is:
 * SERVER[:PORT][,SERVER[:PORT]]...
 * Some examples are:
 * 10.0.0.1,10.0.0.2,10.0.0.3
 * localhost LIBGEARMAN_BITFIELD234,jobserver2.domain.com:7003,10.0.0.3
 *
 * @param[in] worker Structure previously initialized with
 *  gearman_worker_create() or gearman_worker_clone().
 * @param[in] servers Server list described above.
 * @return Standard gearman return value.
 */
GEARMAN_API
gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker,
                                            const char *servers);
                                            
/**
 * Register and add callback function for worker. To remove functions that have
 * been added, call gearman_worker_unregister() or
 * gearman_worker_unregister_all().
 *
 * @param[in] worker Structure previously initialized with
 *  gearman_worker_create() or gearman_worker_clone().
 * @param[in] function_name Function name to register.
 * @param[in] timeout Optional timeout (in seconds) that specifies the maximum
 *  time a job should. This is enforced on the job server. A value of 0 means
 *  an infinite time.
 * @param[in] function Function to run when there is a job ready.
 * @param[in] context Argument to pass into the callback function.
 * @return Standard gearman return value.
 */
GEARMAN_API
gearman_return_t gearman_worker_add_function(gearman_worker_st *worker,
                                             const char *function_name,
                                             uint32_t timeout,
                                             gearman_worker_fn *function,
                                             void *context);


/**
 * Wait for a job and call the appropriate callback function when it gets one.
 *
 * @param[in] worker Structure previously initialized with
 *  gearman_worker_create() or gearman_worker_clone().
 * @return Standard gearman return value.
 */
GEARMAN_API
gearman_return_t gearman_worker_work(gearman_worker_st *worker);


/**
 * See gearman_universal_set_timeout() for details.
 */
GEARMAN_API
void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout);


开发实例

下面这个实例程序是,jfy_client发送test,jfy_worker返回test->result

/*
  gearman client 测试程序
  gcc -o jfy_client jfy_client.c -I/usr/local/gearman/include -L/usr/local/gearman/lib -lgearman
  ./jfy_client "this is a test"
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <libgearman/gearman.h>

static void usage(char *name);

int main(int argc, char *argv[])
{
  int i;
  gearman_return_t ret;
  gearman_client_st client;
  char *result;
  size_t result_size;
  uint32_t numerator;
  uint32_t denominator;

  char *host = "localhost", *port = "4730";

  if (gearman_client_create(&client) == NULL)
  {
    fprintf(stderr, "Memory allocation failure on client creation\n");
    exit(1);
  }

  gearman_client_set_options(&client, GEARMAN_CLIENT_FREE_TASKS);
  gearman_client_set_timeout(&client, 15000);

  ret= gearman_client_add_server(&client, host, atoi(port));
  if (ret != GEARMAN_SUCCESS)
  {
    fprintf(stderr, "%s\n", gearman_client_error(&client));
    exit(1);
  }

  for (i=0;i<10;i++)
  {
    result= (char *)gearman_client_do(&client, "jfytest", NULL,
                                      (void *)argv[1],
                                      (size_t)strlen(argv[1]),
                                      &result_size, &ret);
    if (ret == GEARMAN_WORK_DATA)
    {
      printf("Data=%.*s\n", (int)result_size, result);
      free(result);
    }
    else if (ret == GEARMAN_WORK_STATUS)
    {
      gearman_client_do_status(&client, &numerator, &denominator);
      printf("Status: %u/%u\n", numerator, denominator);
    }
    else if (ret == GEARMAN_SUCCESS)
    {
      char result2[1024];
      strncpy(result2, result, result_size);
      result2[result_size] = 0;
      printf("result_size=%d,result=%s=\n", (int)result_size, result2);
      free(result);
    }
    else if (ret == GEARMAN_WORK_FAIL)
      fprintf(stderr, "Work failed\n");
    else if (ret == GEARMAN_TIMEOUT)
    {
      fprintf(stderr, "Work timeout\n");
    } else {
      fprintf(stderr, "%d,%s\n", gearman_client_errno(&client), gearman_client_error(&client));
    }
    printf("sleep 5s ...\n");
    sleep(5);
  }

  gearman_client_free(&client);

  return 0;
}

/*
  gearman worker 测试程序
  gcc -o jfy_worker jfy_worker2.c -I/usr/local/gearman/include -L/usr/local/gearman/lib -lgearman
  ./jfy_worker ./jfy_worker.tr
*/

#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <libgearman/gearman.h>

static void *jfytest(gearman_job_st *job, void *context, size_t *result_size,
                     gearman_return_t *ret_ptr);

int main(int argc, char *argv[])
{
  gearman_return_t ret;
  gearman_worker_st worker;

  char *host = "localhost", *port = "4730";

  if (gearman_worker_create(&worker) == NULL)
  {
    printf("%s\n", gearman_worker_error(&worker));
    exit(1);
  }

  ret= gearman_worker_add_server(&worker, host, atoi(port));
  if (ret != GEARMAN_SUCCESS)
  {
    printf("%s\n", gearman_worker_error(&worker));
    exit(1);
  }

  ret= gearman_worker_add_function(&worker, "jfytest", 0, jfytest, NULL);
  if (ret != GEARMAN_SUCCESS)
  {
    printf("%s\n", gearman_worker_error(&worker));
    exit(1);
  }

  printf("wait job ...\n");
  while (1)
  {
    ret= gearman_worker_work(&worker);
    if (ret != GEARMAN_SUCCESS)
    {
      printf("%s\n", gearman_worker_error(&worker));
      break;
    }
  }

  gearman_worker_free(&worker);

  return 0;
}

static void *jfytest(gearman_job_st *job, void *context, size_t *result_size,
                     gearman_return_t *ret_ptr)
{
  const uint8_t *workload;
  char *request,*result;

  workload= gearman_job_workload(job);
  *result_size= gearman_job_workload_size(job);

  request= malloc(1024);
  if (result == NULL)
  {
    printf("malloc request:%d\n", errno);
    *ret_ptr= GEARMAN_WORK_FAIL;
    return NULL;
  }

  snprintf((char *)request, *result_size+1, "%s", (char *)workload);
  printf("job=%s,result_size=%d,request=%s\n", gearman_job_handle(job),*result_size,request);

  result= malloc(1024);
  if (result == NULL)
  {
    printf("malloc result:%d\n", errno);
    *ret_ptr= GEARMAN_WORK_FAIL;
    return NULL;
  }

  *ret_ptr= GEARMAN_SUCCESS;

  sprintf((char *)result, "%s->result", (char *)request);
  *result_size= strlen((char *)result);

  printf("job=%s,result_size=%d,result=%s\n", gearman_job_handle(job),*result_size,result);

  return result;
}


下面的实例是PHP程序(PHP Gearman参考文档),客户端发送"hello!"

worker端是两个程序,一个是阻塞方式的,一个是非阻塞方式的

<?php
/*
 * send "Hello!"
 */

echo "Starting\n";

# Create our client object.
$gmclient= new GearmanClient();

# Add default server (localhost).
$gmclient->addServer();

echo "Sending job\n";

# Send reverse job
do
{
  $result= $gmclient->do("reverse", "Hello!");
  # Check for various return packets and errors.
  switch($gmclient->returnCode())
  {
    case GEARMAN_WORK_DATA:
      echo "Data: $result\n";
      break;
    case GEARMAN_WORK_STATUS:
      list($numerator, $denominator)= $gmclient->doStatus();
      echo "Status: $numerator/$denominator complete\n";
      break;
    case GEARMAN_SUCCESS:
      break;
    default:
      echo "RET: " . $gmclient->returnCode() . "\n";
      exit;
  }
}
while($gmclient->returnCode() != GEARMAN_SUCCESS);
echo "Success: $result\n";

?>


<?php
/*
 * 阻塞方式Worker,处理"Hello!"转换为"!olleH"
 */

echo "Starting\n";

# Create our worker object.
$gmworker= new GearmanWorker();
$gmworker->setTimeout(5000);

# Add default server (localhost).
$gmworker->addServer();

# Register function "reverse" with the server. Change the worker function to
# "reverse_fn_fast" for a faster worker with no output.
$gmworker->addFunction("reverse", "reverse_fn");

print "Waiting for job...\n";
while($gmworker->work())
{
  if ($gmworker->returnCode() != GEARMAN_SUCCESS)
  {
    echo "return_code: " . $gmworker->returnCode() . "\n";
    break;
  }
  echo "receve and proced a job!";
}

function reverse_fn($job)
{
  echo "Received job: " . $job->handle() . "\n";

  $workload= $job->workload();
  $workload_size= $job->workloadSize();

  echo "Workload: $workload ($workload_size)\n";

  # This status loop is not needed, just showing how it works
  for ($x= 0; $x < $workload_size; $x++)
  {
    echo "Sending status: $x/$workload_size complete\n";
    /*
    $job->sendStatus($x, $workload_size);
    sleep(1);
    */
  }

  $result= strrev($workload);
  echo "Result: $result\n";

  # Return what we want to send back to the client.
  return $result;
}

# A much simpler and less verbose version of the above function would be:
function reverse_fn_fast($job)
{
  return strrev($job->workload());
}

?>

<?php
/*
 * 非阻塞方式Worker,处理"Hello!"转换为"!olleH"
 */

echo "Starting\n";

# Create our worker object.
$gmworker= new GearmanWorker();
$gmworker->setTimeout(1000);
$gmworker->addOptions(GEARMAN_WORKER_NON_BLOCKING); # Make the worker non-blocking

# Add default server (localhost).
$gmworker->addServer();

# Register function "reverse" with the server. Change the worker function to
# "reverse_fn_fast" for a faster worker with no output.
$gmworker->addFunction("reverse", "reverse_fn");

print "Waiting for job...\n";

while ( ($ret = $gmworker->work()) || $gmworker->returnCode() == GEARMAN_IO_WAIT || $gmworker->returnCode() == GEARMAN_NO_JOBS) {
  echo "return_code: " . $gmworker->returnCode() . "\n";
  if ($gmworker->returnCode() == GEARMAN_SUCCESS) {
    continue;
  }
  if ( !$gmworker->wait() ) {
    echo "return_code: " . $gmworker->returnCode() . "\n";
    if ($gmworker->returnCode() == GEARMAN_NO_ACTIVE_FDS) {
      # We are not connected to any servers, so wait a bit before
      # trying to reconnect. sleep(5);
      continue;
    }
    break;
  }
}

function reverse_fn($job)
{
  echo "Received job: " . $job->handle() . "\n";

  $workload= $job->workload();
  $workload_size= $job->workloadSize();

  echo "Workload: $workload ($workload_size)\n";

  # This status loop is not needed, just showing how it works
  for ($x= 0; $x < $workload_size; $x++)
  {
    echo "Sending status: $x/$workload_size complete\n";
    /*
    $job->sendStatus($x, $workload_size);
    sleep(1);
    */
  }

  $result= strrev($workload);
  echo "Result: $result\n";

  # Return what we want to send back to the client.
  return $result;
}

# A much simpler and less verbose version of the above function would be:
function reverse_fn_fast($job)
{
  return strrev($job->workload());
}

?>

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/xtjsxtj/article/details/16339039

智能推荐

中南大学计算机学院竞赛,2019 CCSP华中分赛区竞赛在湖南大学举办,中南大学苏志华夺冠..._Yang Yanzhe的博客-程序员秘密

2019 CCF CCSP分赛区竞赛5月18日在全国7个分赛区的承办高校同时开赛。其中,华中分赛区由CCF华中分赛区秘书处湖南大学承办,来自湖南、湖北、江西、河南四省15所有高校的104名选手参赛。CCSP华中分赛区现场与ACM/ICPC团体竞赛相比,CCSP是个人竞赛,不仅考察大学生的数据结构与算法基础等编程能力,更侧重考查其系统功能设计、系统性能优化以及系统知识学习等计算机系统综合能力,CCS...

Android Studio 插件开发2、网络请求、创建dialog、trello获取cardId等_android 网络请求弹框_安果移不动的博客-程序员秘密

然后取出来第一个参数 那么这个参数就是看板咯。看板的cards的 列表id就是这个。创建一个dialog 并且展示布局。这个用的是 fromListId。出来弹框我们先看到的就是布局。moveCard 还没用到。这个loadCards用到了。程序底部导入网络请求api。点击这里就出来一个弹框。view点击产生的回调。这个id可获取有点费劲。

第四届蓝桥杯【省赛试题9】带分数_第四届省赛第九题 带分数_i逆天耗子丶的博客-程序员秘密

题目描述:100 可以表示为带分数的形式:100 = 3 + 69258 / 714还可以表示为:100 = 82 + 3546 / 197注意特征:带分数中,数字1~9分别出现且只出现一次(不包含0)。类似这样的带分数,100 有 11 种表示法。题目要求:从标准输入读入一个正整数N (N程序输出该数字用数码1~9不重复不遗漏地组成带分数表示的全部种数。注意:不要

二叉树的二叉链表表示_画出该二叉树的二叉链表表示_&刘仔很忙的博客-程序员秘密

链式存储结构    二叉树的链式存储结构是指,用链表来表示一棵二叉树,即用链来指示元素的逻辑关系。通常的方法是链表中每个结点由三个域组成,数据域和左右指针域,左右指针分别用来给出该结点左孩子和右孩子所在的链结点的存储地址。其结点结构为:  其中,data域存放某结点的数据信息;lchild与rchild分别存放指向左孩子和右孩子的指针,当左孩子或右孩子不存在时,相应

Java流处理之序列化和打印流_共饮一杯无的博客-程序员秘密

序列化概述ObjectOutputStream类构造方法序列化操作ObjectInputStream类构造方法反序列化操作1**反序列化操作2**案例:序列化集合案例分析案例实现打印流概述PrintStream类构造方法改变打印流向

数据结构 5-3-1 二叉树四种顺序遍历递归实现_对图5-31中的树进行后根遍历的结果是什么_林北不要忍了的博客-程序员秘密

一、概念二叉树的遍历是指按照某种搜索方式,得到一条路径,按照这个路径访问整个二叉树,树中的每个节点访问一次。根据当前节点的访问顺序分为先序、中序、后序访问,除此之外还有一种层序,总共四种访问方式。二、先序遍历先序是指到达一个节点后,先输出节点所存放的数据,之后访问左子树和右子树,对应的代码如下void preorder_print(struct TNode *t){ if(t!=NULL) { printf("%d ",t-&gt;data); preorder_print(t-&gt

随便推点

selenium+chromedriver+PhantomJS安装配置_心机小兔的博客-程序员秘密

爬虫——selenium模块使用selenium是一个web的自动化测试工具,最初是为网站自动化测试而开发的,selenium可以直接运行在浏览器上,它支持所有主流的浏览器,可以接收指令,让浏览器自动加载页面,获取需要的数据,甚至页面截屏,在反反爬虫中也是一个重要的工具,比较强大,但是缺点就是——慢。一般如果要爬的网站是动态加载的,那么可以考虑用selenium,听说号称 “可见即可爬”Selenium+chromedriverselenium让程序连接到浏览器,让浏览器去完成各种操作,是一种自动化

spring 中dataSource配置_spring.datasource的配置设置文件输出_cskf的博客-程序员秘密

1、Spring配置文件中:&amp;lt;context:property-placeholder location=&quot;db.properties&quot;/&amp;lt;bean id=&quot;dataSource&quot; class=&quot;org.apache.commons.dbcp2.BasicDataSource&quot;&amp;gt; &amp;lt;property name=&quot;driverClassName&quot; value=&quo

golang 包详解_流浮生的博客-程序员秘密

Golang 第二天(golang 包详解)一、Golang 中的包的介绍和定义包(package)是多个 Go 源码的集合,是一种高级的代码复用方案,Go 语言为我们提供了很多内置包,如 fmt、strconv、string、sort、errors、time 等。Golang 中的包可以分为三种:系统内置包自定义包第三方包系统内置包:Golang 语言给我们提供内置包,映入后可以直接使用,如 fmt、strconv、strings、sort 等自定义包:开发者自己写的包第三方包:属于

[小e笔记]之错误案例——ora-00603和ora-00600错误_ora-00603 ora-01092 ora-00600_Elvis_dataguru的博客-程序员秘密

案例出错情况:[[email protected] ~]$ sqlplus / as sysdbaSQL*Plus: Release 11.2.0.1.0 Production on Wed Oct 17 15:05:42 2012Copyright (c) 1982, 2009, Oracle.  All rights reserved.Connected to an idle insta

php workman消息提醒,原生workman实现消息推送_刀熊说说的博客-程序员秘密

原生workman实现及时消息推送需求:共享单车关闭后,会通过设置的url,进行回调,发送数据。那么相对于共享单车关锁来说,回调给url的数据,对于客户端来说是异步操作,那么如何使回调变成同步操作,客户端能立即收到关锁之后的信息呢?问题:由于进程之间申请的内存不能共享,所以当你直接调用workman里面封装的方法时,会无法拿到数据。所以好多人就会出现,在回调里面调用推送的方法,提示 未定义方法或方...

docker容器挂载宿主机目录_docker 挂载 自动创建目录_weixin_43838625的博客-程序员秘密

Docker容器启动的时候,如果要挂载宿主机的一个目录,可以用-v参数指定。例如我要启动一个centos容器,宿主机的/test目录挂载到容器的/soft目录,可通过以下方式指定:docker run -it -v /test:/soft centos /bin/bash解析:冒号":"前面的目录是宿主机目录,后面的目录是容器内目录。这样在容器启动后,容器内会自动创建/soft的目录。注...

推荐文章

热门文章

相关标签