技术标签: go
转载自:http://daizuozhuo.github.io/golang-rpc-practice/
一直用Golang标准库里的的RPC package来进行远程调用,简单好用. 但是随着任务数量的增大, 发现简单的像包里面的示例那样的代码出现了各种各样的问题,下面就把我踩过的一些坑记录一下吧. 首先是最初使用的文档里的版本,使用HTTP来发送请求.
server.go
func ListenRPC() {
rpc.Register(NewWorker())
rpc.HandleHTTP()
l, e := net.Listen("tcp", ":4200")
if e != nil {
log.Fatal("Error: listen 4200 error:", e)
}
go http.Serve(l, nil)
}
client.go
func call(srv string, rpcname string, args interface{}, reply interface{}) error {
c, errx := rpc.DialHTTP("tcp", srv+":4200")
if errx != nil {
return fmt.Errorf("ConnectError: %s", errx.Error())
}
defer c.Close()
return c.Call(rpcname, args, reply)
}
这样四五台机器的情况是够用了, 但是后来集群的机器增加到了十二台, 当请求大了之后发现总有很多任务卡住,通过call函数发送任务之后总会有没有返回的情况. 于是转而直接用tcp,效率有很大提升.
server.go
func ListenRPC() {
rpc.Register(NewWorker())
l, e := net.Listen("tcp", ":4200")
if e != nil {
log.Fatal("Error: listen 4200 error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err != nil {
log.Print("Error: accept rpc connection", err.Error())
continue
}
go rpc.ServeConn(conn)
}
}()
}
client.go
func call(srv string, rpcname string, args interface{}, reply interface{}) error {
c, errx := rpc.Dial("tcp", srv+":4200")
if errx != nil {
return fmt.Errorf("ConnectError: %s", errx.Error())
}
defer c.Close()
return c.Call(rpcname, args, reply)
}
这样局面有所改观,但是还是有任务卡住,概率大概是0.01%, 也就是一万个call里会有一个没有响应. 仔细研究后发现这个rpc package有两大坑:
rpc包里的rpc.Dial函数没有timeout, 系统默认是没有timeout的,所以在这里可能卡住.所以我们可以采用net包里的 net.DialTimeout函数.
rpc包里默认使用gobCodec来编码解码, 这里io可能会卡住而不返回错误,所以我们要自己编写加入timeout的codec. 注意server这边读写都有timeout,但是client这边只有写有timeout,因为读的话并不能预知任务完成的时间. 于是就有了接下来这个版本的rpc,几十万个任务下来没有任何问题.
完整的代码可以在在github rpc-example上下载.
server.go
func TimeoutCoder(f func(interface{}) error, e interface{}, msg string) error {
echan := make(chan error, 1)
go func() { echan <- f(e) }()
select {
case e := <-echan:
return e
case <-time.After(time.Minute):
return fmt.Errorf("Timeout %s", msg)
}
}
type gobServerCodec struct {
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
closed bool
}
func (c *gobServerCodec) ReadRequestHeader(r *rpc.Request) error {
return TimeoutCoder(c.dec.Decode, r, "server read request header")
}
func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
return TimeoutCoder(c.dec.Decode, body, "server read request body")
}
func (c *gobServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {
if err = TimeoutCoder(c.enc.Encode, r, "server write response"); err != nil {
if c.encBuf.Flush() == nil {
log.Println("rpc: gob error encoding response:", err)
c.Close()
}
return
}
if err = TimeoutCoder(c.enc.Encode, body, "server write response body"); err != nil {
if c.encBuf.Flush() == nil {
log.Println("rpc: gob error encoding body:", err)
c.Close()
}
return
}
return c.encBuf.Flush()
}
func (c *gobServerCodec) Close() error {
if c.closed {
// Only call c.rwc.Close once; otherwise the semantics are undefined.
return nil
}
c.closed = true
return c.rwc.Close()
}
func ListenRPC() {
rpc.Register(NewWorker())
l, e := net.Listen("tcp", ":4200")
if e != nil {
log.Fatal("Error: listen 4200 error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err != nil {
log.Print("Error: accept rpc connection", err.Error())
continue
}
go func(conn net.Conn) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
err = rpc.ServeRequest(srv)
if err != nil {
log.Print("Error: server rpc request", err.Error())
}
srv.Close()
}(conn)
}
}()
}
client.go
type gobClientCodec struct {
rwc io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
encBuf *bufio.Writer
}
func (c *gobClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) {
if err = TimeoutCoder(c.enc.Encode, r, "client write request"); err != nil {
return
}
if err = TimeoutCoder(c.enc.Encode, body, "client write request body"); err != nil {
return
}
return c.encBuf.Flush()
}
func (c *gobClientCodec) ReadResponseHeader(r *rpc.Response) error {
return c.dec.Decode(r)
}
func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
return c.dec.Decode(body)
}
func (c *gobClientCodec) Close() error {
return c.rwc.Close()
}
func call(srv string, rpcname string, args interface{}, reply interface{}) error {
conn, err := net.DialTimeout("tcp", srv+":4200", time.Second*10)
if err != nil {
return fmt.Errorf("ConnectError: %s", err.Error())
}
encBuf := bufio.NewWriter(conn)
codec := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
c := rpc.NewClientWithCodec(codec)
err = c.Call(rpcname, args, reply)
errc := c.Close()
if err != nil && errc != nil {
return fmt.Errorf("%s %s", err, errc)
}
if err != nil {
return err
} else {
return errc
}
}
https://blog.csdn.net/kisssun0608/article/details/45338655版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010113156/article/details/45338655根据Hadoop官网的相关介绍和实际使用中的软件集,将Hadoop生态圈的主要软件工具简单介绍下,拓展对整个...
struts2提供了annotation来代替配置文件,并且说:"It is great start."。我试用了Annoation风格的Action配置,感觉并不是很好用,尤其在比较复杂的Action中,反而不如XML直观明了。甚至,我不喜欢使用带有{}的简化配置,还是传统的方式更加一目了然。尤其是配置配置Action的时候需要在web.xml中注明packages,简直是...今天终于找到...
AbstractWith the ubiquity of real-time data,organizations need streaming systems that are scalable, easy to use, and easyto integrate into business applications. Structured Streaming is a newhigh-leve...
上文链接:第几天修改版【蓝桥杯真题】(c++实现)乘积尾零如下的10行数据,每行有10个整数,请你求出它们的乘积的末尾有多少个零?5650 4542 3554 473 946 4114 3871 9073 90 43292758 7949 6113 5659 5245 7432 3051 4434 6704 35949937 1173 6866 3397 4759 7557 3070 ...
工作了半年后,觉得自己的能力毫无提升的方式?作做好本职工作之外,不妨多去接一些私活,除了提升自己的技能之外,也让收入再上一层,何乐不为?平台0.程序员客栈https://www.proginn.com/主要雇佣BAT级别开发者为创业者开发产品。远程工作是一种趋势,解放了时间,解放了束缚,充分利用自己的时间。各尽其能,各负其责。不说其他,钱是保...
js和css动画使用setTimeout()或者setInterval()使用这两个函数定时调用一段代码。这是其原理。目的,重复修改内联样式,达到动画的效果通过在相同的时间内构造出一帧帧的内容,然后让其在函数的作用下不断的改变css的值,达到动画的效果js写css动画//将e转化为相对定位的元素,使得其可以左右移动//第一个参数为元素对象或者元素的id//如果第二个参数是函数,以e为参数,它...
QT中QMainWindow、QWidget、QDialog简述在分享所有基础知识之前,很有必要在这里介绍下常用的窗口-QWidget、QDialog、QMainWindow。熟悉Qt的同学都应该知道,在新建Qt Widgets项目进行类信息选择时会碰到它们,没错,很巧,每次都能碰到。所以,在以后的Qt开发中,我们会经常和它们打交道。常言道,知己知彼,百战不殆。只有了解它们、...
最近脑抽,用win32 API写了一个数据处理平台,将各个窗口定义为Class进行自管理,但是由于类常规成员函数不能作为窗口函数,只能将窗口函数定义为静态函数,这样写则后续在窗口函数中引用的函数都要定义为静态函数,且不能引用类中的成员变量,非常麻烦,通过调研(竟然在20年前的贴子中找到了一个方法,但只是思路,无法直接使用)可以在createstruct中将成员函数传递给窗口,实现如下,包含一个窗口...
在程序中跳转页面时,网址前多了一串 res://ieframe.dll/navcancl.htm#处理方法和处理思路可查看该博客,是我之前写的一篇博客:C#程序使用Nginx时出现的重定向问题
邮件服务相关MUA:邮件用户代理; 指的就是如Foxmail,outlook,等邮件客户端程序。MDA(Mail Delivery Agent):“邮件投递代理”主要的功能就是将MTA接收的信件依照信件的流向(送到哪里)将该信件放置到本机账户下的邮件文件中(收件箱),或者再经由MTA将信件送到下个MTA。如果信件的流向是到本机,这个邮件代理的功能就不只是将由MTA传来的...
今天做Excel表格的时候,在表格中需要嵌入日历丰富表格的内容。结合网上找到的一些方法,把具体实现步骤整理了一下,加上一些我自己使用到的VB代码,自己留个脚印,以后也会用得上。本文档指导在Excel 2007中启用日历控件,并在指定列中弹出日历选择框的步骤。1. 先需下载并注册安装Excel 2007的日期控件文件MSCAL.OCX。在Microsoft网站或internet上下载日历