Skip to content
Snippets Groups Projects
Commit e2da137b authored by flycash's avatar flycash Committed by lzp0412
Browse files

Add customizer

parent ed6b59bb
No related branches found
No related tags found
No related merge requests found
Showing
with 337 additions and 84 deletions
......@@ -81,4 +81,4 @@ const (
const (
SERVICE_DISCOVERY_DEFAULT_GROUP = "DEFAULT_GROUP"
)
\ No newline at end of file
)
......@@ -22,6 +22,7 @@ const (
)
const (
PORT_KEY = "port"
GROUP_KEY = "group"
VERSION_KEY = "version"
INTERFACE_KEY = "interface"
......@@ -262,16 +263,17 @@ const (
// service discovery
const (
SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services"
PROVIDER_BY = "provided-by"
EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision"
SERVICE_INSTANCE_SELECTOR = "service-instance-selector"
METADATA_STORAGE_TYPE_PROPERTY_NAME = "dubbo.metadata.storage-type"
DEFAULT_METADATA_STORAGE_TYPE = "local"
SERVICE_INSTANCE_ENDPOINTS = "dubbo.endpoints"
METADATA_SERVICE_PREFIX = "dubbo.metadata-service."
METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params"
METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls"
SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services"
PROVIDER_BY = "provided-by"
EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision"
SUBSCRIBED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.subscribed-services.revision"
SERVICE_INSTANCE_SELECTOR = "service-instance-selector"
METADATA_STORAGE_TYPE_PROPERTY_NAME = "dubbo.metadata.storage-type"
DEFAULT_METADATA_STORAGE_TYPE = "local"
SERVICE_INSTANCE_ENDPOINTS = "dubbo.endpoints"
METADATA_SERVICE_PREFIX = "dubbo.metadata-service."
METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params"
METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls"
// SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used
SERVICE_DISCOVERY_KEY = "service_discovery"
......
......@@ -29,7 +29,7 @@ var (
// AddCustomizers will put the customizer into slices and then sort them;
// this method will be invoked several time, so we sort them here.
func AddCustomizers(cus registry.ServiceInstanceCustomizer) {
func AddCustomizers(cus registry.ServiceInstanceCustomizer) {
customizers = append(customizers, cus)
sort.Stable(customizerSlice(customizers))
}
......
......@@ -32,7 +32,6 @@ type EventListener interface {
// OnEvent handle this event
OnEvent(e Event) error
// GetEventType listen which event type
// return nil if the implementation want to listen any event
GetEventType() reflect.Type
}
......
......@@ -650,3 +650,17 @@ func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []f
}
return methodConfigMergeFcn
}
type URLSlice []URL
func (s URLSlice) Len() int {
return len(s)
}
func (s URLSlice) Less(i, j int) bool {
return s[i].String() < s[j].String()
}
func (s URLSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
......@@ -17,6 +17,7 @@
package inmemory
import (
"sort"
"sync"
)
......@@ -89,7 +90,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
mts.lock.RUnlock()
}
mts.lock.Lock()
//double chk
// double chk
wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url))
if len(wantedUrl) > 0 && wantedUrl[0] != nil {
mts.lock.Unlock()
......@@ -115,35 +116,38 @@ func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) {
}
// getAllService can return all the exportedUrlString except for metadataService
func (mts *MetadataService) getAllService(services *sync.Map) *skip.SkipList {
skipList := skip.New(uint64(0))
func (mts *MetadataService) getAllService(services *sync.Map) []common.URL {
// using skip list to dedup and sorting
res := make([]common.URL, 0)
services.Range(func(key, value interface{}) bool {
urls := value.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := common.URL(urls.ByPosition(i).(Comparator))
if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.SIMPLE_METADATA_SERVICE_NAME {
skipList.Insert(Comparator(url))
res = append(res, url)
}
}
return true
})
return skipList
sort.Sort(common.URLSlice(res))
return res
}
// getSpecifiedService can return specified service url by serviceKey
func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey string, protocol string) *skip.SkipList {
skipList := skip.New(uint64(0))
func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey string, protocol string) []common.URL {
res := make([]common.URL, 0)
serviceList, loaded := services.Load(serviceKey)
if loaded {
urls := serviceList.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := common.URL(urls.ByPosition(i).(Comparator))
if len(protocol) == 0 || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol {
skipList.Insert(Comparator(url))
res = append(res, url)
}
}
sort.Stable(common.URLSlice(res))
}
return skipList
return res
}
// ExportURL can store the in memory
......@@ -173,16 +177,16 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
if len(interfaceName) > 0 && !isGeneric {
//judge is consumer or provider
//side := url.GetParam(constant.SIDE_KEY, "")
//var service event.RPCService
// judge is consumer or provider
// side := url.GetParam(constant.SIDE_KEY, "")
// var service event.RPCService
service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
//if side == event.RoleType(event.CONSUMER).Role() {
// if side == event.RoleType(event.CONSUMER).Role() {
// //TODO:generate the service definition and store it
//
//} else if side == event.RoleType(event.PROVIDER).Role() {
// } else if side == event.RoleType(event.PROVIDER).Role() {
// //TODO:generate the service definition and store it
//}
// }
sd := definition.BuildServiceDefinition(*service, url)
data, err := sd.ToBytes()
if err != nil {
......@@ -196,7 +200,7 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
}
// GetExportedURLs get all exported urls
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) {
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) {
if serviceInterface == constant.ANY_VALUE {
return mts.getAllService(mts.exportedServiceURLs), nil
} else {
......@@ -206,7 +210,7 @@ func (mts *MetadataService) GetExportedURLs(serviceInterface string, group strin
}
// GetSubscribedURLs get all subscribedUrl
func (mts *MetadataService) GetSubscribedURLs() (*skip.SkipList, error) {
func (mts *MetadataService) GetSubscribedURLs() ([]common.URL, error) {
return mts.getAllService(mts.subscribedServiceURLs), nil
}
......
......@@ -20,7 +20,6 @@ package remote
import (
"sync"
"github.com/Workiva/go-datastructures/slice/skip"
"go.uber.org/atomic"
)
......@@ -121,12 +120,12 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
}
// GetExportedURLs will be implemented by in memory service
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) {
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) {
return mts.inMemoryMetadataService.GetExportedURLs(serviceInterface, group, version, protocol)
}
// GetSubscribedURLs will be implemented by in memory service
func (mts *MetadataService) GetSubscribedURLs() (*skip.SkipList, error) {
func (mts *MetadataService) GetSubscribedURLs() ([]common.URL, error) {
return mts.inMemoryMetadataService.GetSubscribedURLs()
}
......@@ -150,16 +149,11 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
iterator := urls.Iter(inmemory.Comparator{})
logger.Infof("urls length = %v", urls.Len())
for {
if !iterator.Next() {
break
}
url := iterator.Value().(inmemory.Comparator)
id := identifier.NewServiceMetadataIdentifier(common.URL(url))
logger.Infof("urls length = %v", len(urls))
for _, u := range urls {
id := identifier.NewServiceMetadataIdentifier(u)
id.Revision = mts.exportedRevision.Load()
if err := mts.delegateReport.SaveServiceMetadata(id, common.URL(url)); err != nil {
if err := mts.delegateReport.SaveServiceMetadata(id, u); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
......@@ -173,14 +167,14 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err)
result = false
}
if urls != nil && urls.Len() > 0 {
if urls != nil && len(urls) > 0 {
id := &identifier.SubscriberMetadataIdentifier{
MetadataIdentifier: identifier.MetadataIdentifier{
Application: config.GetApplicationConfig().Name,
},
Revision: subscribedRevision,
}
if err := mts.delegateReport.SaveSubscribedData(id, convertUrls(urls)); err != nil {
if err := mts.delegateReport.SaveSubscribedData(id, urls); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
......@@ -193,20 +187,3 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR
func (MetadataService) Version() string {
return version
}
// convertUrls will convert the skip list to slice
func convertUrls(list *skip.SkipList) []common.URL {
urls := make([]common.URL, list.Len())
iterator := list.Iter(inmemory.Comparator{})
for {
if iterator.Value() == nil {
break
}
url := iterator.Value().(inmemory.Comparator)
urls = append(urls, common.URL(url))
if !iterator.Next() {
break
}
}
return urls
}
......@@ -17,10 +17,6 @@
package service
import (
"github.com/Workiva/go-datastructures/slice/skip"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -44,9 +40,11 @@ type MetadataService interface {
// PublishServiceDefinition will generate the target url's code info
PublishServiceDefinition(url common.URL) error
// GetExportedURLs will get the target exported url in metadata
GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error)
// the url should be unique
GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error)
// GetExportedURLs will get the target subscribed url in metadata
GetSubscribedURLs() (*skip.SkipList, error)
// the url should be unique
GetSubscribedURLs() ([]common.URL, error)
// GetServiceDefinition will get the target service info store in metadata
GetServiceDefinition(interfaceName string, group string, version string) (string, error)
// GetServiceDefinition will get the target service info store in metadata by service key
......
......@@ -30,7 +30,6 @@ func init() {
}
type logEventListener struct {
}
func (l *logEventListener) GetPriority() int {
......@@ -43,6 +42,5 @@ func (l *logEventListener) OnEvent(e observer.Event) error {
}
func (l *logEventListener) GetEventType() reflect.Type {
return nil
return reflect.TypeOf(&observer.BaseEvent{})
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"encoding/json"
gxset "github.com/dubbogo/gost/container/set"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/service/remote"
"github.com/apache/dubbo-go/registry"
)
func init() {
exceptKeys := gxset.NewSet(
// remove APPLICATION_KEY because service name must be present
constant.APPLICATION_KEY,
// remove GROUP_KEY, always uses service name.
constant.GROUP_KEY,
// remove TIMESTAMP_KEY because it's nonsense
constant.TIMESTAMP_KEY)
extension.AddCustomizers(&metadataServiceURLParamsMetadataCustomizer{exceptKeys: exceptKeys})
}
type metadataServiceURLParamsMetadataCustomizer struct {
exceptKeys *gxset.HashSet
}
// GetPriority will return 0 so that it will be invoked in front of user defining Customizer
func (m *metadataServiceURLParamsMetadataCustomizer) GetPriority() int {
return 0
}
func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance registry.ServiceInstance) {
ms, err := remote.NewMetadataService()
if err != nil {
logger.Errorf("could not find the metadata service", err)
return
}
serviceName := constant.METADATA_SERVICE_NAME
version := ms.Version()
group := instance.GetServiceName()
urls, err := ms.GetExportedURLs(serviceName, group, version, constant.ANY_VALUE)
if err != nil || len(urls) == 0 {
logger.Errorf("could not find the exported urls", err)
return
}
ps := m.convertToParams(urls)
str, err := json.Marshal(ps)
if err != nil {
logger.Errorf("could not transfer the map to json", err)
return
}
instance.GetMetadata()[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME] = string(str)
}
func (m *metadataServiceURLParamsMetadataCustomizer) convertToParams(urls []common.URL) map[string]map[string]string {
// usually there will be only one protocol
res := make(map[string]map[string]string, 1)
// those keys are useless
for _, u := range urls {
p := make(map[string]string, len(u.GetParams()))
for k, v := range u.GetParams() {
// we will ignore that
if m.exceptKeys.Contains(k) || len(v) == 0 {
continue
}
p[k] = v[0]
}
p[constant.PORT_KEY] = u.Port
res[u.Protocol] = p
}
return res
}
......@@ -18,6 +18,9 @@
package event
import (
"encoding/json"
"strconv"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/service/remote"
......@@ -44,9 +47,49 @@ func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceIns
// 4 is enough...
protocolMap := make(map[string]int, 4)
list, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE,constant.ANY_VALUE)
if err != nil {
logger.Errorf("Could", err)
list, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
if err != nil || list == nil {
logger.Errorf("Could not find exported urls", err)
return
}
for _, u := range list {
if len(u.Protocol) == 0 {
continue
}
port, err := strconv.Atoi(u.Port)
if err != nil {
logger.Errorf("Could not customize the metadata of port. ", err)
}
protocolMap[u.Protocol] = port
}
instance.GetMetadata()[constant.SERVICE_INSTANCE_ENDPOINTS] = endpointsStr(protocolMap)
}
func endpointsStr(protocolMap map[string]int) string {
if len(protocolMap) == 0 {
return ""
}
endpoints := make([]endpoint, 0, len(protocolMap))
for k, v := range protocolMap {
endpoints = append(endpoints, endpoint{
port: v,
protocol: k,
})
}
str, err := json.Marshal(endpoints)
if err != nil {
logger.Errorf("could not convert the endpoints to json", err)
return ""
}
return string(str)
}
type endpoint struct {
port int
protocol string
}
......@@ -30,6 +30,7 @@ func init() {
nameMapping: extension.GetGlobalServiceNameMapping(),
})
}
type serviceNameMappingListener struct {
nameMapping mapping.ServiceNameMapping
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"fmt"
"hash/crc32"
"sort"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/service/remote"
"github.com/apache/dubbo-go/registry"
)
const defaultRevision = "N/A"
func init() {
extension.AddCustomizers(&exportedServicesRevisionMetadataCustomizer{})
extension.AddCustomizers(&subscribedServicesRevisionMetadataCustomizer{})
}
type exportedServicesRevisionMetadataCustomizer struct {
}
// GetPriority will return 1 so that it will be invoked in front of user defining Customizer
func (e *exportedServicesRevisionMetadataCustomizer) GetPriority() int {
return 1
}
func (e *exportedServicesRevisionMetadataCustomizer) Customize(instance registry.ServiceInstance) {
ms, err := remote.NewMetadataService()
if err != nil {
logger.Errorf("could not get metadata service", err)
return
}
urls, err := ms.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
if err != nil {
logger.Errorf("could not find the exported url", err)
}
revision := resolveRevision(urls)
if len(revision) == 0 {
revision = defaultRevision
}
instance.GetMetadata()[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME] = revision
}
type subscribedServicesRevisionMetadataCustomizer struct {
}
// GetPriority will return 2 so that it will be invoked in front of user defining Customizer
func (e *subscribedServicesRevisionMetadataCustomizer) GetPriority() int {
return 2
}
func (e *subscribedServicesRevisionMetadataCustomizer) Customize(instance registry.ServiceInstance) {
ms, err := remote.NewMetadataService()
if err != nil {
logger.Errorf("could not get metadata service", err)
return
}
urls, err := ms.GetSubscribedURLs()
if err != nil {
logger.Errorf("could not find the subscribed url", err)
}
revision := resolveRevision(urls)
if len(revision) == 0 {
revision = defaultRevision
}
instance.GetMetadata()[constant.SUBSCRIBED_SERVICES_REVISION_PROPERTY_NAME] = revision
}
// resolveRevision is different from Dubbo because golang doesn't support overload
// so that we could use interface + method name as identifier and ignore the method params
// per my understanding, it's enough because Dubbo actually ignore the url params.
// please refer org.apache.dubbo.common.URL#toParameterString(java.lang.String...)
func resolveRevision(urls []common.URL) string {
if len(urls) == 0 {
return ""
}
candidates := make([]string, 0, len(urls))
for _, u := range urls {
sk := u.GetParam(constant.INTERFACE_KEY, "")
for _, m := range u.Methods {
// methods are part of candidates
candidates = append(candidates, sk+constant.KEY_SEPARATOR+m)
}
// append url params if we need it
}
sort.Sort(sort.StringSlice(candidates))
// it's nearly impossible to be overflow
res := uint64(0)
for _, c := range candidates {
res += uint64(crc32.ChecksumIEEE([]byte(c)))
}
return fmt.Sprint(res)
}
......@@ -53,4 +53,4 @@ func (lstn *ServiceInstancesChangedListener) GetPriority() int {
// get event type
func (lstn *ServiceInstancesChangedListener) GetEventType() reflect.Type {
return reflect.TypeOf(&ServiceInstancesChangedEvent{})
}
\ No newline at end of file
}
......@@ -87,11 +87,17 @@ func (d *DefaultServiceInstance) IsHealthy() bool {
return d.Healthy
}
// GetMetadata will return the metadata
// GetMetadata will return the metadata, it will never return nil
func (d *DefaultServiceInstance) GetMetadata() map[string]string {
if d.Metadata == nil {
d.Metadata = make(map[string]string, 0)
}
return d.Metadata
}
// ServiceInstanceCustomizer is an extension point which allow user using custom logic to modify instance
// Be careful of priority. Usually you should use number between [100, 9000]
// other number will be thought as system reserve number
type ServiceInstanceCustomizer interface {
gxsort.Prioritizer
......
......@@ -399,19 +399,12 @@ func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registr
if metadataService == nil {
return urls
}
result, err := metadataService.GetExportedURLs("*", "", "", "")
result, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
if err != nil {
logger.Errorf("get exported urls catch error:%s,instance:%+v", err.Error(), serviceInstance)
return urls
}
if result == nil {
logger.Errorf("get empty exported urls,instance:%+v", serviceInstance)
return urls
}
for i := uint64(0); i < result.Len(); i++ {
urls = append(urls, common.URL(result.ByPosition(i).(comparator)))
}
return urls
return result
}
func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment