Skip to content
Snippets Groups Projects
Commit c506f91f authored by 382673304@qq.com's avatar 382673304@qq.com
Browse files

feat: add grpc max message size config

parent b81274af
No related branches found
No related tags found
No related merge requests found
...@@ -25,6 +25,7 @@ const ( ...@@ -25,6 +25,7 @@ const (
GROUP_KEY = "group" GROUP_KEY = "group"
VERSION_KEY = "version" VERSION_KEY = "version"
INTERFACE_KEY = "interface" INTERFACE_KEY = "interface"
GRPC_MESSAGE_SIZE_KEY = "message_size"
PATH_KEY = "path" PATH_KEY = "path"
SERVICE_KEY = "service" SERVICE_KEY = "service"
METHODS_KEY = "methods" METHODS_KEY = "methods"
......
...@@ -73,6 +73,7 @@ type ServiceConfig struct { ...@@ -73,6 +73,7 @@ type ServiceConfig struct {
Auth string `yaml:"auth" json:"auth,omitempty" property:"auth"` Auth string `yaml:"auth" json:"auth,omitempty" property:"auth"`
ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"` ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"`
Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"` Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"`
GrpcMaxMessageSize int `default:"4" yaml:"max_message_size" json:"max_message_size,omitempty"`
Protocols map[string]*ProtocolConfig Protocols map[string]*ProtocolConfig
unexported *atomic.Bool unexported *atomic.Bool
...@@ -271,6 +272,7 @@ func (c *ServiceConfig) getUrlMap() url.Values { ...@@ -271,6 +272,7 @@ func (c *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.RELEASE_KEY, "dubbo-golang-"+constant.Version) urlMap.Set(constant.RELEASE_KEY, "dubbo-golang-"+constant.Version)
urlMap.Set(constant.SIDE_KEY, (common.RoleType(common.PROVIDER)).Role()) urlMap.Set(constant.SIDE_KEY, (common.RoleType(common.PROVIDER)).Role())
urlMap.Set(constant.GRPC_MESSAGE_SIZE_KEY, strconv.Itoa(c.GrpcMaxMessageSize))
// todo: move // todo: move
urlMap.Set(constant.SERIALIZATION_KEY, c.Serialization) urlMap.Set(constant.SERIALIZATION_KEY, c.Serialization)
// application info // application info
......
...@@ -19,6 +19,7 @@ package grpc ...@@ -19,6 +19,7 @@ package grpc
import ( import (
"reflect" "reflect"
"strconv"
) )
import ( import (
...@@ -93,9 +94,13 @@ func NewClient(url common.URL) *Client { ...@@ -93,9 +94,13 @@ func NewClient(url common.URL) *Client {
// if global trace instance was set , it means trace function enabled. If not , will return Nooptracer // if global trace instance was set , it means trace function enabled. If not , will return Nooptracer
tracer := opentracing.GlobalTracer() tracer := opentracing.GlobalTracer()
dailOpts := make([]grpc.DialOption, 0, 4) dailOpts := make([]grpc.DialOption, 0, 4)
maxMessageSize, _ := strconv.Atoi(url.GetParam(constant.GRPC_MESSAGE_SIZE_KEY, "4"))
dailOpts = append(dailOpts, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( dailOpts = append(dailOpts, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads())), otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads())),
grpc.WithDefaultCallOptions(grpc.CallContentSubtype(clientConf.ContentSubType))) grpc.WithDefaultCallOptions(
grpc.CallContentSubtype(clientConf.ContentSubType),
grpc.MaxCallRecvMsgSize(1024*1024*maxMessageSize),
grpc.MaxCallSendMsgSize(1024*1024*maxMessageSize)))
conn, err := grpc.Dial(url.Location, dailOpts...) conn, err := grpc.Dial(url.Location, dailOpts...)
if err != nil { if err != nil {
panic(err) panic(err)
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package grpc package grpc
import ( import (
"github.com/apache/dubbo-go/common/constant"
"strconv"
"sync" "sync"
) )
...@@ -76,7 +78,9 @@ func (gp *GrpcProtocol) openServer(url common.URL) { ...@@ -76,7 +78,9 @@ func (gp *GrpcProtocol) openServer(url common.URL) {
gp.serverLock.Lock() gp.serverLock.Lock()
_, ok = gp.serverMap[url.Location] _, ok = gp.serverMap[url.Location]
if !ok { if !ok {
grpcMessageSize, _ := strconv.Atoi(url.GetParam(constant.GRPC_MESSAGE_SIZE_KEY, "4"))
srv := NewServer() srv := NewServer()
srv.SetBufferSize(grpcMessageSize)
gp.serverMap[url.Location] = srv gp.serverMap[url.Location] = srv
srv.Start(url) srv.Start(url)
} }
......
...@@ -40,6 +40,7 @@ import ( ...@@ -40,6 +40,7 @@ import (
// Server is a gRPC server // Server is a gRPC server
type Server struct { type Server struct {
grpcServer *grpc.Server grpcServer *grpc.Server
bufferSize int
} }
// NewServer creates a new server // NewServer creates a new server
...@@ -57,6 +58,10 @@ type DubboGrpcService interface { ...@@ -57,6 +58,10 @@ type DubboGrpcService interface {
ServiceDesc() *grpc.ServiceDesc ServiceDesc() *grpc.ServiceDesc
} }
func (s *Server) SetBufferSize(n int) {
s.bufferSize = n
}
// Start gRPC server with @url // Start gRPC server with @url
func (s *Server) Start(url common.URL) { func (s *Server) Start(url common.URL) {
var ( var (
...@@ -72,7 +77,11 @@ func (s *Server) Start(url common.URL) { ...@@ -72,7 +77,11 @@ func (s *Server) Start(url common.URL) {
// if global trace instance was set, then server tracer instance can be get. If not , will return Nooptracer // if global trace instance was set, then server tracer instance can be get. If not , will return Nooptracer
tracer := opentracing.GlobalTracer() tracer := opentracing.GlobalTracer()
server := grpc.NewServer( server := grpc.NewServer(
grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer))) grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)),
grpc.MaxRecvMsgSize(1024*1024*s.bufferSize),
grpc.MaxSendMsgSize(1024*1024*s.bufferSize))
fmt.Println("-------------------")
fmt.Println("size = ", s.bufferSize)
key := url.GetParam(constant.BEAN_NAME_KEY, "") key := url.GetParam(constant.BEAN_NAME_KEY, "")
service := config.GetProviderService(key) service := config.GetProviderService(key)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment