Skip to content
Snippets Groups Projects
Commit c67d34a9 authored by cvictory's avatar cvictory
Browse files

fix review issue

parent 9bc37f33
No related branches found
No related tags found
No related merge requests found
......@@ -82,7 +82,6 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) err
p.Body = &hessian.Response{RspObj: pendingRsp.Reply}
}
}
// read body
err = codec.ReadBody(p.Body)
return perrors.WithStack(err)
......
......@@ -94,7 +94,11 @@ func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
// ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
// RequestTimeout: requestTimeout,
//}))
invoker := NewDubboInvoker(url, getExchangeClient(url))
exchangeClient := getExchangeClient(url)
if exchangeClient == nil {
return nil
}
invoker := NewDubboInvoker(url, exchangeClient)
dp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
return invoker
......@@ -180,7 +184,9 @@ func getExchangeClient(url common.URL) *remoting.ExchangeClient {
exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
}), config.GetConsumerConfig().ConnectTimeout)
exchangeClientMap.Store(url.Location, exchangeClientTmp)
if exchangeClientTmp != nil {
exchangeClientMap.Store(url.Location, exchangeClientTmp)
}
return exchangeClientTmp
}
......@@ -189,8 +195,9 @@ func getExchangeClient(url common.URL) *remoting.ExchangeClient {
exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
}), config.GetConsumerConfig().ConnectTimeout)
exchangeClientMap.Store(url.Location, exchangeClientTmp)
if exchangeClientTmp != nil {
exchangeClientMap.Store(url.Location, exchangeClientTmp)
}
return exchangeClientTmp
}
return exchangeClient
......
......@@ -26,7 +26,7 @@ type Client interface {
SetResponseHandler(responseHandler ResponseHandler)
//invoke once for connection
//ConfigClient()
Connect(url common.URL)
Connect(url common.URL) error
Close()
Request(request *Request, timeout time.Duration, response *PendingResponse) error
}
......@@ -42,7 +42,13 @@ func NewExchangeClient(url common.URL, client Client, connectTimeout time.Durati
client: client,
}
client.SetExchangeClient(exchangeClient)
client.Connect(url)
if client.Connect(url) != nil {
//retry for a while
time.Sleep(1 * time.Second)
if client.Connect(url) != nil {
return nil
}
}
client.SetResponseHandler(exchangeClient)
return exchangeClient
}
......
......@@ -22,9 +22,9 @@ func NewExchangeServer(url common.URL, server Server) *ExchangeServer {
}
func (server *ExchangeServer) Start() {
(server.Server).Start()
server.Server.Start()
}
func (server *ExchangeServer) Stop() {
(server.Server).Stop()
server.Server.Stop()
}
......@@ -149,7 +149,7 @@ func (c *Client) SetResponseHandler(responseHandler remoting.ResponseHandler) {
c.responseHandler = responseHandler
}
func (c *Client) Connect(url common.URL) {
func (c *Client) Connect(url common.URL) error {
initClient(url.Protocol)
c.conf = *clientConf
// new client
......@@ -157,6 +157,9 @@ func (c *Client) Connect(url common.URL) {
// codec
c.codec = remoting.GetCodec(url.Protocol)
c.addr = url.Location
_, _, err := c.selectSession(c.addr)
logger.Error("try to connect server %v failed for %v", url.Location, err)
return err
}
func (c *Client) Close() {
......@@ -225,10 +228,5 @@ func (c *Client) heartbeat(session getty.Session) error {
func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error {
err := session.WritePkg(request, timeout)
//if rsp != nil { // cond2
// // cond2 should not merged with cond1. cause the response package may be returned very
// // soon and it will be handled by other goroutine.
// rsp.ReadStart = time.Now()
//}
return perrors.WithStack(err)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment