Skip to content
Snippets Groups Projects
Commit b3fabcee authored by Patrick's avatar Patrick
Browse files

Merge branch 'develop' into rest_protocol

parents 69020c04 1fa4bf66
No related branches found
No related tags found
No related merge requests found
......@@ -41,7 +41,7 @@ func NewJsonrpcInvoker(url common.URL, client *HTTPClient) *JsonrpcInvoker {
}
}
func (ji *JsonrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (ji *JsonrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.RPCResult
......@@ -50,12 +50,12 @@ func (ji *JsonrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result
inv := invocation.(*invocation_impl.RPCInvocation)
url := ji.GetUrl()
req := ji.client.NewRequest(url, inv.MethodName(), inv.Arguments())
ctx := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{
ctxNew := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{
"X-Proxy-Id": "dubbogo",
"X-Services": url.Path,
"X-Method": inv.MethodName(),
})
result.Err = ji.client.Call(ctx, url, req, inv.Reply())
result.Err = ji.client.Call(ctxNew, url, req, inv.Reply())
if result.Err == nil {
result.Rest = inv.Reply()
}
......
......@@ -60,7 +60,7 @@ func TestJsonrpcInvoker_Invoke(t *testing.T) {
jsonInvoker := NewJsonrpcInvoker(url, client)
user := &User{}
res := jsonInvoker.Invoke(invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), invocation.WithArguments([]interface{}{"1", "username"}),
res := jsonInvoker.Invoke(context.Background(), invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), invocation.WithArguments([]interface{}{"1", "username"}),
invocation.WithReply(user)))
assert.NoError(t, res.Error())
......
......@@ -93,6 +93,8 @@ func (s *Server) handlePkg(conn net.Conn) {
rsp := &http.Response{
Header: header,
StatusCode: 500,
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(body)),
Body: ioutil.NopCloser(bytes.NewReader(body)),
}
......@@ -252,6 +254,8 @@ func serveRequest(ctx context.Context,
rsp := &http.Response{
Header: make(http.Header),
StatusCode: 500,
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(body)),
Body: ioutil.NopCloser(bytes.NewReader(body)),
}
......@@ -276,6 +280,8 @@ func serveRequest(ctx context.Context,
rsp := &http.Response{
Header: make(http.Header),
StatusCode: 200,
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(body)),
Body: ioutil.NopCloser(bytes.NewReader(body)),
}
......@@ -324,7 +330,7 @@ func serveRequest(ctx context.Context,
exporter, _ := jsonrpcProtocol.ExporterMap().Load(path)
invoker := exporter.(*JsonrpcExporter).GetInvoker()
if invoker != nil {
result := invoker.Invoke(invocation.NewRPCInvocation(methodName, args, map[string]string{
result := invoker.Invoke(context.Background(), invocation.NewRPCInvocation(methodName, args, map[string]string{
constant.PATH_KEY: path,
constant.VERSION_KEY: codec.req.Version,
}))
......
......@@ -21,6 +21,7 @@
package mock
import (
"context"
"reflect"
)
......@@ -91,7 +92,7 @@ func (mr *MockInvokerMockRecorder) Destroy() *gomock.Call {
}
// Invoke mocks base method
func (m *MockInvoker) Invoke(arg0 protocol.Invocation) protocol.Result {
func (m *MockInvoker) Invoke(ctx context.Context, arg0 protocol.Invocation) protocol.Result {
ret := m.ctrl.Call(m, "Invoke", arg0)
ret0, _ := ret[0].(protocol.Result)
return ret0
......
......@@ -18,6 +18,7 @@
package protocolwrapper
import (
"context"
"strings"
)
......@@ -102,9 +103,9 @@ func (fi *FilterInvoker) IsAvailable() bool {
return fi.invoker.IsAvailable()
}
func (fi *FilterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
result := fi.filter.Invoke(fi.next, invocation)
return fi.filter.OnResponse(result, fi.invoker, invocation)
func (fi *FilterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
result := fi.filter.Invoke(ctx, fi.next, invocation)
return fi.filter.OnResponse(ctx, result, fi.invoker, invocation)
}
func (fi *FilterInvoker) Destroy() {
......
......@@ -18,6 +18,7 @@
package protocolwrapper
import (
"context"
"net/url"
"testing"
)
......@@ -66,7 +67,7 @@ func init() {
type EchoFilterForTest struct{}
func (ef *EchoFilterForTest) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (ef *EchoFilterForTest) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking echo filter.")
logger.Debugf("%v,%v", invocation.MethodName(), len(invocation.Arguments()))
if invocation.MethodName() == constant.ECHO && len(invocation.Arguments()) == 1 {
......@@ -75,10 +76,10 @@ func (ef *EchoFilterForTest) Invoke(invoker protocol.Invoker, invocation protoco
}
}
return invoker.Invoke(invocation)
return invoker.Invoke(ctx, invocation)
}
func (ef *EchoFilterForTest) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (ef *EchoFilterForTest) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
......
......@@ -20,6 +20,7 @@ package protocol
import (
"sync"
"sync/atomic"
"time"
)
import (
......@@ -27,18 +28,69 @@ import (
)
var (
methodStatistics sync.Map // url -> { methodName : RpcStatus}
methodStatistics sync.Map // url -> { methodName : RPCStatus}
serviceStatistic sync.Map // url -> RPCStatus
)
type RpcStatus struct {
active int32
type RPCStatus struct {
active int32
failed int32
total int32
totalElapsed int64
failedElapsed int64
maxElapsed int64
failedMaxElapsed int64
succeededMaxElapsed int64
successiveRequestFailureCount int32
lastRequestFailedTimestamp int64
}
func (rpc *RpcStatus) GetActive() int32 {
func (rpc *RPCStatus) GetActive() int32 {
return atomic.LoadInt32(&rpc.active)
}
func GetStatus(url common.URL, methodName string) *RpcStatus {
func (rpc *RPCStatus) GetFailed() int32 {
return atomic.LoadInt32(&rpc.failed)
}
func (rpc *RPCStatus) GetTotal() int32 {
return atomic.LoadInt32(&rpc.total)
}
func (rpc *RPCStatus) GetTotalElapsed() int64 {
return atomic.LoadInt64(&rpc.totalElapsed)
}
func (rpc *RPCStatus) GetFailedElapsed() int64 {
return atomic.LoadInt64(&rpc.failedElapsed)
}
func (rpc *RPCStatus) GetMaxElapsed() int64 {
return atomic.LoadInt64(&rpc.maxElapsed)
}
func (rpc *RPCStatus) GetFailedMaxElapsed() int64 {
return atomic.LoadInt64(&rpc.failedMaxElapsed)
}
func (rpc *RPCStatus) GetSucceededMaxElapsed() int64 {
return atomic.LoadInt64(&rpc.succeededMaxElapsed)
}
func (rpc *RPCStatus) GetLastRequestFailedTimestamp() int64 {
return atomic.LoadInt64(&rpc.lastRequestFailedTimestamp)
}
func (rpc *RPCStatus) GetSuccessiveRequestFailureCount() int32 {
return atomic.LoadInt32(&rpc.successiveRequestFailureCount)
}
func GetURLStatus(url common.URL) *RPCStatus {
rpcStatus, _ := serviceStatistic.LoadOrStore(url.Key(), &RPCStatus{})
return rpcStatus.(*RPCStatus)
}
func GetMethodStatus(url common.URL, methodName string) *RPCStatus {
identifier := url.Key()
methodMap, found := methodStatistics.Load(identifier)
if !found {
......@@ -49,27 +101,53 @@ func GetStatus(url common.URL, methodName string) *RpcStatus {
methodActive := methodMap.(*sync.Map)
rpcStatus, found := methodActive.Load(methodName)
if !found {
rpcStatus = &RpcStatus{}
rpcStatus = &RPCStatus{}
methodActive.Store(methodName, rpcStatus)
}
status := rpcStatus.(*RpcStatus)
status := rpcStatus.(*RPCStatus)
return status
}
func BeginCount(url common.URL, methodName string) {
beginCount0(GetStatus(url, methodName))
beginCount0(GetURLStatus(url))
beginCount0(GetMethodStatus(url, methodName))
}
func EndCount(url common.URL, methodName string) {
endCount0(GetStatus(url, methodName))
func EndCount(url common.URL, methodName string, elapsed int64, succeeded bool) {
endCount0(GetURLStatus(url), elapsed, succeeded)
endCount0(GetMethodStatus(url, methodName), elapsed, succeeded)
}
// private methods
func beginCount0(rpcStatus *RpcStatus) {
func beginCount0(rpcStatus *RPCStatus) {
atomic.AddInt32(&rpcStatus.active, 1)
}
func endCount0(rpcStatus *RpcStatus) {
func endCount0(rpcStatus *RPCStatus, elapsed int64, succeeded bool) {
atomic.AddInt32(&rpcStatus.active, -1)
atomic.AddInt32(&rpcStatus.total, 1)
atomic.AddInt64(&rpcStatus.totalElapsed, elapsed)
if rpcStatus.maxElapsed < elapsed {
atomic.StoreInt64(&rpcStatus.maxElapsed, elapsed)
}
if succeeded {
if rpcStatus.succeededMaxElapsed < elapsed {
atomic.StoreInt64(&rpcStatus.succeededMaxElapsed, elapsed)
}
atomic.StoreInt32(&rpcStatus.successiveRequestFailureCount, 0)
} else {
atomic.StoreInt64(&rpcStatus.lastRequestFailedTimestamp, time.Now().Unix())
atomic.AddInt32(&rpcStatus.successiveRequestFailureCount, 1)
atomic.AddInt32(&rpcStatus.failed, 1)
atomic.AddInt64(&rpcStatus.failedElapsed, elapsed)
if rpcStatus.failedMaxElapsed < elapsed {
atomic.StoreInt64(&rpcStatus.failedMaxElapsed, elapsed)
}
}
}
func CurrentTimeMillis() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}
package protocol
import (
"context"
"strconv"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
)
func TestBeginCount(t *testing.T) {
defer destroy()
url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
BeginCount(url, "test")
urlStatus := GetURLStatus(url)
methodStatus := GetMethodStatus(url, "test")
methodStatus1 := GetMethodStatus(url, "test1")
assert.Equal(t, int32(1), methodStatus.active)
assert.Equal(t, int32(1), urlStatus.active)
assert.Equal(t, int32(0), methodStatus1.active)
}
func TestEndCount(t *testing.T) {
defer destroy()
url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
EndCount(url, "test", 100, true)
urlStatus := GetURLStatus(url)
methodStatus := GetMethodStatus(url, "test")
assert.Equal(t, int32(-1), methodStatus.active)
assert.Equal(t, int32(-1), urlStatus.active)
assert.Equal(t, int32(1), methodStatus.total)
assert.Equal(t, int32(1), urlStatus.total)
}
func TestGetMethodStatus(t *testing.T) {
defer destroy()
url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
status := GetMethodStatus(url, "test")
assert.NotNil(t, status)
assert.Equal(t, int32(0), status.total)
}
func TestGetUrlStatus(t *testing.T) {
defer destroy()
url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
status := GetURLStatus(url)
assert.NotNil(t, status)
assert.Equal(t, int32(0), status.total)
}
func Test_beginCount0(t *testing.T) {
defer destroy()
url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
status := GetURLStatus(url)
beginCount0(status)
assert.Equal(t, int32(1), status.active)
}
func Test_All(t *testing.T) {
defer destroy()
url, _ := common.NewURL(context.TODO(), "dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
request(url, "test", 100, false, true)
urlStatus := GetURLStatus(url)
methodStatus := GetMethodStatus(url, "test")
assert.Equal(t, int32(1), methodStatus.total)
assert.Equal(t, int32(1), urlStatus.total)
assert.Equal(t, int32(0), methodStatus.active)
assert.Equal(t, int32(0), urlStatus.active)
assert.Equal(t, int32(0), methodStatus.failed)
assert.Equal(t, int32(0), urlStatus.failed)
assert.Equal(t, int32(0), methodStatus.successiveRequestFailureCount)
assert.Equal(t, int32(0), urlStatus.successiveRequestFailureCount)
assert.Equal(t, int64(100), methodStatus.totalElapsed)
assert.Equal(t, int64(100), urlStatus.totalElapsed)
request(url, "test", 100, false, false)
request(url, "test", 100, false, false)
request(url, "test", 100, false, false)
request(url, "test", 100, false, false)
request(url, "test", 100, false, false)
assert.Equal(t, int32(6), methodStatus.total)
assert.Equal(t, int32(6), urlStatus.total)
assert.Equal(t, int32(5), methodStatus.failed)
assert.Equal(t, int32(5), urlStatus.failed)
assert.Equal(t, int32(5), methodStatus.successiveRequestFailureCount)
assert.Equal(t, int32(5), urlStatus.successiveRequestFailureCount)
assert.Equal(t, int64(600), methodStatus.totalElapsed)
assert.Equal(t, int64(600), urlStatus.totalElapsed)
assert.Equal(t, int64(500), methodStatus.failedElapsed)
assert.Equal(t, int64(500), urlStatus.failedElapsed)
request(url, "test", 100, false, true)
assert.Equal(t, int32(0), methodStatus.successiveRequestFailureCount)
assert.Equal(t, int32(0), urlStatus.successiveRequestFailureCount)
request(url, "test", 200, false, false)
request(url, "test", 200, false, false)
assert.Equal(t, int32(2), methodStatus.successiveRequestFailureCount)
assert.Equal(t, int32(2), urlStatus.successiveRequestFailureCount)
assert.Equal(t, int64(200), methodStatus.maxElapsed)
assert.Equal(t, int64(200), urlStatus.maxElapsed)
request(url, "test1", 200, false, false)
request(url, "test1", 200, false, false)
request(url, "test1", 200, false, false)
assert.Equal(t, int32(5), urlStatus.successiveRequestFailureCount)
methodStatus1 := GetMethodStatus(url, "test1")
assert.Equal(t, int32(2), methodStatus.successiveRequestFailureCount)
assert.Equal(t, int32(3), methodStatus1.successiveRequestFailureCount)
}
func request(url common.URL, method string, elapsed int64, active, succeeded bool) {
BeginCount(url, method)
if !active {
EndCount(url, method, elapsed, succeeded)
}
}
func TestCurrentTimeMillis(t *testing.T) {
defer destroy()
c := CurrentTimeMillis()
assert.NotNil(t, c)
str := strconv.FormatInt(c, 10)
i, _ := strconv.ParseInt(str, 10, 64)
assert.Equal(t, c, i)
}
func destroy() {
delete1 := func(key interface{}, value interface{}) bool {
methodStatistics.Delete(key)
return true
}
methodStatistics.Range(delete1)
delete2 := func(key interface{}, value interface{}) bool {
serviceStatistic.Delete(key)
return true
}
serviceStatistic.Range(delete2)
}
......@@ -18,6 +18,7 @@
package protocol
import (
"context"
"strings"
"sync"
)
......@@ -356,10 +357,10 @@ func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoke
}
}
func (ivk *wrappedInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (ivk *wrappedInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
// get right url
ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl())
return ivk.invoker.Invoke(invocation)
return ivk.invoker.Invoke(ctx, invocation)
}
type providerConfigurationListener struct {
......
......@@ -370,6 +370,7 @@ func (r *zkRegistry) register(c common.URL) error {
return perrors.Errorf("@c{%v} type is not referencer or provider", c)
}
dubboPath = strings.ReplaceAll(dubboPath, "$", "%24")
err = r.registerTempZookeeperNode(dubboPath, encodedURL)
if err != nil {
......
......@@ -290,6 +290,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
z.Lock()
a := z.eventRegistry[zkPath]
a = append(a, event)
z.eventRegistry[zkPath] = a
logger.Debugf("zkClient{%s} register event{path:%s, ptr:%p}", z.name, zkPath, event)
z.Unlock()
......
......@@ -19,6 +19,7 @@ package zookeeper
import (
"path"
"strings"
"sync"
"time"
)
......@@ -273,6 +274,7 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.Da
children []string
)
zkPath = strings.ReplaceAll(zkPath, "$", "%24")
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment