Skip to content
Snippets Groups Projects
Commit 22202f87 authored by fangyincheng's avatar fangyincheng
Browse files

Merge branch 'develop-2.6.0' of https://github.com/dubbogo/dubbo-go into develop-2.6.0

parents 45ff270a ebedbf2b
No related branches found
No related tags found
No related merge requests found
......@@ -68,11 +68,32 @@ func WithParams(params url.Values) option {
}
}
func NewURLWithOptions(service string, protocol string, ip string, port string, path string, opts ...option) *URL {
func WithProtocol(proto string) option {
return func(url *URL) {
url.Protocol = proto
}
}
func WithIp(ip string) option {
return func(url *URL) {
url.Ip = ip
}
}
func WithPort(port string) option {
return func(url *URL) {
url.Port = port
}
}
func WithPath(path string) option {
return func(url *URL) {
url.Path = path
}
}
func NewURLWithOptions(service string, opts ...option) *URL {
url := &URL{
baseUrl: baseUrl{Protocol: protocol, Ip: ip, Port: port},
Service: service,
Path: path,
}
for _, opt := range opts {
opt(url)
......
......@@ -2,36 +2,39 @@ package support
import (
"context"
"github.com/dubbo/dubbo-go/common/constant"
"github.com/dubbo/dubbo-go/config"
"net/url"
"strconv"
"time"
)
import (
"github.com/dubbo/dubbo-go/common/extension"
"github.com/dubbo/dubbo-go/common/proxy"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
)
type ReferenceConfig struct {
context context.Context
pxy *proxy.Proxy
Interface string `required:"true" yaml:"interface" json:"interface,omitempty"`
Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty"`
Methods []method `yaml:"methods" json:"methods,omitempty"`
URLs []config.URL `yaml:"-"`
Async bool `yaml:"async" json:"async,omitempty"`
invoker protocol.Invoker
context context.Context
pxy *proxy.Proxy
Interface string `required:"true" yaml:"interface" json:"interface,omitempty"`
Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty"`
retries int64 `yaml:"retries" json:"retries,omitempty"`
Methods []struct {
name string `yaml:"name" json:"name,omitempty"`
retries int64 `yaml:"retries" json:"retries,omitempty"`
loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"`
} `yaml:"methods" json:"methods,omitempty"`
Async bool `yaml:"async" json:"async,omitempty"`
invoker protocol.Invoker
}
type ConfigRegistry struct {
string
}
type method struct {
name string `yaml:"name" json:"name,omitempty"`
retries int64 `yaml:"retries" json:"retries,omitempty"`
loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"`
}
func NewReferenceConfig(ctx context.Context) *ReferenceConfig {
return &ReferenceConfig{context: ctx}
}
......@@ -40,10 +43,16 @@ func (refconfig *ReferenceConfig) Refer() {
//首先是user specified URL, could be peer-to-peer address, or register center's address.
//其次是assemble URL from register center's configuration模式
urls := loadRegistries(refconfig.Registries, consumerConfig.Registries)
regUrls := loadRegistries(refconfig.Registries, consumerConfig.Registries)
url := config.NewURLWithOptions(refconfig.Interface, config.WithParams(refconfig.getUrlMap()))
if len(urls) == 1 {
refconfig.invoker = extension.GetProtocolExtension("registry").Refer(urls[0])
//set url to regUrls
for _, regUrl := range regUrls {
regUrl.URL = *url
}
if len(regUrls) == 1 {
refconfig.invoker = extension.GetProtocolExtension("registry").Refer(regUrls[0])
} else {
//TODO:multi registries ,just wrap multi registry as registry cluster invoker including cluster invoker
......@@ -57,3 +66,22 @@ func (refconfig *ReferenceConfig) Refer() {
func (refconfig *ReferenceConfig) Implement(v config.RPCService) error {
return refconfig.pxy.Implement(v)
}
func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, refconfig.Cluster)
urlMap.Set(constant.LOADBALANCE_KEY, refconfig.Loadbalance)
urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(refconfig.retries, 10))
//getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.Async))
for _, v := range refconfig.Methods {
urlMap.Set("methods."+v.name+"."+constant.LOADBALANCE_KEY, v.loadbalance)
urlMap.Set("methods."+v.name+"."+constant.RETRIES_KEY, strconv.FormatInt(v.retries, 10))
}
return urlMap
}
......@@ -25,13 +25,16 @@ type ServiceConfig struct {
Registries []ConfigRegistry `required:"true" yaml:"registries" json:"registries,omitempty"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty"`
Methods []method `yaml:"methods" json:"methods,omitempty"`
warmup string `yaml:"warmup" json:"warmup,omitempty"`
retries int64 `yaml:"retries" json:"retries,omitempty"`
unexported *atomic.Bool
exported *atomic.Bool
URLs []config.URL
Methods []struct {
name string `yaml:"name" json:"name,omitempty"`
retries int64 `yaml:"retries" json:"retries,omitempty"`
loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty"`
weight int64 `yaml:"weight" json:"weight,omitempty"`
} `yaml:"methods" json:"methods,omitempty"`
warmup string `yaml:"warmup" json:"warmup,omitempty"`
retries int64 `yaml:"retries" json:"retries,omitempty"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService config.RPCService
exporters []protocol.Exporter
}
......@@ -59,7 +62,7 @@ func (srvconfig *ServiceConfig) Export() error {
}
regUrls := loadRegistries(srvconfig.Registries, providerConfig.Registries)
urlMap := getUrlMap(srvconfig)
urlMap := srvconfig.getUrlMap()
for _, proto := range loadProtocol(srvconfig.Protocol, providerConfig.Protocols) {
//registry the service reflect
......@@ -73,7 +76,13 @@ func (srvconfig *ServiceConfig) Export() error {
if contextPath == "" {
contextPath = providerConfig.Path
}
url := config.NewURLWithOptions(srvconfig.Interface, proto.name, proto.ip, proto.port, contextPath, config.WithParams(urlMap))
url := config.NewURLWithOptions(srvconfig.Interface,
config.WithProtocol(proto.name),
config.WithIp(proto.ip),
config.WithPort(proto.port),
config.WithPath(contextPath),
config.WithParams(urlMap))
for _, regUrl := range regUrls {
regUrl.URL = *url
invoker := protocol.NewBaseInvoker(regUrl)
......@@ -89,18 +98,19 @@ func (srvconfig *ServiceConfig) Implement(s config.RPCService) {
srvconfig.rpcService = s
}
func getUrlMap(config *ServiceConfig) url.Values {
func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, config.Cluster)
urlMap.Set(constant.LOADBALANCE_KEY, config.Loadbalance)
urlMap.Set(constant.WARMUP_KEY, config.warmup)
urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(config.retries, 10))
urlMap.Set(constant.CLUSTER_KEY, srvconfig.Cluster)
urlMap.Set(constant.LOADBALANCE_KEY, srvconfig.Loadbalance)
urlMap.Set(constant.WARMUP_KEY, srvconfig.warmup)
urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(srvconfig.retries, 10))
for _, v := range config.Methods {
for _, v := range srvconfig.Methods {
urlMap.Set("methods."+v.name+"."+constant.LOADBALANCE_KEY, v.loadbalance)
urlMap.Set("methods."+v.name+"."+constant.RETRIES_KEY, strconv.FormatInt(v.retries, 10))
urlMap.Set("methods."+v.name+"."+constant.WEIGHT_KEY, strconv.FormatInt(v.weight, 10))
}
return urlMap
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment