/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package zookeeper import ( "strings" "sync" ) import ( perrors "github.com/pkg/errors" ) import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/remoting" zk "github.com/apache/dubbo-go/remoting/zookeeper" ) // RegistryDataListener ... type RegistryDataListener struct { interestedURL []*common.URL listener config_center.ConfigurationListener } // NewRegistryDataListener ... func NewRegistryDataListener(listener config_center.ConfigurationListener) *RegistryDataListener { return &RegistryDataListener{listener: listener} } // AddInterestedURL ... func (l *RegistryDataListener) AddInterestedURL(url *common.URL) { l.interestedURL = append(l.interestedURL, url) } // DataChange ... func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { // Intercept the last bit index := strings.Index(eventType.Path, "/providers/") if index == -1 { logger.Warnf("Listen with no url, event.path={%v}", eventType.Path) return false } url := eventType.Path[index+len("/providers/"):] serviceURL, err := common.NewURL(url) if err != nil { logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path) return false } for _, v := range l.interestedURL { if serviceURL.URLEqual(*v) { l.listener.Process( &config_center.ConfigChangeEvent{ Key: eventType.Path, Value: serviceURL, ConfigType: eventType.Action, }, ) return true } } return false } // RegistryConfigurationListener ... type RegistryConfigurationListener struct { client *zk.ZookeeperClient registry *zkRegistry events chan *config_center.ConfigChangeEvent isClosed bool closeOnce sync.Once } // NewRegistryConfigurationListener for listening the event of zk. func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { reg.WaitGroup().Add(1) return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false} } // Process ... func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType } // Next ... func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { for { select { case <-l.client.Done(): logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.") return nil, perrors.New("listener stopped") case <-l.registry.Done(): logger.Warnf("zk consumer register has quit, so zk event listener exit now.") return nil, perrors.New("listener stopped") case e := <-l.events: logger.Debugf("got zk event %s", e) if e.ConfigType == remoting.EventTypeDel && !l.valid() { logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) continue } //r.update(e.res) //write to invoker //r.outerEventCh <- e.res return ®istry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil } } } // Close ... func (l *RegistryConfigurationListener) Close() { // ensure that the listener will be closed at most once. l.closeOnce.Do(func() { l.isClosed = true l.registry.WaitGroup().Done() }) } func (l *RegistryConfigurationListener) valid() bool { return l.client.ZkConnValid() }