diff --git a/common/observer/event.go b/common/observer/event.go index 8c3362feeee3d62315eb734460f486dcdbfe2f36..d78179043e8a2059e1d5fd15878fe32d7596321e 100644 --- a/common/observer/event.go +++ b/common/observer/event.go @@ -58,7 +58,7 @@ func (b *BaseEvent) String() string { return fmt.Sprintf("BaseEvent[source = %#v]", b.Source) } -func newBaseEvent(source interface{}) *BaseEvent { +func NewBaseEvent(source interface{}) *BaseEvent { return &BaseEvent{ Source: source, Timestamp: time.Now(), diff --git a/registry/common/event_publishing_service_deiscovery_test.go b/registry/common/event_publishing_service_deiscovery_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1e08335e04232c9b5944e34133e4e979bee9ee74 --- /dev/null +++ b/registry/common/event_publishing_service_deiscovery_test.go @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "reflect" + "testing" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + dispatcher2 "github.com/apache/dubbo-go/common/observer/dispatcher" + "github.com/apache/dubbo-go/registry" +) + +func TestEventPublishingServiceDiscovery_DispatchEvent(t *testing.T) { + dc := NewEventPublishingServiceDiscovery(&ServiceDiscoveryA{}) + tsd := &TestServiceDiscoveryDestroyingEventListener{} + tsd.SetT(t) + tsi := &TestServiceInstancePreRegisteredEventListener{} + tsi.SetT(t) + extension.AddEventListener(tsd) + extension.AddEventListener(tsi) + extension.SetEventDispatcher("direct", dispatcher2.NewDirectEventDispatcher) + extension.SetAndInitGlobalDispatcher("direct") + err := dc.Destroy() + assert.Nil(t, err) + si := ®istry.DefaultServiceInstance{Id: "testServiceInstance"} + err = dc.Register(si) + assert.Nil(t, err) + +} + +type TestServiceDiscoveryDestroyingEventListener struct { + suite.Suite + observer.BaseListenable +} + +func (tel *TestServiceDiscoveryDestroyingEventListener) OnEvent(e observer.Event) error { + e1, ok := e.(*ServiceDiscoveryDestroyingEvent) + assert.Equal(tel.T(), ok, true) + assert.Equal(tel.T(), "testServiceDiscovery", e1.GetOriginal().String()) + assert.Equal(tel.T(), "testServiceDiscovery", e1.GetServiceDiscovery().String()) + return nil +} + +func (tel *TestServiceDiscoveryDestroyingEventListener) GetPriority() int { + return -1 +} + +func (tel *TestServiceDiscoveryDestroyingEventListener) GetEventType() reflect.Type { + return reflect.TypeOf(ServiceDiscoveryDestroyingEvent{}) +} + +type TestServiceInstancePreRegisteredEventListener struct { + suite.Suite + observer.BaseListenable +} + +func (tel *TestServiceInstancePreRegisteredEventListener) OnEvent(e observer.Event) error { + e1, ok := e.(*ServiceInstancePreRegisteredEvent) + assert.Equal(tel.T(), ok, true) + assert.Equal(tel.T(), "testServiceInstance", e1.getServiceInstance().GetId()) + return nil +} + +func (tel *TestServiceInstancePreRegisteredEventListener) GetPriority() int { + return -1 +} + +func (tel *TestServiceInstancePreRegisteredEventListener) GetEventType() reflect.Type { + return reflect.TypeOf(ServiceInstancePreRegisteredEvent{}) +} + +type ServiceDiscoveryA struct { +} + +// String return mockServiceDiscovery +func (msd *ServiceDiscoveryA) String() string { + return "testServiceDiscovery" +} + +// Destroy do nothing +func (msd *ServiceDiscoveryA) Destroy() error { + return nil +} + +func (msd *ServiceDiscoveryA) Register(instance registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) Update(instance registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) Unregister(instance registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) GetDefaultPageSize() int { + return 1 +} + +func (msd *ServiceDiscoveryA) GetServices() *gxset.HashSet { + return nil +} + +func (msd *ServiceDiscoveryA) GetInstances(serviceName string) []registry.ServiceInstance { + return nil +} + +func (msd *ServiceDiscoveryA) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + return nil +} + +func (msd *ServiceDiscoveryA) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + return nil +} + +func (msd *ServiceDiscoveryA) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + return nil +} + +func (msd *ServiceDiscoveryA) AddListener(listener *registry.ServiceInstancesChangedListener) error { + return nil +} + +func (msd *ServiceDiscoveryA) DispatchEventByServiceName(serviceName string) error { + return nil +} + +func (msd *ServiceDiscoveryA) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + return nil +} diff --git a/registry/common/event_publishing_service_discovery.go b/registry/common/event_publishing_service_discovery.go new file mode 100644 index 0000000000000000000000000000000000000000..f61dd84690f4878eaaf7fb29890d6dab2210ef8f --- /dev/null +++ b/registry/common/event_publishing_service_discovery.go @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/registry" +) + +// EventPublishingServiceDiscovery will enhance Service Discovery +// Publish some event about service discovery +type EventPublishingServiceDiscovery struct { + serviceDiscovery registry.ServiceDiscovery +} + +// NewEventPublishingServiceDiscovery is a constructor +func NewEventPublishingServiceDiscovery(serviceDiscovery registry.ServiceDiscovery) *EventPublishingServiceDiscovery { + return &EventPublishingServiceDiscovery{ + serviceDiscovery: serviceDiscovery, + } +} + +// String +func (epsd *EventPublishingServiceDiscovery) String() string { + return epsd.serviceDiscovery.String() +} + +// Destroy delegate function +func (epsd *EventPublishingServiceDiscovery) Destroy() error { + f := func() error { + return epsd.serviceDiscovery.Destroy() + } + return epsd.executeWithEvents(NewServiceDiscoveryDestroyingEvent(epsd, epsd.serviceDiscovery), + f, NewServiceDiscoveryDestroyedEvent(epsd, epsd.serviceDiscovery)) +} + +// Register delegate function +func (epsd *EventPublishingServiceDiscovery) Register(instance registry.ServiceInstance) error { + f := func() error { + return epsd.serviceDiscovery.Register(instance) + } + return epsd.executeWithEvents(NewServiceInstancePreRegisteredEvent(epsd.serviceDiscovery, instance), + f, NewServiceInstanceRegisteredEvent(epsd.serviceDiscovery, instance)) + +} + +// Update delegate function +func (epsd *EventPublishingServiceDiscovery) Update(instance registry.ServiceInstance) error { + f := func() error { + return epsd.serviceDiscovery.Update(instance) + } + return epsd.executeWithEvents(nil, f, nil) +} + +// Unregister delegate function +func (epsd *EventPublishingServiceDiscovery) Unregister(instance registry.ServiceInstance) error { + f := func() error { + return epsd.serviceDiscovery.Unregister(instance) + } + return epsd.executeWithEvents(NewServiceInstancePreUnregisteredEvent(epsd.serviceDiscovery, instance), + f, NewServiceInstanceUnregisteredEvent(epsd.serviceDiscovery, instance)) +} + +func (epsd *EventPublishingServiceDiscovery) GetDefaultPageSize() int { + return epsd.serviceDiscovery.GetDefaultPageSize() +} + +func (epsd *EventPublishingServiceDiscovery) GetServices() *gxset.HashSet { + return epsd.serviceDiscovery.GetServices() +} + +func (epsd *EventPublishingServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { + return epsd.serviceDiscovery.GetInstances(serviceName) +} + +func (epsd *EventPublishingServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + return epsd.serviceDiscovery.GetInstancesByPage(serviceName, offset, pageSize) +} + +func (epsd *EventPublishingServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + return epsd.serviceDiscovery.GetHealthyInstancesByPage(serviceName, offset, pageSize, healthy) +} + +func (epsd *EventPublishingServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + return epsd.serviceDiscovery.GetRequestInstances(serviceNames, offset, requestedSize) +} + +// AddListener add event listener +func (epsd *EventPublishingServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + extension.GetGlobalDispatcher().AddEventListener(listener) + return epsd.serviceDiscovery.AddListener(listener) +} + +func (epsd *EventPublishingServiceDiscovery) DispatchEventByServiceName(serviceName string) error { + return epsd.DispatchEventByServiceName(serviceName) +} + +func (epsd *EventPublishingServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + return epsd.serviceDiscovery.DispatchEventForInstances(serviceName, instances) +} + +func (epsd *EventPublishingServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + return epsd.serviceDiscovery.DispatchEvent(event) +} + +// executeWithEvents dispatch before event and after event if return error will dispatch exception event +func (epsd *EventPublishingServiceDiscovery) executeWithEvents(beforeEvent observer.Event, f func() error, afterEvent observer.Event) error { + globalDispatcher := extension.GetGlobalDispatcher() + if beforeEvent != nil { + globalDispatcher.Dispatch(beforeEvent) + } + if err := f(); err != nil { + globalDispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) + return err + } + if afterEvent != nil { + globalDispatcher.Dispatch(afterEvent) + } + return nil +} diff --git a/registry/common/service_discovery_event.go b/registry/common/service_discovery_event.go new file mode 100644 index 0000000000000000000000000000000000000000..a60ca56a39016738daf6e992b286f2b04c2fdec8 --- /dev/null +++ b/registry/common/service_discovery_event.go @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/registry" +) + +type ServiceDiscoveryEvent struct { + observer.BaseEvent + original registry.ServiceDiscovery +} + +func NewServiceDiscoveryEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryEvent { + return &ServiceDiscoveryEvent{ + BaseEvent: *observer.NewBaseEvent(discovery), + original: original, + } +} + +func (sde *ServiceDiscoveryEvent) GetServiceDiscovery() registry.ServiceDiscovery { + return sde.GetSource().(registry.ServiceDiscovery) +} + +func (sde *ServiceDiscoveryEvent) GetOriginal() registry.ServiceDiscovery { + return sde.original +} + +// ServiceDiscoveryDestroyingEvent +// this event will be dispatched before service discovery be destroyed +type ServiceDiscoveryDestroyingEvent struct { + ServiceDiscoveryEvent +} + +// ServiceDiscoveryExceptionEvent +// this event will be dispatched when the error occur in service discovery +type ServiceDiscoveryExceptionEvent struct { + ServiceDiscoveryEvent + err error +} + +// ServiceDiscoveryInitializedEvent +// this event will be dispatched after service discovery initialize +type ServiceDiscoveryInitializedEvent struct { + ServiceDiscoveryEvent +} + +// ServiceDiscoveryInitializingEvent +// this event will be dispatched before service discovery initialize +type ServiceDiscoveryInitializingEvent struct { + ServiceDiscoveryEvent +} + +// ServiceDiscoveryDestroyedEvent +// this event will be dispatched after service discovery be destroyed +type ServiceDiscoveryDestroyedEvent struct { + ServiceDiscoveryEvent +} + +// NewServiceDiscoveryDestroyingEvent create a ServiceDiscoveryDestroyingEvent +func NewServiceDiscoveryDestroyingEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryDestroyingEvent { + return &ServiceDiscoveryDestroyingEvent{*NewServiceDiscoveryEvent(discovery, original)} +} + +// NewServiceDiscoveryExceptionEvent create a ServiceDiscoveryExceptionEvent +func NewServiceDiscoveryExceptionEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery, err error) *ServiceDiscoveryExceptionEvent { + return &ServiceDiscoveryExceptionEvent{*NewServiceDiscoveryEvent(discovery, original), err} +} + +// NewServiceDiscoveryInitializedEvent create a ServiceDiscoveryInitializedEvent +func NewServiceDiscoveryInitializedEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryInitializedEvent { + return &ServiceDiscoveryInitializedEvent{*NewServiceDiscoveryEvent(discovery, original)} +} + +// NewServiceDiscoveryInitializingEvent create a ServiceDiscoveryInitializingEvent +func NewServiceDiscoveryInitializingEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryInitializingEvent { + return &ServiceDiscoveryInitializingEvent{*NewServiceDiscoveryEvent(discovery, original)} +} + +// NewServiceDiscoveryDestroyedEvent create a ServiceDiscoveryDestroyedEvent +func NewServiceDiscoveryDestroyedEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryDestroyedEvent { + return &ServiceDiscoveryDestroyedEvent{*NewServiceDiscoveryEvent(discovery, original)} +} diff --git a/registry/common/service_instance_event.go b/registry/common/service_instance_event.go new file mode 100644 index 0000000000000000000000000000000000000000..f70e7ee0ff12bf41c761f6bf7c239228df046980 --- /dev/null +++ b/registry/common/service_instance_event.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/registry" +) + +type ServiceInstanceEvent struct { + observer.BaseEvent + serviceInstance registry.ServiceInstance +} + +// NewServiceInstanceEvent create a ServiceInstanceEvent +func NewServiceInstanceEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceEvent { + return &ServiceInstanceEvent{ + BaseEvent: *observer.NewBaseEvent(source), + serviceInstance: instance, + } +} + +func (sie *ServiceInstanceEvent) getServiceInstance() registry.ServiceInstance { + return sie.serviceInstance +} + +// ServiceInstancePreRegisteredEvent +// this event will be dispatched before service instance be registered +type ServiceInstancePreRegisteredEvent struct { + ServiceInstanceEvent +} + +// ServiceInstancePreUnregisteredEvent +// this event will be dispatched before service instance be unregistered +type ServiceInstancePreUnregisteredEvent struct { + ServiceInstanceEvent +} + +// ServiceInstanceRegisteredEvent +// this event will be dispatched after service instance be registered +type ServiceInstanceRegisteredEvent struct { + ServiceInstanceEvent +} + +// ServiceInstanceRegisteredEvent +// this event will be dispatched after service instance be unregistered +type ServiceInstanceUnregisteredEvent struct { + ServiceInstanceEvent +} + +// NewServiceInstancePreRegisteredEvent create a ServiceInstancePreRegisteredEvent +func NewServiceInstancePreRegisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstancePreRegisteredEvent { + return &ServiceInstancePreRegisteredEvent{*NewServiceInstanceEvent(source, instance)} +} + +// NewServiceInstancePreUnregisteredEvent create a ServiceInstancePreUnregisteredEvent +func NewServiceInstancePreUnregisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstancePreUnregisteredEvent { + return &ServiceInstancePreUnregisteredEvent{*NewServiceInstanceEvent(source, instance)} +} + +// NewServiceInstanceRegisteredEvent create a ServiceInstanceRegisteredEvent +func NewServiceInstanceRegisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceRegisteredEvent { + return &ServiceInstanceRegisteredEvent{*NewServiceInstanceEvent(source, instance)} +} + +// NewServiceInstanceUnregisteredEvent create a ServiceInstanceUnregisteredEvent +func NewServiceInstanceUnregisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceUnregisteredEvent { + return &ServiceInstanceUnregisteredEvent{*NewServiceInstanceEvent(source, instance)} +}