Skip to content
Snippets Groups Projects
Commit 96a1206c authored by fangyincheng's avatar fangyincheng
Browse files

Merge branch 'develop-2.6.0' of https://github.com/dubbogo/dubbo-go into develop-2.6.0

parents d68c44e6 124bc37c
No related branches found
No related tags found
No related merge requests found
......@@ -8,4 +8,5 @@ const (
const (
DEFAULT_LOADBALANCE = "random"
DEFAULT_RETRIES = 2
DEFAULT_PROTOCOL = "dubbo"
)
......@@ -31,3 +31,12 @@ const (
const (
DUBBOGO_CTX_KEY = "dubbogo-ctx"
)
const (
REGISTRY_KEY = "registry"
REGISTRY_PROTOCOL = "registry"
)
const (
EXPORT_KEY = "export"
)
......@@ -80,6 +80,8 @@ func NewURL(ctx context.Context, urlString string) (*URL, error) {
s.PrimitiveURL = urlString
s.Protocol = serviceUrl.Scheme
s.Username = serviceUrl.User.Username()
s.Password, _ = serviceUrl.User.Password()
s.Location = serviceUrl.Host
s.Path = serviceUrl.Path
if strings.Contains(s.Location, ":") {
......@@ -90,6 +92,7 @@ func NewURL(ctx context.Context, urlString string) (*URL, error) {
}
s.Group = s.Params.Get("group")
s.Version = s.Params.Get("version")
timeoutStr := s.Params.Get("timeout")
if len(timeoutStr) == 0 {
timeoutStr = s.Params.Get("default.timeout")
......@@ -124,9 +127,13 @@ func (c URL) String() string {
}
func (c *URL) ToFullString() string {
return fmt.Sprintf(
"%s://%s:%s@%s:%s/%s?%s&%s&%s",
c.Protocol, c.Password, c.Username, c.Ip, c.Port, c.Path, c.Version, c.Group, c.Params)
buildString := fmt.Sprintf(
"%s://%s:%s@%s:%s/%s?verison=%s&group=%s",
c.Protocol, c.Username, c.Password, c.Ip, c.Port, c.Path, c.Version, c.Group)
for k, v := range c.Params {
buildString += "&" + k + "=" + v[0]
}
return buildString
}
func (c *URL) Context() context.Context {
......
package protocol
import (
"github.com/dubbo/dubbo-go/protocol/protocolwrapper"
"sync"
)
import (
log "github.com/AlexStocks/log4go"
jerrors "github.com/juju/errors"
)
import (
......@@ -21,8 +23,7 @@ const RegistryConnDelay = 3
type RegistryProtocol struct {
// Registry Map<RegistryAddress, Registry>
registies map[string]registry.Registry
registiesMutex sync.Mutex
registies sync.Map
}
func init() {
......@@ -31,29 +32,27 @@ func init() {
func NewRegistryProtocol() protocol.Protocol {
return &RegistryProtocol{
registies: make(map[string]registry.Registry),
registies: sync.Map{},
}
}
func getRegistry(regUrl *config.RegistryURL) registry.Registry {
reg, err := extension.GetRegistryExtension(regUrl.Protocol, regUrl)
if err != nil {
log.Error("Registry can not connect success, program is going to panic.Error message is %s", err.Error())
panic(err.Error())
}
return reg
}
func (protocol *RegistryProtocol) Refer(url config.IURL) protocol.Invoker {
var regUrl = url.(*config.RegistryURL)
var serviceUrl = regUrl.URL
protocol.registiesMutex.Lock()
defer protocol.registiesMutex.Unlock()
var reg registry.Registry
var ok bool
if reg, ok = protocol.registies[url.Key()]; !ok {
var err error
reg, err = extension.GetRegistryExtension(regUrl.Protocol, regUrl)
if err != nil {
log.Error("Registry can not connect success, program is going to panic.Error message is %s", err.Error())
panic(err.Error())
} else {
protocol.registies[url.Key()] = reg
}
}
regI, _ := protocol.registies.LoadOrStore(url.Key(),
getRegistry(regUrl))
reg = regI.(registry.Registry)
//new registry directory for store service url from registry
directory := directory2.NewRegistryDirectory(regUrl, reg)
go directory.Subscribe(serviceUrl)
......@@ -63,9 +62,73 @@ func (protocol *RegistryProtocol) Refer(url config.IURL) protocol.Invoker {
return cluster.Join(directory)
}
func (*RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
return nil
func (protocol *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
registryUrl := protocol.getRegistryUrl(invoker)
providerUrl := protocol.getProviderUrl(invoker)
regI, _ := protocol.registies.LoadOrStore(providerUrl.Key(),
getRegistry(&registryUrl))
reg := regI.(registry.Registry)
err := reg.Register(providerUrl)
if err != nil {
log.Error("provider service %v register registry %v error, error message is %v", providerUrl.ToFullString(), registryUrl.String(), err.Error())
}
wrappedInvoker := newWrappedInvoker(invoker, providerUrl)
return extension.GetProtocolExtension(protocolwrapper.FILTER).Export(wrappedInvoker)
}
func (*RegistryProtocol) Destroy() {
}
func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) config.RegistryURL {
//here add * for return a new url
url := *invoker.GetUrl().(*config.RegistryURL)
//if the protocol == registry ,set protocol the registry value in url.params
if url.Protocol == constant.REGISTRY_PROTOCOL {
protocol := url.GetParam(constant.REGISTRY_KEY, constant.DEFAULT_PROTOCOL)
url.Protocol = protocol
}
return url
}
func (*RegistryProtocol) getProviderUrl(invoker protocol.Invoker) config.URL {
url := invoker.GetUrl().(*config.RegistryURL)
var export string
if export = url.GetParam(constant.EXPORT_KEY, ""); export == "" {
err := jerrors.Errorf("The registry export url is null! registry: %v", url.String())
log.Error(err.Error())
panic(err)
}
newUrl, err := config.NewURL(url.Context(), export)
if err != nil {
err := jerrors.Errorf("The registry export url is invalid! registry: %v ,error messsage:%v ", url.String(), err.Error())
log.Error(err.Error())
panic(err)
}
return *newUrl
}
type wrappedInvoker struct {
invoker protocol.Invoker
url config.URL
protocol.BaseInvoker
}
func newWrappedInvoker(invoker protocol.Invoker, url config.URL) *wrappedInvoker {
return &wrappedInvoker{
invoker: invoker,
url: url,
BaseInvoker: protocol.NewBaseInvoker(nil),
}
}
func (ivk *wrappedInvoker) GetUrl() config.IURL {
return &ivk.url
}
func (ivk *wrappedInvoker) getInvoker() protocol.Invoker {
return ivk.invoker
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment