Skip to content
Snippets Groups Projects
service_config.go 12.9 KiB
Newer Older
AlexStocks's avatar
AlexStocks committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
fangyincheng's avatar
fangyincheng committed

vito.he's avatar
vito.he committed
package config
邹毅贤's avatar
邹毅贤 committed
	"container/list"
vito.he's avatar
vito.he committed
	"fmt"
	"net/url"
	"strconv"
	"strings"
vito.he's avatar
vito.he committed
	"sync"
	"github.com/creasty/defaults"
邹毅贤's avatar
邹毅贤 committed
	gxnet "github.com/dubbogo/gost/net"
fangyincheng's avatar
fangyincheng committed
	perrors "github.com/pkg/errors"
	"go.uber.org/atomic"
)
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/common/logger"
	"github.com/apache/dubbo-go/protocol"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/protocol/protocolwrapper"
vito.he's avatar
vito.he committed

// ServiceConfig is the configuration of the service provider
vito.he's avatar
vito.he committed
type ServiceConfig struct {
	context                     context.Context
	id                          string
	Filter                      string            `yaml:"filter" json:"filter,omitempty" property:"filter"`
	Protocol                    string            `default:"dubbo"  required:"true"  yaml:"protocol"  json:"protocol,omitempty" property:"protocol"` // multi protocol support, split by ','
	InterfaceName               string            `required:"true"  yaml:"interface"  json:"interface,omitempty" property:"interface"`
	Registry                    string            `yaml:"registry"  json:"registry,omitempty"  property:"registry"`
	Cluster                     string            `default:"failover" yaml:"cluster"  json:"cluster,omitempty" property:"cluster"`
	Loadbalance                 string            `default:"random" yaml:"loadbalance"  json:"loadbalance,omitempty"  property:"loadbalance"`
	Group                       string            `yaml:"group"  json:"group,omitempty" property:"group"`
	Version                     string            `yaml:"version"  json:"version,omitempty" property:"version" `
	Methods                     []*MethodConfig   `yaml:"methods"  json:"methods,omitempty" property:"methods"`
	Warmup                      string            `yaml:"warmup"  json:"warmup,omitempty"  property:"warmup"`
	Retries                     string            `yaml:"retries"  json:"retries,omitempty" property:"retries"`
fangyincheng's avatar
fangyincheng committed
	Serialization               string            `yaml:"serialization" json:"serialization" property:"serialization"`
	Params                      map[string]string `yaml:"params"  json:"params,omitempty" property:"params"`
	Token                       string            `yaml:"token" json:"token,omitempty" property:"token"`
	AccessLog                   string            `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
	TpsLimiter                  string            `yaml:"tps.limiter" json:"tps.limiter,omitempty" property:"tps.limiter"`
	TpsLimitInterval            string            `yaml:"tps.limit.interval" json:"tps.limit.interval,omitempty" property:"tps.limit.interval"`
	TpsLimitRate                string            `yaml:"tps.limit.rate" json:"tps.limit.rate,omitempty" property:"tps.limit.rate"`
	TpsLimitStrategy            string            `yaml:"tps.limit.strategy" json:"tps.limit.strategy,omitempty" property:"tps.limit.strategy"`
	TpsLimitRejectedHandler     string            `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"`
	ExecuteLimit                string            `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
	ExecuteLimitRejectedHandler string            `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
Ooo0oO0o0oO's avatar
Ooo0oO0o0oO committed
	Auth                        string            `yaml:"auth" json:"auth,omitempty" property:"auth"`
	ParamSign                   string            `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"`
	Tag                         string            `yaml:"tag" json:"tag,omitempty" property:"tag"`
	GrpcMaxMessageSize          int               `default:"4" yaml:"max_message_size" json:"max_message_size,omitempty"`
vito.he's avatar
vito.he committed
	Protocols     map[string]*ProtocolConfig
	unexported    *atomic.Bool
	exported      *atomic.Bool
	rpcService    common.RPCService
	cacheMutex    sync.Mutex
vito.he's avatar
vito.he committed
	cacheProtocol protocol.Protocol

vito.he's avatar
vito.he committed
	exportersLock sync.Mutex
vito.he's avatar
vito.he committed
	exporters     []protocol.Exporter
vito.he's avatar
vito.he committed
}

flycash's avatar
flycash committed
// Prefix returns dubbo.service.${interface}.
func (c *ServiceConfig) Prefix() string {
	return constant.ServiceConfigPrefix + c.InterfaceName + "."
}

// UnmarshalYAML unmarshals the ServiceConfig by @unmarshal function
func (c *ServiceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
	if err := defaults.Set(c); err != nil {
		return err
	}
	type plain ServiceConfig
	if err := unmarshal((*plain)(c)); err != nil {
		return err
	}
vito.he's avatar
vito.he committed
	c.exported = atomic.NewBool(false)
	c.unexported = atomic.NewBool(false)
// NewServiceConfig The only way to get a new ServiceConfig
func NewServiceConfig(id string, context context.Context) *ServiceConfig {
	return &ServiceConfig{
		unexported: atomic.NewBool(false),
		exported:   atomic.NewBool(false),
	}
vito.he's avatar
vito.he committed
}
vito.he's avatar
vito.he committed
// InitExported will set exported as false atom bool
func (c *ServiceConfig) InitExported() {
	c.exported = atomic.NewBool(false)
}

vito.he's avatar
vito.he committed
// IsExport will return whether the service config is exported or not
func (c *ServiceConfig) IsExport() bool {
	return c.exported.Load()
}

邹毅贤's avatar
邹毅贤 committed
// Get Random Port
func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
邹毅贤's avatar
邹毅贤 committed
	ports := list.New()
邹毅贤's avatar
邹毅贤 committed
	for _, proto := range protocolConfigs {
邹毅贤's avatar
邹毅贤 committed
		if len(proto.Port) > 0 {
邹毅贤's avatar
邹毅贤 committed
			continue
		}

		tcp, err := gxnet.ListenOnTCPRandomPort(proto.Ip)
		if err != nil {
xg.gao's avatar
xg.gao committed
			panic(perrors.New(fmt.Sprintf("Get tcp port error, err is {%v}", err)))
邹毅贤's avatar
邹毅贤 committed
		}
邹毅贤's avatar
邹毅贤 committed
		defer tcp.Close()
邹毅贤's avatar
邹毅贤 committed
		ports.PushBack(strings.Split(tcp.Addr().String(), ":")[1])
邹毅贤's avatar
邹毅贤 committed
	}
邹毅贤's avatar
邹毅贤 committed
	return ports
邹毅贤's avatar
邹毅贤 committed
}

flycash's avatar
flycash committed
// Export exports the service
AlexStocks's avatar
AlexStocks committed
func (c *ServiceConfig) Export() error {
	// TODO: config center start here
xg.gao's avatar
xg.gao committed
	// TODO: delay export
AlexStocks's avatar
AlexStocks committed
	if c.unexported != nil && c.unexported.Load() {
xg.gao's avatar
xg.gao committed
		err := perrors.Errorf("The service %v has already unexported!", c.InterfaceName)
fangyincheng's avatar
fangyincheng committed
		logger.Errorf(err.Error())
AlexStocks's avatar
AlexStocks committed
	if c.unexported != nil && c.exported.Load() {
xg.gao's avatar
xg.gao committed
		logger.Warnf("The service %v has already exported!", c.InterfaceName)
AlexStocks's avatar
AlexStocks committed
	regUrls := loadRegistries(c.Registry, providerConfig.Registries, common.PROVIDER)
	urlMap := c.getUrlMap()
vito.he's avatar
vito.he committed
	protocolConfigs := loadProtocol(c.Protocol, c.Protocols)
邹毅贤's avatar
邹毅贤 committed
	if len(protocolConfigs) == 0 {
xg.gao's avatar
xg.gao committed
		logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs", c.InterfaceName, c.Protocol)
邹毅贤's avatar
邹毅贤 committed
	ports := getRandomPort(protocolConfigs)
	nextPort := ports.Front()
xg.gao's avatar
xg.gao committed
	proxyFactory := extension.GetProxyFactory(providerConfig.ProxyFactory)
	for _, proto := range protocolConfigs {
		// registry the service reflect
		methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.Group, c.Version, c.rpcService)
		if err != nil {
xg.gao's avatar
xg.gao committed
			formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v.", c.InterfaceName, proto.Name, err.Error())
			logger.Errorf(formatErr.Error())
			return formatErr
邹毅贤's avatar
邹毅贤 committed

		port := proto.Port
邹毅贤's avatar
邹毅贤 committed
		if len(proto.Port) == 0 {
邹毅贤's avatar
邹毅贤 committed
			port = nextPort.Value.(string)
			nextPort = nextPort.Next()
		}
		ivkURL := common.NewURLWithOptions(
			common.WithPath(c.InterfaceName),
vito.he's avatar
vito.he committed
			common.WithProtocol(proto.Name),
			common.WithIp(proto.Ip),
邹毅贤's avatar
邹毅贤 committed
			common.WithPort(port),
vito.he's avatar
vito.he committed
			common.WithParams(urlMap),
AlexStocks's avatar
AlexStocks committed
			common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
aliiohs's avatar
aliiohs committed
			common.WithParamsValue(constant.SSL_ENABLED_KEY, strconv.FormatBool(GetSslEnabled())),
xujianhai666's avatar
xujianhai666 committed
			common.WithMethods(strings.Split(methods, ",")),
AlexStocks's avatar
AlexStocks committed
			common.WithToken(c.Token),
xujianhai666's avatar
xujianhai666 committed
		)
		if len(c.Tag) > 0 {
			ivkURL.AddParam(constant.Tagkey, c.Tag)
		}
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
		if len(regUrls) > 0 {
xg.gao's avatar
xg.gao committed
			c.cacheMutex.Lock()
			if c.cacheProtocol == nil {
				logger.Infof(fmt.Sprintf("First load the registry protocol, url is {%v}!", ivkURL))
				c.cacheProtocol = extension.GetProtocol("registry")
			}
			c.cacheMutex.Unlock()

vito.he's avatar
vito.he committed
			for _, regUrl := range regUrls {
				regUrl.SubURL = ivkURL
haohongfan's avatar
haohongfan committed
				invoker := proxyFactory.GetInvoker(regUrl)
xg.gao's avatar
xg.gao committed
				exporter := c.cacheProtocol.Export(invoker)
vito.he's avatar
vito.he committed
				if exporter == nil {
xg.gao's avatar
xg.gao committed
					return perrors.New(fmt.Sprintf("Registry protocol new exporter error, registry is {%v}, url is {%v}", regUrl, ivkURL))
vito.he's avatar
vito.he committed
				}
xg.gao's avatar
xg.gao committed
				c.exporters = append(c.exporters, exporter)
vito.he's avatar
vito.he committed
			}
vito.he's avatar
vito.he committed
		} else {
haohongfan's avatar
haohongfan committed
			invoker := proxyFactory.GetInvoker(ivkURL)
xg.gao's avatar
xg.gao committed
			exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
fangyincheng's avatar
fangyincheng committed
			if exporter == nil {
xg.gao's avatar
xg.gao committed
				return perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error, url is {%v}", ivkURL))
xg.gao's avatar
xg.gao committed
			c.exporters = append(c.exporters, exporter)
vito.he's avatar
vito.he committed
	c.exported.Store(true)
vito.he's avatar
vito.he committed
// Unexport will call unexport of all exporters service config exported
func (c *ServiceConfig) Unexport() {
	if !c.exported.Load() {
		return
	}
	if c.unexported.Load() {
		return
	}
vito.he's avatar
vito.he committed

	func() {
		c.exportersLock.Lock()
		defer c.exportersLock.Unlock()
		for _, exporter := range c.exporters {
			exporter.Unexport()
		}
		c.exporters = nil
	}()

vito.he's avatar
vito.he committed
	c.exported.Store(false)
vito.he's avatar
vito.he committed
	c.unexported.Store(true)
}

flycash's avatar
flycash committed
// Implement only store the @s and return
AlexStocks's avatar
AlexStocks committed
func (c *ServiceConfig) Implement(s common.RPCService) {
	c.rpcService = s
AlexStocks's avatar
AlexStocks committed
func (c *ServiceConfig) getUrlMap() url.Values {
	urlMap := url.Values{}
	// first set user params
AlexStocks's avatar
AlexStocks committed
	for k, v := range c.Params {
AlexStocks's avatar
AlexStocks committed
	urlMap.Set(constant.INTERFACE_KEY, c.InterfaceName)
	urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
AlexStocks's avatar
AlexStocks committed
	urlMap.Set(constant.CLUSTER_KEY, c.Cluster)
	urlMap.Set(constant.LOADBALANCE_KEY, c.Loadbalance)
	urlMap.Set(constant.WARMUP_KEY, c.Warmup)
	urlMap.Set(constant.RETRIES_KEY, c.Retries)
	urlMap.Set(constant.GROUP_KEY, c.Group)
	urlMap.Set(constant.VERSION_KEY, c.Version)
lzp0412's avatar
lzp0412 committed
	urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
vito.he's avatar
vito.he committed
	urlMap.Set(constant.RELEASE_KEY, "dubbo-golang-"+constant.Version)
vito.he's avatar
vito.he committed
	urlMap.Set(constant.SIDE_KEY, (common.RoleType(common.PROVIDER)).Role())
	urlMap.Set(constant.MESSAGE_SIZE_KEY, strconv.Itoa(c.GrpcMaxMessageSize))
fangyincheng's avatar
fangyincheng committed
	// todo: move
	urlMap.Set(constant.SERIALIZATION_KEY, c.Serialization)
	urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name)
	urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization)
	urlMap.Set(constant.NAME_KEY, providerConfig.ApplicationConfig.Name)
	urlMap.Set(constant.MODULE_KEY, providerConfig.ApplicationConfig.Module)
	urlMap.Set(constant.APP_VERSION_KEY, providerConfig.ApplicationConfig.Version)
	urlMap.Set(constant.OWNER_KEY, providerConfig.ApplicationConfig.Owner)
	urlMap.Set(constant.ENVIRONMENT_KEY, providerConfig.ApplicationConfig.Environment)

AlexStocks's avatar
AlexStocks committed
	urlMap.Set(constant.SERVICE_FILTER_KEY, mergeValue(providerConfig.Filter, c.Filter, constant.DEFAULT_SERVICE_FILTERS))
	// filter special config
AlexStocks's avatar
AlexStocks committed
	urlMap.Set(constant.ACCESS_LOG_KEY, c.AccessLog)
AlexStocks's avatar
AlexStocks committed
	urlMap.Set(constant.TPS_LIMIT_STRATEGY_KEY, c.TpsLimitStrategy)
	urlMap.Set(constant.TPS_LIMIT_INTERVAL_KEY, c.TpsLimitInterval)
	urlMap.Set(constant.TPS_LIMIT_RATE_KEY, c.TpsLimitRate)
	urlMap.Set(constant.TPS_LIMITER_KEY, c.TpsLimiter)
	urlMap.Set(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, c.TpsLimitRejectedHandler)
	// execute limit filter
AlexStocks's avatar
AlexStocks committed
	urlMap.Set(constant.EXECUTE_LIMIT_KEY, c.ExecuteLimit)
	urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, c.ExecuteLimitRejectedHandler)
Ooo0oO0o0oO's avatar
Ooo0oO0o0oO committed
	// auth filter
	urlMap.Set(constant.SERVICE_AUTH_KEY, c.Auth)
	urlMap.Set(constant.PARAMTER_SIGNATURE_ENABLE_KEY, c.ParamSign)

AlexStocks's avatar
AlexStocks committed
	for _, v := range c.Methods {
		prefix := "methods." + v.Name + "."
lihaowei's avatar
lihaowei committed
		urlMap.Set(prefix+constant.LOADBALANCE_KEY, v.LoadBalance)
		urlMap.Set(prefix+constant.RETRIES_KEY, v.Retries)
		urlMap.Set(prefix+constant.WEIGHT_KEY, strconv.FormatInt(v.Weight, 10))

Ming Deng's avatar
Ming Deng committed
		urlMap.Set(prefix+constant.TPS_LIMIT_STRATEGY_KEY, v.TpsLimitStrategy)
		urlMap.Set(prefix+constant.TPS_LIMIT_INTERVAL_KEY, v.TpsLimitInterval)
		urlMap.Set(prefix+constant.TPS_LIMIT_RATE_KEY, v.TpsLimitRate)

		urlMap.Set(constant.EXECUTE_LIMIT_KEY, v.ExecuteLimit)
		urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, v.ExecuteLimitRejectedHandler)
vito.he's avatar
vito.he committed

// GetExportedUrls will return the url in service config's exporter
func (c *ServiceConfig) GetExportedUrls() []*common.URL {
	if c.exported.Load() {
		var urls []*common.URL
		for _, exporter := range c.exporters {
haohongfan's avatar
haohongfan committed
			urls = append(urls, exporter.GetInvoker().GetUrl())
vito.he's avatar
vito.he committed
		}
		return urls
	}
	return nil
}