Skip to content
Snippets Groups Projects
Commit 1fa11bdf authored by flycash's avatar flycash
Browse files

Merge develop

parents a17c04f4 f1d03e95
No related branches found
No related tags found
No related merge requests found
Showing
with 391 additions and 219 deletions
......@@ -16,7 +16,7 @@ Apache License, Version 2.0
## Release note ##
[v1.4.0-rc1 - Mar 12, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0-rc1)
[v1.4.0 - Mar 17, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
[v1.3.0 - Mar 1, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.3.0)
......
......@@ -15,7 +15,7 @@ Apache License, Version 2.0
## 发布日志 ##
[v1.4.0-rc1 - 2020年3月12](https://github.com/apache/dubbo-go/releases/tag/v1.4.0-rc1)
[v1.4.0 - 2020年3月17](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
[v1.3.0 - 2020年3月1日](https://github.com/apache/dubbo-go/releases/tag/v1.3.0)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/registry"
)
type registryDirectory func(url *common.URL, registry registry.Registry) (cluster.Directory, error)
var defaultRegistry registryDirectory
// SetDefaultRegistryDirectory ...
func SetDefaultRegistryDirectory(v registryDirectory) {
defaultRegistry = v
}
// GetDefaultRegistryDirectory ...
func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (cluster.Directory, error) {
if defaultRegistry == nil {
panic("registry directory is not existing, make sure you have import the package.")
}
return defaultRegistry(config, registry)
}
......@@ -44,7 +44,7 @@ var (
typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type()
)
// NewProxy ...
// NewProxy create service proxy.
func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[string]string) *Proxy {
return &Proxy{
invoke: invoke,
......@@ -59,7 +59,6 @@ func NewProxy(invoke protocol.Invoker, callBack interface{}, attachments map[str
// type XxxProvider struct {
// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error
// }
func (p *Proxy) Implement(v common.RPCService) {
// check parameters, incoming interface must be a elem's pointer.
......@@ -202,12 +201,12 @@ func (p *Proxy) Implement(v common.RPCService) {
}
// Get ...
// Get get rpc service instance.
func (p *Proxy) Get() common.RPCService {
return p.rpc
}
// GetCallback ...
// GetCallback get callback.
func (p *Proxy) GetCallback() interface{} {
return p.callBack
}
......@@ -22,7 +22,7 @@ import (
"github.com/apache/dubbo-go/protocol"
)
// ProxyFactory ...
// ProxyFactory interface.
type ProxyFactory interface {
GetProxy(invoker protocol.Invoker, url *common.URL) *Proxy
GetAsyncProxy(invoker protocol.Invoker, callBack interface{}, url *common.URL) *Proxy
......
......@@ -59,7 +59,6 @@ type AsyncCallback func(response CallbackResponse)
// return map[string][string]{}
// }
const (
// METHOD_MAPPER ...
METHOD_MAPPER = "MethodMapper"
)
......@@ -68,7 +67,7 @@ var (
// because Typeof takes an empty interface value. This is annoying.
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// ServiceMap ...
// ServiceMap store description of service.
// todo: lowerecas?
ServiceMap = &serviceMap{
serviceMap: make(map[string]map[string]*Service),
......@@ -80,7 +79,7 @@ var (
// info of method
//////////////////////////
// MethodType ...
// MethodType is description of service method.
type MethodType struct {
method reflect.Method
ctxType reflect.Type // request context
......@@ -88,27 +87,27 @@ type MethodType struct {
replyType reflect.Type // return value, otherwise it is nil
}
// Method ...
// Method get @m.method.
func (m *MethodType) Method() reflect.Method {
return m.method
}
// CtxType ...
// CtxType get @m.ctxType.
func (m *MethodType) CtxType() reflect.Type {
return m.ctxType
}
// ArgsType ...
// ArgsType get @m.argsType.
func (m *MethodType) ArgsType() []reflect.Type {
return m.argsType
}
// ReplyType ...
// ReplyType get @m.replyType.
func (m *MethodType) ReplyType() reflect.Type {
return m.replyType
}
// SuiteContext ...
// SuiteContext tranfer @ctx to reflect.Value type or get it from @m.ctxType.
func (m *MethodType) SuiteContext(ctx context.Context) reflect.Value {
if contextv := reflect.ValueOf(ctx); contextv.IsValid() {
return contextv
......@@ -120,7 +119,7 @@ func (m *MethodType) SuiteContext(ctx context.Context) reflect.Value {
// info of service interface
//////////////////////////
// Service ...
// Service is description of service
type Service struct {
name string
rcvr reflect.Value
......@@ -128,7 +127,7 @@ type Service struct {
methods map[string]*MethodType
}
// Method ...
// Method get @s.methods.
func (s *Service) Method() map[string]*MethodType {
return s.methods
}
......@@ -138,12 +137,12 @@ func (s *Service) Name() string {
return s.name
}
// RcvrType ...
// RcvrType get @s.rcvrType.
func (s *Service) RcvrType() reflect.Type {
return s.rcvrType
}
// Rcvr ...
// Rcvr get @s.rcvr.
func (s *Service) Rcvr() reflect.Value {
return s.rcvr
}
......
......@@ -195,7 +195,6 @@ func NewURLWithOptions(opts ...option) *URL {
// NewURL will create a new url
// the urlString should not be empty
func NewURL(urlString string, opts ...option) (URL, error) {
var (
err error
rawUrlString string
......@@ -249,7 +248,7 @@ func NewURL(urlString string, opts ...option) (URL, error) {
return s, nil
}
// URLEqual ...
// URLEqual judge @url and @c is equal or not.
func (c URL) URLEqual(url URL) bool {
c.Ip = ""
c.Port = ""
......@@ -265,17 +264,19 @@ func (c URL) URLEqual(url URL) bool {
} else if urlGroup == constant.ANY_VALUE {
urlKey = strings.Replace(urlKey, "group=*", "group="+cGroup, 1)
}
// 1. protocol, username, password, ip, port, service name, group, version should be equal
if cKey != urlKey {
return false
}
// 2. if url contains enabled key, should be true, or *
if url.GetParam(constant.ENABLED_KEY, "true") != "true" && url.GetParam(constant.ENABLED_KEY, "") != constant.ANY_VALUE {
return false
}
//TODO :may need add interface key any value condition
if !isMatchCategory(url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), c.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY)) {
return false
}
return true
return isMatchCategory(url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), c.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY))
}
func isMatchCategory(category1 string, category2 string) bool {
......@@ -313,10 +314,9 @@ func (c URL) Key() string {
"%s://%s:%s@%s:%s/?interface=%s&group=%s&version=%s",
c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Service(), c.GetParam(constant.GROUP_KEY, ""), c.GetParam(constant.VERSION_KEY, ""))
return buildString
//return c.ServiceKey()
}
// ServiceKey ...
// ServiceKey get a unique key of a service.
func (c URL) ServiceKey() string {
intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/"))
if intf == "" {
......@@ -409,12 +409,12 @@ func (c *URL) RangeParams(f func(key, value string) bool) {
// GetParam ...
func (c URL) GetParam(s string, d string) string {
var r string
c.paramsLock.RLock()
if r = c.params.Get(s); len(r) == 0 {
defer c.paramsLock.RUnlock()
r := c.params.Get(s)
if len(r) == 0 {
r = d
}
c.paramsLock.RUnlock()
return r
}
......@@ -454,10 +454,8 @@ func (c URL) GetRawParam(key string) string {
// GetParamBool ...
func (c URL) GetParamBool(s string, d bool) bool {
var r bool
var err error
if r, err = strconv.ParseBool(c.GetParam(s, "")); err != nil {
r, err := strconv.ParseBool(c.GetParam(s, ""))
if err != nil {
return d
}
return r
......@@ -465,10 +463,8 @@ func (c URL) GetParamBool(s string, d bool) bool {
// GetParamInt ...
func (c URL) GetParamInt(s string, d int64) int64 {
var r int
var err error
if r, err = strconv.Atoi(c.GetParam(s, "")); r == 0 || err != nil {
r, err := strconv.Atoi(c.GetParam(s, ""))
if r == 0 || err != nil {
return d
}
return int64(r)
......@@ -476,11 +472,8 @@ func (c URL) GetParamInt(s string, d int64) int64 {
// GetMethodParamInt ...
func (c URL) GetMethodParamInt(method string, key string, d int64) int64 {
var r int
var err error
c.paramsLock.RLock()
defer c.paramsLock.RUnlock()
if r, err = strconv.Atoi(c.GetParam("methods."+method+"."+key, "")); r == 0 || err != nil {
r, err := strconv.Atoi(c.GetParam("methods."+method+"."+key, ""))
if r == 0 || err != nil {
return d
}
return int64(r)
......@@ -492,14 +485,13 @@ func (c URL) GetMethodParamInt64(method string, key string, d int64) int64 {
if r == math.MinInt64 {
return c.GetParamInt(key, d)
}
return r
}
// GetMethodParam ...
func (c URL) GetMethodParam(method string, key string, d string) string {
var r string
if r = c.GetParam("methods."+method+"."+key, ""); r == "" {
r := c.GetParam("methods."+method+"."+key, "")
if r == "" {
r = d
}
return r
......@@ -530,7 +522,6 @@ func (c *URL) SetParams(m url.Values) {
// ToMap transfer URL to Map
func (c URL) ToMap() map[string]string {
paramsMap := make(map[string]string)
c.RangeParams(func(key, value string) bool {
......@@ -615,8 +606,30 @@ func (c *URL) Clone() *URL {
return newUrl
}
// Copy url based on the reserved parameters' keys.
func (c *URL) CloneWithParams(reserveParams []string) *URL {
params := url.Values{}
for _, reserveParam := range reserveParams {
v := c.GetParam(reserveParam, "")
if len(v) != 0 {
params.Set(reserveParam, v)
}
}
return NewURLWithOptions(
WithProtocol(c.Protocol),
WithUsername(c.Username),
WithPassword(c.Password),
WithIp(c.Ip),
WithPort(c.Port),
WithPath(c.Path),
WithMethods(c.Methods),
WithParams(params),
)
}
func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []func(method string) {
var methodConfigMergeFcn = []func(method string){}
methodConfigMergeFcn := make([]func(method string), 0, len(paramKeys))
for _, paramKey := range paramKeys {
if v := referenceUrl.GetParam(paramKey, ""); len(v) > 0 {
mergedUrl.SetParam(paramKey, v)
......
......@@ -82,140 +82,145 @@ func checkApplicationName(config *ApplicationConfig) {
}
}
// Load Dubbo Init
func Load() {
// init router
if confRouterFile != "" {
if errPro := RouterInit(confRouterFile); errPro != nil {
log.Printf("[routerConfig init] %#v", errPro)
}
}
var eventDispatcherType string
if consumerConfig != nil {
eventDispatcherType = consumerConfig.eventDispatcherType
}
// notice consumerConfig.eventDispatcherType will be replaced
if providerConfig != nil {
eventDispatcherType = providerConfig.eventDispatcherType
}
// init EventDispatcher should before everything
extension.SetAndInitGlobalDispatcher(eventDispatcherType)
// reference config
func loadConsumerConfig() {
if consumerConfig == nil {
logger.Warnf("consumerConfig is nil!")
} else {
// init other consumer config
conConfigType := consumerConfig.ConfigType
for key, value := range extension.GetDefaultConfigReader() {
if conConfigType == nil {
if v, ok := conConfigType[key]; ok {
value = v
}
}
if err := extension.GetConfigReaders(value).ReadConsumerConfig(consumerConfig.fileStream); err != nil {
logger.Errorf("ReadConsumerConfig error: %#v for %s", perrors.WithStack(err), value)
return
}
// init other consumer config
conConfigType := consumerConfig.ConfigType
for key, value := range extension.GetDefaultConfigReader() {
if conConfigType == nil {
if v, ok := conConfigType[key]; ok {
value = v
}
}
if err := extension.GetConfigReaders(value).ReadConsumerConfig(consumerConfig.fileStream); err != nil {
logger.Errorf("ReadConsumerConfig error: %#v for %s", perrors.WithStack(err), value)
}
}
metricConfig = consumerConfig.MetricConfig
applicationConfig = consumerConfig.ApplicationConfig
metricConfig = consumerConfig.MetricConfig
applicationConfig = consumerConfig.ApplicationConfig
extension.SetAndInitGlobalDispatcher(consumerConfig.eventDispatcherType)
checkApplicationName(consumerConfig.ApplicationConfig)
if err := configCenterRefreshConsumer(); err != nil {
logger.Errorf("[consumer config center refresh] %#v", err)
checkApplicationName(consumerConfig.ApplicationConfig)
if err := configCenterRefreshConsumer(); err != nil {
logger.Errorf("[consumer config center refresh] %#v", err)
}
checkRegistries(consumerConfig.Registries, consumerConfig.Registry)
for key, ref := range consumerConfig.References {
if ref.Generic {
genericService := NewGenericService(key)
SetConsumerService(genericService)
}
checkRegistries(consumerConfig.Registries, consumerConfig.Registry)
for key, ref := range consumerConfig.References {
if ref.Generic {
genericService := NewGenericService(key)
SetConsumerService(genericService)
}
rpcService := GetConsumerService(key)
if rpcService == nil {
logger.Warnf("%s does not exist!", key)
continue
}
ref.id = key
ref.Refer(rpcService)
ref.Implement(rpcService)
rpcService := GetConsumerService(key)
if rpcService == nil {
logger.Warnf("%s does not exist!", key)
continue
}
ref.id = key
ref.Refer(rpcService)
ref.Implement(rpcService)
}
//wait for invoker is available, if wait over default 3s, then panic
var count int
checkok := true
for {
for _, refconfig := range consumerConfig.References {
if (refconfig.Check != nil && *refconfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check == nil) { //default to true
if refconfig.invoker != nil &&
!refconfig.invoker.IsAvailable() {
checkok = false
count++
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
panic(errMsg)
}
time.Sleep(time.Second * 1)
break
}
if refconfig.invoker == nil {
logger.Warnf("The interface %s invoker not exist , may you should check your interface config.", refconfig.InterfaceName)
//wait for invoker is available, if wait over default 3s, then panic
var count int
checkok := true
for {
for _, refconfig := range consumerConfig.References {
if (refconfig.Check != nil && *refconfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check == nil) { //default to true
if refconfig.invoker != nil &&
!refconfig.invoker.IsAvailable() {
checkok = false
count++
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
panic(errMsg)
}
time.Sleep(time.Second * 1)
break
}
if refconfig.invoker == nil {
logger.Warnf("The interface %s invoker not exist , may you should check your interface config.", refconfig.InterfaceName)
}
}
if checkok {
break
}
checkok = true
}
if checkok {
break
}
checkok = true
}
}
// service config
func loadProviderConfig() {
if providerConfig == nil {
logger.Warnf("providerConfig is nil!")
} else {
// init other provider config
proConfigType := providerConfig.ConfigType
for key, value := range extension.GetDefaultConfigReader() {
if proConfigType != nil {
if v, ok := proConfigType[key]; ok {
value = v
}
}
if err := extension.GetConfigReaders(value).ReadProviderConfig(providerConfig.fileStream); err != nil {
logger.Errorf("ReadProviderConfig error: %#v for %s", perrors.WithStack(err), value)
return
}
// init other provider config
proConfigType := providerConfig.ConfigType
for key, value := range extension.GetDefaultConfigReader() {
if proConfigType != nil {
if v, ok := proConfigType[key]; ok {
value = v
}
}
if err := extension.GetConfigReaders(value).ReadProviderConfig(providerConfig.fileStream); err != nil {
logger.Errorf("ReadProviderConfig error: %#v for %s", perrors.WithStack(err), value)
}
}
// so, you should know that the consumer's config will be override
metricConfig = providerConfig.MetricConfig
applicationConfig = providerConfig.ApplicationConfig
// so, you should know that the consumer's config will be override
metricConfig = providerConfig.MetricConfig
applicationConfig = providerConfig.ApplicationConfig
extension.SetAndInitGlobalDispatcher(providerConfig.eventDispatcherType)
checkApplicationName(providerConfig.ApplicationConfig)
if err := configCenterRefreshProvider(); err != nil {
logger.Errorf("[provider config center refresh] %#v", err)
checkApplicationName(providerConfig.ApplicationConfig)
if err := configCenterRefreshProvider(); err != nil {
logger.Errorf("[provider config center refresh] %#v", err)
}
checkRegistries(providerConfig.Registries, providerConfig.Registry)
for key, svs := range providerConfig.Services {
rpcService := GetProviderService(key)
if rpcService == nil {
logger.Warnf("%s does not exist!", key)
continue
}
checkRegistries(providerConfig.Registries, providerConfig.Registry)
for key, svs := range providerConfig.Services {
rpcService := GetProviderService(key)
if rpcService == nil {
logger.Warnf("%s does not exist!", key)
continue
}
svs.id = key
svs.Implement(rpcService)
svs.Protocols = providerConfig.Protocols
if err := svs.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! err: %#v", key, err))
}
svs.id = key
svs.Implement(rpcService)
svs.Protocols = providerConfig.Protocols
if err := svs.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! err: %#v", key, err))
}
}
}
func initRouter() {
if confRouterFile != "" {
if err := RouterInit(confRouterFile); err != nil {
log.Printf("[routerConfig init] %#v", err)
}
}
}
// Load Dubbo Init
func Load() {
// init router
initRouter()
// reference config
loadConsumerConfig()
// service config
loadProviderConfig()
// init the shutdown callback
GracefulShutdownInit()
}
......
......@@ -36,14 +36,15 @@ import (
// RegistryConfig ...
type RegistryConfig struct {
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
//I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig
// I changed "type" to "protocol" ,the same as "protocol" field in java class RegistryConfig
TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second
Group string `yaml:"group" json:"group,omitempty" property:"group"`
//for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
// for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
}
// UnmarshalYAML ...
......@@ -70,9 +71,11 @@ func loadRegistries(targetRegistries string, registries map[string]*RegistryConf
for k, registryConf := range registries {
target := false
// if user not config targetRegistries,default load all
// Notice:in func "func Split(s, sep string) []string" comment : if s does not contain sep and sep is not empty, SplitAfter returns a slice of length 1 whose only element is s.
// So we have to add the condition when targetRegistries string is not set (it will be "" when not set)
// if user not config targetRegistries, default load all
// Notice: in func "func Split(s, sep string) []string" comment:
// if s does not contain sep and sep is not empty, SplitAfter returns
// a slice of length 1 whose only element is s. So we have to add the
// condition when targetRegistries string is not set (it will be "" when not set)
if len(trSlice) == 0 || (len(trSlice) == 1 && trSlice[0] == "") {
target = true
} else {
......@@ -86,29 +89,24 @@ func loadRegistries(targetRegistries string, registries map[string]*RegistryConf
}
if target {
var (
url common.URL
err error
)
addresses := strings.Split(registryConf.Address, ",")
address := addresses[0]
address = translateRegistryConf(address, registryConf)
url, err = common.NewURL(constant.REGISTRY_PROTOCOL+"://"+address,
url, err := common.NewURL(constant.REGISTRY_PROTOCOL+"://"+address,
common.WithParams(registryConf.getUrlMap(roleType)),
common.WithParamsValue("simplified", strconv.FormatBool(registryConf.Simplified)),
common.WithUsername(registryConf.Username),
common.WithPassword(registryConf.Password),
common.WithLocation(registryConf.Address),
)
if err != nil {
logger.Errorf("The registry id:%s url is invalid , error: %#v", k, err)
logger.Errorf("The registry id: %s url is invalid, error: %#v", k, err)
panic(err)
} else {
urls = append(urls, &url)
}
}
}
return urls
......@@ -123,7 +121,6 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values {
for k, v := range c.Params {
urlMap.Set(k, v)
}
return urlMap
}
......@@ -131,7 +128,7 @@ func translateRegistryConf(address string, registryConf *RegistryConfig) string
if strings.Contains(address, "://") {
translatedUrl, err := url.Parse(address)
if err != nil {
logger.Errorf("The registry url is invalid , error: %#v", err)
logger.Errorf("The registry url is invalid, error: %#v", err)
panic(err)
}
address = translatedUrl.Host
......
......@@ -18,6 +18,7 @@
package config
import (
"container/list"
"context"
"fmt"
"net/url"
......@@ -29,6 +30,7 @@ import (
import (
"github.com/creasty/defaults"
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
......@@ -121,6 +123,24 @@ func (c *ServiceConfig) IsExport() bool {
return c.exported.Load()
}
// Get Random Port
func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
ports := list.New()
for _, proto := range protocolConfigs {
if len(proto.Port) > 0 {
continue
}
tcp, err := gxnet.ListenOnTCPRandomPort(proto.Ip)
if err != nil {
panic(perrors.New(fmt.Sprintf("Get tcp port error,err is {%v}", err)))
}
defer tcp.Close()
ports.PushBack(strings.Split(tcp.Addr().String(), ":")[1])
}
return ports
}
// Export ...
func (c *ServiceConfig) Export() error {
// TODO: config center start here
......@@ -143,6 +163,9 @@ func (c *ServiceConfig) Export() error {
logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs ", c.InterfaceName, c.Protocol)
return nil
}
ports := getRandomPort(protocolConfigs)
nextPort := ports.Front()
for _, proto := range protocolConfigs {
// registry the service reflect
methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.rpcService)
......@@ -151,11 +174,18 @@ func (c *ServiceConfig) Export() error {
logger.Errorf(err.Error())
return err
}
port := proto.Port
if len(proto.Port) == 0 {
port = nextPort.Value.(string)
nextPort = nextPort.Next()
}
ivkURL := common.NewURLWithOptions(
common.WithPath(c.id),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(proto.Port),
common.WithPort(port),
common.WithParams(urlMap),
common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
common.WithMethods(strings.Split(methods, ",")),
......
......@@ -22,6 +22,8 @@ import (
)
import (
gxnet "github.com/dubbogo/gost/net"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)
......@@ -145,3 +147,35 @@ func Test_Export(t *testing.T) {
}
providerConfig = nil
}
func Test_getRandomPort(t *testing.T) {
protocolConfigs := make([]*ProtocolConfig, 0, 3)
ip, err := gxnet.GetLocalIP()
protocolConfigs = append(protocolConfigs, &ProtocolConfig{
Ip: ip,
})
protocolConfigs = append(protocolConfigs, &ProtocolConfig{
Ip: ip,
})
protocolConfigs = append(protocolConfigs, &ProtocolConfig{
Ip: ip,
})
assert.NoError(t, err)
ports := getRandomPort(protocolConfigs)
assert.Equal(t, ports.Len(), len(protocolConfigs))
front := ports.Front()
for {
if front == nil {
break
}
t.Logf("port:%v", front.Value)
front = front.Next()
}
protocolConfigs = make([]*ProtocolConfig, 0, 3)
ports = getRandomPort(protocolConfigs)
assert.Equal(t, ports.Len(), len(protocolConfigs))
}
......@@ -71,14 +71,18 @@ func init() {
*
* the value of "accesslog" can be "true" or "default" too.
* If the value is one of them, the access log will be record in log file which defined in log.yml
* AccessLogFilter is designed to be singleton
*/
type AccessLogFilter struct {
logChan chan AccessLogData
}
// Invoke ...
// Invoke will check whether user wants to use this filter.
// If we find the value of key constant.ACCESS_LOG_KEY, we will log the invocation info
func (ef *AccessLogFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
accessLog := invoker.GetUrl().GetParam(constant.ACCESS_LOG_KEY, "")
// the user do not
if len(accessLog) > 0 {
accessLogData := AccessLogData{data: ef.buildAccessLogData(invoker, invocation), accessLog: accessLog}
ef.logIntoChannel(accessLogData)
......@@ -86,7 +90,7 @@ func (ef *AccessLogFilter) Invoke(ctx context.Context, invoker protocol.Invoker,
return invoker.Invoke(ctx, invocation)
}
// it won't block the invocation
// logIntoChannel won't block the invocation
func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) {
select {
case ef.logChan <- accessLogData:
......@@ -97,6 +101,7 @@ func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) {
}
}
// buildAccessLogData builds the access log data
func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation protocol.Invocation) map[string]string {
dataMap := make(map[string]string, 16)
attachments := invocation.Attachments()
......@@ -130,11 +135,12 @@ func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation pro
return dataMap
}
// OnResponse ...
// OnResponse do nothing
func (ef *AccessLogFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
return result
}
// writeLogToFile actually write the logs into file
func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) {
accessLog := data.accessLog
if isDefault(accessLog) {
......@@ -156,6 +162,12 @@ func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) {
}
}
// openLogFile will open the log file with append mode.
// If the file is not found, it will create the file.
// Actually, the accessLog is the filename
// You may find out that, once we want to write access log into log file,
// we open the file again and again.
// It needs to be optimized.
func (ef *AccessLogFilter) openLogFile(accessLog string) (*os.File, error) {
logFile, err := os.OpenFile(accessLog, os.O_CREATE|os.O_APPEND|os.O_RDWR, LogFileMode)
if err != nil {
......@@ -169,6 +181,12 @@ func (ef *AccessLogFilter) openLogFile(accessLog string) (*os.File, error) {
return nil, err
}
last := fileInfo.ModTime().Format(FileDateFormat)
// this is confused.
// for example, if the last = '2020-03-04'
// and today is '2020-03-05'
// we will create one new file to log access data
// By this way, we can split the access log based on days.
if now != last {
err = os.Rename(fileInfo.Name(), fileInfo.Name()+"."+now)
if err != nil {
......@@ -180,11 +198,12 @@ func (ef *AccessLogFilter) openLogFile(accessLog string) (*os.File, error) {
return logFile, err
}
// isDefault check whether accessLog == true or accessLog == default
func isDefault(accessLog string) bool {
return strings.EqualFold("true", accessLog) || strings.EqualFold("default", accessLog)
}
// GetAccessLogFilter ...
// GetAccessLogFilter return the instance of AccessLogFilter
func GetAccessLogFilter() filter.Filter {
accessLogFilter := &AccessLogFilter{logChan: make(chan AccessLogData, LogMaxBuffer)}
go func() {
......@@ -195,12 +214,13 @@ func GetAccessLogFilter() filter.Filter {
return accessLogFilter
}
// AccessLogData ...
// AccessLogData defines the data that will be log into file
type AccessLogData struct {
accessLog string
data map[string]string
}
// toLogMessage convert the AccessLogData to String
func (ef *AccessLogData) toLogMessage() string {
builder := strings.Builder{}
builder.WriteString("[")
......
......@@ -115,7 +115,12 @@ type MethodServiceTpsLimiterImpl struct {
tpsState *concurrent.Map
}
// IsAllowable ...
// IsAllowable based on method-level and service-level.
// The method-level has high priority which means that if there is any rate limit configuration for the method,
// the service-level rate limit strategy will be ignored.
// The key point is how to keep thread-safe
// This implementation use concurrent map + loadOrStore to make implementation thread-safe
// You can image that even multiple threads create limiter, but only one could store the limiter into tpsState
func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocation protocol.Invocation) bool {
methodConfigPrefix := "methods." + invocation.MethodName() + "."
......@@ -123,23 +128,30 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio
methodLimitRateConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "")
methodIntervalConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_INTERVAL_KEY, "")
// service-level tps limit
limitTarget := url.ServiceKey()
// method-level tps limit
if len(methodIntervalConfig) > 0 || len(methodLimitRateConfig) > 0 {
// it means that if the method-level rate limit exist, we will use method-level rate limit strategy
limitTarget = limitTarget + "#" + invocation.MethodName()
}
// looking up the limiter from 'cache'
limitState, found := limiter.tpsState.Load(limitTarget)
if found {
// the limiter has been cached, we return its result
return limitState.(filter.TpsLimitStrategy).IsAllowable()
}
// we could not find the limiter, and try to create one.
limitRate := getLimitConfig(methodLimitRateConfig, url, invocation,
constant.TPS_LIMIT_RATE_KEY,
constant.DEFAULT_TPS_LIMIT_RATE)
if limitRate < 0 {
// the limitTarget is not necessary to be limited.
return true
}
......@@ -150,13 +162,20 @@ func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url common.URL, invocatio
panic(fmt.Sprintf("The interval must be positive, please check your configuration! url: %s", url.String()))
}
// find the strategy config and then create one
limitStrategyConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_STRATEGY_KEY,
url.GetParam(constant.TPS_LIMIT_STRATEGY_KEY, constant.DEFAULT_KEY))
limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig)
// we using loadOrStore to ensure thread-safe
limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator.Create(int(limitRate), int(limitInterval)))
return limitState.(filter.TpsLimitStrategy).IsAllowable()
}
// getLimitConfig will try to fetch the configuration from url.
// If we can convert the methodLevelConfig to int64, return;
// Or, we will try to look up server-level configuration and then convert it to int64
func getLimitConfig(methodLevelConfig string,
url common.URL,
invocation protocol.Invocation,
......@@ -172,6 +191,8 @@ func getLimitConfig(methodLevelConfig string,
return result
}
// actually there is no method-level configuration, so we use the service-level configuration
result, err := strconv.ParseInt(url.GetParam(configKey, defaultVal), 0, 0)
if err != nil {
......@@ -183,7 +204,7 @@ func getLimitConfig(methodLevelConfig string,
var methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl
var methodServiceTpsLimiterOnce sync.Once
// GetMethodServiceTpsLimiter ...
// GetMethodServiceTpsLimiter will return an MethodServiceTpsLimiterImpl instance.
func GetMethodServiceTpsLimiter() filter.TpsLimiter {
methodServiceTpsLimiterOnce.Do(func() {
methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{
......
......@@ -36,6 +36,7 @@ const (
)
func init() {
// this implementation is the the default implementation of RejectedExecutionHandler
extension.SetRejectedExecutionHandler(HandlerName, GetOnlyLogRejectedExecutionHandler)
extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, GetOnlyLogRejectedExecutionHandler)
}
......@@ -56,11 +57,12 @@ var onlyLogHandlerOnce sync.Once
* tps.limit.rejected.handler: "default" or "log"
* methods:
* - name: "GetUser"
* OnlyLogRejectedExecutionHandler is designed to be singleton
*/
type OnlyLogRejectedExecutionHandler struct {
}
// RejectedExecution ...
// RejectedExecution will do nothing, it only log the invocation.
func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL,
_ protocol.Invocation) protocol.Result {
......@@ -68,7 +70,7 @@ func (handler *OnlyLogRejectedExecutionHandler) RejectedExecution(url common.URL
return &protocol.RPCResult{}
}
// GetOnlyLogRejectedExecutionHandler ...
// GetOnlyLogRejectedExecutionHandler will return the instance of OnlyLogRejectedExecutionHandler
func GetOnlyLogRejectedExecutionHandler() filter.RejectedExecutionHandler {
onlyLogHandlerOnce.Do(func() {
onlyLogHandlerInstance = &OnlyLogRejectedExecutionHandler{}
......
......@@ -31,5 +31,7 @@ import (
* In such situation, implement this interface and register it by invoking extension.SetRejectedExecutionHandler.
*/
type RejectedExecutionHandler interface {
// RejectedExecution will be called if the invocation was rejected by some component.
RejectedExecution(url common.URL, invocation protocol.Invocation) protocol.Result
}
......@@ -33,10 +33,16 @@ package filter
* tps.limit.strategy: "name of implementation" # method-level
*/
type TpsLimitStrategy interface {
// IsAllowable will return true if this invocation is not over limitation
IsAllowable() bool
}
// TpsLimitStrategyCreator ...
// TpsLimitStrategyCreator, the creator abstraction for TpsLimitStrategy
type TpsLimitStrategyCreator interface {
Create(rate int, interval int) TpsLimitStrategy
// Create will create an instance of TpsLimitStrategy
// It will be a little hard to understand this method.
// The unit of interval is ms
// for example, if the limit = 100, interval = 1000
// which means that the tps limitation is 100 times per 1000ms (100/1000ms)
Create(limit int, interval int) TpsLimitStrategy
}
......@@ -34,5 +34,6 @@ import (
* tps.limiter: "the name of limiter",
*/
type TpsLimiter interface {
// IsAllowable will check whether this invocation should be enabled for further process
IsAllowable(common.URL, protocol.Invocation) bool
}
module github.com/apache/dubbo-go
require (
github.com/Workiva/go-datastructures v1.0.52
github.com/Workiva/go-datastructures v1.0.50
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/apache/dubbo-go-hessian2 v1.4.0
github.com/apache/dubbo-go-hessian2 v1.5.0
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/creasty/defaults v1.3.0
github.com/dubbogo/getty v1.3.3
github.com/dubbogo/getty v1.3.5
github.com/dubbogo/go-zookeeper v1.0.0
github.com/dubbogo/gost v1.8.0
github.com/dubbogo/gost v1.9.0
github.com/emicklei/go-restful/v3 v3.0.0
github.com/go-resty/resty/v2 v2.1.0
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
......@@ -26,6 +26,8 @@ require (
github.com/hashicorp/consul v1.5.3
github.com/hashicorp/consul/api v1.1.0
github.com/jinzhu/copier v0.0.0-20190625015134-976e0346caa8
github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 // indirect
github.com/juju/testing v0.0.0-20191001232224-ce9dec17d28b // indirect
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
......
......@@ -27,8 +27,8 @@ github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc h1:LkkwnbY+S8Wmw
github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc/go.mod h1:ARgCUhI1MHQH+ONky/PAtmVHQrP5JlGY0F3poXOp/fA=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI=
github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo=
github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af h1:DBNMBMuMiWYu0b+8KMJuWmfCkcxl09JwdlqwDZZ6U14=
github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af/go.mod h1:5Jv4cbFiHJMsVxt52+i0Ha45fjshj6wxYr1r19tB9bw=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vajsk51NtYIcwSTkXr+JGrMd36kTDJw=
......@@ -40,8 +40,8 @@ github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk=
github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/apache/dubbo-go-hessian2 v1.4.0 h1:Cb9FQVTy3G93dnDr7P93U8DeKFYpDTJjQp44JG5TafA=
github.com/apache/dubbo-go-hessian2 v1.4.0/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8=
github.com/apache/dubbo-go-hessian2 v1.5.0 h1:fzulDG5G7nX0ccgKdiN9XipJ7tZ4WXKgmk4stdlDS6s=
github.com/apache/dubbo-go-hessian2 v1.5.0/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
......@@ -108,14 +108,13 @@ github.com/docker/go-connections v0.3.0 h1:3lOnM9cSzgGwx8VfK/NGOW5fLQ0GjIlCkaktF
github.com/docker/go-connections v0.3.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk=
github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dubbogo/getty v1.3.3 h1:8m4zZBqFHO+NmhH7rMPlFuuYRVjcPD7cUhumevqMZZs=
github.com/dubbogo/getty v1.3.3/go.mod h1:U92BDyJ6sW9Jpohr2Vlz8w2uUbIbNZ3d+6rJvFTSPp0=
github.com/dubbogo/getty v1.3.5 h1:xJxdDj9jm7wlrRSsVZSk2TDNxJbbac5GpxV0QpjO+Tw=
github.com/dubbogo/getty v1.3.5/go.mod h1:T55vN8Q6tZjf2AQZiGmkujneD3LfqYbv2b3QjacwYOY=
github.com/dubbogo/go-zookeeper v1.0.0 h1:RsYdlGwhDW+iKXM3eIIcvt34P2swLdmQfuIJxsHlGoM=
github.com/dubbogo/go-zookeeper v1.0.0/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.5.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.8.0 h1:9ACbQe5OwMjqtinQcNJC5xp16kky27OsfSGw5L9A6vw=
github.com/dubbogo/gost v1.8.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.0 h1:UT+dWwvLyJiDotxJERO75jB3Yxgsdy10KztR5ycxRAk=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74 h1:2MIhn2R6oXQbgW5yHfS+d6YqyMfXiu2L55rFZC4UD/M=
github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74/go.mod h1:UqXY1lYT/ERa4OEAywUqdok1T4RCRdArkhic1Opuavo=
github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y=
......@@ -325,6 +324,12 @@ github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9 h1:hJix6idebFclqlfZCHE7EUX7uqLCyb70nHNHH1XKGBg=
github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q=
github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 h1:UUHMLvzt/31azWTN/ifGWef4WUqvXk0iRqdhdy/2uzI=
github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U=
github.com/juju/testing v0.0.0-20191001232224-ce9dec17d28b h1:Rrp0ByJXEjhREMPGTt3aWYjoIsUGCbt21ekbeJcTWv0=
github.com/juju/testing v0.0.0-20191001232224-ce9dec17d28b/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/keybase/go-crypto v0.0.0-20180614160407-5114a9a81e1b h1:VE6r2OwP5gj+Z9aCkSKl3MlmnZbfMAjhvR5T7abKHEo=
github.com/keybase/go-crypto v0.0.0-20180614160407-5114a9a81e1b/go.mod h1:ghbZscTyKdM07+Fw3KSi0hcJm+AlEUWj8QLlPtijN/M=
......@@ -386,8 +391,6 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c h1:WoCa3AvgQMVKNs+RIFlWPRgY9QVJwUxJDrGxHs0fcRo=
github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo=
github.com/nacos-group/nacos-sdk-go v0.3.0 h1:2v2QmihtyX6ZUXAN+ya+5h2pedn7R5M+WJwSJPFsuMY=
github.com/nacos-group/nacos-sdk-go v0.3.0/go.mod h1:ESKb6yF0gxSc8GuS+0jaMBe+n8rJ5/k4ya6LyFG2xi8=
github.com/nacos-group/nacos-sdk-go v0.3.1 h1:MI7bNDAN5m9UFcRRUTSPfJi4dCQo+TYG85qVB1rCHeg=
github.com/nacos-group/nacos-sdk-go v0.3.1/go.mod h1:ESKb6yF0gxSc8GuS+0jaMBe+n8rJ5/k4ya6LyFG2xi8=
github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s=
......
......@@ -88,7 +88,7 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
// SetClientConf ...
// SetClientConf set dubbo client config.
func SetClientConf(c ClientConfig) {
clientConf = &c
err := clientConf.CheckValidity()
......@@ -99,7 +99,7 @@ func SetClientConf(c ClientConfig) {
setClientGrpool()
}
// GetClientConf ...
// GetClientConf get dubbo client config.
func GetClientConf() ClientConfig {
return *clientConf
}
......@@ -129,7 +129,7 @@ type AsyncCallbackResponse struct {
Reply interface{}
}
// Client ...
// Client is dubbo protocol client.
type Client struct {
opts Options
conf ClientConfig
......@@ -139,7 +139,7 @@ type Client struct {
pendingResponses *sync.Map
}
// NewClient ...
// NewClient create a new Client.
func NewClient(opt Options) *Client {
switch {
......@@ -167,7 +167,7 @@ func NewClient(opt Options) *Client {
return c
}
// Request ...
// Request is dubbo protocol request.
type Request struct {
addr string
svcUrl common.URL
......@@ -176,7 +176,7 @@ type Request struct {
atta map[string]string
}
// NewRequest ...
// NewRequest create a new Request.
func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request {
return &Request{
addr: addr,
......@@ -187,13 +187,13 @@ func NewRequest(addr string, svcUrl common.URL, method string, args interface{},
}
}
// Response ...
// Response is dubbo protocol response.
type Response struct {
reply interface{}
atta map[string]string
}
// NewResponse ...
// NewResponse create a new Response.
func NewResponse(reply interface{}, atta map[string]string) *Response {
return &Response{
reply: reply,
......@@ -201,15 +201,14 @@ func NewResponse(reply interface{}, atta map[string]string) *Response {
}
}
// CallOneway call one way
// CallOneway call by one way
func (c *Client) CallOneway(request *Request) error {
return perrors.WithStack(c.call(CT_OneWay, request, NewResponse(nil, nil), nil))
}
// Call if @response is nil, the transport layer will get the response without notify the invoker.
// Call call remoting by two way or one way, if @response.reply is nil, the way of call is one way.
func (c *Client) Call(request *Request, response *Response) error {
ct := CT_TwoWay
if response.reply == nil {
ct = CT_OneWay
......@@ -218,14 +217,12 @@ func (c *Client) Call(request *Request, response *Response) error {
return perrors.WithStack(c.call(ct, request, response, nil))
}
// AsyncCall ...
// AsyncCall call remoting by async with callback.
func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, response *Response) error {
return perrors.WithStack(c.call(CT_TwoWay, request, response, callback))
}
func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error {
p := &DubboPackage{}
p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
p.Service.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "")
......@@ -293,7 +290,7 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
return perrors.WithStack(err)
}
// Close ...
// Close close the client pool.
func (c *Client) Close() {
if c.pool != nil {
c.pool.close()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment