Skip to content
Snippets Groups Projects
Unverified Commit f48a899e authored by vito.he's avatar vito.he Committed by GitHub
Browse files

Merge pull request #148 from sxllwx/develop

Ftr: etcdv3 for registry 
parents 96bf3ea3 48e3abf6
No related branches found
No related tags found
No related merge requests found
......@@ -5,24 +5,44 @@ require (
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dubbogo/getty v1.2.2
github.com/dubbogo/gost v1.1.1
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/go-errors/errors v1.0.1 // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1
github.com/google/btree v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/magiconair/properties v1.8.1
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/testify v1.3.0
github.com/tebeka/strftime v0.1.3 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.3 // indirect
go.etcd.io/etcd v3.3.13+incompatible
go.uber.org/atomic v1.4.0
go.uber.org/zap v1.10.0
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
)
This diff is collapsed.
package etcdv3
import (
"context"
"strings"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)
type dataListener struct {
interestedURL []*common.URL
listener remoting.ConfigurationListener
}
func NewRegistryDataListener(listener remoting.ConfigurationListener) *dataListener {
return &dataListener{listener: listener, interestedURL: []*common.URL{}}
}
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
func (l *dataListener) DataChange(eventType remoting.Event) bool {
url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):]
serviceURL, err := common.NewURL(context.Background(), url)
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
return false
}
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(&remoting.ConfigChangeEvent{Key: eventType.Path, Value: serviceURL, ConfigType: eventType.Action})
return true
}
}
return false
}
type configurationListener struct {
registry *etcdV3Registry
events chan *remoting.ConfigChangeEvent
}
func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
// add a new waiter
reg.wg.Add(1)
return &configurationListener{registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)}
}
func (l *configurationListener) Process(configType *remoting.ConfigChangeEvent) {
l.events <- configType
}
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.registry.done:
logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.")
return nil, perrors.New("listener stopped")
case e := <-l.events:
logger.Infof("got etcd event %#v", e)
if e.ConfigType == remoting.EventTypeDel {
select {
case <-l.registry.done:
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
default:
}
continue
}
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
}
func (l *configurationListener) Close() {
l.registry.wg.Done()
}
package etcdv3
import (
"context"
"testing"
"time"
)
import (
"github.com/dubbogo/getty"
"github.com/stretchr/testify/suite"
"go.etcd.io/etcd/embed"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/remoting"
)
type RegistryTestSuite struct {
suite.Suite
etcd *embed.Etcd
}
// start etcd server
func (suite *RegistryTestSuite) SetupSuite() {
t := suite.T()
cfg := embed.NewConfig()
cfg.Dir = "/tmp/default.etcd"
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
select {
case <-e.Server.ReadyNotify():
t.Log("Server is ready!")
case <-getty.GetTimeWheel().After(60 * time.Second):
e.Server.Stop() // trigger a shutdown
t.Logf("Server took too long to start!")
}
suite.etcd = e
return
}
// stop etcd server
func (suite *RegistryTestSuite) TearDownSuite() {
suite.etcd.Close()
}
func (suite *RegistryTestSuite) TestDataChange() {
t := suite.T()
listener := NewRegistryDataListener(&MockDataListener{})
url, _ := common.NewURL(context.Background(), "jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
listener.AddInterestedURL(&url)
if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) {
t.Fatal("data change not ok")
}
}
func TestRegistrySuite(t *testing.T) {
suite.Run(t, &RegistryTestSuite{})
}
type MockDataListener struct{}
func (*MockDataListener) Process(configType *remoting.ConfigChangeEvent) {}
package etcdv3
import (
"fmt"
"net/url"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting/etcdv3"
"github.com/apache/dubbo-go/version"
)
var (
processID = ""
localIP = ""
)
const Name = "etcdv3"
func init() {
processID = fmt.Sprintf("%d", os.Getpid())
localIP, _ = utils.GetLocalIP()
extension.SetRegistry(Name, newETCDV3Registry)
}
type etcdV3Registry struct {
*common.URL
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
cltLock sync.Mutex
client *etcdv3.Client
services map[string]common.URL // service name + protocol -> service config
listenerLock sync.Mutex
listener *etcdv3.EventListener
dataListener *dataListener
configListener *configurationListener
wg sync.WaitGroup // wg+done for etcd client restart
done chan struct{}
}
func (r *etcdV3Registry) Client() *etcdv3.Client {
return r.client
}
func (r *etcdV3Registry) SetClient(client *etcdv3.Client) {
r.client = client
}
func (r *etcdV3Registry) ClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *etcdV3Registry) GetDone() chan struct{} {
return r.done
}
func (r *etcdV3Registry) RestartCallBack() bool {
services := []common.URL{}
for _, confIf := range r.services {
services = append(services, confIf)
}
flag := true
for _, confIf := range services {
err := r.Register(confIf)
if err != nil {
logger.Errorf("(etcdV3ProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
flag = false
break
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
return flag
}
func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return nil, perrors.WithMessagef(err, "new etcd registry(address:%+v)", url.Location)
}
logger.Infof("etcd address is: %v, timeout is: %s", url.Location, timeout.String())
r := &etcdV3Registry{
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
services: make(map[string]common.URL),
}
if err := etcdv3.ValidateClient(
r,
etcdv3.WithName(etcdv3.RegistryETCDV3Client),
etcdv3.WithTimeout(timeout),
etcdv3.WithEndpoints(url.Location),
); err != nil {
return nil, err
}
r.wg.Add(1)
go etcdv3.HandleClientRestart(r)
r.listener = etcdv3.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener)
return r, nil
}
func (r *etcdV3Registry) GetUrl() common.URL {
return *r.URL
}
func (r *etcdV3Registry) IsAvailable() bool {
select {
case <-r.done:
return false
default:
return true
}
}
func (r *etcdV3Registry) Destroy() {
if r.configListener != nil {
r.configListener.Close()
}
r.stop()
}
func (r *etcdV3Registry) stop() {
close(r.done)
// close current client
r.client.Close()
r.cltLock.Lock()
r.client = nil
r.services = nil
r.cltLock.Unlock()
}
func (r *etcdV3Registry) Register(svc common.URL) error {
role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if err != nil {
return perrors.WithMessage(err, "get registry role")
}
r.cltLock.Lock()
if _, ok := r.services[svc.Key()]; ok {
r.cltLock.Unlock()
return perrors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path))
}
r.cltLock.Unlock()
switch role {
case common.PROVIDER:
logger.Debugf("(provider register )Register(conf{%#v})", svc)
if err := r.registerProvider(svc); err != nil {
return perrors.WithMessage(err, "register provider")
}
case common.CONSUMER:
logger.Debugf("(consumer register )Register(conf{%#v})", svc)
if err := r.registerConsumer(svc); err != nil {
return perrors.WithMessage(err, "register consumer")
}
default:
return perrors.New(fmt.Sprintf("unknown role %d", role))
}
r.cltLock.Lock()
r.services[svc.Key()] = svc
r.cltLock.Unlock()
return nil
}
func (r *etcdV3Registry) createDirIfNotExist(k string) error {
var tmpPath string
for _, str := range strings.Split(k, "/")[1:] {
tmpPath = path.Join(tmpPath, "/", str)
if err := r.client.Create(tmpPath, ""); err != nil {
return perrors.WithMessagef(err, "create path %s in etcd", tmpPath)
}
}
return nil
}
func (r *etcdV3Registry) registerConsumer(svc common.URL) error {
consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER])
if err := r.createDirIfNotExist(consumersNode); err != nil {
logger.Errorf("etcd client create path %s: %v", consumersNode, err)
return perrors.WithMessage(err, "etcd create consumer nodes")
}
providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
if err := r.createDirIfNotExist(providersNode); err != nil {
return perrors.WithMessage(err, "create provider node")
}
params := url.Values{}
params.Add("protocol", svc.Protocol)
params.Add("category", (common.RoleType(common.CONSUMER)).String())
params.Add("dubbo", "dubbogo-consumer-"+version.Version)
encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode()))
dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String())
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
}
return nil
}
func (r *etcdV3Registry) registerProvider(svc common.URL) error {
if len(svc.Path) == 0 || len(svc.Methods) == 0 {
return perrors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods))
}
var (
urlPath string
encodedURL string
dubboPath string
)
providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
if err := r.createDirIfNotExist(providersNode); err != nil {
return perrors.WithMessage(err, "create provider node")
}
params := url.Values{}
for k, v := range svc.Params {
params[k] = v
}
params.Add("pid", processID)
params.Add("ip", localIP)
params.Add("anyhost", "true")
params.Add("category", (common.RoleType(common.PROVIDER)).String())
params.Add("dubbo", "dubbo-provider-golang-"+version.Version)
params.Add("side", (common.RoleType(common.PROVIDER)).Role())
if len(svc.Methods) == 0 {
params.Add("methods", strings.Join(svc.Methods, ","))
}
logger.Debugf("provider url params:%#v", params)
var host string
if len(svc.Ip) == 0 {
host = localIP + ":" + svc.Port
} else {
host = svc.Ip + ":" + svc.Port
}
urlPath = svc.Path
encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode()))
dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String())
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
}
return nil
}
func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) {
var (
configListener *configurationListener
)
r.listenerLock.Lock()
configListener = r.configListener
r.listenerLock.Unlock()
if r.listener == nil {
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("etcd client broken")
}
// new client & listener
listener := etcdv3.NewEventListener(r.client)
r.listenerLock.Lock()
r.listener = listener
r.listenerLock.Unlock()
}
//register the svc to dataListener
r.dataListener.AddInterestedURL(&svc)
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", svc.Service()), r.dataListener)
return configListener, nil
}
package etcdv3
import (
"context"
"strconv"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
func initRegistry(t *testing.T) *etcdV3Registry {
regurl, err := common.NewURL(context.Background(), "registry://127.0.0.1:2379", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
if err != nil {
t.Fatal(err)
}
reg, err := newETCDV3Registry(&regurl)
if err != nil {
t.Fatal(err)
}
out := reg.(*etcdV3Registry)
out.client.CleanKV()
return out
}
func (suite *RegistryTestSuite) TestRegister() {
t := suite.T()
url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg := initRegistry(t)
err := reg.Register(url)
children, _, err := reg.client.GetChildrenKVList("/dubbo/com.ikurento.user.UserProvider/providers")
if err != nil {
t.Fatal(err)
}
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children)
assert.NoError(t, err)
}
func (suite *RegistryTestSuite) TestSubscribe() {
t := suite.T()
regurl, _ := common.NewURL(context.Background(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg := initRegistry(t)
//provider register
err := reg.Register(url)
if err != nil {
t.Fatal(err)
}
//consumer register
regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
reg2 := initRegistry(t)
reg2.Register(url)
listener, err := reg2.Subscribe(url)
if err != nil {
t.Fatal(err)
}
serviceEvent, err := listener.Next()
if err != nil {
t.Fatal(err)
}
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
}
func (suite *RegistryTestSuite) TestConsumerDestory() {
t := suite.T()
url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg := initRegistry(t)
_, err := reg.Subscribe(url)
if err != nil {
t.Fatal(err)
}
//listener.Close()
time.Sleep(1e9)
reg.Destroy()
assert.Equal(t, false, reg.IsAvailable())
}
func (suite *RegistryTestSuite) TestProviderDestory() {
t := suite.T()
reg := initRegistry(t)
url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg.Register(url)
//listener.Close()
time.Sleep(1e9)
reg.Destroy()
assert.Equal(t, false, reg.IsAvailable())
}
package etcdv3
import (
"context"
"path"
"sync"
"time"
)
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
perrors "github.com/pkg/errors"
"google.golang.org/grpc"
)
import (
"github.com/apache/dubbo-go/common/logger"
)
const (
ConnDelay = 3
MaxFailTimes = 15
RegistryETCDV3Client = "etcd registry"
)
var (
ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR
ErrKVPairNotFound = perrors.New("k/v pair not found")
)
type Options struct {
name string
endpoints []string
client *Client
timeout time.Duration
heartbeat int // heartbeat second
}
type Option func(*Options)
func WithEndpoints(endpoints ...string) Option {
return func(opt *Options) {
opt.endpoints = endpoints
}
}
func WithName(name string) Option {
return func(opt *Options) {
opt.name = name
}
}
func WithTimeout(timeout time.Duration) Option {
return func(opt *Options) {
opt.timeout = timeout
}
}
func WithHeartbeat(heartbeat int) Option {
return func(opt *Options) {
opt.heartbeat = heartbeat
}
}
func ValidateClient(container clientFacade, opts ...Option) error {
options := &Options{
heartbeat: 1, // default heartbeat
}
for _, opt := range opts {
opt(options)
}
lock := container.ClientLock()
lock.Lock()
defer lock.Unlock()
// new Client
if container.Client() == nil {
newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat)
if err != nil {
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
}
container.SetClient(newClient)
}
// Client lose connection with etcd server
if container.Client().rawClient == nil {
newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat)
if err != nil {
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
}
container.SetClient(newClient)
}
return nil
}
type Client struct {
lock sync.RWMutex
// these properties are only set once when they are started.
name string
endpoints []string
timeout time.Duration
heartbeat int
ctx context.Context // if etcd server connection lose, the ctx.Done will be sent msg
cancel context.CancelFunc // cancel the ctx, all watcher will stopped
rawClient *clientv3.Client
exit chan struct{}
Wait sync.WaitGroup
}
func newClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background())
rawClient, err := clientv3.New(clientv3.Config{
Context: ctx,
Endpoints: endpoints,
DialTimeout: timeout,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
return nil, perrors.WithMessage(err, "new raw client block connect to server")
}
c := &Client{
name: name,
timeout: timeout,
endpoints: endpoints,
heartbeat: heartbeat,
ctx: ctx,
cancel: cancel,
rawClient: rawClient,
exit: make(chan struct{}),
}
if err := c.maintenanceStatus(); err != nil {
return nil, perrors.WithMessage(err, "client maintenance status")
}
return c, nil
}
// NOTICE: need to get the lock before calling this method
func (c *Client) clean() {
// close raw client
c.rawClient.Close()
// cancel ctx for raw client
c.cancel()
// clean raw client
c.rawClient = nil
}
func (c *Client) stop() bool {
select {
case <-c.exit:
return true
default:
close(c.exit)
}
return false
}
func (c *Client) Close() {
if c == nil {
return
}
// stop the client
c.stop()
// wait client maintenance status stop
c.Wait.Wait()
c.lock.Lock()
if c.rawClient != nil {
c.clean()
}
c.lock.Unlock()
logger.Warnf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints)
}
func (c *Client) maintenanceStatus() error {
s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat))
if err != nil {
return perrors.WithMessage(err, "new session with server")
}
// must add wg before go maintenance status goroutine
c.Wait.Add(1)
go c.maintenanceStatusLoop(s)
return nil
}
func (c *Client) maintenanceStatusLoop(s *concurrency.Session) {
defer func() {
c.Wait.Done()
logger.Infof("etcd client {endpoints:%v, name:%s} maintenance goroutine game over.", c.endpoints, c.name)
}()
for {
select {
case <-c.Done():
// Client be stopped, will clean the client hold resources
return
case <-s.Done():
logger.Warn("etcd server stopped")
c.lock.Lock()
// when etcd server stopped, cancel ctx, stop all watchers
c.clean()
// when connection lose, stop client, trigger reconnect to etcd
c.stop()
c.lock.Unlock()
return
}
}
}
// if k not exist will put k/v in etcd
// if k is already exist in etcd, return nil
func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3Client
}
_, err := c.rawClient.Txn(c.ctx).
If(clientv3.Compare(clientv3.Version(k), "<", 1)).
Then(clientv3.OpPut(k, v, opts...)).
Commit()
if err != nil {
return err
}
return nil
}
func (c *Client) delete(k string) error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3Client
}
_, err := c.rawClient.Delete(c.ctx, k)
if err != nil {
return err
}
return nil
}
func (c *Client) get(k string) (string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return "", ErrNilETCDV3Client
}
resp, err := c.rawClient.Get(c.ctx, k)
if err != nil {
return "", err
}
if len(resp.Kvs) == 0 {
return "", ErrKVPairNotFound
}
return string(resp.Kvs[0].Value), nil
}
func (c *Client) CleanKV() error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3Client
}
_, err := c.rawClient.Delete(c.ctx, "", clientv3.WithPrefix())
if err != nil {
return err
}
return nil
}
func (c *Client) getChildren(k string) ([]string, []string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return nil, nil, ErrNilETCDV3Client
}
resp, err := c.rawClient.Get(c.ctx, k, clientv3.WithPrefix())
if err != nil {
return nil, nil, err
}
if len(resp.Kvs) == 0 {
return nil, nil, ErrKVPairNotFound
}
var (
kList []string
vList []string
)
for _, kv := range resp.Kvs {
kList = append(kList, string(kv.Key))
vList = append(vList, string(kv.Value))
}
return kList, vList, nil
}
func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return nil, ErrNilETCDV3Client
}
return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil
}
func (c *Client) watch(k string) (clientv3.WatchChan, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return nil, ErrNilETCDV3Client
}
return c.rawClient.Watch(c.ctx, k), nil
}
func (c *Client) keepAliveKV(k string, v string) error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3Client
}
lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds()))
if err != nil {
return perrors.WithMessage(err, "grant lease")
}
keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID)
if err != nil || keepAlive == nil {
c.rawClient.Revoke(c.ctx, lease.ID)
return perrors.WithMessage(err, "keep alive lease")
}
_, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID))
if err != nil {
return perrors.WithMessage(err, "put k/v with lease")
}
return nil
}
func (c *Client) Done() <-chan struct{} {
return c.exit
}
func (c *Client) Valid() bool {
select {
case <-c.exit:
return false
default:
}
c.lock.RLock()
if c.rawClient == nil {
c.lock.RUnlock()
return false
}
c.lock.RUnlock()
return true
}
func (c *Client) Create(k string, v string) error {
err := c.put(k, v)
if err != nil {
return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v)
}
return nil
}
func (c *Client) Delete(k string) error {
err := c.delete(k)
if err != nil {
return perrors.WithMessagef(err, "delete k/v (key %s)", k)
}
return nil
}
func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
completeKey := path.Join(basePath, node)
err := c.keepAliveKV(completeKey, "")
if err != nil {
return "", perrors.WithMessagef(err, "keepalive kv (key %s)", completeKey)
}
return completeKey, nil
}
func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
kList, vList, err := c.getChildren(k)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "get key children (key %s)", k)
}
return kList, vList, nil
}
func (c *Client) Get(k string) (string, error) {
v, err := c.get(k)
if err != nil {
return "", perrors.WithMessagef(err, "get key value (key %s)", k)
}
return v, nil
}
func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
wc, err := c.watch(k)
if err != nil {
return nil, perrors.WithMessagef(err, "watch prefix (key %s)", k)
}
return wc, nil
}
func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
wc, err := c.watchWithPrefix(prefix)
if err != nil {
return nil, perrors.WithMessagef(err, "watch prefix (key %s)", prefix)
}
return wc, nil
}
package etcdv3
import (
"fmt"
"net/url"
"path"
"reflect"
"strings"
"sync"
"testing"
"time"
)
import (
"github.com/coreos/etcd/mvcc/mvccpb"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/suite"
"go.etcd.io/etcd/embed"
"google.golang.org/grpc/connectivity"
)
// tests dataset
var tests = []struct {
input struct {
k string
v string
}
}{
{input: struct {
k string
v string
}{k: "name", v: "scott.wang"}},
{input: struct {
k string
v string
}{k: "namePrefix", v: "prefix.scott.wang"}},
{input: struct {
k string
v string
}{k: "namePrefix1", v: "prefix1.scott.wang"}},
{input: struct {
k string
v string
}{k: "age", v: "27"}},
}
// test dataset prefix
const prefix = "name"
type ClientTestSuite struct {
suite.Suite
etcdConfig struct {
name string
endpoints []string
timeout time.Duration
heartbeat int
}
etcd *embed.Etcd
client *Client
}
// start etcd server
func (suite *ClientTestSuite) SetupSuite() {
t := suite.T()
DefaultListenPeerURLs := "http://localhost:2382"
DefaultListenClientURLs := "http://localhost:2381"
lpurl, _ := url.Parse(DefaultListenPeerURLs)
lcurl, _ := url.Parse(DefaultListenClientURLs)
cfg := embed.NewConfig()
cfg.LPUrls = []url.URL{*lpurl}
cfg.LCUrls = []url.URL{*lcurl}
cfg.Dir = "/tmp/default.etcd"
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
select {
case <-e.Server.ReadyNotify():
t.Log("Server is ready!")
case <-time.After(60 * time.Second):
e.Server.Stop() // trigger a shutdown
t.Logf("Server took too long to start!")
}
suite.etcd = e
return
}
// stop etcd server
func (suite *ClientTestSuite) TearDownSuite() {
suite.etcd.Close()
}
func (suite *ClientTestSuite) setUpClient() *Client {
c, err := newClient(suite.etcdConfig.name,
suite.etcdConfig.endpoints,
suite.etcdConfig.timeout,
suite.etcdConfig.heartbeat)
if err != nil {
suite.T().Fatal(err)
}
return c
}
// set up a client for suite
func (suite *ClientTestSuite) SetupTest() {
c := suite.setUpClient()
c.CleanKV()
suite.client = c
return
}
func (suite *ClientTestSuite) TestClientClose() {
fmt.Println("called client close")
c := suite.client
t := suite.T()
defer c.Close()
if c.rawClient.ActiveConnection().GetState() != connectivity.Ready {
t.Fatal(suite.client.rawClient.ActiveConnection().GetState())
}
}
func (suite *ClientTestSuite) TestClientValid() {
fmt.Println("called client valid")
c := suite.client
t := suite.T()
if c.Valid() != true {
t.Fatal("client is not valid")
}
c.Close()
if suite.client.Valid() != false {
t.Fatal("client is valid")
}
}
func (suite *ClientTestSuite) TestClientDone() {
c := suite.client
go func() {
time.Sleep(2 * time.Second)
c.Close()
}()
c.Wait.Wait()
}
func (suite *ClientTestSuite) TestClientCreateKV() {
tests := tests
c := suite.client
t := suite.T()
defer suite.client.Close()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
expect := tc.input.v
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
value, err := c.Get(k)
if err != nil {
t.Fatal(err)
}
if value != expect {
t.Fatalf("expect %v but get %v", expect, value)
}
}
}
func (suite *ClientTestSuite) TestClientDeleteKV() {
tests := tests
c := suite.client
t := suite.T()
defer c.Close()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
expect := ErrKVPairNotFound
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
if err := c.Delete(k); err != nil {
t.Fatal(err)
}
_, err := c.Get(k)
if perrors.Cause(err) == expect {
continue
}
if err != nil {
t.Fatal(err)
}
}
}
func (suite *ClientTestSuite) TestClientGetChildrenKVList() {
tests := tests
c := suite.client
t := suite.T()
var expectKList []string
var expectVList []string
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if strings.Contains(k, prefix) {
expectKList = append(expectKList, k)
expectVList = append(expectVList, v)
}
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
}
kList, vList, err := c.GetChildrenKVList(prefix)
if err != nil {
t.Fatal(err)
}
if reflect.DeepEqual(expectKList, kList) && reflect.DeepEqual(expectVList, vList) {
return
}
t.Fatalf("expect keylist %v but got %v expect valueList %v but got %v ", expectKList, kList, expectVList, vList)
}
func (suite *ClientTestSuite) TestClientWatch() {
tests := tests
c := suite.client
t := suite.T()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
wc, err := c.watch(prefix)
if err != nil {
t.Fatal(err)
}
for e := range wc {
for _, event := range e.Events {
t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value)
}
}
}()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
if err := c.delete(k); err != nil {
t.Fatal(err)
}
}
c.Close()
wg.Wait()
}
func (suite *ClientTestSuite) TestClientRegisterTemp() {
c := suite.client
observeC := suite.setUpClient()
t := suite.T()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
completePath := path.Join("scott", "wang")
wc, err := observeC.watch(completePath)
if err != nil {
t.Fatal(err)
}
for e := range wc {
for _, event := range e.Events {
if event.Type == mvccpb.DELETE {
t.Logf("complete key (%s) is delete", completePath)
wg.Done()
observeC.Close()
return
}
t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value)
}
}
}()
_, err := c.RegisterTemp("scott", "wang")
if err != nil {
t.Fatal(err)
}
time.Sleep(2 * time.Second)
c.Close()
wg.Wait()
}
func TestClientSuite(t *testing.T) {
suite.Run(t, &ClientTestSuite{
etcdConfig: struct {
name string
endpoints []string
timeout time.Duration
heartbeat int
}{
name: "test",
endpoints: []string{"localhost:2381"},
timeout: time.Second,
heartbeat: 1,
},
})
}
package etcdv3
import (
"sync"
"time"
)
import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)
type clientFacade interface {
Client() *Client
SetClient(*Client)
ClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container
GetDone() chan struct{} //for etcd client control
RestartCallBack() bool
common.Node
}
func HandleClientRestart(r clientFacade) {
var (
err error
failTimes int
)
defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.GetDone():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...")
break LOOP
// re-register all services
case <-r.Client().Done():
r.ClientLock().Lock()
clientName := RegistryETCDV3Client
timeout, _ := time.ParseDuration(r.GetUrl().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
endpoint := r.GetUrl().Location
r.Client().Close()
r.SetClient(nil)
r.ClientLock().Unlock()
// try to connect to etcd,
failTimes = 0
for {
select {
case <-r.GetDone():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
}
err = ValidateClient(
r,
WithName(clientName),
WithEndpoints(endpoint),
WithTimeout(timeout),
)
logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}",
endpoint, perrors.WithStack(err))
if err == nil {
if r.RestartCallBack() {
break
}
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
}
}
}
}
package etcdv3
import (
"sync"
"time"
)
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)
type EventListener struct {
client *Client
keyMapLock sync.Mutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
func NewEventListener(client *Client) *EventListener {
return &EventListener{
client: client,
keyMap: make(map[string]struct{}),
}
}
// Listen on a spec key
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
for {
wc, err := l.client.Watch(key)
if err != nil {
logger.Warnf("WatchExist{key:%s} = error{%v}", key, err)
return false
}
select {
// client stopped
case <-l.client.Done():
logger.Warnf("etcd client stopped")
return false
// client ctx stop
case <-l.client.ctx.Done():
logger.Warnf("etcd client ctx cancel")
return false
// handle etcd events
case e, ok := <-wc:
if !ok {
logger.Warnf("etcd watch-chan closed")
return false
}
if e.Err() != nil {
logger.Errorf("etcd watch ERR {err: %s}", e.Err())
continue
}
for _, event := range e.Events {
if l.handleEvents(event, listener...) {
// if event is delete
return true
}
}
}
}
return false
}
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool {
logger.Infof("got a etcd event {type: %s, key: %s}", event.Type, event.Kv.Key)
switch event.Type {
// the etcdv3 event just include PUT && DELETE
case mvccpb.PUT:
for _, listener := range listeners {
switch event.IsCreate() {
case true:
logger.Infof("etcd get event (key{%s}) = event{EventNodeDataCreated}", event.Kv.Key)
listener.DataChange(remoting.Event{
Path: string(event.Kv.Key),
Action: remoting.EventTypeAdd,
Content: string(event.Kv.Value),
})
case false:
logger.Infof("etcd get event (key{%s}) = event{EventNodeDataChanged}", event.Kv.Key)
listener.DataChange(remoting.Event{
Path: string(event.Kv.Key),
Action: remoting.EvnetTypeUpdate,
Content: string(event.Kv.Value),
})
}
}
return false
case mvccpb.DELETE:
logger.Warnf("etcd get event (key{%s}) = event{EventNodeDeleted}", event.Kv.Key)
return true
default:
return false
}
panic("unreachable")
}
// Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
for {
wc, err := l.client.WatchWithPrefix(prefix)
if err != nil {
logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
}
select {
// client stopped
case <-l.client.Done():
logger.Warnf("etcd client stopped")
return
// client ctx stop
case <-l.client.ctx.Done():
logger.Warnf("etcd client ctx cancel")
return
// etcd event stream
case e, ok := <-wc:
if !ok {
logger.Warnf("etcd watch-chan closed")
return
}
if e.Err() != nil {
logger.Errorf("etcd watch ERR {err: %s}", e.Err())
continue
}
for _, event := range e.Events {
l.handleEvents(event, listener...)
}
}
}
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {
l.keyMapLock.Lock()
_, ok := l.keyMap[key]
l.keyMapLock.Unlock()
if ok {
logger.Warnf("etcdv3 key %s has already been listened.", key)
return
}
l.keyMapLock.Lock()
l.keyMap[key] = struct{}{}
l.keyMapLock.Unlock()
keyList, valueList, err := l.client.getChildren(key)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children"))
}
logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList)
for i, k := range keyList {
logger.Infof("got children list key -> %s", k)
listener.DataChange(remoting.Event{
Path: k,
Action: remoting.EventTypeAdd,
Content: valueList[i],
})
}
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key)
go func(key string, listener remoting.DataListener) {
l.ListenServiceNodeEventWithPrefix(key, listener)
logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
}(key, listener)
logger.Infof("listen dubbo service key{%s}", key)
go func(key string) {
if l.ListenServiceNodeEvent(key) {
listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key)
}(key)
}
func (l *EventListener) Close() {
l.wg.Wait()
}
package etcdv3
import (
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/remoting"
)
var changedData = `
dubbo.consumer.request_timeout=3s
dubbo.consumer.connect_timeout=5s
dubbo.application.organization=ikurento.com
dubbo.application.name=BDTService
dubbo.application.module=dubbogo user-info server
dubbo.application.version=0.0.1
dubbo.application.owner=ZX
dubbo.application.environment=dev
dubbo.registries.hangzhouzk.protocol=zookeeper
dubbo.registries.hangzhouzk.timeout=3s
dubbo.registries.hangzhouzk.address=127.0.0.1:2181
dubbo.registries.shanghaizk.protocol=zookeeper
dubbo.registries.shanghaizk.timeout=3s
dubbo.registries.shanghaizk.address=127.0.0.1:2182
dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo
dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider
dubbo.service.com.ikurento.user.UserProvider.loadbalance=random
dubbo.service.com.ikurento.user.UserProvider.warmup=100
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`
func (suite *ClientTestSuite) TestListener() {
var tests = []struct {
input struct {
k string
v string
}
}{
{input: struct {
k string
v string
}{k: "/dubbo", v: changedData}},
}
c := suite.client
t := suite.T()
listener := NewEventListener(c)
dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)}
listener.ListenServiceEvent("/dubbo", dataListener)
// NOTICE: direct listen will lose create msg
time.Sleep(time.Second)
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
}
msg := <-dataListener.rc
assert.Equal(t, changedData, msg.Content)
}
type mockDataListener struct {
eventList []remoting.Event
client *Client
changedData string
rc chan remoting.Event
}
func (m *mockDataListener) DataChange(eventType remoting.Event) bool {
m.eventList = append(m.eventList, eventType)
if eventType.Content == m.changedData {
m.rc <- eventType
}
return true
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment