Skip to content
Snippets Groups Projects
Commit 9a26d03f authored by fangyincheng's avatar fangyincheng
Browse files

resolve conflicts

parents ff4f2f74 12680fda
No related branches found
No related tags found
No related merge requests found
Showing with 567 additions and 352 deletions
......@@ -16,6 +16,8 @@ Apache License, Version 2.0
## Release note ##
[v1.5.0 - July 24, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.5.0)
[v1.4.0 - Mar 17, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
[v1.3.0 - Mar 1, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.3.0)
......@@ -49,6 +51,7 @@ Finished List:
- Codec
* JsonRPC V2
* Hessian V2
* [json for grpc](https://github.com/apache/dubbo-go/pull/582)
- Protocol
* Dubbo
......@@ -110,6 +113,17 @@ Finished List:
* [For dubbo](https://github.com/apache/dubbo-go/pull/344)
* [For grpc](https://github.com/apache/dubbo-go/pull/397)
- Metadata Center
* [Nacos](https://github.com/apache/dubbo-go/pull/522)
* [Zookeeper](https://github.com/apache/dubbo-go/pull/633)
* [Etcd](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/metadata/report/etcd/report.go)
* [Consul](https://github.com/apache/dubbo-go/pull/633)
- Service discovery
* [Nacos](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/registry/nacos/service_discovery.go)
* [Zookeeper](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/registry/zookeeper/service_discovery.go)
* [Etcd](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/registry/etcdv3/service_discovery.go)
- Others:
* start check
* connecting certain provider
......@@ -118,11 +132,6 @@ Finished List:
* multi-versions
* service group
Working List:
- Metadata Center (dubbo v2.7.x)
- Service Discovery (dubbo v2.7.x)
You can know more about dubbo-go by its [roadmap](https://github.com/apache/dubbo-go/wiki/Roadmap).
![feature](./doc/pic/arch/dubbo-go-arch.png)
......
......@@ -15,6 +15,8 @@ Apache License, Version 2.0
## 发布日志 ##
[v1.5.0 - 2020年7月24日](https://github.com/apache/dubbo-go/releases/tag/v1.5.0)
[v1.4.0 - 2020年3月17日](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
[v1.3.0 - 2020年3月1日](https://github.com/apache/dubbo-go/releases/tag/v1.3.0)
......@@ -48,6 +50,7 @@ Apache License, Version 2.0
- 序列化协议
* JsonRPC V2
* Hessian V2
* [json for grpc](https://github.com/apache/dubbo-go/pull/582)
- 协议
* Dubbo
......@@ -108,6 +111,16 @@ Apache License, Version 2.0
* [For dubbo](https://github.com/apache/dubbo-go/pull/344)
* [For grpc](https://github.com/apache/dubbo-go/pull/397)
- 元数据中心
* [Nacos](https://github.com/apache/dubbo-go/pull/522)
* [Zookeeper](https://github.com/apache/dubbo-go/pull/633)
* [Etcd](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/metadata/report/etcd/report.go)
* [Consul](https://github.com/apache/dubbo-go/pull/633)
- 服务发现
* [Nacos](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/registry/nacos/service_discovery.go)
* [Zookeeper](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/registry/zookeeper/service_discovery.go)
* [Etcd](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/registry/etcdv3/service_discovery.go)
- 其他功能支持:
* 启动时检查
......@@ -117,11 +130,6 @@ Apache License, Version 2.0
* 多服务版本
* 服务分组
开发中列表:
- 元数据中心 (dubbo v2.7.x)
- 服务发现 (dubbo v2.7.x)
你可以通过访问 [roadmap](https://github.com/apache/dubbo-go/wiki/Roadmap) 知道更多关于 dubbo-go 的信息。
![feature](./doc/pic/arch/dubbo-go-arch.png)
......@@ -172,7 +180,7 @@ go test ./... -coverprofile=coverage.txt -covermode=atomic
## [User List](https://github.com/apache/dubbo-go/issues/2)
若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者对其做改进,请列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓
若你正在使用 [apache/dubbo-go](github.com/apache/dubbo-go) 且认为其有用或者对其做改进,请列贵司信息于 [用户列表](https://github.com/apache/dubbo-go/issues/2),以便我们知晓。
<div>
<table>
......
......@@ -100,7 +100,6 @@ func InitLog(logConfFile string) error {
func InitLogger(conf *zap.Config) {
var zapLoggerConfig zap.Config
if conf == nil {
zapLoggerConfig = zap.NewDevelopmentConfig()
zapLoggerEncoderConfig := zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "level",
......@@ -113,12 +112,18 @@ func InitLogger(conf *zap.Config) {
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}
zapLoggerConfig.EncoderConfig = zapLoggerEncoderConfig
zapLoggerConfig = zap.Config{
Level: zap.NewAtomicLevelAt(zap.DebugLevel),
Development: false,
Encoding: "console",
EncoderConfig: zapLoggerEncoderConfig,
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
}
} else {
zapLoggerConfig = *conf
}
zapLogger, _ := zapLoggerConfig.Build(zap.AddCallerSkip(1))
//logger = zapLogger.Sugar()
logger = &DubboLogger{Logger: zapLogger.Sugar(), dynamicLevel: zapLoggerConfig.Level}
// set getty log
......
module github.com/apache/dubbo-go
require (
cloud.google.com/go v0.39.0 // indirect
github.com/Microsoft/go-winio v0.4.13 // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/Workiva/go-datastructures v1.0.50
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/apache/dubbo-go-hessian2 v1.6.1
......@@ -8,52 +11,58 @@ require (
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/creasty/defaults v1.3.0
github.com/docker/go-connections v0.4.0 // indirect
github.com/dubbogo/getty v1.3.7
github.com/dubbogo/go-zookeeper v1.0.1
github.com/dubbogo/gost v1.9.0
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.0.0
github.com/frankban/quicktest v1.4.1 // indirect
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 // indirect
github.com/go-co-op/gocron v0.1.1
github.com/go-resty/resty/v2 v2.1.0
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/google/btree v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/google/go-cmp v0.3.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/consul v1.5.3
github.com/hashicorp/consul/api v1.1.0
github.com/hashicorp/vault v0.10.3
github.com/hashicorp/consul v1.8.0
github.com/hashicorp/consul/api v1.5.0
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/vault/api v1.0.5-0.20191108163347-bdd38fca2cff // indirect
github.com/hashicorp/vault/sdk v0.1.14-0.20191112033314-390e96e22eb2
github.com/jinzhu/copier v0.0.0-20190625015134-976e0346caa8
github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 // indirect
github.com/juju/testing v0.0.0-20191001232224-ce9dec17d28b // indirect
github.com/magiconair/properties v1.8.1
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/mitchellh/mapstructure v1.1.2
github.com/mitchellh/hashstructure v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.2.3
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go v0.3.3-0.20200617023039-50c7537d6a5f
github.com/opentracing/opentracing-go v1.1.0
github.com/pierrec/lz4 v2.2.6+incompatible // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.1.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/shirou/gopsutil v2.19.9+incompatible // indirect
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.5.1
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8
go.etcd.io/bbolt v1.3.4 // indirect
go.uber.org/atomic v1.6.0
go.uber.org/zap v1.15.0
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
k8s.io/api v0.0.0-20190325185214-7544f9db76f6
k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841
k8s.io/client-go v8.0.0+incompatible
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 // indirect
google.golang.org/grpc v1.23.0
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.16.9
k8s.io/apimachinery v0.16.9
k8s.io/client-go v0.16.9
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a // indirect
)
......
This diff is collapsed.
......@@ -143,7 +143,7 @@ func (suite *consulMetadataReportTestSuite) testGetServiceDefinition() {
func test1(t *testing.T) {
consulAgent := consul.NewConsulAgent(t, 8500)
defer consulAgent.Close()
defer consulAgent.Shutdown()
url := newProviderRegistryUrl("localhost", 8500)
mf := extension.GetMetadataReportFactory("consul")
......
......@@ -48,8 +48,8 @@ var filterSlice []restful.FilterFunction
// GoRestfulServer a rest server implement by go-restful
type GoRestfulServer struct {
srv *http.Server
container *restful.Container
srv *http.Server
ws *restful.WebService
}
// NewGoRestfulServer a constructor of GoRestfulServer
......@@ -60,13 +60,17 @@ func NewGoRestfulServer() server.RestServer {
// Start go-restful server
// It will add all go-restful filters
func (grs *GoRestfulServer) Start(url common.URL) {
grs.container = restful.NewContainer()
container := restful.NewContainer()
for _, filter := range filterSlice {
grs.container.Filter(filter)
container.Filter(filter)
}
grs.srv = &http.Server{
Handler: grs.container,
Handler: container,
}
grs.ws = &restful.WebService{}
grs.ws.Path("/")
grs.ws.SetDynamicRoutes(true)
container.Add(grs.ws)
ln, err := net.Listen("tcp", url.Location)
if err != nil {
panic(perrors.New(fmt.Sprintf("Restful Server start error:%v", err)))
......@@ -83,23 +87,21 @@ func (grs *GoRestfulServer) Start(url common.URL) {
// Publish a http api in go-restful server
// The routeFunc should be invoked when the server receive a request
func (grs *GoRestfulServer) Deploy(restMethodConfig *config.RestMethodConfig, routeFunc func(request server.RestServerRequest, response server.RestServerResponse)) {
ws := &restful.WebService{}
rf := func(req *restful.Request, resp *restful.Response) {
routeFunc(NewGoRestfulRequestAdapter(req), resp)
}
ws.Path(restMethodConfig.Path).
grs.ws.Route(grs.ws.Method(restMethodConfig.MethodType).
Produces(strings.Split(restMethodConfig.Produces, ",")...).
Consumes(strings.Split(restMethodConfig.Consumes, ",")...).
Route(ws.Method(restMethodConfig.MethodType).To(rf))
grs.container.Add(ws)
Path(restMethodConfig.Path).To(rf))
}
// Delete a http api in go-restful server
func (grs *GoRestfulServer) UnDeploy(restMethodConfig *config.RestMethodConfig) {
ws := new(restful.WebService)
ws.Path(restMethodConfig.Path)
err := grs.container.Remove(ws)
err := grs.ws.RemoveRoute(restMethodConfig.Path, restMethodConfig.MethodType)
if err != nil {
logger.Warnf("[Go restful] Remove web service error:%v", err)
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server_impl
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol/rest/config"
"github.com/apache/dubbo-go/protocol/rest/server"
)
func TestGoRestfulServerDeploySameUrl(t *testing.T) {
grs := NewGoRestfulServer()
url, err := common.NewURL("http://127.0.0.1:43121")
assert.NoError(t, err)
grs.Start(url)
rmc := &config.RestMethodConfig{
Produces: "*/*",
Consumes: "*/*",
MethodType: "POST",
Path: "/test",
}
f := func(request server.RestServerRequest, response server.RestServerResponse) {}
grs.Deploy(rmc, f)
rmc1 := &config.RestMethodConfig{
Produces: "*/*",
Consumes: "*/*",
MethodType: "GET",
Path: "/test",
}
grs.Deploy(rmc1, f)
grs.UnDeploy(rmc)
grs.UnDeploy(rmc1)
grs.Destroy()
}
......@@ -197,7 +197,7 @@ func (l *consulListener) Next() (*registry.ServiceEvent, error) {
}
}
// Close closes this listener
// Close the listener.
func (l *consulListener) Close() {
close(l.done)
l.plan.Stop()
......
......@@ -148,7 +148,7 @@ func (suite *consulRegistryTestSuite) close() {
// register -> subscribe -> unregister
func test1(t *testing.T) {
consulAgent := consul.NewConsulAgent(t, registryPort)
defer consulAgent.Close()
defer consulAgent.Shutdown()
server := newServer(providerHost, providerPort)
defer server.close()
......@@ -165,10 +165,10 @@ func test1(t *testing.T) {
suite.testListener(remoting.EventTypeDel)
}
// subscribe -> register
// subscribe -> register -> unregister
func test2(t *testing.T) {
consulAgent := consul.NewConsulAgent(t, registryPort)
defer consulAgent.Close()
defer consulAgent.Shutdown()
server := newServer(providerHost, providerPort)
defer server.close()
......@@ -181,6 +181,8 @@ func test2(t *testing.T) {
suite.testNewProviderRegistry()
suite.testRegister()
suite.testListener(remoting.EventTypeAdd)
suite.testUnregister()
suite.testListener(remoting.EventTypeDel)
}
func TestConsulRegistry(t *testing.T) {
......
......@@ -26,7 +26,7 @@ import (
import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
"github.com/hashicorp/vault/helper/jsonutil"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
perrors "github.com/pkg/errors"
)
......
......@@ -103,25 +103,26 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
}
// clean data
serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, testName)
serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, testName)
assert.Nil(t, err)
// clean data for local test
serviceDiscovry.Unregister(&registry.DefaultServiceInstance{
err = serviceDiscovery.Unregister(&registry.DefaultServiceInstance{
Id: id,
ServiceName: serviceName,
Host: host,
Port: port,
})
assert.Nil(t, err)
err := serviceDiscovry.Register(instance)
err = serviceDiscovery.Register(instance)
assert.Nil(t, err)
//sometimes nacos may be failed to push update of instance,
//so it need 10s to pull, we sleep 10 second to make sure instance has been update
time.Sleep(11 * time.Second)
page := serviceDiscovry.GetHealthyInstancesByPage(serviceName, 0, 10, true)
page := serviceDiscovery.GetHealthyInstancesByPage(serviceName, 0, 10, true)
assert.NotNil(t, page)
assert.Equal(t, 0, page.GetOffset())
assert.Equal(t, 10, page.GetPageSize())
assert.Equal(t, 1, page.GetDataSize())
......@@ -135,12 +136,15 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
assert.Equal(t, 0, len(instance.GetMetadata()))
instance.Metadata["a"] = "b"
err = serviceDiscovry.Update(instance)
err = serviceDiscovery.Update(instance)
assert.Nil(t, err)
pageMap := serviceDiscovry.GetRequestInstances([]string{serviceName}, 0, 1)
//sometimes nacos may be failed to push update of instance,
//so it need 10s to pull, we sleep 10 second to make sure instance has been update
time.Sleep(11 * time.Second)
pageMap := serviceDiscovery.GetRequestInstances([]string{serviceName}, 0, 1)
assert.Equal(t, 1, len(pageMap))
page = pageMap[serviceName]
assert.NotNil(t, page)
assert.Equal(t, 1, len(page.GetData()))
......@@ -150,11 +154,11 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
assert.Equal(t, "b", v)
// test dispatcher event
err = serviceDiscovry.DispatchEventByServiceName(serviceName)
err = serviceDiscovery.DispatchEventByServiceName(serviceName)
assert.Nil(t, err)
// test AddListener
err = serviceDiscovry.AddListener(&registry.ServiceInstancesChangedListener{})
err = serviceDiscovery.AddListener(&registry.ServiceInstancesChangedListener{})
assert.Nil(t, err)
}
......
......@@ -18,8 +18,6 @@
package consul
import (
"io/ioutil"
"os"
"strconv"
"testing"
)
......@@ -30,35 +28,11 @@ import (
// Consul agent, used for test, simulates
// an embedded consul server.
type ConsulAgent struct {
dataDir string
testAgent *agent.TestAgent
}
func NewConsulAgent(t *testing.T, port int) *ConsulAgent {
dataDir, _ := ioutil.TempDir("./", "agent")
func NewConsulAgent(t *testing.T, port int) *agent.TestAgent {
hcl := `
ports {
http = ` + strconv.Itoa(port) + `
}
data_dir = "` + dataDir + `"
`
testAgent := &agent.TestAgent{Name: t.Name(), DataDir: dataDir, HCL: hcl}
testAgent.Start(t)
consulAgent := &ConsulAgent{
dataDir: dataDir,
testAgent: testAgent,
}
return consulAgent
}
func (consulAgent *ConsulAgent) Close() error {
var err error
err = consulAgent.testAgent.Shutdown()
if err != nil {
return err
}
return os.RemoveAll(consulAgent.dataDir)
return agent.NewTestAgent(t, hcl)
}
......@@ -27,6 +27,6 @@ import (
func TestNewConsulAgent(t *testing.T) {
consulAgent := NewConsulAgent(t, 8500)
err := consulAgent.Close()
err := consulAgent.Shutdown()
assert.NoError(t, err)
}
......@@ -35,7 +35,7 @@ import (
)
const (
// ConnDelay connection dalay
// ConnDelay connection delay
ConnDelay = 3
// MaxFailTimes max failure times
MaxFailTimes = 15
......@@ -93,7 +93,6 @@ func WithHeartbeat(heartbeat int) Option {
// ValidateClient validates client and sets options
func ValidateClient(container clientFacade, opts ...Option) error {
options := &Options{
heartbeat: 1, // default heartbeat
}
......@@ -118,7 +117,6 @@ func ValidateClient(container clientFacade, opts ...Option) error {
// Client lose connection with etcd server
if container.Client().rawClient == nil {
newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat)
if err != nil {
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
......@@ -136,7 +134,6 @@ func NewServiceDiscoveryClient(opts ...Option) *Client {
options := &Options{
heartbeat: 1, // default heartbeat
}
for _, opt := range opts {
opt(options)
}
......@@ -145,9 +142,7 @@ func NewServiceDiscoveryClient(opts ...Option) *Client {
if err != nil {
logger.Errorf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
return nil
}
return newClient
}
......@@ -171,7 +166,6 @@ type Client struct {
// nolint
func NewClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background())
rawClient, err := clientv3.New(clientv3.Config{
Context: ctx,
......@@ -184,7 +178,6 @@ func NewClient(name string, endpoints []string, timeout time.Duration, heartbeat
}
c := &Client{
name: name,
timeout: timeout,
endpoints: endpoints,
......@@ -205,7 +198,6 @@ func NewClient(name string, endpoints []string, timeout time.Duration, heartbeat
// NOTICE: need to get the lock before calling this method
func (c *Client) clean() {
// close raw client
c.rawClient.Close()
......@@ -217,7 +209,6 @@ func (c *Client) clean() {
}
func (c *Client) stop() bool {
select {
case <-c.exit:
return true
......@@ -229,7 +220,6 @@ func (c *Client) stop() bool {
// nolint
func (c *Client) Close() {
if c == nil {
return
}
......@@ -241,15 +231,14 @@ func (c *Client) Close() {
c.Wait.Wait()
c.lock.Lock()
defer c.lock.Unlock()
if c.rawClient != nil {
c.clean()
}
c.lock.Unlock()
logger.Warnf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints)
}
func (c *Client) maintenanceStatus() error {
s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat))
if err != nil {
return perrors.WithMessage(err, "new session with server")
......@@ -262,7 +251,6 @@ func (c *Client) maintenanceStatus() error {
}
func (c *Client) maintenanceStatusLoop(s *concurrency.Session) {
defer func() {
c.Wait.Done()
logger.Infof("etcd client {endpoints:%v, name:%s} maintenance goroutine game over.", c.endpoints, c.name)
......@@ -288,7 +276,6 @@ func (c *Client) maintenanceStatusLoop(s *concurrency.Session) {
// if k not exist will put k/v in etcd, otherwise return nil
func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
c.lock.RLock()
defer c.lock.RUnlock()
......@@ -300,17 +287,12 @@ func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
If(clientv3.Compare(clientv3.Version(k), "<", 1)).
Then(clientv3.OpPut(k, v, opts...)).
Commit()
if err != nil {
return err
}
return nil
return err
}
// if k not exist will put k/v in etcd
// if k is already exist in etcd, replace it
func (c *Client) update(k string, v string, opts ...clientv3.OpOption) error {
c.lock.RLock()
defer c.lock.RUnlock()
......@@ -322,15 +304,10 @@ func (c *Client) update(k string, v string, opts ...clientv3.OpOption) error {
If(clientv3.Compare(clientv3.Version(k), "!=", -1)).
Then(clientv3.OpPut(k, v, opts...)).
Commit()
if err != nil {
return err
}
return nil
return err
}
func (c *Client) delete(k string) error {
c.lock.RLock()
defer c.lock.RUnlock()
......@@ -339,15 +316,10 @@ func (c *Client) delete(k string) error {
}
_, err := c.rawClient.Delete(c.ctx, k)
if err != nil {
return err
}
return nil
return err
}
func (c *Client) get(k string) (string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
......@@ -369,7 +341,6 @@ func (c *Client) get(k string) (string, error) {
// nolint
func (c *Client) CleanKV() error {
c.lock.RLock()
defer c.lock.RUnlock()
......@@ -378,15 +349,10 @@ func (c *Client) CleanKV() error {
}
_, err := c.rawClient.Delete(c.ctx, "", clientv3.WithPrefix())
if err != nil {
return err
}
return nil
return err
}
func (c *Client) getChildren(k string) ([]string, []string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
......@@ -403,21 +369,16 @@ func (c *Client) getChildren(k string) ([]string, []string, error) {
return nil, nil, ErrKVPairNotFound
}
var (
kList []string
vList []string
)
kList := make([]string, 0, len(resp.Kvs))
vList := make([]string, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
kList = append(kList, string(kv.Key))
vList = append(vList, string(kv.Value))
}
return kList, vList, nil
}
func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) {
c.lock.RLock()
defer c.lock.RUnlock()
......@@ -429,7 +390,6 @@ func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) {
}
func (c *Client) watch(k string) (clientv3.WatchChan, error) {
c.lock.RLock()
defer c.lock.RUnlock()
......@@ -441,7 +401,6 @@ func (c *Client) watch(k string) (clientv3.WatchChan, error) {
}
func (c *Client) keepAliveKV(k string, v string) error {
c.lock.RLock()
defer c.lock.RUnlock()
......@@ -457,14 +416,15 @@ func (c *Client) keepAliveKV(k string, v string) error {
keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID)
if err != nil || keepAlive == nil {
c.rawClient.Revoke(c.ctx, lease.ID)
return perrors.WithMessage(err, "keep alive lease")
if err != nil {
return perrors.WithMessage(err, "keep alive lease")
} else {
return perrors.New("keep alive lease")
}
}
_, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID))
if err != nil {
return perrors.WithMessage(err, "put k/v with lease")
}
return nil
return perrors.WithMessage(err, "put k/v with lease")
}
// nolint
......@@ -481,92 +441,54 @@ func (c *Client) Valid() bool {
}
c.lock.RLock()
if c.rawClient == nil {
c.lock.RUnlock()
return false
}
c.lock.RUnlock()
return true
defer c.lock.RUnlock()
return c.rawClient != nil
}
// nolint
func (c *Client) Create(k string, v string) error {
err := c.put(k, v)
if err != nil {
return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v)
}
return nil
return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v)
}
// Update key value ...
func (c *Client) Update(k, v string) error {
err := c.update(k, v)
if err != nil {
return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v)
}
return nil
return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v)
}
// nolint
func (c *Client) Delete(k string) error {
err := c.delete(k)
if err != nil {
return perrors.WithMessagef(err, "delete k/v (key %s)", k)
}
return nil
return perrors.WithMessagef(err, "delete k/v (key %s)", k)
}
// RegisterTemp registers a temporary node
func (c *Client) RegisterTemp(k, v string) error {
err := c.keepAliveKV(k, v)
if err != nil {
return perrors.WithMessagef(err, "keepalive kv (key %s)", k)
}
return nil
return perrors.WithMessagef(err, "keepalive kv (key %s)", k)
}
// GetChildrenKVList gets children kv list by @k
func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
kList, vList, err := c.getChildren(k)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "get key children (key %s)", k)
}
return kList, vList, nil
return kList, vList, perrors.WithMessagef(err, "get key children (key %s)", k)
}
// Get gets value by @k
func (c *Client) Get(k string) (string, error) {
v, err := c.get(k)
if err != nil {
return "", perrors.WithMessagef(err, "get key value (key %s)", k)
}
return v, nil
return v, perrors.WithMessagef(err, "get key value (key %s)", k)
}
// Watch watches on spec key
func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
wc, err := c.watch(k)
if err != nil {
return nil, perrors.WithMessagef(err, "watch prefix (key %s)", k)
}
return wc, nil
return wc, perrors.WithMessagef(err, "watch prefix (key %s)", k)
}
// WatchWithPrefix watches on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
wc, err := c.watchWithPrefix(prefix)
if err != nil {
return nil, perrors.WithMessagef(err, "watch prefix (key %s)", prefix)
}
return wc, nil
return wc, perrors.WithMessagef(err, "watch prefix (key %s)", prefix)
}
......@@ -363,14 +363,9 @@ func (z *ZookeeperClient) ZkConnValid() bool {
default:
}
valid := true
z.RLock()
if z.Conn == nil {
valid = false
}
z.RUnlock()
return valid
defer z.RUnlock()
return z.Conn != nil
}
// nolint
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment