/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package delegate import ( "encoding/json" "sync" "time" ) import ( "github.com/go-co-op/gocron" "go.uber.org/atomic" ) import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config/instance" "github.com/apache/dubbo-go/metadata/definition" "github.com/apache/dubbo-go/metadata/identifier" ) const ( // defaultMetadataReportRetryTimes is defined for max times to retry defaultMetadataReportRetryTimes int64 = 100 // defaultMetadataReportRetryPeriod is defined for cycle interval to retry, the unit is second defaultMetadataReportRetryPeriod int64 = 3 // defaultMetadataReportRetryPeriod is defined for cycle report or not defaultMetadataReportCycleReport bool = true ) // metadataReportRetry is a scheduler for retrying task type metadataReportRetry struct { retryPeriod int64 retryLimit int64 scheduler *gocron.Scheduler job *gocron.Job retryCounter *atomic.Int64 // if no failed report, wait how many times to run retry task. retryTimesIfNonFail int64 } // newMetadataReportRetry will create a scheduler for retry task func newMetadataReportRetry(retryPeriod int64, retryLimit int64, retryFunc func() bool) (*metadataReportRetry, error) { s1 := gocron.NewScheduler(time.UTC) mrr := &metadataReportRetry{ retryPeriod: retryPeriod, retryLimit: retryLimit, scheduler: s1, retryCounter: atomic.NewInt64(0), retryTimesIfNonFail: 600, } newJob, err := mrr.scheduler.Every(uint64(mrr.retryPeriod)).Seconds().Do( func() { mrr.retryCounter.Inc() logger.Infof("start to retry task for metadata report. retry times: %v", mrr.retryCounter.Load()) if mrr.retryCounter.Load() > mrr.retryLimit { mrr.scheduler.Clear() } else if retryFunc() && mrr.retryCounter.Load() > mrr.retryTimesIfNonFail { mrr.scheduler.Clear() // may not interrupt the running job } }) mrr.job = newJob return mrr, err } // startRetryTask will make scheduler with retry task run func (mrr *metadataReportRetry) startRetryTask() { mrr.scheduler.StartAt(time.Now().Add(500 * time.Millisecond)) mrr.scheduler.Start() } // MetadataReport is a absolute delegate for MetadataReport type MetadataReport struct { reportUrl common.URL syncReport bool metadataReportRetry *metadataReportRetry failedReports map[*identifier.MetadataIdentifier]interface{} failedReportsLock sync.RWMutex // allMetadataReports store all the metdadata reports records in memory allMetadataReports map[*identifier.MetadataIdentifier]interface{} allMetadataReportsLock sync.RWMutex } // NewMetadataReport will create a MetadataReport with initiation func NewMetadataReport() (*MetadataReport, error) { url := instance.GetMetadataReportUrl() bmr := &MetadataReport{ reportUrl: url, syncReport: url.GetParamBool(constant.SYNC_REPORT_KEY, false), failedReports: make(map[*identifier.MetadataIdentifier]interface{}, 4), allMetadataReports: make(map[*identifier.MetadataIdentifier]interface{}, 4), } mrr, err := newMetadataReportRetry( url.GetParamInt(constant.RETRY_PERIOD_KEY, defaultMetadataReportRetryPeriod), url.GetParamInt(constant.RETRY_TIMES_KEY, defaultMetadataReportRetryTimes), bmr.retry, ) if err != nil { return nil, err } bmr.metadataReportRetry = mrr if url.GetParamBool(constant.CYCLE_REPORT_KEY, defaultMetadataReportCycleReport) { scheduler := gocron.NewScheduler(time.UTC) _, err := scheduler.Every(1).Day().Do( func() { logger.Info("start to publish all metadata.") bmr.allMetadataReportsLock.RLock() bmr.doHandlerMetadataCollection(bmr.allMetadataReports) bmr.allMetadataReportsLock.RUnlock() }) if err != nil { return nil, err } scheduler.StartAt(time.Now().Add(500 * time.Millisecond)) scheduler.Start() } return bmr, nil } // retry will do metadata failed reports collection by call metadata report sdk func (bmr *MetadataReport) retry() bool { bmr.failedReportsLock.RLock() defer bmr.failedReportsLock.RUnlock() return bmr.doHandlerMetadataCollection(bmr.failedReports) } // StoreProviderMetadata will delegate to call remote metadata's sdk to store provider service definition func (bmr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) { if bmr.syncReport { bmr.storeMetadataTask(common.PROVIDER, identifier, definer) } go bmr.storeMetadataTask(common.PROVIDER, identifier, definer) } // storeMetadataTask will delegate to call remote metadata's sdk to store func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) { logger.Infof("store provider metadata. Identifier :%v ; definition: %v .", identifier, definer) bmr.allMetadataReportsLock.Lock() bmr.allMetadataReports[identifier] = definer bmr.allMetadataReportsLock.Unlock() bmr.failedReportsLock.Lock() delete(bmr.failedReports, identifier) bmr.failedReportsLock.Unlock() // data is store the json marshaled definition var ( data []byte err error ) defer func() { if r := recover(); r != nil { bmr.failedReportsLock.Lock() bmr.failedReports[identifier] = definer bmr.failedReportsLock.Unlock() bmr.metadataReportRetry.startRetryTask() logger.Errorf("Failed to put provider metadata %v in %v, cause: %v", identifier, string(data), r) } }() data, err = json.Marshal(definer) if err != nil { logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %v", err) panic(err) } report := instance.GetMetadataReportInstance() if role == common.PROVIDER { err = report.StoreProviderMetadata(identifier, string(data)) } else if role == common.CONSUMER { err = report.StoreConsumerMetadata(identifier, string(data)) } if err != nil { logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %v", err) panic(err) } } // StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition func (bmr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) { if bmr.syncReport { bmr.storeMetadataTask(common.CONSUMER, identifier, definer) } go bmr.storeMetadataTask(common.CONSUMER, identifier, definer) } // SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata func (bmr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error { report := instance.GetMetadataReportInstance() if bmr.syncReport { return report.SaveServiceMetadata(identifier, url) } go report.SaveServiceMetadata(identifier, url) return nil } // RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata func (bmr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error { report := instance.GetMetadataReportInstance() if bmr.syncReport { return report.RemoveServiceMetadata(identifier) } go report.RemoveServiceMetadata(identifier) return nil } // GetExportedURLs will delegate to call remote metadata's sdk to get exported urls func (bmr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string { report := instance.GetMetadataReportInstance() return report.GetExportedURLs(identifier) } // SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data func (bmr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error { report := instance.GetMetadataReportInstance() if bmr.syncReport { return report.SaveSubscribedData(identifier, urls) } go report.SaveSubscribedData(identifier, urls) return nil } // GetSubscribedURLs will delegate to call remote metadata's sdk to get subscribed urls func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) []string { report := instance.GetMetadataReportInstance() return report.GetSubscribedURLs(identifier) } // GetServiceDefinition will delegate to call remote metadata's sdk to get service definitions func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) string { report := instance.GetMetadataReportInstance() return report.GetServiceDefinition(identifier) } // doHandlerMetadataCollection will store metadata to metadata support with given metadataMap func (bmr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool { if len(metadataMap) == 0 { return true } for e := range metadataMap { if common.RoleType(common.PROVIDER).Role() == e.Side { bmr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition)) } else if common.RoleType(common.CONSUMER).Role() == e.Side { bmr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string)) } } return false }