diff --git a/protocol/grpc/client.go b/protocol/grpc/client.go index 6026f0991b926fd38de8aef3774e46b001820edd..92f42c3952f0b7233a93f9b0bbbbcbd1b5dfdc0e 100644 --- a/protocol/grpc/client.go +++ b/protocol/grpc/client.go @@ -25,14 +25,53 @@ import ( "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/opentracing/opentracing-go" "google.golang.org/grpc" + "gopkg.in/yaml.v2" ) import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" ) +var ( + clientConf *ClientConfig +) + +func init() { + // load clientconfig from consumer_config + // default use grpc + consumerConfig := config.GetConsumerConfig() + if consumerConfig.ApplicationConfig == nil { + return + } + protocolConf := config.GetConsumerConfig().ProtocolConf + defaultClientConfig := GetDefaultClientConfig() + if protocolConf == nil { + logger.Info("protocol_conf default use dubbo config") + } else { + grpcConf := protocolConf.(map[interface{}]interface{})[GRPC] + if grpcConf == nil { + logger.Warnf("grpcConf is nil") + return + } + grpcConfByte, err := yaml.Marshal(grpcConf) + if err != nil { + panic(err) + } + err = yaml.Unmarshal(grpcConfByte, &defaultClientConfig) + if err != nil { + panic(err) + } + } + clientConf = &defaultClientConfig + if err := clientConf.Validate(); err != nil { + logger.Warnf("[CheckValidity] error: %v", err) + return + } +} + // Client ... type Client struct { *grpc.ClientConn @@ -43,9 +82,11 @@ type Client struct { func NewClient(url common.URL) *Client { // if global trace instance was set , it means trace function enabled. If not , will return Nooptracer tracer := opentracing.GlobalTracer() - conn, err := grpc.Dial(url.Location, grpc.WithInsecure(), grpc.WithBlock(), - grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads()))) + dailOpts := make([]grpc.DialOption, 0) + dailOpts = append(dailOpts, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( + otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads())), + grpc.WithDefaultCallOptions(grpc.CallContentSubtype(clientConf.ContentType))) + conn, err := grpc.Dial(url.Location, dailOpts...) if err != nil { panic(err) } diff --git a/protocol/grpc/codec.go b/protocol/grpc/codec.go new file mode 100644 index 0000000000000000000000000000000000000000..a2f1882c75511ca7755a9f2078a2cc52222f09fc --- /dev/null +++ b/protocol/grpc/codec.go @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package grpc + + +import ( + "bytes" + "encoding/json" +) + +import ( + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc/encoding" +) + +func init() { + encoding.RegisterCodec(JSON{ + Marshaler: jsonpb.Marshaler{ + EmitDefaults: true, + OrigName: true, + }, + }) +} + +type JSON struct { + jsonpb.Marshaler + jsonpb.Unmarshaler +} + +func (_ JSON) Name() string { + return "json" +} + +func (j JSON) Marshal(v interface{}) (out []byte, err error) { + if pm, ok := v.(proto.Message); ok { + b := new(bytes.Buffer) + err := j.Marshaler.Marshal(b, pm) + if err != nil { + return nil, err + } + return b.Bytes(), nil + } + return json.Marshal(v) +} + +func (j JSON) Unmarshal(data []byte, v interface{}) (err error) { + if pm, ok := v.(proto.Message); ok { + b := bytes.NewBuffer(data) + return j.Unmarshaler.Unmarshal(b, pm) + } + return json.Unmarshal(data, v) +} diff --git a/protocol/grpc/config.go b/protocol/grpc/config.go new file mode 100644 index 0000000000000000000000000000000000000000..a37750ce327f48e91b1ca031b967fc0a1f70ffe9 --- /dev/null +++ b/protocol/grpc/config.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package grpc + +type ( + // ServerConfig + ServerConfig struct { + } + + // ClientConfig + ClientConfig struct { + // content type, more information refer by https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests + ContentType string `default:"application/grpc+proto" yaml:"content_type" json:"content_type,omitempty"` + } +) + +// GetDefaultClientConfig ... +func GetDefaultClientConfig() ClientConfig { + return ClientConfig{ + ContentType: "application/grpc+proto", + } +} + +// GetDefaultServerConfig ... +func GetDefaultServerConfig() ServerConfig { + return ServerConfig{ + } +} + +func (c *ClientConfig) Validate() error { + return nil +} + +func (c *ServerConfig) Validate() error { + return nil +}