diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go index c1f8231385dd788e3900eeb8db7f7b62d08bc498..a7f265c3bb189dc72a50d1f1302a5613f7cc45db 100644 --- a/protocol/dubbo/dubbo_codec.go +++ b/protocol/dubbo/dubbo_codec.go @@ -82,7 +82,6 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) err p.Body = &hessian.Response{RspObj: pendingRsp.Reply} } } - // read body err = codec.ReadBody(p.Body) return perrors.WithStack(err) diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 3bf03419239e85a5d71baded13965cb78f158231..4fd2492a7591a5529a83ecc6ef8ed13c34545ea2 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -94,7 +94,11 @@ func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { // ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, // RequestTimeout: requestTimeout, //})) - invoker := NewDubboInvoker(url, getExchangeClient(url)) + exchangeClient := getExchangeClient(url) + if exchangeClient == nil { + return nil + } + invoker := NewDubboInvoker(url, exchangeClient) dp.SetInvokers(invoker) logger.Infof("Refer service: %s", url.String()) return invoker @@ -180,7 +184,9 @@ func getExchangeClient(url common.URL) *remoting.ExchangeClient { exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{ ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, }), config.GetConsumerConfig().ConnectTimeout) - exchangeClientMap.Store(url.Location, exchangeClientTmp) + if exchangeClientTmp != nil { + exchangeClientMap.Store(url.Location, exchangeClientTmp) + } return exchangeClientTmp } @@ -189,8 +195,9 @@ func getExchangeClient(url common.URL) *remoting.ExchangeClient { exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{ ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, }), config.GetConsumerConfig().ConnectTimeout) - exchangeClientMap.Store(url.Location, exchangeClientTmp) - + if exchangeClientTmp != nil { + exchangeClientMap.Store(url.Location, exchangeClientTmp) + } return exchangeClientTmp } return exchangeClient diff --git a/remoting/exchange_client.go b/remoting/exchange_client.go index 2a0e6cfd799f4b75de59b2dd8eed7a3426e66036..e1778ed3a4260e7fc59450add4544ae15b9483dc 100644 --- a/remoting/exchange_client.go +++ b/remoting/exchange_client.go @@ -26,7 +26,7 @@ type Client interface { SetResponseHandler(responseHandler ResponseHandler) //invoke once for connection //ConfigClient() - Connect(url common.URL) + Connect(url common.URL) error Close() Request(request *Request, timeout time.Duration, response *PendingResponse) error } @@ -42,7 +42,13 @@ func NewExchangeClient(url common.URL, client Client, connectTimeout time.Durati client: client, } client.SetExchangeClient(exchangeClient) - client.Connect(url) + if client.Connect(url) != nil { + //retry for a while + time.Sleep(1 * time.Second) + if client.Connect(url) != nil { + return nil + } + } client.SetResponseHandler(exchangeClient) return exchangeClient } diff --git a/remoting/exchange_server.go b/remoting/exchange_server.go index 4aae39fd83f688983b00655c24360abf59b38ecd..dd6e76087f2508318dd42b8f116bf83a4c656b99 100644 --- a/remoting/exchange_server.go +++ b/remoting/exchange_server.go @@ -22,9 +22,9 @@ func NewExchangeServer(url common.URL, server Server) *ExchangeServer { } func (server *ExchangeServer) Start() { - (server.Server).Start() + server.Server.Start() } func (server *ExchangeServer) Stop() { - (server.Server).Stop() + server.Server.Stop() } diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go index 8544b8e17f4b93cd3a0bd1d47c5e8b96b7002e2e..ccf8881bfe6bbefca6980638ccf014283af635cb 100644 --- a/remoting/getty/getty_client.go +++ b/remoting/getty/getty_client.go @@ -149,7 +149,7 @@ func (c *Client) SetResponseHandler(responseHandler remoting.ResponseHandler) { c.responseHandler = responseHandler } -func (c *Client) Connect(url common.URL) { +func (c *Client) Connect(url common.URL) error { initClient(url.Protocol) c.conf = *clientConf // new client @@ -157,6 +157,9 @@ func (c *Client) Connect(url common.URL) { // codec c.codec = remoting.GetCodec(url.Protocol) c.addr = url.Location + _, _, err := c.selectSession(c.addr) + logger.Error("try to connect server %v failed for %v", url.Location, err) + return err } func (c *Client) Close() { @@ -225,10 +228,5 @@ func (c *Client) heartbeat(session getty.Session) error { func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error { err := session.WritePkg(request, timeout) - //if rsp != nil { // cond2 - // // cond2 should not merged with cond1. cause the response package may be returned very - // // soon and it will be handled by other goroutine. - // rsp.ReadStart = time.Now() - //} return perrors.WithStack(err) }