Skip to content
Snippets Groups Projects
Commit a8645f08 authored by AlexStocks's avatar AlexStocks
Browse files

Merge branch 'develop'

parents fd213ebf 16b0c88c
No related branches found
No related tags found
No related merge requests found
Showing
with 73 additions and 51 deletions
......@@ -18,7 +18,6 @@
package nacos
import (
"path/filepath"
"strconv"
"strings"
"sync"
......@@ -38,9 +37,6 @@ import (
"github.com/apache/dubbo-go/common/logger"
)
// Nacos Log dir, it can be override when creating client by config_center.log_dir
var logDir = filepath.Join("logs", "nacos", "log")
// NacosClient Nacos client
type NacosClient struct {
name string
......@@ -69,7 +65,7 @@ type option func(*options)
type options struct {
nacosName string
client *NacosClient
//client *NacosClient
}
// WithNacosName Set nacos name
......
......@@ -40,7 +40,7 @@ import (
// run mock config server
func runMockConfigServer(configHandler func(http.ResponseWriter, *http.Request),
configListenHandler func(http.ResponseWriter, *http.Request)) *httptest.Server {
uriHandlerMap := make(map[string]func(http.ResponseWriter, *http.Request), 0)
uriHandlerMap := make(map[string]func(http.ResponseWriter, *http.Request))
uriHandlerMap["/nacos/v1/cs/configs"] = configHandler
uriHandlerMap["/nacos/v1/cs/configs/listener"] = configListenHandler
......@@ -85,6 +85,8 @@ func TestGetConfig(t *testing.T) {
nacos, err := initNacosData(t)
assert.NoError(t, err)
configs, err := nacos.GetProperties("dubbo.properties", config_center.WithGroup("dubbo"))
assert.Empty(t, configs)
assert.NoError(t, err)
_, err = nacos.Parser().Parse(configs)
assert.NoError(t, err)
}
......@@ -100,7 +102,8 @@ func TestNacosDynamicConfiguration_GetConfigKeysByGroup(t *testing.T) {
}
`
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(data))
_, err := w.Write([]byte(data))
assert.Nil(t, err)
}))
nacosURL := strings.ReplaceAll(ts.URL, "http", "registry")
......
......@@ -38,7 +38,6 @@ func callback(listener config_center.ConfigurationListener, _, _, dataId, data s
func (n *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) {
_, loaded := n.keyListeners.Load(key)
if !loaded {
_, cancel := context.WithCancel(context.Background())
err := (*n.client.Client()).ListenConfig(vo.ConfigParam{
DataId: key,
Group: "dubbo",
......@@ -50,13 +49,15 @@ func (n *nacosDynamicConfiguration) addListener(key string, listener config_cent
logger.Errorf("nacos : listen config fail, error:%v ", err)
return
}
_, cancel := context.WithCancel(context.Background())
newListener := make(map[config_center.ConfigurationListener]context.CancelFunc)
newListener[listener] = cancel
n.keyListeners.Store(key, newListener)
} else {
// TODO check goroutine alive, but this version of go_nacos_sdk is not support.
logger.Infof("profile:%s. this profile is already listening", key)
return
}
// TODO check goroutine alive, but this version of go_nacos_sdk is not support.
logger.Infof("profile:%s. this profile is already listening", key)
}
func (n *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) {
......
......@@ -52,7 +52,7 @@ type zookeeperDynamicConfiguration struct {
done chan struct{}
client *zookeeper.ZookeeperClient
listenerLock sync.Mutex
//listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
cacheListener *CacheListener
parser parser.ConfigurationParser
......
......@@ -99,7 +99,10 @@ func initZkData(group string, t *testing.T) (*zk.TestCluster, *zookeeperDynamicC
func TestGetConfig(t *testing.T) {
ts, reg := initZkData("dubbo", t)
defer ts.Stop()
defer func() {
err := ts.Stop()
assert.NoError(t, err)
}()
configs, err := reg.GetProperties(dubboPropertyFileName, config_center.WithGroup("dubbo"))
assert.NoError(t, err)
m, err := reg.Parser().Parse(configs)
......@@ -107,15 +110,21 @@ func TestGetConfig(t *testing.T) {
assert.Equal(t, "5s", m["dubbo.consumer.request_timeout"])
configs, err = reg.GetProperties(dubboPropertyFileName)
assert.Error(t, err)
assert.Equal(t, "", configs)
configs, err = reg.GetInternalProperty(dubboPropertyFileName)
assert.Error(t, err)
assert.Equal(t, "", configs)
configs, err = reg.GetRule(dubboPropertyFileName)
assert.Error(t, err)
assert.Equal(t, "", configs)
}
func TestAddListener(t *testing.T) {
ts, reg := initZkData("", t)
defer ts.Stop()
defer func() {
err := ts.Stop()
assert.NoError(t, err)
}()
listener := &mockDataListener{}
reg.AddListener(dubboPropertyFileName, listener)
listener.wg.Add(1)
......@@ -148,7 +157,10 @@ func TestAddListener(t *testing.T) {
func TestRemoveListener(t *testing.T) {
ts, reg := initZkData("", t)
defer ts.Stop()
defer func() {
err := ts.Stop()
assert.NoError(t, err)
}()
listener := &mockDataListener{}
reg.AddListener(dubboPropertyFileName, listener)
listener.wg.Add(1)
......@@ -186,7 +198,10 @@ func TestZookeeperDynamicConfigurationPublishConfig(t *testing.T) {
customGroup := "Custom Group"
key := "myKey"
ts, zk := initZkData(config_center.DEFAULT_GROUP, t)
defer ts.Stop()
defer func() {
err := ts.Stop()
assert.NoError(t, err)
}()
err := zk.PublishConfig(key, customGroup, value)
assert.Nil(t, err)
result, err := zk.GetInternalProperty("myKey", config_center.WithGroup(customGroup))
......
......@@ -26,7 +26,7 @@ import (
type AccessKeyPair struct {
AccessKey string `yaml:"accessKey" json:"accessKey,omitempty" property:"accessKey"`
SecretKey string `yaml:"secretKey" json:"secretKey,omitempty" property:"secretKey"`
ConsumerSide string `yaml:"consumerSide" json:"ConsumerSide,consumerSide" property:"consumerSide"`
ConsumerSide string `yaml:"consumerSide" json:"consumerSide,omitempty" property:"consumerSide"`
ProviderSide string `yaml:"providerSide" json:"providerSide,omitempty" property:"providerSide"`
Creator string `yaml:"creator" json:"creator,omitempty" property:"creator"`
Options string `yaml:"options" json:"options,omitempty" property:"options"`
......
......@@ -77,6 +77,6 @@ func TestAccessLogFilterInvokeDefaultConfig(t *testing.T) {
func TestAccessLogFilterOnResponse(t *testing.T) {
result := &protocol.RPCResult{}
accessLogFilter := GetAccessLogFilter()
response := accessLogFilter.OnResponse(nil, result, nil, nil)
response := accessLogFilter.OnResponse(context.TODO(), result, nil, nil)
assert.Equal(t, result, response)
}
......@@ -37,7 +37,7 @@ import (
)
func TestActiveFilterInvoke(t *testing.T) {
invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, make(map[string]interface{}, 0))
invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, make(map[string]interface{}))
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
filter := ActiveFilter{}
ctrl := gomock.NewController(t)
......@@ -65,7 +65,7 @@ func TestActiveFilterOnResponse(t *testing.T) {
result := &protocol.RPCResult{
Err: errors.New("test"),
}
filter.OnResponse(nil, result, invoker, invoc)
filter.OnResponse(context.TODO(), result, invoker, invoc)
methodStatus := protocol.GetMethodStatus(url, "test")
urlStatus := protocol.GetURLStatus(url)
......
......@@ -69,7 +69,7 @@ func getSignature(url *common.URL, invocation protocol.Invocation, secrectKey st
requestString := fmt.Sprintf(constant.SIGNATURE_STRING_FORMAT,
url.ColonSeparatedKey(), invocation.MethodName(), secrectKey, currentTime)
var signature string
if parameterEncrypt := url.GetParamBool(constant.PARAMTER_SIGNATURE_ENABLE_KEY, false); parameterEncrypt {
if parameterEncrypt := url.GetParamBool(constant.PARAMETER_SIGNATURE_ENABLE_KEY, false); parameterEncrypt {
var err error
if signature, err = SignWithParams(invocation.Arguments(), requestString, secrectKey); err != nil {
// TODO
......
......@@ -39,7 +39,7 @@ func TestDefaultAuthenticator_Authenticate(t *testing.T) {
secret := "dubbo-sk"
access := "dubbo-ak"
testurl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=gg&version=2.6.0")
testurl.SetParam(constant.PARAMTER_SIGNATURE_ENABLE_KEY, "true")
testurl.SetParam(constant.PARAMETER_SIGNATURE_ENABLE_KEY, "true")
testurl.SetParam(constant.ACCESS_KEY_ID_KEY, access)
testurl.SetParam(constant.SECRET_ACCESS_KEY_KEY, secret)
parmas := []interface{}{"OK", struct {
......@@ -77,7 +77,7 @@ func TestDefaultAuthenticator_Sign(t *testing.T) {
testurl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?application=test&interface=com.ikurento.user.UserProvider&group=gg&version=2.6.0")
testurl.SetParam(constant.ACCESS_KEY_ID_KEY, "akey")
testurl.SetParam(constant.SECRET_ACCESS_KEY_KEY, "skey")
testurl.SetParam(constant.PARAMTER_SIGNATURE_ENABLE_KEY, "false")
testurl.SetParam(constant.PARAMETER_SIGNATURE_ENABLE_KEY, "false")
inv := invocation.NewRPCInvocation("test", []interface{}{"OK"}, nil)
_ = authenticator.Sign(inv, testurl)
assert.NotEqual(t, inv.AttachmentsByKey(constant.REQUEST_SIGNATURE_KEY, ""), "")
......@@ -113,12 +113,13 @@ func Test_getAccessKeyPairFailed(t *testing.T) {
common.WithParamsValue(constant.SECRET_ACCESS_KEY_KEY, "skey"),
common.WithParamsValue(constant.ACCESS_KEY_ID_KEY, "akey"), common.WithParamsValue(constant.ACCESS_KEY_STORAGE_KEY, "dubbo"))
_, e = getAccessKeyPair(invcation, testurl)
assert.NoError(t, e)
}
func Test_getSignatureWithinParams(t *testing.T) {
testurl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=gg&version=2.6.0")
testurl.SetParam(constant.PARAMTER_SIGNATURE_ENABLE_KEY, "true")
testurl.SetParam(constant.PARAMETER_SIGNATURE_ENABLE_KEY, "true")
inv := invocation.NewRPCInvocation("test", []interface{}{"OK"}, map[string]interface{}{
"": "",
})
......@@ -134,7 +135,7 @@ func Test_getSignatureWithinParams(t *testing.T) {
func Test_getSignature(t *testing.T) {
testurl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=gg&version=2.6.0")
testurl.SetParam(constant.PARAMTER_SIGNATURE_ENABLE_KEY, "false")
testurl.SetParam(constant.PARAMETER_SIGNATURE_ENABLE_KEY, "false")
inv := invocation.NewRPCInvocation("test", []interface{}{"OK"}, nil)
secret := "dubbo"
current := strconv.Itoa(int(time.Now().Unix() * 1000))
......
......@@ -26,6 +26,10 @@ import (
"strings"
)
import (
"github.com/apache/dubbo-go/common/logger"
)
// Sign gets a signature string with given bytes
func Sign(metadata, key string) string {
return doSign([]byte(metadata), key)
......@@ -33,7 +37,7 @@ func Sign(metadata, key string) string {
// SignWithParams returns a signature with giving params and metadata.
func SignWithParams(params []interface{}, metadata, key string) (string, error) {
if params == nil || len(params) == 0 {
if len(params) == 0 {
return Sign(metadata, key), nil
}
......@@ -56,7 +60,9 @@ func toBytes(data []interface{}) ([]byte, error) {
func doSign(bytes []byte, key string) string {
mac := hmac.New(sha256.New, []byte(key))
mac.Write(bytes)
if _, err := mac.Write(bytes); err != nil {
logger.Error(err)
}
signature := mac.Sum(nil)
return base64.URLEncoding.EncodeToString(signature)
}
......
......@@ -84,7 +84,7 @@ func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invok
methodConfigPrefix := "methods." + invocation.MethodName() + "."
ivkURL := invoker.GetUrl()
limitTarget := ivkURL.ServiceKey()
limitRateConfig := constant.DEFAULT_EXECUTE_LIMIT
var limitRateConfig string
methodLevelConfig := ivkURL.GetParam(methodConfigPrefix+constant.EXECUTE_LIMIT_KEY, "")
if len(methodLevelConfig) > 0 {
......
......@@ -36,7 +36,7 @@ import (
func TestExecuteLimitFilterInvokeIgnored(t *testing.T) {
methodName := "hello"
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}, 0))
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}))
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
......@@ -51,7 +51,7 @@ func TestExecuteLimitFilterInvokeIgnored(t *testing.T) {
func TestExecuteLimitFilterInvokeConfigureError(t *testing.T) {
methodName := "hello1"
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}, 0))
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}))
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
......@@ -68,7 +68,7 @@ func TestExecuteLimitFilterInvokeConfigureError(t *testing.T) {
func TestExecuteLimitFilterInvoke(t *testing.T) {
methodName := "hello1"
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}, 0))
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}))
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
......
......@@ -125,7 +125,7 @@ func TestGenericServiceFilterResponseTestStruct(t *testing.T) {
filter := GetGenericServiceFilter()
methodName := "$invoke"
rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil)
r := filter.OnResponse(nil, result, nil, rpcInvocation)
r := filter.OnResponse(context.TODO(), result, nil, rpcInvocation)
assert.NotNil(t, r.Result())
assert.Equal(t, reflect.ValueOf(r.Result()).Kind(), reflect.Map)
}
......@@ -143,7 +143,7 @@ func TestGenericServiceFilterResponseString(t *testing.T) {
filter := GetGenericServiceFilter()
methodName := "$invoke"
rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil)
r := filter.OnResponse(nil, result, nil, rpcInvocation)
r := filter.OnResponse(context.TODO(), result, nil, rpcInvocation)
assert.NotNil(t, r.Result())
assert.Equal(t, reflect.ValueOf(r.Result()).Kind(), reflect.String)
}
......@@ -39,10 +39,8 @@ import (
)
func TestGenericFilterInvoke(t *testing.T) {
invoc := invocation.NewRPCInvocation("GetUser", []interface{}{"OK"}, make(map[string]interface{}, 0))
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}))
invoc := invocation.NewRPCInvocation("GetUser", []interface{}{"OK"}, make(map[string]interface{}))
invokeUrl := common.NewURLWithOptions(common.WithParams(url.Values{}))
shutdownFilter := extension.GetFilter(constant.PROVIDER_SHUTDOWN_FILTER).(*gracefulShutdownFilter)
......@@ -65,7 +63,8 @@ func TestGenericFilterInvoke(t *testing.T) {
shutdownFilter.shutdownConfig = providerConfig.ShutdownConfig
assert.True(t, shutdownFilter.rejectNewRequest())
result = shutdownFilter.OnResponse(nil, nil, protocol.NewBaseInvoker(invokeUrl), invoc)
result = shutdownFilter.OnResponse(context.Background(), nil, protocol.NewBaseInvoker(invokeUrl), invoc)
assert.Nil(t, result)
rejectHandler := &common2.OnlyLogRejectedExecutionHandler{}
extension.SetRejectedExecutionHandler("mock", func() filter.RejectedExecutionHandler {
......
......@@ -23,6 +23,7 @@ import (
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
......@@ -30,12 +31,12 @@ import (
)
const (
SEATA = "seata"
SEATA_XID = "SEATA_XID"
SEATA = constant.DubboCtxKey("seata")
SEATA_XID = constant.DubboCtxKey("SEATA_XID")
)
func init() {
extension.SetFilter(SEATA, getSeataFilter)
extension.SetFilter(string(SEATA), getSeataFilter)
}
// SeataFilter when use seata-golang, use this filter to transfer xid
......@@ -45,7 +46,7 @@ type SeataFilter struct{}
// Invoke Get Xid by attachment key `SEATA_XID`
func (sf *SeataFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking seata filter.")
xid := invocation.AttachmentsByKey(SEATA_XID, "")
xid := invocation.AttachmentsByKey(string(SEATA_XID), "")
if strings.TrimSpace(xid) != "" {
logger.Debugf("Method: %v,Xid: %v", invocation.MethodName(), xid)
return invoker.Invoke(context.WithValue(ctx, SEATA_XID, xid), invocation)
......
......@@ -50,7 +50,7 @@ func TestSeataFilter_Invoke(t *testing.T) {
filter := getSeataFilter()
result := filter.Invoke(context.Background(), &testMockSeataInvoker{}, invocation.NewRPCInvocation("$echo",
[]interface{}{"OK"}, map[string]interface{}{
SEATA_XID: "10.30.21.227:8091:2000047792",
string(SEATA_XID): "10.30.21.227:8091:2000047792",
}))
assert.Equal(t, "10.30.21.227:8091:2000047792", result.Result())
}
......@@ -205,8 +205,8 @@ const (
DefaultProviderPrefix = "dubbo:provider:"
DefaultConsumerPrefix = "dubbo:consumer:"
MethodEntryKey = "$$sentinelMethodEntry"
InterfaceEntryKey = "$$sentinelInterfaceEntry"
MethodEntryKey = constant.DubboCtxKey("$$sentinelMethodEntry")
InterfaceEntryKey = constant.DubboCtxKey("$$sentinelInterfaceEntry")
)
func getResourceName(invoker protocol.Invoker, invocation protocol.Invocation, prefix string) (interfaceResourceName, methodResourceName string) {
......
......@@ -50,11 +50,11 @@ func TestSentinelFilter_QPS(t *testing.T) {
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: interfaceResourceName,
Resource: interfaceResourceName,
//MetricType: flow.QPS,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
Threshold: 100,
Threshold: 100,
RelationStrategy: flow.CurrentResource,
},
})
......
......@@ -40,7 +40,7 @@ func TestTokenFilterInvoke(t *testing.T) {
url := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TOKEN_KEY, "ori_key"))
attch := make(map[string]interface{}, 0)
attch := make(map[string]interface{})
attch[constant.TOKEN_KEY] = "ori_key"
result := filter.Invoke(context.Background(),
protocol.NewBaseInvoker(url),
......@@ -54,7 +54,7 @@ func TestTokenFilterInvokeEmptyToken(t *testing.T) {
filter := GetTokenFilter()
testUrl := common.URL{}
attch := make(map[string]interface{}, 0)
attch := make(map[string]interface{})
attch[constant.TOKEN_KEY] = "ori_key"
result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(&testUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch))
assert.Nil(t, result.Error())
......@@ -67,7 +67,7 @@ func TestTokenFilterInvokeEmptyAttach(t *testing.T) {
testUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TOKEN_KEY, "ori_key"))
attch := make(map[string]interface{}, 0)
attch := make(map[string]interface{})
result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(testUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch))
assert.NotNil(t, result.Error())
}
......@@ -78,7 +78,7 @@ func TestTokenFilterInvokeNotEqual(t *testing.T) {
testUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TOKEN_KEY, "ori_key"))
attch := make(map[string]interface{}, 0)
attch := make(map[string]interface{})
attch[constant.TOKEN_KEY] = "err_key"
result := filter.Invoke(context.Background(),
protocol.NewBaseInvoker(testUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch))
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment