Skip to content
Snippets Groups Projects
Commit bf3cb2db authored by 高辛格's avatar 高辛格
Browse files

update consul

parent 70748131
No related branches found
No related tags found
No related merge requests found
......@@ -199,10 +199,6 @@ func NewURL(ctx context.Context, urlString string, opts ...option) (URL, error)
return s, nil
}
func NewURLFromString(url string) URL {
}
func (c URL) URLEqual(url URL) bool {
c.Ip = ""
c.Port = ""
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
"sync"
)
import (
perrors "github.com/pkg/errors"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/watch"
)
......@@ -8,18 +30,52 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)
// Consul listener wraps the consul watcher, to
// listen the service information change in consul
// registry.
type consulListener struct {
plan *watch.Plan
addrCh chan *consul.ServiceEntry
}
// Consumer url.
common.URL
func newConsulListener(url common.URL) (registry.Listener, error) {
var err error
// Consul watcher.
plan *watch.Plan
addrCh := make(chan *consul.ServiceEntry, 1)
// Most recent service urls return by
// watcher.
urls []common.URL
// All service information changes will
// be wrapped into ServiceEvent, and be
// sent into eventCh. Then listener's
// Next method will get event from eventCh,
// and return to upstream.
eventCh chan *registry.ServiceEvent
// All errors, happening in the listening
// period, will be caught and send into
// errCh. Then listener's Next method will
// get error from errCh, and return to
// upstream.
errCh chan error
// Running field represents whether consul
// listener has been closed. When closing
// listener, this field will be set to
// false, and will notify consul watcher
// to close.
running bool
// After listener notifies consul watcher
// to close, listener will call wg.wait to
// make sure that consul watcher is closed
// before the listener closes.
wg sync.WaitGroup
}
func newConsulListener(url common.URL) (*consulListener, error) {
params := make(map[string]interface{})
params["type"] = "service"
params["service"] = url.Service()
......@@ -27,32 +83,109 @@ func newConsulListener(url common.URL) (registry.Listener, error) {
if err != nil {
return nil, err
}
plan.Handler = func(idx uint64, raw interface{}) {
addrs, _ := raw.([]*consul.ServiceEntry)
for _, addr := range addrs {
addrCh <- addr
listener := &consulListener{
URL: url,
plan: plan,
urls: make([]common.URL, 0),
eventCh: make(chan *registry.ServiceEvent, 1),
errCh: make(chan error, 1),
running: true,
}
// Set handler to consul watcher, and
// make watcher begin to watch service
// information change.
listener.plan.Handler = listener.handler
listener.wg.Add(1)
go listener.run()
return listener, nil
}
// Wrap the consul watcher run api. There are three
// conditions that will finish the run:
// - set running to false
// - call plan.Stop
// - close eventCh and errCh
// If run meets first two conditions, it will close
// gracefully. However, if run meets the last condition,
// run will close with panic, so use recover to cover
// this case.
func (l *consulListener) run() {
defer func() {
recover()
}()
if l.running {
err := l.plan.Run(l.URL.Location)
if err != nil {
l.errCh <- err
}
}
}
func (l *consulListener) handler(idx uint64, raw interface{}) {
var (
service *consul.ServiceEntry
event *registry.ServiceEvent
url common.URL
ok bool
err error
)
services, ok := raw.([]*consul.ServiceEntry)
if !ok {
err = perrors.New("handler get non ServiceEntry type parameter")
l.errCh <- err
return
}
newUrls := make([]common.URL, 0)
events := make([]*registry.ServiceEvent, 0)
go func() {
err := plan.Run(url.Location)
for _, service = range services {
url, err = retrieveURL(service)
if err != nil {
l.errCh <- err
return
}
newUrls = append(newUrls, url)
}
for url = range l.urls {
ok = in(url, newUrls)
if !ok {
event := &registry.ServiceEvent{Action: remoting.EventTypeDel, Service: url}
events = append(events, event)
}
plan.Stop()
}()
}
listener := &consulListener{
plan: plan,
addrCh: addrCh,
for url = range newUrls {
ok = in(url, l.urls)
if !ok {
event := &registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: url}
events = append(events, event)
}
}
l.urls = newUrls
for _, event = range events {
l.eventCh <- event
}
return listener, nil
}
func (l *consulListener) Next() (*registry.ServiceEvent, error) {
return nil, nil
select {
case event := <- l.eventCh:
return event, nil
case err := <- l.errCh:
return nil, err
}
}
func (l *consulListener) Close() {
}
\ No newline at end of file
l.running = false
l.plan.Stop()
close(l.eventCh)
close(l.errCh)
l.wg.Wait()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
......@@ -13,16 +30,28 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
)
func init() {
extension.SetRegistry("consul", newConsulRegistry)
}
// Consul registry wraps the consul client, to
// register and subscribe service.
type consulRegistry struct {
// Registry url.
*common.URL
// Consul client.
client *consul.Client
// Done field represents whether
// consul registry is closed.
done chan struct{}
}
func newConsulRegistry(url *common.URL) (registry.Registry, error) {
var err error
config := &consul.Config{Address: url.Location}
client, err := consul.NewClient(config)
if err != nil {
......@@ -32,6 +61,7 @@ func newConsulRegistry(url *common.URL) (registry.Registry, error) {
r := &consulRegistry{
URL: url,
client: client,
done: make(chan struct{}, 1),
}
return r, nil
......@@ -51,8 +81,6 @@ func (r *consulRegistry) Register(url common.URL) error {
}
func (r *consulRegistry) register(url common.URL) error {
var err error
service, err := buildService(url)
if err != nil {
return err
......@@ -77,8 +105,6 @@ func (r *consulRegistry) Subscribe(url common.URL) (registry.Listener, error) {
}
func (r *consulRegistry) subscribe(url common.URL) (registry.Listener, error) {
var err error
listener, err := newConsulListener(url)
return listener, err
}
......@@ -88,9 +114,14 @@ func (r *consulRegistry) GetUrl() common.URL {
}
func (r *consulRegistry) IsAvailable() bool {
select {
case <- r.done:
return false
default:
return true
}
}
func (r *consulRegistry) Destroy() {
close(r.done)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
"context"
"strconv"
"crypto/md5"
)
import (
perrors "github.com/pkg/errors"
consul "github.com/hashicorp/consul/api"
)
......@@ -51,7 +70,23 @@ func buildService(url common.URL) (*consul.AgentServiceRegistration, error) {
return service, nil
}
func retrieveURL(service *consul.ServiceEntry) common.URL {
url := service.Service.Meta["url"]
return common.NewURLFromString(url)
func retrieveURL(service *consul.ServiceEntry) (common.URL, error) {
url, ok := service.Service.Meta["url"]
if !ok {
return common.URL{}, perrors.New("retrieve url fails with no url key in service meta")
}
url1, err := common.NewURL(context.Background(), url)
if err != nil {
return common.URL{}, perrors.WithStack(err)
}
return url1, nil
}
func in(url common.URL, urls []common.URL) bool {
for _, url1 := range urls {
if url.URLEqual(url1) {
return true
}
}
return false
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment