Skip to content
Snippets Groups Projects
Commit 377271a9 authored by vito.he's avatar vito.he
Browse files

Merge remote-tracking branch 'apache/master' into config_center

parents a48bf9af 3db9caf2
No related branches found
No related tags found
No related merge requests found
......@@ -50,7 +50,7 @@ func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation
var (
leastActive int32 = -1 // The least active value of all invokers
totalWeight int64 = 0 // The number of invokers having the same least active value (LEAST_ACTIVE)
firstWeight int64 = 0 // Initial value, used for comparision
firstWeight int64 = 0 // Initial value, used for comparison
leastIndexes = make([]int, count) // The index of invokers having the same least active value (LEAST_ACTIVE)
leastCount = 0 // The number of invokers having the same least active value (LEAST_ACTIVE)
sameWeight = true // Every invoker has the same weight value?
......
......@@ -112,7 +112,8 @@ func Test_Export(t *testing.T) {
doinit()
extension.SetProtocol("registry", GetProtocol)
for _, service := range providerConfig.Services {
for i := 0; i < len(providerConfig.Services); i++ {
service := providerConfig.Services[i]
service.Implement(&MockService{})
service.Export()
assert.Condition(t, func() bool {
......
......@@ -82,6 +82,11 @@ func init() {
func SetClientConf(c ClientConfig) {
clientConf = &c
err := clientConf.CheckValidity()
if err != nil {
logger.Warnf("[ClientConfig CheckValidity] error: %v", err)
return
}
}
func GetClientConf() ClientConfig {
......@@ -148,14 +153,13 @@ type Client struct {
pool *gettyRPCClientPool
sequence atomic.Uint64
pendingLock sync.RWMutex
pendingResponses map[SequenceType]*PendingResponse
pendingResponses *sync.Map
}
func NewClient() *Client {
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
pendingResponses: new(sync.Map),
conf: *clientConf,
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
......@@ -201,13 +205,6 @@ func (c *Client) AsyncCall(addr string, svcUrl common.URL, method string, args i
return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts))
}
func (c *Client) GetPendingResponse(seq SequenceType) *PendingResponse {
c.pendingLock.RLock()
defer c.pendingLock.RUnlock()
return c.pendingResponses[SequenceType(seq)]
}
func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string,
args, reply interface{}, callback AsyncCallback, opts CallOptions) error {
......@@ -330,20 +327,16 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
}
func (c *Client) addPendingResponse(pr *PendingResponse) {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
c.pendingResponses[SequenceType(pr.seq)] = pr
c.pendingResponses.Store(SequenceType(pr.seq), pr)
}
func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
if c.pendingResponses == nil {
return nil
}
if presp, ok := c.pendingResponses[seq]; ok {
delete(c.pendingResponses, seq)
return presp
if presp, ok := c.pendingResponses.Load(seq); ok {
c.pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
......@@ -40,7 +40,7 @@ func TestClient_CallOneway(t *testing.T) {
proto, url := InitTest(t)
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
pendingResponses: new(sync.Map),
conf: *clientConf,
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
......@@ -57,7 +57,7 @@ func TestClient_Call(t *testing.T) {
proto, url := InitTest(t)
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
pendingResponses: new(sync.Map),
conf: *clientConf,
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
......@@ -118,7 +118,7 @@ func TestClient_AsyncCall(t *testing.T) {
proto, url := InitTest(t)
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
pendingResponses: new(sync.Map),
conf: *clientConf,
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
......
......@@ -40,9 +40,9 @@ const (
type CallType int32
const (
CT_UNKOWN CallType = 0
CT_OneWay CallType = 1
CT_TwoWay CallType = 2
CT_UNKNOWN CallType = 0
CT_OneWay CallType = 1
CT_TwoWay CallType = 2
)
////////////////////////////////////////////
......@@ -88,11 +88,11 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
return perrors.Errorf("opts[0] is not of type *Client")
}
pendingRsp := client.GetPendingResponse(SequenceType(p.Header.ID))
if pendingRsp == nil {
pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
if !ok {
return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
} else {
p.Body = &hessian.Response{RspObj: pendingRsp.reply}
p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).reply}
}
}
......
......@@ -36,7 +36,7 @@ func TestDubboInvoker_Invoke(t *testing.T) {
proto, url := InitTest(t)
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
pendingResponses: new(sync.Map),
conf: *clientConf,
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
......
......@@ -48,7 +48,7 @@ func TestDubboProtocol_Export(t *testing.T) {
eq := exporter.GetInvoker().GetUrl().URLEqual(url)
assert.True(t, eq)
// second service: the same path and the diferent version
// second service: the same path and the different version
url2, err := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+
......
......@@ -190,7 +190,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*DubboPackage)
if !ok {
logger.Errorf("illegal packge{%#v}", pkg)
logger.Errorf("illegal package{%#v}", pkg)
return
}
p.Header.ResponseStatus = hessian.Response_OK
......
......@@ -46,8 +46,6 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler {
}
func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
p.client.pendingLock.RLock()
defer p.client.pendingLock.RUnlock()
pkg := &DubboPackage{}
buf := bytes.NewBuffer(data)
......
......@@ -152,7 +152,9 @@ func (s *Server) handlePkg(conn net.Conn) {
timeout, err := time.ParseDuration(reqHeader["Timeout"])
if err == nil {
httpTimeout = timeout
ctx, _ = context.WithTimeout(ctx, httpTimeout)
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, httpTimeout)
defer cancel()
}
delete(reqHeader, "Timeout")
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment