Skip to content
Snippets Groups Projects
Commit 2693201f authored by zhangshen023's avatar zhangshen023
Browse files

remove hc-log

code clean
parent f3c63b55
No related branches found
No related tags found
No related merge requests found
......@@ -31,7 +31,6 @@ require (
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/consul v1.8.0
github.com/hashicorp/consul/api v1.5.0
github.com/hashicorp/go-hclog v0.12.0
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/vault/api v1.0.5-0.20191108163347-bdd38fca2cff // indirect
......
......@@ -30,7 +30,6 @@ import (
"github.com/dubbogo/gost/page"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"github.com/hashicorp/go-hclog"
perrors "github.com/pkg/errors"
)
......@@ -75,7 +74,6 @@ func init() {
// newConsulServiceDiscovery will create new service discovery instance
// use double-check pattern to reduce race condition
func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
instance, ok := instanceMap[name]
if ok {
return instance, nil
......@@ -135,6 +133,7 @@ type consulServiceDiscovery struct {
tags []string
address string
ttl map[string]chan struct{}
*consul.Config
}
func (csd *consulServiceDiscovery) Initialize(registryURL common.URL) error {
......@@ -143,19 +142,20 @@ func (csd *consulServiceDiscovery) Initialize(registryURL common.URL) error {
csd.tag = registryURL.GetParam(QUERY_TAG, "")
csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",")
aclToken := registryURL.GetParam(ACL_TOKEN, "")
config := &consul.Config{Address: csd.address, Token: aclToken}
client, err := consul.NewClient(config)
csd.Config = &consul.Config{Address: csd.address, Token: aclToken}
client, err := consul.NewClient(csd.Config)
if err != nil {
return perrors.WithMessage(err, "create consul client failed.")
}
csd.consulClient = client
return nil
}
func (csd consulServiceDiscovery) String() string {
func (csd *consulServiceDiscovery) String() string {
return csd.descriptor
}
func (csd consulServiceDiscovery) Destroy() error {
func (csd *consulServiceDiscovery) Destroy() error {
csd.consulClient = nil
for _, t := range csd.ttl {
close(t)
......@@ -171,13 +171,10 @@ func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) e
return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
}
csd.registerTtl(instance)
return nil
return csd.registerTtl(instance)
}
func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error {
func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error {
checkID := buildID(instance)
stopChan := make(chan struct{})
......@@ -206,7 +203,7 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance
return nil
}
func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
ins, _ := csd.buildRegisterInstance(instance)
err := csd.consulClient.Agent().ServiceDeregister(buildID(instance))
if err != nil {
......@@ -215,7 +212,7 @@ func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) erro
return csd.consulClient.Agent().ServiceRegister(ins)
}
func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
err := csd.consulClient.Agent().ServiceDeregister(buildID(instance))
if err != nil {
logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err)
......@@ -229,11 +226,11 @@ func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance)
return nil
}
func (csd consulServiceDiscovery) GetDefaultPageSize() int {
func (csd *consulServiceDiscovery) GetDefaultPageSize() int {
return csd.PageSize
}
func (csd consulServiceDiscovery) GetServices() *gxset.HashSet {
func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet {
var res = gxset.NewSet()
services, _, err := csd.consulClient.Catalog().Services(nil)
......@@ -249,7 +246,7 @@ func (csd consulServiceDiscovery) GetServices() *gxset.HashSet {
}
func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000
instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
WaitTime: time.Duration(waitTime),
......@@ -288,7 +285,7 @@ func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.Se
return res
}
func (csd consulServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
func (csd *consulServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
all := csd.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
for i := offset; i < len(all) && i < offset+pageSize; i++ {
......@@ -297,7 +294,7 @@ func (csd consulServiceDiscovery) GetInstancesByPage(serviceName string, offset
return gxpage.New(offset, pageSize, res, len(all))
}
func (csd consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
func (csd *consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
all := csd.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
// could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
......@@ -316,7 +313,7 @@ func (csd consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string,
return gxpage.New(offset, pageSize, res, len(all))
}
func (csd consulServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
func (csd *consulServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
res := make(map[string]gxpage.Pager, len(serviceNames))
for _, name := range serviceNames {
res[name] = csd.GetInstancesByPage(name, offset, requestedSize)
......@@ -324,7 +321,7 @@ func (csd consulServiceDiscovery) GetRequestInstances(serviceNames []string, off
return res
}
func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
func (csd *consulServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
params := make(map[string]interface{}, 8)
params["type"] = "service"
......@@ -336,10 +333,6 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance
return err
}
hcLogger := hclog.New(&hclog.LoggerOptions{
Name: "watch",
Output: plan.LogOutput,
})
plan.Handler = func(idx uint64, raw interface{}) {
services, ok := raw.([]*consul.ServiceEntry)
if !ok {
......@@ -377,7 +370,7 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance
}
}
go func() {
err = plan.RunWithClientAndHclog(csd.consulClient, hcLogger)
err = plan.RunWithConfig(csd.Config.Address, csd.Config)
if err != nil {
logger.Error("consul plan run failure!error:%v", err)
}
......@@ -385,20 +378,20 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance
return nil
}
func (csd consulServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
func (csd *consulServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
return csd.DispatchEventForInstances(serviceName, csd.GetInstances(serviceName))
}
func (csd consulServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
func (csd *consulServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
return csd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
}
func (csd consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
func (csd *consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
extension.GetGlobalDispatcher().Dispatch(event)
return nil
}
func (csd consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) {
func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) {
metadata := instance.GetMetadata()
if metadata == nil {
metadata = make(map[string]string, 1)
......@@ -418,7 +411,7 @@ func (csd consulServiceDiscovery) buildRegisterInstance(instance registry.Servic
}, nil
}
func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) consul.AgentServiceCheck {
func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) consul.AgentServiceCheck {
deregister, ok := instance.GetMetadata()[DEREGISTER_AFTER]
if !ok || deregister == "" {
......@@ -434,7 +427,5 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance)
func buildID(instance registry.ServiceInstance) string {
id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort())
//Md5Inst := md5.New()
//Md5Inst.Write([]byte(id))
return id
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment