diff --git a/common/url.go b/common/url.go index c594df235134d71d98b83c11b2f6eb9312283aae..36454f91b95d3f9b2babfd06f44a065a20f5a0c0 100644 --- a/common/url.go +++ b/common/url.go @@ -108,16 +108,19 @@ func WithParams(params url.Values) option { url.Params = params } } + func WithParamsValue(key, val string) option { return func(url *URL) { url.Params.Set(key, val) } } + func WithProtocol(proto string) option { return func(url *URL) { url.Protocol = proto } } + func WithIp(ip string) option { return func(url *URL) { url.Ip = ip @@ -141,6 +144,7 @@ func WithLocation(location string) option { url.Location = location } } + func NewURLWithOptions(opts ...option) *URL { url := &URL{} for _, opt := range opts { diff --git a/config/config_loader.go b/config/config_loader.go index e1301541556bcc77d4a027e5fe82d9e75f8645e4..1164d5ffc62935ea9d1b3738c9ab3ddf282d4ad0 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -37,8 +37,8 @@ var ( maxWait = 3 ) -// loaded comsumer & provider config from xxx.yml, and log config from xxx.xml -// Namely: dubbo.comsumer.xml & dubbo.provider.xml in java dubbo +// loaded consumer & provider config from xxx.yml, and log config from xxx.xml +// Namely: dubbo.consumer.xml & dubbo.provider.xml in java dubbo func init() { var ( confConFile, confProFile string @@ -67,9 +67,8 @@ func Load() { } for key, ref := range consumerConfig.References { rpcService := GetConsumerService(key) - if rpcService == nil { - logger.Warnf("%s is not exsist!", key) + logger.Warnf("%s does not exist!", key) continue } ref.id = key @@ -96,7 +95,7 @@ func Load() { break } if refconfig.invoker == nil { - logger.Warnf("The interface %s invoker not exsist , may you should check your interface config.", refconfig.InterfaceName) + logger.Warnf("The interface %s invoker not exist , may you should check your interface config.", refconfig.InterfaceName) } } } @@ -116,9 +115,8 @@ func Load() { } for key, svs := range providerConfig.Services { rpcService := GetProviderService(key) - fmt.Println(key) if rpcService == nil { - logger.Warnf("%s is not exsist!", key) + logger.Warnf("%s does not exist!", key) continue } svs.id = key diff --git a/registry/consul/consul_test.go b/registry/consul/consul_test.go new file mode 100644 index 0000000000000000000000000000000000000000..69ad1176558e2d644c789770310b6d671fc730bb --- /dev/null +++ b/registry/consul/consul_test.go @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package consul + +import ( + "fmt" + "net" + "net/url" + "strconv" + "sync" + "testing" +) + +import ( + "github.com/hashicorp/consul/sdk/testutil" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +var ( + registryHost = "localhost" + registryPort = 8500 + providerHost = "localhost" + providerPort = 8000 + consumerHost = "localhost" + consumerPort = 8001 + service = "HelloWorld" + protocol = "tcp" +) + +func newProviderRegistryUrl(host string, port int) *common.URL { + url1 := common.NewURLWithOptions( + common.WithIp(host), + common.WithPort(strconv.Itoa(port)), + common.WithParams(url.Values{}), + common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)), + ) + return url1 +} + +func newConsumerRegistryUrl(host string, port int) *common.URL { + url1 := common.NewURLWithOptions( + common.WithIp(host), + common.WithPort(strconv.Itoa(port)), + common.WithParams(url.Values{}), + common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)), + ) + return url1 +} + +func newProviderUrl(host string, port int, service string, protocol string) common.URL { + url1 := common.NewURLWithOptions( + common.WithIp(host), + common.WithPort(strconv.Itoa(port)), + common.WithPath(service), + common.WithProtocol(protocol), + ) + return *url1 +} + +func newConsumerUrl(host string, port int, service string, protocol string) common.URL { + url1 := common.NewURLWithOptions( + common.WithIp(host), + common.WithPort(strconv.Itoa(port)), + common.WithPath(service), + common.WithProtocol(protocol), + ) + return *url1 +} + +type Server struct { + listener net.Listener + wg sync.WaitGroup + done chan struct{} +} + +func newServer(host string, port int) *Server { + addr := fmt.Sprintf("%s:%d", host, port) + tcpAddr, _ := net.ResolveTCPAddr("tcp", addr) + listener, _ := net.ListenTCP("tcp", tcpAddr) + + server := &Server{ + listener: listener, + done: make(chan struct{}, 1), + } + + server.wg.Add(1) + go server.serve() + return server +} + +func (server *Server) serve() { + defer server.wg.Done() + for { + select { + case <-server.done: + return + default: + conn, err := server.listener.Accept() + if err != nil { + continue + } + conn.Write([]byte("Hello World")) + conn.Close() + } + } +} + +func (server *Server) close() { + close(server.done) + server.listener.Close() + server.wg.Wait() +} + +func TestSomething(t *testing.T) { + providerRegistryUrl := newProviderRegistryUrl(registryHost, registryPort) + consumerRegistryUrl := newConsumerRegistryUrl(registryHost, registryPort) + providerUrl := newProviderUrl(providerHost, providerPort, service, protocol) + consumerUrl := newConsumerUrl(consumerHost, consumerPort, service, protocol) + + cb := func(c *testutil.TestServerConfig) { c.Ports.HTTP = registryPort } + consulServer, _ := testutil.NewTestServerConfig(cb) + defer consulServer.Stop() + providerRegistry, err := newConsulRegistry(providerRegistryUrl) + assert.NoError(t, err) + consumerRegistry, err := newConsulRegistry(consumerRegistryUrl) + assert.NoError(t, err) + + server := newServer(providerHost, providerPort) + defer server.close() + err = providerRegistry.Register(providerUrl) + assert.NoError(t, err) + + listener, err := consumerRegistry.Subscribe(consumerUrl) + assert.NoError(t, err) + event, err := listener.Next() + assert.NoError(t, err) + assert.True(t, providerUrl.URLEqual(event.Service)) +}