Skip to content
Snippets Groups Projects
Commit bac7c65d authored by vito.he's avatar vito.he
Browse files

Mod:merge from develop

parents f169566d 63e0369b
No related branches found
No related tags found
No related merge requests found
Showing
with 179 additions and 66 deletions
......@@ -23,14 +23,10 @@ import (
type MockService struct{}
func (*MockService) Service() string {
func (*MockService) Reference() string {
return "MockService"
}
func (*MockService) Version() string {
return "1.0"
}
func (*MockService) GetUser(ctx context.Context, itf []interface{}, str *struct{}) error {
return nil
}
......
......@@ -63,7 +63,7 @@ func GetProviderConfig() ProviderConfig {
return *providerConfig
}
func providerInit(confProFile string) error {
func ProviderInit(confProFile string) error {
if len(confProFile) == 0 {
return perrors.Errorf("application configure(provider) file name is nil")
}
......
......@@ -38,29 +38,33 @@ import (
type ReferenceConfig struct {
context context.Context
pxy *proxy.Proxy
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
async bool `yaml:"async" json:"async,omitempty" property:"async"`
id string
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
async bool `yaml:"async" json:"async,omitempty" property:"async"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
invoker protocol.Invoker
urls []*common.URL
Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"`
}
func (c *ReferenceConfig) Prefix() string {
return constant.ReferenceConfigPrefix + c.InterfaceName + "."
}
func NewReferenceConfig(ctx context.Context) *ReferenceConfig {
return &ReferenceConfig{context: ctx}
// The only way to get a new ReferenceConfig
func NewReferenceConfig(id string, ctx context.Context) *ReferenceConfig {
return &ReferenceConfig{id: id, context: ctx}
}
func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
......@@ -75,7 +79,7 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro
}
func (refconfig *ReferenceConfig) Refer() {
url := common.NewURLWithOptions(common.WithPath(refconfig.InterfaceName), common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap()))
url := common.NewURLWithOptions(common.WithPath(refconfig.id), common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap()))
//1. user specified URL, could be peer-to-peer address, or register center's address.
if refconfig.Url != "" {
......@@ -90,7 +94,7 @@ func (refconfig *ReferenceConfig) Refer() {
refconfig.urls = append(refconfig.urls, &serviceUrl)
} else {
if serviceUrl.Path == "" {
serviceUrl.Path = "/" + refconfig.InterfaceName
serviceUrl.Path = "/" + refconfig.id
}
// merge url need to do
newUrl := common.MergeUrl(serviceUrl, url)
......@@ -107,7 +111,6 @@ func (refconfig *ReferenceConfig) Refer() {
regUrl.SubURL = url
}
}
if len(refconfig.urls) == 1 {
refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0])
} else {
......@@ -143,6 +146,10 @@ func (refconfig *ReferenceConfig) GetRPCService() common.RPCService {
func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
//first set user params
for k, v := range refconfig.Params {
urlMap.Set(k, v)
}
urlMap.Set(constant.INTERFACE_KEY, refconfig.InterfaceName)
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, refconfig.Cluster)
......@@ -150,6 +157,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(refconfig.Retries, 10))
urlMap.Set(constant.GROUP_KEY, refconfig.Group)
urlMap.Set(constant.VERSION_KEY, refconfig.Version)
urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic))
//getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async))
......@@ -163,7 +171,11 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.ENVIRONMENT_KEY, consumerConfig.ApplicationConfig.Environment)
//filter
urlMap.Set(constant.REFERENCE_FILTER_KEY, mergeValue(consumerConfig.Filter, refconfig.Filter, constant.DEFAULT_REFERENCE_FILTERS))
var defaultReferenceFilter = constant.DEFAULT_REFERENCE_FILTERS
if refconfig.Generic {
defaultReferenceFilter = constant.GENERIC_REFERENCE_FILTERS + defaultReferenceFilter
}
urlMap.Set(constant.REFERENCE_FILTER_KEY, mergeValue(consumerConfig.Filter, refconfig.Filter, defaultReferenceFilter))
for _, v := range refconfig.Methods {
urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
......@@ -173,3 +185,11 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
return urlMap
}
func (refconfig *ReferenceConfig) GenericLoad(id string) {
genericService := NewGenericService(refconfig.id)
SetConsumerService(genericService)
refconfig.id = id
refconfig.Refer()
refconfig.Implement(genericService)
return
}
......@@ -80,8 +80,11 @@ func doInit() {
},
References: map[string]*ReferenceConfig{
"MockService": {
Params: map[string]string{
"serviceid": "soa.mock",
},
Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2",
InterfaceName: "MockService",
InterfaceName: "com.MockService",
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
......@@ -125,6 +128,7 @@ func Test_Refer(t *testing.T) {
for _, reference := range consumerConfig.References {
reference.Refer()
assert.Equal(t, "soa.mock", reference.Params["serviceid"])
assert.NotNil(t, reference.invoker)
assert.NotNil(t, reference.pxy)
}
......@@ -143,6 +147,7 @@ func Test_ReferP2P(t *testing.T) {
}
consumerConfig = nil
}
func Test_ReferMultiP2P(t *testing.T) {
doInit()
extension.SetProtocol("dubbo", GetProtocol)
......
......@@ -68,13 +68,28 @@ func loadRegistries(targetRegistries string, registries map[string]*RegistryConf
}
if target {
url, err := common.NewURL(
context.TODO(),
constant.REGISTRY_PROTOCOL+"://"+registryConf.Address,
common.WithParams(registryConf.getUrlMap(roleType)),
common.WithUsername(registryConf.Username),
common.WithPassword(registryConf.Password),
var (
url common.URL
err error
)
if addresses := strings.Split(registryConf.Address, ","); len(addresses) > 1 {
url, err = common.NewURL(
context.Background(),
constant.REGISTRY_PROTOCOL+"://"+addresses[0],
common.WithParams(registryConf.getUrlMap(roleType)),
common.WithUsername(registryConf.Username),
common.WithPassword(registryConf.Password),
common.WithLocation(registryConf.Address),
)
} else {
url, err = common.NewURL(
context.Background(),
constant.REGISTRY_PROTOCOL+"://"+registryConf.Address,
common.WithParams(registryConf.getUrlMap(roleType)),
common.WithUsername(registryConf.Username),
common.WithPassword(registryConf.Password),
)
}
if err != nil {
logger.Errorf("The registry id:%s url is invalid ,and will skip the registry, error: %#v", k, err)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"fmt"
"testing"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/stretchr/testify/assert"
)
func Test_loadRegistries(t *testing.T) {
target := "shanghai1"
regs := map[string]*RegistryConfig{
"shanghai1": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "shanghai_idc",
Address: "127.0.0.2:2181,128.0.0.1:2181",
Username: "user1",
Password: "pwd1",
},
}
urls := loadRegistries(target, regs, common.CONSUMER)
fmt.Println(urls[0])
assert.Equal(t, "127.0.0.2:2181,128.0.0.1:2181", urls[0].Location)
}
func Test_loadRegistries1(t *testing.T) {
target := "shanghai1"
regs := map[string]*RegistryConfig{
"shanghai1": {
Protocol: "mock",
TimeoutStr: "2s",
Group: "shanghai_idc",
Address: "127.0.0.2:2181",
Username: "user1",
Password: "pwd1",
},
}
urls := loadRegistries(target, regs, common.CONSUMER)
fmt.Println(urls[0])
assert.Equal(t, "127.0.0.2:2181", urls[0].Location)
}
......@@ -28,12 +28,12 @@ var (
// SetConService is called by init() of implement of RPCService
func SetConsumerService(service common.RPCService) {
conServices[service.Service()] = service
conServices[service.Reference()] = service
}
// SetProService is called by init() of implement of RPCService
func SetProviderService(service common.RPCService) {
proServices[service.Service()] = service
proServices[service.Reference()] = service
}
func GetConsumerService(name string) common.RPCService {
......
......@@ -43,17 +43,19 @@ import (
type ServiceConfig struct {
context context.Context
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"`
Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
Group string `yaml:"group" json:"group,omitempty" property:"group"`
Version string `yaml:"version" json:"version,omitempty" property:"version" `
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
......@@ -66,8 +68,12 @@ func (c *ServiceConfig) Prefix() string {
return constant.ServiceConfigPrefix + c.InterfaceName + "."
}
func NewServiceConfig() *ServiceConfig {
// The only way to get a new ServiceConfig
func NewServiceConfig(id string, context context.Context) *ServiceConfig {
return &ServiceConfig{
context: context,
id: id,
unexported: atomic.NewBool(false),
exported: atomic.NewBool(false),
}
......@@ -99,15 +105,12 @@ func (srvconfig *ServiceConfig) Export() error {
logger.Errorf(err.Error())
return err
}
//contextPath := proto.ContextPath
//if contextPath == "" {
// contextPath = providerConfig.Path
//}
url := common.NewURLWithOptions(common.WithPath(srvconfig.InterfaceName),
url := common.NewURLWithOptions(common.WithPath(srvconfig.id),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(proto.Port),
common.WithParams(urlMap),
common.WithParamsValue(constant.BEAN_NAME_KEY, srvconfig.id),
common.WithMethods(strings.Split(methods, ",")))
if len(regUrls) > 0 {
......@@ -148,6 +151,10 @@ func (srvconfig *ServiceConfig) Implement(s common.RPCService) {
func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
//first set user params
for k, v := range srvconfig.Params {
urlMap.Set(k, v)
}
urlMap.Set(constant.INTERFACE_KEY, srvconfig.InterfaceName)
urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10))
urlMap.Set(constant.CLUSTER_KEY, srvconfig.Cluster)
......
......@@ -74,7 +74,7 @@ func doinit() {
},
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "MockService",
InterfaceName: "com.MockService",
Protocol: "mock",
Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2",
Cluster: "failover",
......
......@@ -36,12 +36,17 @@ references:
registry: "hangzhouzk,shanghaizk"
filter: ""
protocol : "dubbo"
version: "1.0"
group: "as"
interface : "com.ikurento.user.UserProvider"
url: "dubbo://127.0.0.1:20000"
url: "dubbo://127.0.0.1:20000/UserProvider"
cluster: "failover"
methods :
- name: "GetUser"
retries: 3
params:
"serviceid":
"soa.com.ikurento.user.UserProvider"
protocol_conf:
dubbo:
......@@ -49,9 +54,14 @@ protocol_conf:
connection_number: 2
heartbeat_period: "5s"
session_timeout: "20s"
fail_fast_timeout: "5s"
pool_size: 64
pool_ttl: 600
# gr_pool_size is recommended to be set to [cpu core number] * 100
gr_pool_size: 1200
# queue_len is recommended to be set to 64 or 128
queue_len: 64
# queue_number is recommended to be set to gr_pool_size / 20
queue_number: 60
getty_session_param:
compress_encoding: false
tcp_no_delay: true
......@@ -59,7 +69,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
......
......@@ -9,7 +9,7 @@ references:
filter: ""
protocol : "dubbo"
interface : "com.ikurento.user.UserProvider"
url: "dubbo://127.0.0.1:20000"
url: "dubbo://127.0.0.1:20000/UserProvider"
cluster: "failover"
methods :
- name: "GetUser"
......@@ -21,7 +21,6 @@ protocol_conf:
connection_number: 2
heartbeat_period: "5s"
session_timeout: "20s"
fail_fast_timeout: "5s"
pool_size: 64
pool_ttl: 600
getty_session_param:
......@@ -31,7 +30,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
......
......@@ -33,6 +33,8 @@ services:
# equivalent to interface of dubbo.xml
interface : "com.ikurento.user.UserProvider"
loadbalance: "random"
version: "1.0"
group: "as"
warmup: "100"
cluster: "failover"
methods:
......@@ -53,8 +55,13 @@ protocols:
protocol_conf:
dubbo:
session_number: 700
fail_fast_timeout: "5s"
session_timeout: "20s"
# gr_pool_size is recommended to be set to [cpu core number] * 10
gr_pool_size: 120
# queue_len is recommended to be set to 64 or 128
queue_len: 64
# queue_number is recommended to be set to gr_pool_size / 20
queue_number: 6
getty_session_param:
compress_encoding: false
tcp_no_delay: true
......@@ -62,7 +69,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
......
......@@ -108,10 +108,6 @@ type UserProvider struct {
Echo func(ctx context.Context, req interface{}) (interface{}, error) // Echo represent EchoFilter will be used
}
func (u *UserProvider) Service() string {
return "com.ikurento.user.UserProvider"
}
func (u *UserProvider) Version() string {
return ""
func (u *UserProvider) Reference() string {
return "UserProvider"
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment