Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package delegate
import (
"encoding/json"
"sync"
"time"
)
import (
"github.com/go-co-op/gocron"
"go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config/instance"
"github.com/apache/dubbo-go/metadata/definition"
"github.com/apache/dubbo-go/metadata/identifier"
)
const (
// defaultMetadataReportRetryTimes is defined for max times to retry
defaultMetadataReportRetryTimes int64 = 100
// defaultMetadataReportRetryPeriod is defined for cycle interval to retry, the unit is second
defaultMetadataReportRetryPeriod int64 = 3
// defaultMetadataReportRetryPeriod is defined for cycle report or not
defaultMetadataReportCycleReport bool = true
)
// metadataReportRetry is a scheduler for retrying task
type metadataReportRetry struct {
retryPeriod int64
retryLimit int64
scheduler *gocron.Scheduler
job *gocron.Job
retryCounter *atomic.Int64
// if no failed report, wait how many times to run retry task.
retryTimesIfNonFail int64
}
// newMetadataReportRetry will create a scheduler for retry task
func newMetadataReportRetry(retryPeriod int64, retryLimit int64, retryFunc func() bool) (*metadataReportRetry, error) {
s1 := gocron.NewScheduler(time.UTC)
mrr := &metadataReportRetry{
retryPeriod: retryPeriod,
retryLimit: retryLimit,
scheduler: s1,
retryCounter: atomic.NewInt64(0),
retryTimesIfNonFail: 600,
}
newJob, err := mrr.scheduler.Every(uint64(mrr.retryPeriod)).Seconds().Do(
func() {
mrr.retryCounter.Inc()
logger.Infof("start to retry task for metadata report. retry times: %v", mrr.retryCounter.Load())
if mrr.retryCounter.Load() > mrr.retryLimit {
mrr.scheduler.Clear()
} else if retryFunc() && mrr.retryCounter.Load() > mrr.retryTimesIfNonFail {
mrr.scheduler.Clear() // may not interrupt the running job
}
})
mrr.job = newJob
return mrr, err
}
// startRetryTask will make scheduler with retry task run
func (mrr *metadataReportRetry) startRetryTask() {
mrr.scheduler.StartAt(time.Now().Add(500 * time.Millisecond))
mrr.scheduler.Start()
}
// MetadataReport is a absolute delegate for MetadataReport
type MetadataReport struct {
reportUrl common.URL
syncReport bool
metadataReportRetry *metadataReportRetry
failedReports map[*identifier.MetadataIdentifier]interface{}
failedReportsLock sync.RWMutex
// allMetadataReports store all the metdadata reports records in memory
allMetadataReports map[*identifier.MetadataIdentifier]interface{}
allMetadataReportsLock sync.RWMutex
}
// NewMetadataReport will create a MetadataReport with initiation
func NewMetadataReport() (*MetadataReport, error) {
url := instance.GetMetadataReportUrl()
bmr := &MetadataReport{
reportUrl: url,
syncReport: url.GetParamBool(constant.SYNC_REPORT_KEY, false),
failedReports: make(map[*identifier.MetadataIdentifier]interface{}, 4),
allMetadataReports: make(map[*identifier.MetadataIdentifier]interface{}, 4),
}
mrr, err := newMetadataReportRetry(
url.GetParamInt(constant.RETRY_PERIOD_KEY, defaultMetadataReportRetryPeriod),
url.GetParamInt(constant.RETRY_TIMES_KEY, defaultMetadataReportRetryTimes),
bmr.retry,
)
if err != nil {
return nil, err
}
bmr.metadataReportRetry = mrr
if url.GetParamBool(constant.CYCLE_REPORT_KEY, defaultMetadataReportCycleReport) {
scheduler := gocron.NewScheduler(time.UTC)
_, err := scheduler.Every(1).Day().Do(
func() {
logger.Info("start to publish all metadata.")
bmr.allMetadataReportsLock.RLock()
bmr.doHandlerMetadataCollection(bmr.allMetadataReports)
bmr.allMetadataReportsLock.RUnlock()
})
if err != nil {
return nil, err
}
scheduler.StartAt(time.Now().Add(500 * time.Millisecond))
scheduler.Start()
}
return bmr, nil
}
// retry will do metadata failed reports collection by call metadata report sdk
func (bmr *MetadataReport) retry() bool {
bmr.failedReportsLock.RLock()
return bmr.doHandlerMetadataCollection(bmr.failedReports)
}
// StoreProviderMetadata will delegate to call remote metadata's sdk to store provider service definition
func (bmr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) {
if bmr.syncReport {
bmr.storeMetadataTask(common.PROVIDER, identifier, definer)
}
go bmr.storeMetadataTask(common.PROVIDER, identifier, definer)
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
}
// storeMetadataTask will delegate to call remote metadata's sdk to store
func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) {
logger.Infof("store provider metadata. Identifier :%v ; definition: %v .", identifier, definer)
bmr.allMetadataReportsLock.Lock()
bmr.allMetadataReports[identifier] = definer
bmr.allMetadataReportsLock.Unlock()
bmr.failedReportsLock.Lock()
delete(bmr.failedReports, identifier)
bmr.failedReportsLock.Unlock()
// data is store the json marshaled definition
var (
data []byte
err error
)
defer func() {
if r := recover(); r != nil {
bmr.failedReportsLock.Lock()
bmr.failedReports[identifier] = definer
bmr.failedReportsLock.Unlock()
bmr.metadataReportRetry.startRetryTask()
logger.Errorf("Failed to put provider metadata %v in %v, cause: %v", identifier, string(data), r)
}
}()
data, err = json.Marshal(definer)
if err != nil {
logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %v", err)
panic(err)
}
report := instance.GetMetadataReportInstance()
if role == common.PROVIDER {
err = report.StoreProviderMetadata(identifier, string(data))
} else if role == common.CONSUMER {
err = report.StoreConsumerMetadata(identifier, string(data))
}
if err != nil {
logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %v", err)
panic(err)
}
}
// StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition
func (bmr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) {
if bmr.syncReport {
bmr.storeMetadataTask(common.CONSUMER, identifier, definer)
}
go bmr.storeMetadataTask(common.CONSUMER, identifier, definer)
}
// SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata
func (bmr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
report := instance.GetMetadataReportInstance()
if bmr.syncReport {
return report.SaveServiceMetadata(identifier, url)
}
go report.SaveServiceMetadata(identifier, url)
return nil
}
// RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata
func (bmr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error {
report := instance.GetMetadataReportInstance()
if bmr.syncReport {
return report.RemoveServiceMetadata(identifier)
}
go report.RemoveServiceMetadata(identifier)
return nil
}
// GetExportedURLs will delegate to call remote metadata's sdk to get exported urls
func (bmr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string {
report := instance.GetMetadataReportInstance()
return report.GetExportedURLs(identifier)
}
// SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data
func (bmr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
report := instance.GetMetadataReportInstance()
if bmr.syncReport {
return report.SaveSubscribedData(identifier, urls)
}
go report.SaveSubscribedData(identifier, urls)
return nil
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
}
// GetSubscribedURLs will delegate to call remote metadata's sdk to get subscribed urls
func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) []string {
report := instance.GetMetadataReportInstance()
return report.GetSubscribedURLs(identifier)
}
// GetServiceDefinition will delegate to call remote metadata's sdk to get service definitions
func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) string {
report := instance.GetMetadataReportInstance()
return report.GetServiceDefinition(identifier)
}
// doHandlerMetadataCollection will store metadata to metadata support with given metadataMap
func (bmr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool {
if len(metadataMap) == 0 {
return true
}
for e := range metadataMap {
if common.RoleType(common.PROVIDER).Role() == e.Side {
bmr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition))
} else if common.RoleType(common.CONSUMER).Role() == e.Side {
bmr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string))
}
}
return false
}