diff --git a/examples/dubbo/go-client/app/client.go b/examples/dubbo/go-client/app/client.go index 7cd002b765144ae7d98e96514207787ff33f671b..9206448ed7157fb2e1dcedf06de73b7af65d6c16 100644 --- a/examples/dubbo/go-client/app/client.go +++ b/examples/dubbo/go-client/app/client.go @@ -80,7 +80,7 @@ func main() { } func initProfiling() { - if !support.GetProviderConfig().Pprof_Enabled { + if !support.GetConsumerConfig().Pprof_Enabled { return } const ( @@ -96,7 +96,7 @@ func initProfiling() { if err != nil { panic("cat not get local ip!") } - addr = ip + ":" + strconv.Itoa(support.GetProviderConfig().Pprof_Port) + addr = ip + ":" + strconv.Itoa(support.GetConsumerConfig().Pprof_Port) log.Info("App Profiling startup on address{%v}", addr+PprofPath) go func() { diff --git a/examples/dubbo/go-client/benchmark/benchmark.go b/examples/dubbo/go-client/benchmark/benchmark.go index 586b73af21a1fee17c821d81b2dc49eb8ff5fdee..94bc864c294d8ba05b6067f1301f46cd311e2a34 100644 --- a/examples/dubbo/go-client/benchmark/benchmark.go +++ b/examples/dubbo/go-client/benchmark/benchmark.go @@ -4,13 +4,19 @@ import ( "context" "errors" "flag" + "fmt" "log" + "net/http" + _ "net/http/pprof" + + "strconv" "sync" "sync/atomic" "time" ) import ( + "github.com/AlexStocks/goext/net" "github.com/dubbogo/hessian2" "github.com/montanaflynn/stats" ) @@ -35,6 +41,7 @@ var total = flag.Int("n", 1, "total requests for all clients") var survivalTimeout int = 10e9 func main() { + initProfiling() flag.Parse() conc, tn, err := checkArgs(*concurrency, *total) @@ -164,3 +171,27 @@ func checkArgs(c, n int) (int, int, error) { } return c, n, nil } + +func initProfiling() { + if !support.GetConsumerConfig().Pprof_Enabled { + return + } + const ( + PprofPath = "/debug/pprof/" + ) + var ( + err error + ip string + addr string + ) + + ip, err = gxnet.GetLocalIP() + if err != nil { + panic("can not get local ip!") + } + addr = ip + ":" + strconv.Itoa(support.GetConsumerConfig().Pprof_Port) + fmt.Println(addr) + go func() { + http.ListenAndServe(addr, nil) + }() +} diff --git a/examples/dubbo/go-client/profiles/dev/client.yml b/examples/dubbo/go-client/profiles/dev/client.yml index 4b9005472db8486f36dc7bac31edbacc63caac08..a845418f883f73bb666692840131337e41ac95b0 100644 --- a/examples/dubbo/go-client/profiles/dev/client.yml +++ b/examples/dubbo/go-client/profiles/dev/client.yml @@ -47,6 +47,7 @@ references: protocol_conf: dubbo: + reconnect_interval: 0 connection_number: 2 heartbeat_period: "5s" session_timeout: "20s" diff --git a/examples/dubbo/go-client/profiles/release/client.yml b/examples/dubbo/go-client/profiles/release/client.yml index 4b9005472db8486f36dc7bac31edbacc63caac08..a845418f883f73bb666692840131337e41ac95b0 100644 --- a/examples/dubbo/go-client/profiles/release/client.yml +++ b/examples/dubbo/go-client/profiles/release/client.yml @@ -47,6 +47,7 @@ references: protocol_conf: dubbo: + reconnect_interval: 0 connection_number: 2 heartbeat_period: "5s" session_timeout: "20s" diff --git a/examples/dubbo/go-client/profiles/test/client.yml b/examples/dubbo/go-client/profiles/test/client.yml index 4b9005472db8486f36dc7bac31edbacc63caac08..a845418f883f73bb666692840131337e41ac95b0 100644 --- a/examples/dubbo/go-client/profiles/test/client.yml +++ b/examples/dubbo/go-client/profiles/test/client.yml @@ -47,6 +47,7 @@ references: protocol_conf: dubbo: + reconnect_interval: 0 connection_number: 2 heartbeat_period: "5s" session_timeout: "20s" diff --git a/examples/jsonrpc/go-client/profiles/dev/client.yml b/examples/jsonrpc/go-client/profiles/dev/client.yml index a897aaeeb0e23c8cc8ef50a41eea51f22343bbb4..03ffb22ab66ea2f058dde903016337dd774a481b 100644 --- a/examples/jsonrpc/go-client/profiles/dev/client.yml +++ b/examples/jsonrpc/go-client/profiles/dev/client.yml @@ -5,9 +5,9 @@ pprof_enabled : true pprof_port : 10086 # client -request_timeout : "100ms" +request_timeout : "3s" # connect timeout -connect_timeout : "100ms" +connect_timeout : "3s" # application config application_config: diff --git a/examples/jsonrpc/go-client/profiles/release/client.yml b/examples/jsonrpc/go-client/profiles/release/client.yml index b2d10c65ebf999bc80c72737084de2aefd48ae16..49cf4696700fd802872f0796f662fb6c6be8aa0b 100644 --- a/examples/jsonrpc/go-client/profiles/release/client.yml +++ b/examples/jsonrpc/go-client/profiles/release/client.yml @@ -5,9 +5,9 @@ pprof_enabled : true pprof_port : 10086 # client -request_timeout : "100ms" +request_timeout : "3s" # connect timeout -connect_timeout : "100ms" +connect_timeout : "3s" # application config application_config: diff --git a/examples/jsonrpc/go-client/profiles/test/client.yml b/examples/jsonrpc/go-client/profiles/test/client.yml index b2d10c65ebf999bc80c72737084de2aefd48ae16..49cf4696700fd802872f0796f662fb6c6be8aa0b 100644 --- a/examples/jsonrpc/go-client/profiles/test/client.yml +++ b/examples/jsonrpc/go-client/profiles/test/client.yml @@ -5,9 +5,9 @@ pprof_enabled : true pprof_port : 10086 # client -request_timeout : "100ms" +request_timeout : "3s" # connect timeout -connect_timeout : "100ms" +connect_timeout : "3s" # application config application_config: diff --git a/go.mod b/go.mod index 06a84aa4e3cbc10da7bd3dcd45d7d54a3ed2a192..b327d5f2824aeb0a07a0876dbe7e7df4e606df2f 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/dubbo/go-for-apache-dubbo require ( - github.com/AlexStocks/getty v0.0.0-20190331201845-1ca64ac5a589 + github.com/AlexStocks/getty v0.0.0-20190513191741-cb811d1c31dd github.com/AlexStocks/goext v0.3.2 github.com/AlexStocks/log4go v1.0.2 github.com/dubbogo/hessian2 v0.0.0-20190410112310-f093e4436e31 diff --git a/go.sum b/go.sum index b3853bbd54abd82aa2ae7f953fb75f3490f627ab..c8127d6d20459ef44d3e20f91230e61b3b041a59 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/AlexStocks/getty v0.0.0-20190331201845-1ca64ac5a589 h1:iat4jfMomN+G0SqwLJRUM2iha0LHwX+VpdT8PR8NihA= -github.com/AlexStocks/getty v0.0.0-20190331201845-1ca64ac5a589/go.mod h1:n25mdqPgFi06sWL6mZTjm1hBIZuKwgXUVXAX+KGB97U= +github.com/AlexStocks/getty v0.0.0-20190513191741-cb811d1c31dd h1:kLcmlBGDmo9S+VSmRnDTUFCq3wvlIh9CMd4zLWqY1+4= +github.com/AlexStocks/getty v0.0.0-20190513191741-cb811d1c31dd/go.mod h1:n25mdqPgFi06sWL6mZTjm1hBIZuKwgXUVXAX+KGB97U= github.com/AlexStocks/goext v0.3.2 h1:Bn4C+R6/E5Yjk2Uc/voawtbGv91x9aCid92xwYL2AS0= github.com/AlexStocks/goext v0.3.2 h1:Bn4C+R6/E5Yjk2Uc/voawtbGv91x9aCid92xwYL2AS0= github.com/AlexStocks/goext v0.3.2/go.mod h1:3M5j9Pjge4CdkNg2WIjRLUeoPedJHHKwkkglDGSl3Hc= diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 6d274b5396f7d64546c511a11b0bab5ea9cd4a2c..43bdb3b08fec0d9d4f1674e4ea573fa159348b8a 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -67,9 +67,14 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { if len(opts) != 0 { // for client if client, ok := opts[0].(*Client); ok { + + r := client.pendingResponses[SequenceType(p.Header.ID)] + if r == nil { + return fmt.Errorf("pendingResponses[%v] = nil", p.Header.ID) + } p.Body = client.pendingResponses[SequenceType(p.Header.ID)].reply } else { - return fmt.Errorf("pendingResponses[%v] = nil", p.Header.ID) + return fmt.Errorf("opts[0] is not *Client") } } diff --git a/protocol/dubbo/config.go b/protocol/dubbo/config.go index 3d0d5116c49cea3b7035b1aeee7d79f66802dde7..b0859681bb1a48c2eb77da49176a9200fbefc5df 100644 --- a/protocol/dubbo/config.go +++ b/protocol/dubbo/config.go @@ -57,6 +57,8 @@ type ( //Host string `default:"127.0.0.1" yaml:"host" json:"host,omitempty"` //ProfilePort int `default:"10086" yaml:"profile_port" json:"profile_port,omitempty"` + ReconnectInterval int `default:"0" yaml:"reconnect_interval" json:"reconnect_interval,omitempty"` + // session pool ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"` diff --git a/protocol/dubbo/pool.go b/protocol/dubbo/pool.go index 1f9828d7b7f28bbe0758a3c0a686fe992fa67133..651733ba3fbc175adad3a86a0c1617c0a720adfb 100644 --- a/protocol/dubbo/pool.go +++ b/protocol/dubbo/pool.go @@ -40,6 +40,7 @@ func newGettyRPCClientConn(pool *gettyRPCClientPool, protocol, addr string) (*ge gettyClient: getty.NewTCPClient( getty.WithServerAddress(addr), getty.WithConnectionNumber((int)(pool.rpcClient.conf.ConnectionNum)), + getty.WithReconnectInterval(pool.rpcClient.conf.ReconnectInterval), ), } c.gettyClient.RunEventLoop(c.newSession) @@ -225,7 +226,7 @@ type gettyRPCClientPool struct { ttl int64 // 每个gettyRPCClient的有效期时间. pool对象会在getConn时执行ttl检查 sync.Mutex - connMap map[string][]*gettyRPCClient // 从[]*gettyRPCClient 可见key是连接地址,而value是对应这个地址的连接数组 + conns []*gettyRPCClient // 从[]*gettyRPCClient 可见key是连接地址,而value是对应这个地址的连接数组 } func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool { @@ -233,39 +234,33 @@ func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) * rpcClient: rpcClient, size: size, ttl: int64(ttl.Seconds()), - connMap: make(map[string][]*gettyRPCClient), + conns: []*gettyRPCClient{}, } } func (p *gettyRPCClientPool) close() { p.Lock() - connMap := p.connMap - p.connMap = nil + conns := p.conns + p.conns = nil p.Unlock() - for _, connArray := range connMap { - for _, conn := range connArray { - conn.close() - } + for _, conn := range conns { + conn.close() } } func (p *gettyRPCClientPool) getGettyRpcClient(protocol, addr string) (*gettyRPCClient, error) { - key := GenerateEndpointAddr(protocol, addr) - p.Lock() defer p.Unlock() - if p.connMap == nil { + if p.conns == nil { return nil, errClientPoolClosed } - connArray := p.connMap[key] now := time.Now().Unix() - for len(connArray) > 0 { - conn := connArray[len(connArray)-1] - connArray = connArray[:len(connArray)-1] - p.connMap[key] = connArray + for len(p.conns) > 0 { + conn := p.conns[len(p.conns)-1] + p.conns = p.conns[:len(p.conns)-1] if d := now - conn.created; d > p.ttl { conn.close() // -> pool.remove(c) @@ -288,20 +283,17 @@ func (p *gettyRPCClientPool) release(conn *gettyRPCClient, err error) { return } - key := GenerateEndpointAddr(conn.protocol, conn.addr) - p.Lock() defer p.Unlock() - if p.connMap == nil { + if p.conns == nil { return } - connArray := p.connMap[key] - if len(connArray) >= p.size { + if len(p.conns) >= p.size { conn.close() return } - p.connMap[key] = append(connArray, conn) + p.conns = append(p.conns, conn) } func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { @@ -309,19 +301,16 @@ func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) { return } - key := GenerateEndpointAddr(conn.protocol, conn.addr) - - p.Lock() - defer p.Unlock() - if p.connMap == nil { + //p.Lock() + //defer p.Unlock() + if p.conns == nil { return } - connArray := p.connMap[key] - if len(connArray) > 0 { - for idx, c := range connArray { + if len(p.conns) > 0 { + for idx, c := range p.conns { if conn == c { - p.connMap[key] = append(connArray[:idx], connArray[idx+1:]...) + p.conns = append(p.conns[:idx], p.conns[idx+1:]...) break } } diff --git a/protocol/jsonrpc/http.go b/protocol/jsonrpc/http.go index 7e19e3742aa0c74bc14e611b3bd439dad305188d..3bdaae56ec846bde5c6ed5f7c7fbb7cc2ee7a313 100644 --- a/protocol/jsonrpc/http.go +++ b/protocol/jsonrpc/http.go @@ -155,7 +155,7 @@ func (c *HTTPClient) Do(addr, path string, httpHeader http.Header, body []byte) t = time.Now().Add(timeout) } - conn.SetReadDeadline(t) + conn.SetDeadline(t) } setNetConnTimeout(tcpConn, c.options.HTTPTimeout) diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index 11ca9f3198ec112037f740274b8e908e73619e80..9c924c20c511054c3f6cc7af1df5d7bf199dd90b 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -67,7 +67,7 @@ func (s *Server) handlePkg(conn net.Conn) { conn.Close() }() - setReadTimeout := func(conn net.Conn, timeout time.Duration) { + setTimeout := func(conn net.Conn, timeout time.Duration) { t := time.Time{} if timeout > time.Duration(0) { t = time.Now().Add(timeout) @@ -101,6 +101,7 @@ func (s *Server) handlePkg(conn net.Conn) { bufReader := bufio.NewReader(conn) r, err := http.ReadRequest(bufReader) if err != nil { + log.Warn("[ReadRequest] error: %v", err) return } @@ -123,7 +124,7 @@ func (s *Server) handlePkg(conn net.Conn) { httpTimeout := s.timeout contentType := reqHeader["Content-Type"] if contentType != "application/json" && contentType != "application/json-rpc" { - setReadTimeout(conn, httpTimeout) + setTimeout(conn, httpTimeout) r.Header.Set("Content-Type", "text/plain") if errRsp := sendErrorResp(r.Header, []byte(jerrors.ErrorStack(err))); errRsp != nil { log.Warn("sendErrorResp(header:%#v, error:%s) = error:%s", @@ -141,7 +142,7 @@ func (s *Server) handlePkg(conn net.Conn) { } delete(reqHeader, "Timeout") } - setReadTimeout(conn, httpTimeout) + setTimeout(conn, httpTimeout) if err := serveRequest(ctx, reqHeader, reqBody, conn, s.exporter); err != nil { if errRsp := sendErrorResp(r.Header, []byte(jerrors.ErrorStack(err))); errRsp != nil { @@ -157,15 +158,13 @@ func (s *Server) handlePkg(conn net.Conn) { func accept(listener net.Listener, fn func(net.Conn)) error { var ( - err error - c net.Conn ok bool ne net.Error tmpDelay time.Duration ) for { - c, err = listener.Accept() + c, err := listener.Accept() if err != nil { if ne, ok = err.(net.Error); ok && ne.Temporary() { if tmpDelay != 0 {