Skip to content
Snippets Groups Projects
reference_config.go 9.35 KiB
Newer Older
AlexStocks's avatar
AlexStocks committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
fangyincheng's avatar
fangyincheng committed

vito.he's avatar
vito.he committed
package config
vito.he's avatar
vito.he committed

vito.he's avatar
vito.he committed
import (
vito.he's avatar
vito.he committed
	"context"
vito.he's avatar
vito.he committed
	"net/url"
	"strconv"
	"time"
import (
	"github.com/creasty/defaults"
	gxstrings "github.com/dubbogo/gost/strings"
vito.he's avatar
vito.he committed
import (
	"github.com/apache/dubbo-go/cluster/directory"
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/common/proxy"
	"github.com/apache/dubbo-go/protocol"
// ReferenceConfig is the configuration of service consumer
vito.he's avatar
vito.he committed
type ReferenceConfig struct {
	context        context.Context
	pxy            *proxy.Proxy
	id             string
	InterfaceName  string            `required:"true"  yaml:"interface"  json:"interface,omitempty" property:"interface"`
	Check          *bool             `yaml:"check"  json:"check,omitempty" property:"check"`
	Url            string            `yaml:"url"  json:"url,omitempty" property:"url"`
	Filter         string            `yaml:"filter" json:"filter,omitempty" property:"filter"`
	Protocol       string            `default:"dubbo"  yaml:"protocol"  json:"protocol,omitempty" property:"protocol"`
	Registry       string            `yaml:"registry"  json:"registry,omitempty"  property:"registry"`
	Cluster        string            `yaml:"cluster"  json:"cluster,omitempty" property:"cluster"`
	Loadbalance    string            `yaml:"loadbalance"  json:"loadbalance,omitempty" property:"loadbalance"`
	Retries        string            `yaml:"retries"  json:"retries,omitempty" property:"retries"`
	Group          string            `yaml:"group"  json:"group,omitempty" property:"group"`
	Version        string            `yaml:"version"  json:"version,omitempty" property:"version"`
flycash's avatar
flycash committed
	ProvideBy      string            `yaml:"provide_by"  json:"provide_by,omitempty" property:"provide_by"`
	Methods        []*MethodConfig   `yaml:"methods"  json:"methods,omitempty" property:"methods"`
pantianying's avatar
pantianying committed
	Async          bool              `yaml:"async"  json:"async,omitempty" property:"async"`
	Params         map[string]string `yaml:"params"  json:"params,omitempty" property:"params"`
	invoker        protocol.Invoker
	urls           []*common.URL
	Generic        bool   `yaml:"generic"  json:"generic,omitempty" property:"generic"`
pantianying's avatar
pantianying committed
	Sticky         bool   `yaml:"sticky"   json:"sticky,omitempty" property:"sticky"`
pantianying's avatar
pantianying committed
	RequestTimeout string `yaml:"timeout"  json:"timeout,omitempty" property:"timeout"`
	ForceTag       bool   `yaml:"force.tag"  json:"force.tag,omitempty" property:"force.tag"`
func (c *ReferenceConfig) Prefix() string {
	return constant.ReferenceConfigPrefix + c.InterfaceName + "."
vito.he's avatar
vito.he committed
}
fangyincheng's avatar
fangyincheng committed

// NewReferenceConfig The only way to get a new ReferenceConfig
func NewReferenceConfig(id string, ctx context.Context) *ReferenceConfig {
	return &ReferenceConfig{id: id, context: ctx}
vito.he's avatar
vito.he committed
}
// UnmarshalYAML unmarshals the ReferenceConfig by @unmarshal function
AlexStocks's avatar
AlexStocks committed
func (c *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
	type rf ReferenceConfig
	raw := rf{} // Put your defaults here
	if err := unmarshal(&raw); err != nil {
		return err
	}
AlexStocks's avatar
AlexStocks committed
	*c = ReferenceConfig(raw)
	if err := defaults.Set(c); err != nil {
		return err
	}

	return nil
}
func (c *ReferenceConfig) Refer(_ interface{}) {
	cfgURL := common.NewURLWithOptions(
		common.WithPath(c.InterfaceName),
AlexStocks's avatar
AlexStocks committed
		common.WithProtocol(c.Protocol),
		common.WithParams(c.getUrlMap()),
		common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
	if c.ForceTag {
		cfgURL.AddParam(constant.ForceUseTag, "true")
	}
AlexStocks's avatar
AlexStocks committed
	if c.Url != "" {
xg.gao's avatar
xg.gao committed
		// 1. user specified URL, could be peer-to-peer address, or register center's address.
AlexStocks's avatar
AlexStocks committed
		urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*")
		for _, urlStr := range urlStrings {
			serviceUrl, err := common.NewURL(urlStr)
			if err != nil {
				panic(fmt.Sprintf("user specified URL %v refer error, error message is %v ", urlStr, err.Error()))
			}
			if serviceUrl.Protocol == constant.REGISTRY_PROTOCOL {
				serviceUrl.SubURL = cfgURL
haohongfan's avatar
haohongfan committed
				c.urls = append(c.urls, serviceUrl)
			} else {
				if serviceUrl.Path == "" {
					serviceUrl.Path = "/" + c.InterfaceName
				}
				// merge url need to do
haohongfan's avatar
haohongfan committed
				newUrl := common.MergeUrl(serviceUrl, cfgURL)
AlexStocks's avatar
AlexStocks committed
				c.urls = append(c.urls, newUrl)
xg.gao's avatar
xg.gao committed
		// 2. assemble SubURL from register center's configuration mode
AlexStocks's avatar
AlexStocks committed
		c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)
xg.gao's avatar
xg.gao committed
		// set url to regUrls
AlexStocks's avatar
AlexStocks committed
		for _, regUrl := range c.urls {
			regUrl.SubURL = cfgURL
xg.gao's avatar
xg.gao committed

AlexStocks's avatar
AlexStocks committed
	if len(c.urls) == 1 {
haohongfan's avatar
haohongfan committed
		c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(c.urls[0])
vito.he's avatar
vito.he committed
	} else {
xg.gao's avatar
xg.gao committed
		invokers := make([]protocol.Invoker, 0, len(c.urls))
		var regUrl *common.URL
AlexStocks's avatar
AlexStocks committed
		for _, u := range c.urls {
haohongfan's avatar
haohongfan committed
			invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(u))
			if u.Protocol == constant.REGISTRY_PROTOCOL {
				regUrl = u
			}
		}
flycash's avatar
flycash committed

		// TODO(decouple from directory, config should not depend on directory module)
tiecheng's avatar
tiecheng committed
		var hitClu string
		if regUrl != nil {
			// for multi-subscription scenario, use 'zone-aware' policy by default
tiecheng's avatar
tiecheng committed
			hitClu = constant.ZONEAWARE_CLUSTER_NAME
			// not a registry url, must be direct invoke.
tiecheng's avatar
tiecheng committed
			hitClu = constant.FAILOVER_CLUSTER_NAME
			if len(invokers) > 0 {
				u := invokers[0].GetUrl()
				if nil != &u {
tiecheng's avatar
tiecheng committed
					hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ZONEAWARE_CLUSTER_NAME)
tiecheng's avatar
tiecheng committed

		cluster := extension.GetCluster(hitClu)
		// If 'zone-aware' policy select, the invoker wrap sequence would be:
		// ZoneAwareClusterInvoker(StaticDirectory) ->
		// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
		c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
xg.gao's avatar
xg.gao committed
	// create proxy
AlexStocks's avatar
AlexStocks committed
	if c.Async {
		callback := GetCallback(c.id)
		c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)
pantianying's avatar
pantianying committed
	} else {
		c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)
pantianying's avatar
pantianying committed
	}
// Implement
// @v is service provider implemented RPCService
AlexStocks's avatar
AlexStocks committed
func (c *ReferenceConfig) Implement(v common.RPCService) {
	c.pxy.Implement(v)
// GetRPCService gets RPCService from proxy
AlexStocks's avatar
AlexStocks committed
func (c *ReferenceConfig) GetRPCService() common.RPCService {
	return c.pxy.Get()
AlexStocks's avatar
AlexStocks committed
func (c *ReferenceConfig) getUrlMap() url.Values {
vito.he's avatar
vito.he committed
	urlMap := url.Values{}
	//first set user params
AlexStocks's avatar
AlexStocks committed
	for k, v := range c.Params {
AlexStocks's avatar
AlexStocks committed
	urlMap.Set(constant.INTERFACE_KEY, c.InterfaceName)
vito.he's avatar
vito.he committed
	urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
AlexStocks's avatar
AlexStocks committed
	urlMap.Set(constant.CLUSTER_KEY, c.Cluster)
	urlMap.Set(constant.LOADBALANCE_KEY, c.Loadbalance)
	urlMap.Set(constant.RETRIES_KEY, c.Retries)
	urlMap.Set(constant.GROUP_KEY, c.Group)
	urlMap.Set(constant.VERSION_KEY, c.Version)
	urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(c.Generic))
lzp0412's avatar
lzp0412 committed
	urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
flycash's avatar
flycash committed
	urlMap.Set(constant.PROVIDER_BY, c.ProvideBy)
vito.he's avatar
vito.he committed
	urlMap.Set(constant.RELEASE_KEY, "dubbo-golang-"+constant.Version)
vito.he's avatar
vito.he committed
	urlMap.Set(constant.SIDE_KEY, (common.RoleType(common.CONSUMER)).Role())

AlexStocks's avatar
AlexStocks committed
	if len(c.RequestTimeout) != 0 {
		urlMap.Set(constant.TIMEOUT_KEY, c.RequestTimeout)
pantianying's avatar
pantianying committed
	}
vito.he's avatar
vito.he committed
	//getty invoke async or sync
AlexStocks's avatar
AlexStocks committed
	urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(c.Async))
	urlMap.Set(constant.STICKY_KEY, strconv.FormatBool(c.Sticky))
	//application info
	urlMap.Set(constant.APPLICATION_KEY, consumerConfig.ApplicationConfig.Name)
	urlMap.Set(constant.ORGANIZATION_KEY, consumerConfig.ApplicationConfig.Organization)
	urlMap.Set(constant.NAME_KEY, consumerConfig.ApplicationConfig.Name)
	urlMap.Set(constant.MODULE_KEY, consumerConfig.ApplicationConfig.Module)
	urlMap.Set(constant.APP_VERSION_KEY, consumerConfig.ApplicationConfig.Version)
	urlMap.Set(constant.OWNER_KEY, consumerConfig.ApplicationConfig.Owner)
	urlMap.Set(constant.ENVIRONMENT_KEY, consumerConfig.ApplicationConfig.Environment)

fangyincheng's avatar
fangyincheng committed
	//filter
pantianying's avatar
pantianying committed
	var defaultReferenceFilter = constant.DEFAULT_REFERENCE_FILTERS
AlexStocks's avatar
AlexStocks committed
	if c.Generic {
		defaultReferenceFilter = constant.GENERIC_REFERENCE_FILTERS + "," + defaultReferenceFilter
pantianying's avatar
pantianying committed
	}
AlexStocks's avatar
AlexStocks committed
	urlMap.Set(constant.REFERENCE_FILTER_KEY, mergeValue(consumerConfig.Filter, c.Filter, defaultReferenceFilter))
AlexStocks's avatar
AlexStocks committed
	for _, v := range c.Methods {
lihaowei's avatar
lihaowei committed
		urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.LoadBalance)
		urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries)
Ooo0oO0o0oO's avatar
Ooo0oO0o0oO committed
		urlMap.Set("methods."+v.Name+"."+constant.STICKY_KEY, strconv.FormatBool(v.Sticky))
		if len(v.RequestTimeout) != 0 {
			urlMap.Set("methods."+v.Name+"."+constant.TIMEOUT_KEY, v.RequestTimeout)
		}
vito.he's avatar
vito.he committed
	}

	return urlMap
}
AlexStocks's avatar
AlexStocks committed
func (c *ReferenceConfig) GenericLoad(id string) {
	genericService := NewGenericService(c.id)
pantianying's avatar
pantianying committed
	SetConsumerService(genericService)
AlexStocks's avatar
AlexStocks committed
	c.id = id
	c.Refer(genericService)
	c.Implement(genericService)
pantianying's avatar
pantianying committed
}