Skip to content
Snippets Groups Projects
Commit 7b8cecf7 authored by vito.he's avatar vito.he
Browse files

Merge branch 'develop' into metadata_report

parents e204bc23 8394c305
No related branches found
No related tags found
No related merge requests found
Showing
with 561 additions and 321 deletions
......@@ -40,8 +40,9 @@ func NewRestExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map
func (re *RestExporter) Unexport() {
serviceId := re.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "")
interfaceName := re.GetInvoker().GetUrl().GetParam(constant.INTERFACE_KEY, "")
re.BaseExporter.Unexport()
err := common.ServiceMap.UnRegister(REST, serviceId)
err := common.ServiceMap.UnRegister(interfaceName, REST, serviceId)
if err != nil {
logger.Errorf("[RestExporter.Unexport] error: %v", err)
}
......
......@@ -20,6 +20,7 @@ package rest
import (
"context"
"fmt"
"net/http"
)
import (
......@@ -56,7 +57,7 @@ func (ri *RestInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio
body interface{}
pathParams map[string]string
queryParams map[string]string
headers map[string]string
header http.Header
err error
)
if methodConfig == nil {
......@@ -71,24 +72,21 @@ func (ri *RestInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio
result.Err = err
return &result
}
if headers, err = restStringMapTransform(methodConfig.HeadersMap, inv.Arguments()); err != nil {
if header, err = getRestHttpHeader(methodConfig, inv.Arguments()); err != nil {
result.Err = err
return &result
}
if len(inv.Arguments()) > methodConfig.Body && methodConfig.Body >= 0 {
body = inv.Arguments()[methodConfig.Body]
}
req := &client.RestRequest{
req := &client.RestClientRequest{
Location: ri.GetUrl().Location,
Produces: methodConfig.Produces,
Consumes: methodConfig.Consumes,
Method: methodConfig.MethodType,
Path: methodConfig.Path,
PathParams: pathParams,
QueryParams: queryParams,
Body: body,
Headers: headers,
Header: header,
}
result.Err = ri.client.Do(req, inv.Reply())
if result.Err == nil {
......@@ -107,3 +105,17 @@ func restStringMapTransform(paramsMap map[int]string, args []interface{}) (map[s
}
return resMap, nil
}
func getRestHttpHeader(methodConfig *config.RestMethodConfig, args []interface{}) (http.Header, error) {
header := http.Header{}
headersMap := methodConfig.HeadersMap
header.Set("Content-Type", methodConfig.Consumes)
header.Set("Accept", methodConfig.Produces)
for k, v := range headersMap {
if k >= len(args) || k < 0 {
return nil, perrors.Errorf("[Rest Invoke] Index %v is out of bundle", k)
}
header.Set(v, fmt.Sprint(args[k]))
}
return header, nil
}
......@@ -61,7 +61,7 @@ func TestRestInvoker_Invoke(t *testing.T) {
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245")
assert.NoError(t, err)
_, err = common.ServiceMap.Register(url.Protocol, &UserProvider{})
_, err = common.ServiceMap.Register("UserProvider", url.Protocol, &UserProvider{})
assert.NoError(t, err)
con := config.ProviderConfig{}
config.SetProviderConfig(con)
......@@ -206,6 +206,6 @@ func TestRestInvoker_Invoke(t *testing.T) {
assert.Error(t, res.Error(), "test error")
assert.Equal(t, filterNum, 12)
err = common.ServiceMap.UnRegister(url.Protocol, "com.ikurento.user.UserProvider")
err = common.ServiceMap.UnRegister("UserProvider", url.Protocol, "com.ikurento.user.UserProvider")
assert.NoError(t, err)
}
......@@ -75,7 +75,9 @@ func (rp *RestProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
}
rp.SetExporterMap(serviceKey, exporter)
restServer := rp.getServer(url, restServiceConfig.Server)
restServer.Deploy(invoker, restServiceConfig.RestMethodConfigsMap)
for _, methodConfig := range restServiceConfig.RestMethodConfigsMap {
restServer.Deploy(methodConfig, server.GetRouteFunc(invoker, methodConfig))
}
return exporter
}
......
......@@ -80,7 +80,7 @@ func TestRestProtocol_Export(t *testing.T) {
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245")
assert.NoError(t, err)
_, err = common.ServiceMap.Register(url.Protocol, &UserProvider{})
_, err = common.ServiceMap.Register("UserProvider", url.Protocol, &UserProvider{})
assert.NoError(t, err)
con := config.ProviderConfig{}
config.SetProviderConfig(con)
......@@ -128,7 +128,7 @@ func TestRestProtocol_Export(t *testing.T) {
proto.Destroy()
_, ok = proto.(*RestProtocol).serverMap[url.Location]
assert.False(t, ok)
err = common.ServiceMap.UnRegister(url.Protocol, "com.ikurento.user.UserProvider")
err = common.ServiceMap.UnRegister("UserProvider", url.Protocol, "com.ikurento.user.UserProvider")
assert.NoError(t, err)
}
......
......@@ -17,15 +17,306 @@
package server
import (
"context"
"errors"
"net/http"
"reflect"
"strconv"
"strings"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/rest/config"
"github.com/apache/dubbo-go/protocol/invocation"
rest_config "github.com/apache/dubbo-go/protocol/rest/config"
)
const parseParameterErrorStr = "An error occurred while parsing parameters on the server"
// RestServer user can implement this server interface
type RestServer interface {
// Start rest server
Start(url common.URL)
Deploy(invoker protocol.Invoker, restMethodConfig map[string]*config.RestMethodConfig)
UnDeploy(restMethodConfig map[string]*config.RestMethodConfig)
// Deploy a http api
Deploy(restMethodConfig *rest_config.RestMethodConfig, routeFunc func(request RestServerRequest, response RestServerResponse))
// UnDeploy a http api
UnDeploy(restMethodConfig *rest_config.RestMethodConfig)
// Destroy rest server
Destroy()
}
// RestServerRequest interface
type RestServerRequest interface {
// RawRequest get the Ptr of http.Request
RawRequest() *http.Request
// PathParameter get the path parameter by name
PathParameter(name string) string
// PathParameters get the map of the path parameters
PathParameters() map[string]string
// QueryParameter get the query parameter by name
QueryParameter(name string) string
// QueryParameters get the map of query parameters
QueryParameters(name string) []string
// BodyParameter get the body parameter of name
BodyParameter(name string) (string, error)
// HeaderParameter get the header parameter of name
HeaderParameter(name string) string
// ReadEntity checks the Accept header and reads the content into the entityPointer.
ReadEntity(entityPointer interface{}) error
}
// RestServerResponse interface
type RestServerResponse interface {
http.ResponseWriter
// WriteError writes the http status and the error string on the response. err can be nil.
// Return an error if writing was not successful.
WriteError(httpStatus int, err error) (writeErr error)
// WriteEntity marshals the value using the representation denoted by the Accept Header.
WriteEntity(value interface{}) error
}
// GetRouteFunc
// A route function will be invoked by http server
func GetRouteFunc(invoker protocol.Invoker, methodConfig *rest_config.RestMethodConfig) func(req RestServerRequest, resp RestServerResponse) {
return func(req RestServerRequest, resp RestServerResponse) {
var (
err error
args []interface{}
)
svc := common.ServiceMap.GetService(invoker.GetUrl().Protocol, strings.TrimPrefix(invoker.GetUrl().Path, "/"))
// get method
method := svc.Method()[methodConfig.MethodName]
argsTypes := method.ArgsType()
replyType := method.ReplyType()
// two ways to prepare arguments
// if method like this 'func1(req []interface{}, rsp *User) error'
// we don't have arguments type
if (len(argsTypes) == 1 || len(argsTypes) == 2 && replyType == nil) &&
argsTypes[0].String() == "[]interface {}" {
args, err = getArgsInterfaceFromRequest(req, methodConfig)
} else {
args, err = getArgsFromRequest(req, argsTypes, methodConfig)
}
if err != nil {
logger.Errorf("[Go Restful] parsing http parameters error:%v", err)
err = resp.WriteError(http.StatusInternalServerError, errors.New(parseParameterErrorStr))
if err != nil {
logger.Errorf("[Go Restful] WriteErrorString error:%v", err)
}
}
result := invoker.Invoke(context.Background(), invocation.NewRPCInvocation(methodConfig.MethodName, args, make(map[string]string)))
if result.Error() != nil {
err = resp.WriteError(http.StatusInternalServerError, result.Error())
if err != nil {
logger.Errorf("[Go Restful] WriteError error:%v", err)
}
return
}
err = resp.WriteEntity(result.Result())
if err != nil {
logger.Errorf("[Go Restful] WriteEntity error:%v", err)
}
}
}
// getArgsInterfaceFromRequest when service function like GetUser(req []interface{}, rsp *User) error
// use this method to get arguments
func getArgsInterfaceFromRequest(req RestServerRequest, methodConfig *rest_config.RestMethodConfig) ([]interface{}, error) {
argsMap := make(map[int]interface{}, 8)
maxKey := 0
for k, v := range methodConfig.PathParamsMap {
if maxKey < k {
maxKey = k
}
argsMap[k] = req.PathParameter(v)
}
for k, v := range methodConfig.QueryParamsMap {
if maxKey < k {
maxKey = k
}
params := req.QueryParameters(v)
if len(params) == 1 {
argsMap[k] = params[0]
} else {
argsMap[k] = params
}
}
for k, v := range methodConfig.HeadersMap {
if maxKey < k {
maxKey = k
}
argsMap[k] = req.HeaderParameter(v)
}
if methodConfig.Body >= 0 {
if maxKey < methodConfig.Body {
maxKey = methodConfig.Body
}
m := make(map[string]interface{})
// TODO read as a slice
if err := req.ReadEntity(&m); err != nil {
return nil, perrors.Errorf("[Go restful] Read body entity as map[string]interface{} error:%v", err)
}
argsMap[methodConfig.Body] = m
}
args := make([]interface{}, maxKey+1)
for k, v := range argsMap {
if k >= 0 {
args[k] = v
}
}
return args, nil
}
// getArgsFromRequest get arguments from server.RestServerRequest
func getArgsFromRequest(req RestServerRequest, argsTypes []reflect.Type, methodConfig *rest_config.RestMethodConfig) ([]interface{}, error) {
argsLength := len(argsTypes)
args := make([]interface{}, argsLength)
for i, t := range argsTypes {
args[i] = reflect.Zero(t).Interface()
}
if err := assembleArgsFromPathParams(methodConfig, argsLength, argsTypes, req, args); err != nil {
return nil, err
}
if err := assembleArgsFromQueryParams(methodConfig, argsLength, argsTypes, req, args); err != nil {
return nil, err
}
if err := assembleArgsFromBody(methodConfig, argsTypes, req, args); err != nil {
return nil, err
}
if err := assembleArgsFromHeaders(methodConfig, req, argsLength, argsTypes, args); err != nil {
return nil, err
}
return args, nil
}
// assembleArgsFromHeaders assemble arguments from headers
func assembleArgsFromHeaders(methodConfig *rest_config.RestMethodConfig, req RestServerRequest, argsLength int, argsTypes []reflect.Type, args []interface{}) error {
for k, v := range methodConfig.HeadersMap {
param := req.HeaderParameter(v)
if k < 0 || k >= argsLength {
return perrors.Errorf("[Go restful] Header param parse error, the index %v args of method:%v doesn't exist", k, methodConfig.MethodName)
}
t := argsTypes[k]
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
if t.Kind() == reflect.String {
args[k] = param
} else {
return perrors.Errorf("[Go restful] Header param parse error, the index %v args's type isn't string", k)
}
}
return nil
}
// assembleArgsFromBody assemble arguments from body
func assembleArgsFromBody(methodConfig *rest_config.RestMethodConfig, argsTypes []reflect.Type, req RestServerRequest, args []interface{}) error {
if methodConfig.Body >= 0 && methodConfig.Body < len(argsTypes) {
t := argsTypes[methodConfig.Body]
kind := t.Kind()
if kind == reflect.Ptr {
t = t.Elem()
}
var ni interface{}
if t.String() == "[]interface {}" {
ni = make([]map[string]interface{}, 0)
} else if t.String() == "interface {}" {
ni = make(map[string]interface{})
} else {
n := reflect.New(t)
if n.CanInterface() {
ni = n.Interface()
}
}
if err := req.ReadEntity(&ni); err != nil {
return perrors.Errorf("[Go restful] Read body entity error, error is %v", perrors.WithStack(err))
}
args[methodConfig.Body] = ni
}
return nil
}
// assembleArgsFromQueryParams assemble arguments from query params
func assembleArgsFromQueryParams(methodConfig *rest_config.RestMethodConfig, argsLength int, argsTypes []reflect.Type, req RestServerRequest, args []interface{}) error {
var (
err error
param interface{}
i64 int64
)
for k, v := range methodConfig.QueryParamsMap {
if k < 0 || k >= argsLength {
return perrors.Errorf("[Go restful] Query param parse error, the index %v args of method:%v doesn't exist", k, methodConfig.MethodName)
}
t := argsTypes[k]
kind := t.Kind()
if kind == reflect.Ptr {
t = t.Elem()
}
if kind == reflect.Slice {
param = req.QueryParameters(v)
} else if kind == reflect.String {
param = req.QueryParameter(v)
} else if kind == reflect.Int {
param, err = strconv.Atoi(req.QueryParameter(v))
} else if kind == reflect.Int32 {
i64, err = strconv.ParseInt(req.QueryParameter(v), 10, 32)
if err == nil {
param = int32(i64)
}
} else if kind == reflect.Int64 {
param, err = strconv.ParseInt(req.QueryParameter(v), 10, 64)
} else {
return perrors.Errorf("[Go restful] Query param parse error, the index %v args's type isn't int or string or slice", k)
}
if err != nil {
return perrors.Errorf("[Go restful] Query param parse error, error:%v", perrors.WithStack(err))
}
args[k] = param
}
return nil
}
// assembleArgsFromPathParams assemble arguments from path params
func assembleArgsFromPathParams(methodConfig *rest_config.RestMethodConfig, argsLength int, argsTypes []reflect.Type, req RestServerRequest, args []interface{}) error {
var (
err error
param interface{}
i64 int64
)
for k, v := range methodConfig.PathParamsMap {
if k < 0 || k >= argsLength {
return perrors.Errorf("[Go restful] Path param parse error, the index %v args of method:%v doesn't exist", k, methodConfig.MethodName)
}
t := argsTypes[k]
kind := t.Kind()
if kind == reflect.Ptr {
t = t.Elem()
}
if kind == reflect.Int {
param, err = strconv.Atoi(req.PathParameter(v))
} else if kind == reflect.Int32 {
i64, err = strconv.ParseInt(req.PathParameter(v), 10, 32)
if err == nil {
param = int32(i64)
}
} else if kind == reflect.Int64 {
param, err = strconv.ParseInt(req.PathParameter(v), 10, 64)
} else if kind == reflect.String {
param = req.PathParameter(v)
} else {
return perrors.Errorf("[Go restful] Path param parse error, the index %v args's type isn't int or string", k)
}
if err != nil {
return perrors.Errorf("[Go restful] Path param parse error, error is %v", perrors.WithStack(err))
}
args[k] = param
}
return nil
}
......@@ -22,8 +22,6 @@ import (
"fmt"
"net"
"net/http"
"reflect"
"strconv"
"strings"
"time"
)
......@@ -38,27 +36,29 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/protocol/rest/config"
"github.com/apache/dubbo-go/protocol/rest/server"
)
func init() {
extension.SetRestServer(constant.DEFAULT_REST_SERVER, GetNewGoRestfulServer)
extension.SetRestServer(constant.DEFAULT_REST_SERVER, NewGoRestfulServer)
}
var filterSlice []restful.FilterFunction
// GoRestfulServer a rest server implement by go-restful
type GoRestfulServer struct {
srv *http.Server
container *restful.Container
}
func NewGoRestfulServer() *GoRestfulServer {
// NewGoRestfulServer a constructor of GoRestfulServer
func NewGoRestfulServer() server.RestServer {
return &GoRestfulServer{}
}
// Start go-restful server
// It will add all go-restful filters
func (grs *GoRestfulServer) Start(url common.URL) {
grs.container = restful.NewContainer()
for _, filter := range filterSlice {
......@@ -80,61 +80,32 @@ func (grs *GoRestfulServer) Start(url common.URL) {
}()
}
func (grs *GoRestfulServer) Deploy(invoker protocol.Invoker, restMethodConfig map[string]*config.RestMethodConfig) {
svc := common.ServiceMap.GetService(invoker.GetUrl().Protocol, strings.TrimPrefix(invoker.GetUrl().Path, "/"))
for methodName, config := range restMethodConfig {
// get method
method := svc.Method()[methodName]
argsTypes := method.ArgsType()
replyType := method.ReplyType()
ws := new(restful.WebService)
ws.Path(config.Path).
Produces(strings.Split(config.Produces, ",")...).
Consumes(strings.Split(config.Consumes, ",")...).
Route(ws.Method(config.MethodType).To(getFunc(methodName, invoker, argsTypes, replyType, config)))
grs.container.Add(ws)
// Publish a http api in go-restful server
// The routeFunc should be invoked when the server receive a request
func (grs *GoRestfulServer) Deploy(restMethodConfig *config.RestMethodConfig, routeFunc func(request server.RestServerRequest, response server.RestServerResponse)) {
ws := &restful.WebService{}
rf := func(req *restful.Request, resp *restful.Response) {
routeFunc(NewGoRestfulRequestAdapter(req), resp)
}
ws.Path(restMethodConfig.Path).
Produces(strings.Split(restMethodConfig.Produces, ",")...).
Consumes(strings.Split(restMethodConfig.Consumes, ",")...).
Route(ws.Method(restMethodConfig.MethodType).To(rf))
grs.container.Add(ws)
}
func getFunc(methodName string, invoker protocol.Invoker, argsTypes []reflect.Type,
replyType reflect.Type, config *config.RestMethodConfig) func(req *restful.Request, resp *restful.Response) {
return func(req *restful.Request, resp *restful.Response) {
var (
err error
args []interface{}
)
if (len(argsTypes) == 1 || len(argsTypes) == 2 && replyType == nil) &&
argsTypes[0].String() == "[]interface {}" {
args = getArgsInterfaceFromRequest(req, config)
} else {
args = getArgsFromRequest(req, argsTypes, config)
}
result := invoker.Invoke(context.Background(), invocation.NewRPCInvocation(methodName, args, make(map[string]string)))
if result.Error() != nil {
err = resp.WriteError(http.StatusInternalServerError, result.Error())
if err != nil {
logger.Errorf("[Go Restful] WriteError error:%v", err)
}
return
}
err = resp.WriteEntity(result.Result())
if err != nil {
logger.Error("[Go Restful] WriteEntity error:%v", err)
}
}
}
func (grs *GoRestfulServer) UnDeploy(restMethodConfig map[string]*config.RestMethodConfig) {
for _, config := range restMethodConfig {
ws := new(restful.WebService)
ws.Path(config.Path)
err := grs.container.Remove(ws)
if err != nil {
logger.Warnf("[Go restful] Remove web service error:%v", err)
}
// Delete a http api in go-restful server
func (grs *GoRestfulServer) UnDeploy(restMethodConfig *config.RestMethodConfig) {
ws := new(restful.WebService)
ws.Path(restMethodConfig.Path)
err := grs.container.Remove(ws)
if err != nil {
logger.Warnf("[Go restful] Remove web service error:%v", err)
}
}
// Destroy the go-restful server
func (grs *GoRestfulServer) Destroy() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
......@@ -144,179 +115,59 @@ func (grs *GoRestfulServer) Destroy() {
logger.Infof("[Go Restful] Server exiting")
}
func getArgsInterfaceFromRequest(req *restful.Request, config *config.RestMethodConfig) []interface{} {
argsMap := make(map[int]interface{}, 8)
maxKey := 0
for k, v := range config.PathParamsMap {
if maxKey < k {
maxKey = k
}
argsMap[k] = req.PathParameter(v)
}
for k, v := range config.QueryParamsMap {
if maxKey < k {
maxKey = k
}
params := req.QueryParameters(v)
if len(params) == 1 {
argsMap[k] = params[0]
} else {
argsMap[k] = params
}
}
for k, v := range config.HeadersMap {
if maxKey < k {
maxKey = k
}
argsMap[k] = req.HeaderParameter(v)
}
if config.Body >= 0 {
if maxKey < config.Body {
maxKey = config.Body
}
m := make(map[string]interface{})
// TODO read as a slice
if err := req.ReadEntity(&m); err != nil {
logger.Warnf("[Go restful] Read body entity as map[string]interface{} error:%v", perrors.WithStack(err))
} else {
argsMap[config.Body] = m
}
}
args := make([]interface{}, maxKey+1)
for k, v := range argsMap {
if k >= 0 {
args[k] = v
}
}
return args
// AddGoRestfulServerFilter let user add the http server of go-restful
// addFilter should before config.Load()
func AddGoRestfulServerFilter(filterFuc restful.FilterFunction) {
filterSlice = append(filterSlice, filterFuc)
}
func getArgsFromRequest(req *restful.Request, argsTypes []reflect.Type, config *config.RestMethodConfig) []interface{} {
argsLength := len(argsTypes)
args := make([]interface{}, argsLength)
for i, t := range argsTypes {
args[i] = reflect.Zero(t).Interface()
}
var (
err error
param interface{}
i64 int64
)
for k, v := range config.PathParamsMap {
if k < 0 || k >= argsLength {
logger.Errorf("[Go restful] Path param parse error, the args:%v doesn't exist", k)
continue
}
t := argsTypes[k]
kind := t.Kind()
if kind == reflect.Ptr {
t = t.Elem()
}
if kind == reflect.Int {
param, err = strconv.Atoi(req.PathParameter(v))
} else if kind == reflect.Int32 {
i64, err = strconv.ParseInt(req.PathParameter(v), 10, 32)
if err == nil {
param = int32(i64)
}
} else if kind == reflect.Int64 {
param, err = strconv.ParseInt(req.PathParameter(v), 10, 64)
} else if kind == reflect.String {
param = req.PathParameter(v)
} else {
logger.Warnf("[Go restful] Path param parse error, the args:%v of type isn't int or string", k)
continue
}
if err != nil {
logger.Errorf("[Go restful] Path param parse error, error is %v", err)
continue
}
args[k] = param
}
for k, v := range config.QueryParamsMap {
if k < 0 || k >= argsLength {
logger.Errorf("[Go restful] Query param parse error, the args:%v doesn't exist", k)
continue
}
t := argsTypes[k]
kind := t.Kind()
if kind == reflect.Ptr {
t = t.Elem()
}
if kind == reflect.Slice {
param = req.QueryParameters(v)
} else if kind == reflect.String {
param = req.QueryParameter(v)
} else if kind == reflect.Int {
param, err = strconv.Atoi(req.QueryParameter(v))
} else if kind == reflect.Int32 {
i64, err = strconv.ParseInt(req.QueryParameter(v), 10, 32)
if err == nil {
param = int32(i64)
}
} else if kind == reflect.Int64 {
param, err = strconv.ParseInt(req.QueryParameter(v), 10, 64)
} else {
logger.Errorf("[Go restful] Query param parse error, the args:%v of type isn't int or string or slice", k)
continue
}
if err != nil {
logger.Errorf("[Go restful] Query param parse error, error is %v", err)
continue
}
args[k] = param
}
// GoRestfulRequestAdapter a adapter struct about RestServerRequest
type GoRestfulRequestAdapter struct {
server.RestServerRequest
request *restful.Request
}
if config.Body >= 0 && config.Body < len(argsTypes) {
t := argsTypes[config.Body]
kind := t.Kind()
if kind == reflect.Ptr {
t = t.Elem()
}
var ni interface{}
if t.String() == "[]interface {}" {
ni = make([]map[string]interface{}, 0)
} else if t.String() == "interface {}" {
ni = make(map[string]interface{})
} else {
n := reflect.New(t)
if n.CanInterface() {
ni = n.Interface()
}
}
if err := req.ReadEntity(&ni); err != nil {
logger.Errorf("[Go restful] Read body entity error:%v", err)
} else {
args[config.Body] = ni
}
}
// NewGoRestfulRequestAdapter a constructor of GoRestfulRequestAdapter
func NewGoRestfulRequestAdapter(request *restful.Request) *GoRestfulRequestAdapter {
return &GoRestfulRequestAdapter{request: request}
}
for k, v := range config.HeadersMap {
param := req.HeaderParameter(v)
if k < 0 || k >= argsLength {
logger.Errorf("[Go restful] Header param parse error, the args:%v doesn't exist", k)
continue
}
t := argsTypes[k]
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
if t.Kind() == reflect.String {
args[k] = param
} else {
logger.Errorf("[Go restful] Header param parse error, the args:%v of type isn't string", k)
}
}
// RawRequest a adapter function of server.RestServerRequest's RawRequest
func (grra *GoRestfulRequestAdapter) RawRequest() *http.Request {
return grra.request.Request
}
return args
// PathParameter a adapter function of server.RestServerRequest's PathParameter
func (grra *GoRestfulRequestAdapter) PathParameter(name string) string {
return grra.request.PathParameter(name)
}
func GetNewGoRestfulServer() server.RestServer {
return NewGoRestfulServer()
// PathParameters a adapter function of server.RestServerRequest's QueryParameter
func (grra *GoRestfulRequestAdapter) PathParameters() map[string]string {
return grra.request.PathParameters()
}
// Let user addFilter
// addFilter should before config.Load()
func AddGoRestfulServerFilter(filterFuc restful.FilterFunction) {
filterSlice = append(filterSlice, filterFuc)
// QueryParameter a adapter function of server.RestServerRequest's QueryParameters
func (grra *GoRestfulRequestAdapter) QueryParameter(name string) string {
return grra.request.QueryParameter(name)
}
// QueryParameters a adapter function of server.RestServerRequest's QueryParameters
func (grra *GoRestfulRequestAdapter) QueryParameters(name string) []string {
return grra.request.QueryParameters(name)
}
// BodyParameter a adapter function of server.RestServerRequest's BodyParameter
func (grra *GoRestfulRequestAdapter) BodyParameter(name string) (string, error) {
return grra.request.BodyParameter(name)
}
// HeaderParameter a adapter function of server.RestServerRequest's HeaderParameter
func (grra *GoRestfulRequestAdapter) HeaderParameter(name string) string {
return grra.request.HeaderParameter(name)
}
// ReadEntity a adapter func of server.RestServerRequest's ReadEntity
func (grra *GoRestfulRequestAdapter) ReadEntity(entityPointer interface{}) error {
return grra.request.ReadEntity(entityPointer)
}
......@@ -121,6 +121,7 @@ func (r *BaseRegistry) Destroy() {
close(r.done)
// wait waitgroup done (wait listeners outside close over)
r.wg.Wait()
//close registry client
r.closeRegisters()
}
......@@ -178,7 +179,10 @@ func (r *BaseRegistry) RestartCallBack() bool {
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
r.facadeBasedRegistry.InitListeners()
if flag {
r.facadeBasedRegistry.InitListeners()
}
return flag
}
......
......@@ -164,9 +164,7 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error)
//register the svc to dataListener
r.dataListener.AddInterestedURL(svc)
for _, v := range strings.Split(svc.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") {
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener)
}
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, svc.Service()), r.dataListener)
return configListener, nil
}
......@@ -21,7 +21,6 @@ import (
"fmt"
"os"
"path"
"strings"
"sync"
"time"
)
......@@ -135,9 +134,7 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er
//register the svc to dataListener
r.dataListener.AddInterestedURL(svc)
for _, v := range strings.Split(svc.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") {
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener)
}
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, svc.Service()), r.dataListener)
return configListener, nil
}
......
......@@ -45,6 +45,7 @@ import (
var (
regProtocol *registryProtocol
once sync.Once
)
type registryProtocol struct {
......@@ -346,12 +347,12 @@ func setProviderUrl(regURL *common.URL, providerURL *common.URL) {
regURL.SubURL = providerURL
}
// GetProtocol ...
// GetProtocol return the singleton RegistryProtocol
func GetProtocol() protocol.Protocol {
if regProtocol != nil {
return regProtocol
}
return newRegistryProtocol()
once.Do(func() {
regProtocol = newRegistryProtocol()
})
return regProtocol
}
type wrappedInvoker struct {
......
......@@ -35,23 +35,28 @@ import (
zk "github.com/apache/dubbo-go/remoting/zookeeper"
)
// RegistryDataListener ...
// RegistryDataListener contains all URL information subscribed by zookeeper registry
type RegistryDataListener struct {
interestedURL []*common.URL
listener config_center.ConfigurationListener
subscribed map[*common.URL]config_center.ConfigurationListener
mutex sync.Mutex
closed bool
}
// NewRegistryDataListener ...
func NewRegistryDataListener(listener config_center.ConfigurationListener) *RegistryDataListener {
return &RegistryDataListener{listener: listener}
// NewRegistryDataListener constructs a new RegistryDataListener
func NewRegistryDataListener() *RegistryDataListener {
return &RegistryDataListener{
subscribed: make(map[*common.URL]config_center.ConfigurationListener)}
}
// AddInterestedURL ...
func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
// SubscribeURL is used to set a watch listener for url
func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_center.ConfigurationListener) {
if l.closed {
return
}
l.subscribed[url] = listener
}
// DataChange ...
// DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
// Intercept the last bit
index := strings.Index(eventType.Path, "/providers/")
......@@ -65,10 +70,14 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path)
return false
}
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(
l.mutex.Lock()
defer l.mutex.Unlock()
if l.closed {
return false
}
for url, listener := range l.subscribed {
if serviceURL.URLEqual(*url) {
listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Value: serviceURL,
......@@ -81,38 +90,48 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
return false
}
// RegistryConfigurationListener ...
// Close all RegistryConfigurationListener in subscribed
func (l *RegistryDataListener) Close() {
l.mutex.Lock()
defer l.mutex.Unlock()
for _, listener := range l.subscribed {
listener.(*RegistryConfigurationListener).Close()
}
}
// RegistryConfigurationListener represent the processor of zookeeper watcher
type RegistryConfigurationListener struct {
client *zk.ZookeeperClient
registry *zkRegistry
events chan *config_center.ConfigChangeEvent
isClosed bool
close chan struct{}
closeOnce sync.Once
}
// NewRegistryConfigurationListener for listening the event of zk.
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
reg.WaitGroup().Add(1)
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false}
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false, close: make(chan struct{}, 1)}
}
// Process ...
// Process submit the ConfigChangeEvent to the event chan to notify all observer
func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}
// Next ...
// Next will observe the registry state and events chan
func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.client.Done():
logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.")
return nil, perrors.New("listener stopped")
logger.Warnf("listener's zk client connection (address {%s}) is broken, so zk event listener exit now.", l.client.ZkAddrs)
return nil, perrors.New("zookeeper client stopped")
case <-l.close:
return nil, perrors.New("listener have been closed")
case <-l.registry.Done():
logger.Warnf("zk consumer register has quit, so zk event listener exit now.")
return nil, perrors.New("listener stopped")
logger.Warnf("zk consumer register has quit, so zk event listener exit now. (registry url {%v}", l.registry.BaseRegistry.URL)
return nil, perrors.New("zookeeper registry, (registry url{%v}) stopped")
case e := <-l.events:
logger.Debugf("got zk event %s", e)
if e.ConfigType == remoting.EventTypeDel && !l.valid() {
......@@ -127,15 +146,17 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
}
}
// Close ...
// Close RegistryConfigurationListener only once
func (l *RegistryConfigurationListener) Close() {
// ensure that the listener will be closed at most once.
l.closeOnce.Do(func() {
l.isClosed = true
l.close <- struct{}{}
l.registry.WaitGroup().Done()
})
}
// valid return the true if the client conn isn't nil
func (l *RegistryConfigurationListener) valid() bool {
return l.client.ZkConnValid()
}
......@@ -32,15 +32,15 @@ import (
)
func Test_DataChange(t *testing.T) {
listener := NewRegistryDataListener(&MockDataListener{})
listener := NewRegistryDataListener()
url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-1.3.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
listener.AddInterestedURL(&url)
listener.SubscribeURL(&url, &MockConfigurationListener{})
int := listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-1.3.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"})
assert.Equal(t, true, int)
}
type MockDataListener struct {
type MockConfigurationListener struct {
}
func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) {
func (*MockConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
}
......@@ -20,7 +20,6 @@ package zookeeper
import (
"fmt"
"net/url"
"strings"
"sync"
"time"
)
......@@ -54,12 +53,11 @@ func init() {
type zkRegistry struct {
registry.BaseRegistry
client *zookeeper.ZookeeperClient
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
dataListener *RegistryDataListener
configListener *RegistryConfigurationListener
cltLock sync.Mutex
client *zookeeper.ZookeeperClient
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
dataListener *RegistryDataListener
cltLock sync.Mutex
//for provider
zkPath map[string]int // key = protocol://ip:port/interface
}
......@@ -78,13 +76,12 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
if err != nil {
return nil, err
}
r.WaitGroup().Add(1) //zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)
r.listener = zookeeper.NewZkEventListener(r.client)
r.configListener = NewRegistryConfigurationListener(r.client, r)
r.dataListener = NewRegistryDataListener(r.configListener)
r.dataListener = NewRegistryDataListener()
return r, nil
}
......@@ -121,8 +118,27 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust
func (r *zkRegistry) InitListeners() {
r.listener = zookeeper.NewZkEventListener(r.client)
r.configListener = NewRegistryConfigurationListener(r.client, r)
r.dataListener = NewRegistryDataListener(r.configListener)
newDataListener := NewRegistryDataListener()
// should recover if dataListener isn't nil before
if r.dataListener != nil {
// close all old listener
oldDataListener := r.dataListener
oldDataListener.mutex.Lock()
defer oldDataListener.mutex.Unlock()
recoverd := r.dataListener.subscribed
if recoverd != nil && len(recoverd) > 0 {
// recover all subscribed url
for conf, oldListener := range recoverd {
if regConfigListener, ok := oldListener.(*RegistryConfigurationListener); ok {
regConfigListener.Close()
}
newDataListener.SubscribeURL(conf, NewRegistryConfigurationListener(r.client, r))
go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), newDataListener)
}
}
}
r.dataListener = newDataListener
}
func (r *zkRegistry) CreatePath(path string) error {
......@@ -155,8 +171,8 @@ func (r *zkRegistry) ZkClientLock() *sync.Mutex {
}
func (r *zkRegistry) CloseListener() {
if r.configListener != nil {
r.configListener.Close()
if r.dataListener != nil {
r.dataListener.Close()
}
}
......@@ -173,32 +189,49 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
logger.Errorf("zk.Create(root{%s}) = err{%v}", root, perrors.WithStack(err))
return perrors.WithStack(err)
}
// try to register the node
zkPath, err = r.client.RegisterTemp(root, node)
if err != nil {
if err == zk.ErrNodeExists {
logger.Warnf("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
} else {
logger.Errorf("RegisterTempNode(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
if perrors.Cause(err) == zk.ErrNodeExists {
// should delete the old node
logger.Info("Register temp node failed, try to delete the old and recreate (root{%s}, node{%s}) , ignore!", root, node)
if err = r.client.Delete(zkPath); err == nil {
_, err = r.client.RegisterTemp(root, node)
}
if err != nil {
logger.Errorf("Recreate the temp node failed, (root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
}
}
return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
}
logger.Debugf("create a zookeeper node:%s", zkPath)
logger.Debugf("Create a zookeeper node:%s", zkPath)
return nil
}
func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) {
var (
zkListener *RegistryConfigurationListener
)
r.listenerLock.Lock()
if r.configListener.isClosed {
r.listenerLock.Unlock()
return nil, perrors.New("configListener already been closed")
var zkListener *RegistryConfigurationListener
dataListener := r.dataListener
dataListener.mutex.Lock()
defer dataListener.mutex.Unlock()
if r.dataListener.subscribed[conf] != nil {
zkListener, _ := r.dataListener.subscribed[conf].(*RegistryConfigurationListener)
if zkListener != nil {
r.listenerLock.Lock()
defer r.listenerLock.Unlock()
if zkListener.isClosed {
return nil, perrors.New("configListener already been closed")
} else {
return zkListener, nil
}
}
}
zkListener = r.configListener
r.listenerLock.Unlock()
zkListener = NewRegistryConfigurationListener(r.client, r)
if r.listener == nil {
r.cltLock.Lock()
client := r.client
......@@ -216,10 +249,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
}
//Interested register to dataconfig.
r.dataListener.AddInterestedURL(conf)
for _, v := range strings.Split(conf.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") {
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, url.QueryEscape(conf.Service())), r.dataListener)
}
r.dataListener.SubscribeURL(conf, zkListener)
go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), r.dataListener)
return zkListener, nil
}
......@@ -118,7 +118,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
for _, opt := range opts {
opt(opions)
}
connected := false
err = nil
lock := container.ZkClientLock()
......@@ -143,6 +143,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
container.SetZkClient(newClient)
connected = true
}
if container.ZkClient().Conn == nil {
......@@ -150,10 +151,16 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
container.ZkClient().Conn, event, err = zk.Connect(container.ZkClient().ZkAddrs, container.ZkClient().Timeout)
if err == nil {
container.ZkClient().Wait.Add(1)
connected = true
go container.ZkClient().HandleZkEvent(event)
}
}
if connected {
logger.Info("Connect to zookeeper successfully, name{%s}, zk address{%v}", opions.zkName, url.Location)
container.WaitGroup().Add(1) //zk client start successful, then registry wg +1
}
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
}
......@@ -386,6 +393,7 @@ func (z *ZookeeperClient) Close() {
z.Conn = nil
z.Unlock()
if conn != nil {
logger.Warnf("zkClient Conn{name:%s, zk addr:%s} exit now.", z.name, conn.SessionID())
conn.Close()
}
......@@ -470,7 +478,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
//if err != nil && err != zk.ErrNodeExists {
if err != nil {
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err))
return "", perrors.WithStack(err)
return zkPath, perrors.WithStack(err)
}
logger.Debugf("zkClient{%s} create a temp zookeeper node:%s\n", z.name, tmpPath)
......
......@@ -48,11 +48,11 @@ func HandleClientRestart(r zkClientFacade) {
failTimes int
)
defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.Done():
r.WaitGroup().Done() // dec the wg when registry is closed
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
// re-register all services
......@@ -63,12 +63,14 @@ LOOP:
zkAddress := r.ZkClient().ZkAddrs
r.SetZkClient(nil)
r.ZkClientLock().Unlock()
r.WaitGroup().Done() // dec the wg when zk client is closed
// Connect zk until success.
failTimes = 0
for {
select {
case <-r.Done():
r.WaitGroup().Done() // dec the wg when registry is closed
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // Prevent crazy reconnection zk.
......
......@@ -38,6 +38,16 @@ type mockFacade struct {
done chan struct{}
}
func newMockFacade(client *ZookeeperClient, url *common.URL) zkClientFacade {
mock := &mockFacade{
client: client,
URL: url,
}
mock.wg.Add(1)
return mock
}
func (r *mockFacade) ZkClient() *ZookeeperClient {
return r.client
}
......@@ -80,7 +90,7 @@ func Test_Facade(t *testing.T) {
assert.NoError(t, err)
defer ts.Stop()
url, _ := common.NewURL("mock://127.0.0.1")
mock := &mockFacade{client: z, URL: &url}
mock := newMockFacade(z, &url)
go HandleClientRestart(mock)
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
......
......@@ -18,6 +18,7 @@
package zookeeper
import (
"github.com/apache/dubbo-go/common"
"path"
"strings"
"sync"
......@@ -173,7 +174,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
}
}
func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listener remoting.DataListener) {
defer l.wg.Done()
var (
......@@ -224,7 +225,16 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
}
failTimes = 0
for _, c := range children {
// listen l service node
// Only need to compare Path when subscribing to provider
if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) != -1 {
provider, _ := common.NewURL(c)
if provider.Path != conf.Path {
continue
}
}
//listen l service node
dubboPath := path.Join(zkPath, c)
//Save the path to avoid listen repeatedly
......@@ -232,7 +242,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
_, ok := l.pathMap[dubboPath]
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
logger.Warnf("@zkPath %s has already been listened.", dubboPath)
continue
}
......@@ -263,7 +273,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
strings.LastIndex(zkPath, constant.CONSUMER_CATEGORY) == -1 {
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
l.listenDirEvent(conf, zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}
......@@ -291,11 +301,11 @@ func timeSecondDuration(sec int) time.Duration {
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, listener remoting.DataListener) {
logger.Infof("listen dubbo path{%s}", zkPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
l.listenDirEvent(conf, zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}
......
......@@ -97,7 +97,7 @@ func TestListener(t *testing.T) {
go client.HandleZkEvent(event)
listener := NewZkEventListener(client)
dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait}
listener.ListenServiceEvent("/dubbo", dataListener)
listener.ListenServiceEvent(nil, "/dubbo", dataListener)
time.Sleep(1 * time.Second)
_, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1)
assert.NoError(t, err)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment