Skip to content
Snippets Groups Projects
Commit 05081fcf authored by cvictory's avatar cvictory
Browse files

fix review issue

parent 482671fc
No related branches found
No related tags found
No related merge requests found
......@@ -48,7 +48,8 @@ var (
)
var (
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY}
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY,
constant.PATH_KEY, constant.VERSION_KEY}
)
// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refer to one service and ip.
......@@ -97,8 +98,6 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
inv := invocation.(*invocation_impl.RPCInvocation)
// init param
inv.SetAttachments(constant.PATH_KEY, di.GetUrl().GetParam(constant.INTERFACE_KEY, ""))
inv.SetAttachments(constant.VERSION_KEY, di.GetUrl().GetParam(constant.VERSION_KEY, ""))
for _, k := range attachmentKey {
if v := di.GetUrl().GetParam(k, ""); len(v) > 0 {
inv.SetAttachments(k, v)
......@@ -134,7 +133,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
}
if result.Err == nil {
result.Rest = inv.Reply()
result.Attrs = result.Attachments()
result.Attrs = rest.Attrs
}
logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest)
......
......@@ -21,6 +21,7 @@ import (
"context"
"fmt"
"sync"
"time"
)
import (
......@@ -49,6 +50,7 @@ var (
// Make the connection can be shared.
// It will create one connection for one address (ip+port)
exchangeClientMap *sync.Map = new(sync.Map)
exchangeLock *sync.Map = new(sync.Map)
)
func init() {
......@@ -93,6 +95,7 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
exchangeClient := getExchangeClient(url)
if exchangeClient == nil {
logger.Warnf("can't dial the server: %+v", url.Location)
return nil
}
invoker := NewDubboInvoker(url, exchangeClient)
......@@ -178,14 +181,40 @@ func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult
func getExchangeClient(url common.URL) *remoting.ExchangeClient {
clientTmp, ok := exchangeClientMap.Load(url.Location)
if !ok {
exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
}), config.GetConsumerConfig().ConnectTimeout, false)
var exchangeClientTmp *remoting.ExchangeClient
func() {
// lock for NewExchangeClient and store into map.
_, loaded := exchangeLock.LoadOrStore(url.Location, 0x00)
// unlock
defer exchangeLock.Delete(url.Location)
if loaded {
// retry for 5 times.
for i := 0; i < 5; i++ {
if clientTmp, ok = exchangeClientMap.Load(url.Location); ok {
break
} else {
// if cannot get, sleep a while.
time.Sleep(time.Duration(i*100) * time.Millisecond)
}
}
return
}
// new ExchangeClient
exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{
ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
}), config.GetConsumerConfig().ConnectTimeout, false)
// input store
if exchangeClientTmp != nil {
exchangeClientMap.Store(url.Location, exchangeClientTmp)
}
}()
if exchangeClientTmp != nil {
exchangeClientMap.Store(url.Location, exchangeClientTmp)
return exchangeClientTmp
}
return exchangeClientTmp
}
// cannot dial the server
if clientTmp == nil {
return nil
}
exchangeClient, ok := clientTmp.(*remoting.ExchangeClient)
if !ok {
......
......@@ -53,7 +53,7 @@ func (ji *JsonrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invoca
url := ji.GetUrl()
req := ji.client.NewRequest(url, inv.MethodName(), inv.Arguments())
ctxNew := context.WithValue(ctx, constant.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-ID": "dubbogo",
"X-Proxy-Id": "dubbogo",
"X-Services": url.Path,
"X-Method": inv.MethodName(),
})
......
......@@ -30,7 +30,7 @@ import (
var (
// generate request ID for global use
sequence atomic.Uint64
sequence atomic.Int64
)
func init() {
......@@ -38,12 +38,13 @@ func init() {
sequence.Store(0)
}
func SequenceId() uint64 {
// increse 2 for every request.
func SequenceId() int64 {
// increse 2 for every request as the same before.
// We expect that the request from client to server, the requestId is even; but from server to client, the requestId is odd.
return sequence.Add(2)
}
// Request ...
// this is request for transport layer
type Request struct {
ID int64
// protocol version
......@@ -54,19 +55,18 @@ type Request struct {
Data interface{}
TwoWay bool
Event bool
// it is used to judge the request is unbroken
// broken bool
}
// NewRequest
// NewRequest aims to create Request.
// The ID is auto increase.
func NewRequest(version string) *Request {
return &Request{
ID: int64(SequenceId()),
ID: SequenceId(),
Version: version,
}
}
// Response ...
// this is response for transport layer
type Response struct {
ID int64
Version string
......@@ -77,7 +77,7 @@ type Response struct {
Result interface{}
}
// NewResponse
// NewResponse aims to create Response
func NewResponse(id int64, version string) *Response {
return &Response{
ID: id,
......@@ -117,7 +117,8 @@ type PendingResponse struct {
Done chan struct{}
}
// NewPendingResponse ...
// NewPendingResponse aims to create PendingResponse.
// Id is always from ID of Request
func NewPendingResponse(id int64) *PendingResponse {
return &PendingResponse{
seq: id,
......@@ -131,7 +132,8 @@ func (r *PendingResponse) SetResponse(response *Response) {
r.response = response
}
// GetCallResponse ...
// GetCallResponse is used for callback of async.
// It is will return AsyncCallbackResponse.
func (r PendingResponse) GetCallResponse() common.CallbackResponse {
return AsyncCallbackResponse{
Cause: r.Err,
......
......@@ -51,10 +51,14 @@ type Client interface {
// This is abstraction level. it is like facade.
type ExchangeClient struct {
// connect server timeout
ConnectTimeout time.Duration
address string
client Client
init bool
// to dial server address. The format: ip:port
address string
// the client that will deal with the transport. It is interface, and it will use gettyClient by default.
client Client
// the tag for init.
init bool
}
// handle the message from server
......@@ -114,11 +118,16 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url commo
AddPendingResponse(rsp)
err := client.client.Request(request, timeout, rsp)
// request error
if err != nil {
result.Err = err
return err
}
result.Rest = rsp.response.Result
if resultTmp, ok := rsp.response.Result.(*protocol.RPCResult); ok {
result.Rest = resultTmp.Rest
result.Attrs = resultTmp.Attrs
result.Err = resultTmp.Err
}
return nil
}
......
......@@ -26,11 +26,11 @@ import (
"time"
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
perrors "github.com/pkg/errors"
)
import (
hessian "github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
......
......@@ -88,7 +88,7 @@ func initClient(protocol string) {
rand.Seed(time.Now().UnixNano())
}
// SetClientConf: config ClientConf
// Config ClientConf
func SetClientConf(c ClientConfig) {
clientConf = &c
err := clientConf.CheckValidity()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment