Skip to content
Snippets Groups Projects
Commit c08374e1 authored by 邹毅贤's avatar 邹毅贤
Browse files

Merge branch 'develop' into feature/addRouter

parents 6054f9ff c291d2c1
No related branches found
No related tags found
No related merge requests found
......@@ -28,4 +28,14 @@ The title format of the pull request `MUST` follow the following rules:
### 3.1 log
> 1 when logging the function's input parameter, you should add '@' before input parameter name.
>- 1 when logging the function's input parameter, you should add '@' before input parameter name.
### 3.2 naming
>- 1 do not use an underscore in package name, such as `filter_impl`.
>- 2 do not use an underscore in constants, such as `DUBBO_PROTOCOL`. use 'DubboProtocol' instead.
### 3.3 comment
>- 1 there should be comment for every export func/var.
>- 2 the comment should begin with function name/var name.
\ No newline at end of file
......@@ -24,6 +24,7 @@ import (
)
import (
"github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
)
......@@ -72,6 +73,10 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
inv.SetAttachments(k, v)
}
}
// put the ctx into attachment
di.appendCtx(ctx, inv)
url := di.GetUrl()
// async
async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false"))
......@@ -112,3 +117,17 @@ func (di *DubboInvoker) Destroy() {
}
})
}
// Finally, I made the decision that I don't provide a general way to transfer the whole context
// because it could be misused. If the context contains to many key-value pairs, the performance will be much lower.
func (di *DubboInvoker) appendCtx(ctx context.Context, inv *invocation_impl.RPCInvocation) {
// inject opentracing ctx
currentSpan := opentracing.SpanFromContext(ctx)
if currentSpan != nil {
carrier := opentracing.TextMapCarrier(inv.Attachments())
err := opentracing.GlobalTracer().Inject(currentSpan.Context(), opentracing.TextMap, carrier)
if err != nil {
logger.Errorf("Could not inject the span context into attachments: %v", err)
}
}
}
......@@ -25,6 +25,7 @@ import (
)
import (
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
)
......@@ -81,6 +82,11 @@ func TestDubboInvoker_Invoke(t *testing.T) {
res = invoker.Invoke(context.Background(), inv)
assert.EqualError(t, res.Error(), "request need @response")
// testing appendCtx
span, ctx := opentracing.StartSpanFromContext(context.Background(), "TestOperation")
invoker.Invoke(ctx, inv)
span.Finish()
// destroy
lock.Lock()
proto.Destroy()
......
......@@ -29,6 +29,7 @@ import (
import (
"github.com/apache/dubbo-go-hessian2"
"github.com/dubbogo/getty"
"github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
)
......@@ -63,9 +64,9 @@ func (s *rpcSession) GetReqNum() int32 {
return atomic.LoadInt32(&s.reqNum)
}
////////////////////////////////////////////
// //////////////////////////////////////////
// RpcClientHandler
////////////////////////////////////////////
// //////////////////////////////////////////
// RpcClientHandler ...
type RpcClientHandler struct {
......@@ -157,9 +158,9 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
h.conn.pool.rpcClient.heartbeat(session)
}
////////////////////////////////////////////
// //////////////////////////////////////////
// RpcServerHandler
////////////////////////////////////////////
// //////////////////////////////////////////
// RpcServerHandler ...
type RpcServerHandler struct {
......@@ -284,7 +285,10 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
args := p.Body.(map[string]interface{})["args"].([]interface{})
inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments)
result := invoker.Invoke(context.Background(), inv)
ctx := rebuildCtx(inv)
result := invoker.Invoke(ctx, inv)
if err := result.Error(); err != nil {
p.Header.ResponseStatus = hessian.Response_OK
p.Body = hessian.NewResponse(nil, err, result.Attachments())
......@@ -327,6 +331,21 @@ func (h *RpcServerHandler) OnCron(session getty.Session) {
}
}
// rebuildCtx rebuild the context by attachment.
// Once we decided to transfer more context's key-value, we should change this.
// now we only support rebuild the tracing context
func rebuildCtx(inv *invocation.RPCInvocation) context.Context {
ctx := context.Background()
// actually, if user do not use any opentracing framework, the err will not be nil.
spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
opentracing.TextMapCarrier(inv.Attachments()))
if err == nil {
ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX, spanCtx)
}
return ctx
}
func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
resp := &DubboPackage{
Header: hessian.DubboHeader{
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dubbo
import (
"testing"
)
import (
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol/invocation"
)
// test rebuild the ctx
func TestRebuildCtx(t *testing.T) {
opentracing.SetGlobalTracer(mocktracer.New())
attach := make(map[string]string, 10)
attach[constant.VERSION_KEY] = "1.0"
attach[constant.GROUP_KEY] = "MyGroup"
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)
// attachment doesn't contains any tracing key-value pair,
ctx := rebuildCtx(inv)
assert.NotNil(t, ctx)
assert.Nil(t, ctx.Value(constant.TRACING_REMOTE_SPAN_CTX))
span, ctx := opentracing.StartSpanFromContext(ctx, "Test-Client")
opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap,
opentracing.TextMapCarrier(inv.Attachments()))
// rebuild the context success
inv = invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)
ctx = rebuildCtx(inv)
span.Finish()
assert.NotNil(t, ctx)
assert.NotNil(t, ctx.Value(constant.TRACING_REMOTE_SPAN_CTX))
}
......@@ -323,7 +323,6 @@ func (r *zkRegistry) register(c common.URL) error {
if c.Path == "" || len(c.Methods) == 0 {
return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
}
// 先创建服务下面的provider node
dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
r.cltLock.Lock()
err = r.client.Create(dubboPath)
......
......@@ -43,6 +43,7 @@ const (
var (
errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
errNilChildren = perrors.Errorf("has none children")
)
// ZookeeperClient ...
......@@ -513,7 +514,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event,
return nil, nil, perrors.Errorf("path{%s} has none children", path)
}
if len(children) == 0 {
return nil, nil, perrors.Errorf("path{%s} has none children", path)
return nil, nil, errNilChildren
}
return children, event, nil
......@@ -544,7 +545,7 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
return nil, perrors.Errorf("path{%s} has none children", path)
}
if len(children) == 0 {
return nil, perrors.Errorf("path{%s} has none children", path)
return nil, errNilChildren
}
return children, nil
......
......@@ -133,3 +133,12 @@ func TestRegisterTempSeq(t *testing.T) {
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}
func Test_UnregisterEvent(t *testing.T) {
client := &ZookeeperClient{}
client.eventRegistry = make(map[string][]*chan struct{})
array := []*chan struct{}{}
array = append(array, new(chan struct{}))
client.eventRegistry["test"] = array
client.UnregisterEvent("test", new(chan struct{}))
}
......@@ -19,7 +19,6 @@ package zookeeper
import (
"path"
"strings"
"sync"
"time"
)
......@@ -111,8 +110,17 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
return
if err == errNilChildren {
content, _, err := l.client.Conn.Get(zkPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkPath, perrors.WithStack(err))
} else {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
}
} else {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
}
}
// a node was added -- listen the new node
......@@ -272,51 +280,6 @@ func timeSecondDuration(sec int) time.Duration {
// |
// --------> ListenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
var (
err error
dubboPath string
children []string
)
zkPath = strings.ReplaceAll(zkPath, "$", "%24")
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
return
}
l.pathMapLock.Lock()
l.pathMap[zkPath] = struct{}{}
l.pathMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.GetChildren(zkPath)
if err != nil {
children = nil
logger.Warnf("fail to get children of zk path{%s}", zkPath)
}
for _, c := range children {
// listen l service node
dubboPath = path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string, listener remoting.DataListener) {
if l.ListenServiceNodeEvent(zkPath) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
......
......@@ -97,12 +97,11 @@ func TestListener(t *testing.T) {
listener := NewZkEventListener(client)
dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait}
listener.ListenServiceEvent("/dubbo", dataListener)
time.Sleep(1 * time.Second)
_, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1)
assert.NoError(t, err)
wait.Wait()
assert.Equal(t, changedData, dataListener.eventList[1].Content)
client.Close()
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment