Skip to content
Snippets Groups Projects
Commit f6c3d442 authored by fangyincheng's avatar fangyincheng Committed by GitHub
Browse files

Merge pull request #602 from zouyx/feature/addProtocolComment

Mod: update the comments in protocol directory 
parents 4e2006c6 b41ddc5d
No related branches found
No related tags found
No related merge requests found
Showing
with 126 additions and 114 deletions
......@@ -111,7 +111,7 @@ func setClientGrpool() {
}
}
// Options ...
// Options is option for create dubbo client
type Options struct {
// connect timeout
ConnectTimeout time.Duration
......
......@@ -65,6 +65,7 @@ type DubboPackage struct {
Err error
}
// String prints dubbo package detail include header、path、body etc.
func (p DubboPackage) String() string {
return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body)
}
......
......@@ -27,7 +27,7 @@ import (
)
type (
// GettySessionParam ...
// GettySessionParam is session configuration for getty.
GettySessionParam struct {
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
......@@ -47,8 +47,7 @@ type (
SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"`
}
// ServerConfig
//Config holds supported types by the multiconfig package
// ServerConfig holds supported types by the multiconfig package
ServerConfig struct {
// session
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
......@@ -64,8 +63,7 @@ type (
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
// ClientConfig
//Config holds supported types by the multiconfig package
// ClientConfig holds supported types by the multiconfig package
ClientConfig struct {
ReconnectInterval int `default:"0" yaml:"reconnect_interval" json:"reconnect_interval,omitempty"`
......@@ -94,7 +92,7 @@ type (
}
)
// GetDefaultClientConfig ...
// GetDefaultClientConfig gets client default configuration.
func GetDefaultClientConfig() ClientConfig {
return ClientConfig{
ReconnectInterval: 0,
......@@ -122,7 +120,7 @@ func GetDefaultClientConfig() ClientConfig {
}}
}
// GetDefaultServerConfig ...
// GetDefaultServerConfig gets server default configuration.
func GetDefaultServerConfig() ServerConfig {
return ServerConfig{
SessionTimeout: "180s",
......
......@@ -33,7 +33,7 @@ import (
// dubbo protocol constant
const (
// DUBBO ...
// DUBBO is dubbo protocol name
DUBBO = "dubbo"
)
......
......@@ -41,10 +41,9 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)
// todo: WritePkg_Timeout will entry *.yml
// todo: writePkg_Timeout will entry *.yml
const (
// WritePkg_Timeout ...
WritePkg_Timeout = 5 * time.Second
writePkg_Timeout = 5 * time.Second
)
var (
......@@ -56,10 +55,12 @@ type rpcSession struct {
reqNum int32
}
// AddReqNum adds total request number safely
func (s *rpcSession) AddReqNum(num int32) {
atomic.AddInt32(&s.reqNum, num)
}
// GetReqNum gets total request number safely
func (s *rpcSession) GetReqNum() int32 {
return atomic.LoadInt32(&s.reqNum)
}
......@@ -68,35 +69,35 @@ func (s *rpcSession) GetReqNum() int32 {
// RpcClientHandler
// //////////////////////////////////////////
// RpcClientHandler ...
// RpcClientHandler is handler of RPC Client
type RpcClientHandler struct {
conn *gettyRPCClient
}
// NewRpcClientHandler ...
// NewRpcClientHandler creates RpcClientHandler with @gettyRPCClient
func NewRpcClientHandler(client *gettyRPCClient) *RpcClientHandler {
return &RpcClientHandler{conn: client}
}
// OnOpen ...
// OnOpen notified when RPC client session opened
func (h *RpcClientHandler) OnOpen(session getty.Session) error {
h.conn.addSession(session)
return nil
}
// OnError ...
// OnError notified when RPC client session got any error
func (h *RpcClientHandler) OnError(session getty.Session, err error) {
logger.Warnf("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.conn.removeSession(session)
}
// OnClose ...
// OnOpen notified when RPC client session closed
func (h *RpcClientHandler) OnClose(session getty.Session) {
logger.Infof("session{%s} is closing......", session.Stat())
h.conn.removeSession(session)
}
// OnMessage ...
// OnMessage notified when RPC client session got any message in connection
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*DubboPackage)
if !ok {
......@@ -141,7 +142,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
}
}
// OnCron ...
// OnCron notified when RPC client session got any message in cron job
func (h *RpcClientHandler) OnCron(session getty.Session) {
clientRpcSession, err := h.conn.getClientRpcSession(session)
if err != nil {
......@@ -163,7 +164,7 @@ func (h *RpcClientHandler) OnCron(session getty.Session) {
// RpcServerHandler
// //////////////////////////////////////////
// RpcServerHandler ...
// RpcServerHandler is handler of RPC Server
type RpcServerHandler struct {
maxSessionNum int
sessionTimeout time.Duration
......@@ -171,7 +172,7 @@ type RpcServerHandler struct {
rwlock sync.RWMutex
}
// NewRpcServerHandler ...
// NewRpcServerHandler creates RpcServerHandler with @maxSessionNum and @sessionTimeout
func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcServerHandler {
return &RpcServerHandler{
maxSessionNum: maxSessionNum,
......@@ -180,7 +181,7 @@ func NewRpcServerHandler(maxSessionNum int, sessionTimeout time.Duration) *RpcSe
}
}
// OnOpen ...
// OnOpen notified when RPC server session opened
func (h *RpcServerHandler) OnOpen(session getty.Session) error {
var err error
h.rwlock.RLock()
......@@ -199,7 +200,7 @@ func (h *RpcServerHandler) OnOpen(session getty.Session) error {
return nil
}
// OnError ...
// OnError notified when RPC server session got any error
func (h *RpcServerHandler) OnError(session getty.Session, err error) {
logger.Warnf("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.rwlock.Lock()
......@@ -207,7 +208,7 @@ func (h *RpcServerHandler) OnError(session getty.Session, err error) {
h.rwlock.Unlock()
}
// OnClose ...
// OnOpen notified when RPC server session closed
func (h *RpcServerHandler) OnClose(session getty.Session) {
logger.Infof("session{%s} is closing......", session.Stat())
h.rwlock.Lock()
......@@ -215,7 +216,7 @@ func (h *RpcServerHandler) OnClose(session getty.Session) {
h.rwlock.Unlock()
}
// OnMessage ...
// OnMessage notified when RPC server session got any message in connection
func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
......@@ -306,7 +307,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
reply(session, p, hessian.PackageResponse)
}
// OnCron ...
// OnCron notified when RPC server session got any message in cron job
func (h *RpcServerHandler) OnCron(session getty.Session) {
var (
flag bool
......@@ -363,7 +364,7 @@ func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
resp.Body = nil
}
if err := session.WritePkg(resp, WritePkg_Timeout); err != nil {
if err := session.WritePkg(resp, writePkg_Timeout); err != nil {
logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req.Header)
}
}
......@@ -82,13 +82,13 @@ func init() {
}
// Client return grpc connection and warp service stub
// Client is gRPC client include client connection and invoker
type Client struct {
*grpc.ClientConn
invoker reflect.Value
}
// NewClient ...
// NewClient creates a new gRPC client.
func NewClient(url common.URL) *Client {
// if global trace instance was set , it means trace function enabled. If not , will return Nooptracer
tracer := opentracing.GlobalTracer()
......
......@@ -33,14 +33,14 @@ type GrpcExporter struct {
*protocol.BaseExporter
}
// NewGrpcExporter ...
// NewGrpcExporter creates a new gRPC exporter
func NewGrpcExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *GrpcExporter {
return &GrpcExporter{
BaseExporter: protocol.NewBaseExporter(key, invoker, exporterMap),
}
}
// Unexport ...
// Unexport and unregister gRPC service from registry and memory.
func (gg *GrpcExporter) Unexport() {
serviceId := gg.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "")
interfaceName := gg.GetInvoker().GetUrl().GetParam(constant.INTERFACE_KEY, "")
......
......@@ -35,8 +35,7 @@ import (
"github.com/apache/dubbo-go/protocol"
)
// ErrNoReply ...
var ErrNoReply = errors.New("request need @response")
var errNoReply = errors.New("request need @response")
// GrpcInvoker ...
type GrpcInvoker struct {
......@@ -60,7 +59,7 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio
)
if invocation.Reply() == nil {
result.Err = ErrNoReply
result.Err = errNoReply
}
in := []reflect.Value{}
......
......@@ -39,14 +39,14 @@ func init() {
var grpcProtocol *GrpcProtocol
// GrpcProtocol ...
// GrpcProtocol is gRPC protocol
type GrpcProtocol struct {
protocol.BaseProtocol
serverMap map[string]*Server
serverLock sync.Mutex
}
// NewGRPCProtocol ...
// NewGRPCProtocol creates new gRPC protocol
func NewGRPCProtocol() *GrpcProtocol {
return &GrpcProtocol{
BaseProtocol: protocol.NewBaseProtocol(),
......@@ -54,7 +54,7 @@ func NewGRPCProtocol() *GrpcProtocol {
}
}
// Export ...
// Export gRPC service for remote invocation
func (gp *GrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetUrl()
serviceKey := url.ServiceKey()
......@@ -84,7 +84,7 @@ func (gp *GrpcProtocol) openServer(url common.URL) {
}
}
// Refer ...
// Refer a remote gRPC service
func (gp *GrpcProtocol) Refer(url common.URL) protocol.Invoker {
invoker := NewGrpcInvoker(url, NewClient(url))
gp.SetInvokers(invoker)
......@@ -92,7 +92,7 @@ func (gp *GrpcProtocol) Refer(url common.URL) protocol.Invoker {
return invoker
}
// Destroy ...
// Destroy will destroy gRPC all invoker and exporter, so it only is called once.
func (gp *GrpcProtocol) Destroy() {
logger.Infof("GrpcProtocol destroy.")
......@@ -104,7 +104,7 @@ func (gp *GrpcProtocol) Destroy() {
}
}
// GetProtocol ...
// GetProtocol gets gRPC protocol , will create if null.
func GetProtocol() protocol.Protocol {
if grpcProtocol == nil {
grpcProtocol = NewGRPCProtocol()
......
......@@ -42,7 +42,7 @@ func (s *server) SayHello(ctx context.Context, in *HelloRequest) (*HelloReply, e
return &HelloReply{Message: "Hello " + in.GetName()}, nil
}
// InitGrpcServer ...
// InitGrpcServer creates global gRPC server.
func InitGrpcServer() {
port := ":30000"
......@@ -57,7 +57,7 @@ func InitGrpcServer() {
}
}
// ShutdownGrpcServer ...
// ShutdownGrpcServer shuts down gRPC server gracefully
func ShutdownGrpcServer() {
if s == nil {
return
......
......@@ -37,24 +37,27 @@ import (
"github.com/apache/dubbo-go/protocol"
)
// Server ...
// Server is a gRPC server
type Server struct {
grpcServer *grpc.Server
}
// NewServer ...
// NewServer creates a new server
func NewServer() *Server {
return &Server{}
}
// DubboGrpcService ...
// DubboGrpcService is gRPC service
type DubboGrpcService interface {
// SetProxyImpl sets proxy.
SetProxyImpl(impl protocol.Invoker)
// GetProxyImpl gets proxy.
GetProxyImpl() protocol.Invoker
// ServiceDesc gets an RPC service's specification.
ServiceDesc() *grpc.ServiceDesc
}
// Start ...
// Start gRPC server with @url
func (s *Server) Start(url common.URL) {
var (
addr string
......@@ -106,7 +109,7 @@ func (s *Server) Start(url common.URL) {
}()
}
// Stop ...
// Stop gRPC server
func (s *Server) Stop() {
s.grpcServer.Stop()
}
......@@ -21,17 +21,26 @@ import (
"reflect"
)
// Invocation ...
// Invocation is a invocation for each remote method.
type Invocation interface {
// MethodName gets invocation method name.
MethodName() string
// ParameterTypes gets invocation parameter types.
ParameterTypes() []reflect.Type
// ParameterValues gets invocation parameter values.
ParameterValues() []reflect.Value
// Arguments gets arguments.
Arguments() []interface{}
// Reply gets response of request
Reply() interface{}
// Attachments gets all attachments
Attachments() map[string]string
// AttachmentsByKey gets attachment by key , if nil then return default value
AttachmentsByKey(string, string) string
// Refer to dubbo 2.7.6. It is different from attachment. It is used in internal process.
// Attributes refers to dubbo 2.7.6. It is different from attachment. It is used in internal process.
Attributes() map[string]interface{}
// AttributeByKey gets attribute by key , if nil then return default value
AttributeByKey(string, interface{}) interface{}
// Invoker gets the invoker in current context.
Invoker() Invoker
}
......@@ -31,7 +31,7 @@ import (
// ///////////////////////////
// todo: is it necessary to separate fields of consumer(provider) from RPCInvocation
// RPCInvocation ...
// nolint
type RPCInvocation struct {
methodName string
parameterTypes []reflect.Type
......@@ -46,7 +46,7 @@ type RPCInvocation struct {
lock sync.RWMutex
}
// NewRPCInvocation ...
// NewRPCInvocation creates a RPC invocation.
func NewRPCInvocation(methodName string, arguments []interface{}, attachments map[string]string) *RPCInvocation {
return &RPCInvocation{
methodName: methodName,
......@@ -56,7 +56,7 @@ func NewRPCInvocation(methodName string, arguments []interface{}, attachments ma
}
}
// NewRPCInvocationWithOptions ...
// NewRPCInvocationWithOptions creates a RPC invocation with @opts.
func NewRPCInvocationWithOptions(opts ...option) *RPCInvocation {
invo := &RPCInvocation{}
for _, opt := range opts {
......@@ -68,42 +68,42 @@ func NewRPCInvocationWithOptions(opts ...option) *RPCInvocation {
return invo
}
// MethodName ...
// MethodName gets RPC invocation method name.
func (r *RPCInvocation) MethodName() string {
return r.methodName
}
// ParameterTypes ...
// ParameterTypes gets RPC invocation parameter types.
func (r *RPCInvocation) ParameterTypes() []reflect.Type {
return r.parameterTypes
}
// ParameterValues ...
// ParameterValues gets RPC invocation parameter values.
func (r *RPCInvocation) ParameterValues() []reflect.Value {
return r.parameterValues
}
// Arguments ...
// Arguments gets RPC arguments.
func (r *RPCInvocation) Arguments() []interface{} {
return r.arguments
}
// Reply ...
// Reply gets response of RPC request.
func (r *RPCInvocation) Reply() interface{} {
return r.reply
}
// SetReply ...
// SetReply sets response of RPC request.
func (r *RPCInvocation) SetReply(reply interface{}) {
r.reply = reply
}
// Attachments ...
// Attachments gets all attachments of RPC.
func (r *RPCInvocation) Attachments() map[string]string {
return r.attachments
}
// AttachmentsByKey ...
// AttachmentsByKey gets RPC attachment by key , if nil then return default value.
func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string {
r.lock.RLock()
defer r.lock.RUnlock()
......@@ -117,12 +117,12 @@ func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string
return defaultValue
}
// get attributes
// Attributes gets all attributes of RPC.
func (r *RPCInvocation) Attributes() map[string]interface{} {
return r.attributes
}
// get attribute by key. If it is not exist, it will return default value
// AttributeByKey gets attribute by @key. If it is not exist, it will return default value.
func (r *RPCInvocation) AttributeByKey(key string, defaultValue interface{}) interface{} {
r.lock.RLock()
defer r.lock.RUnlock()
......@@ -133,7 +133,7 @@ func (r *RPCInvocation) AttributeByKey(key string, defaultValue interface{}) int
return defaultValue
}
// SetAttachments ...
// SetAttachments sets attribute by @key and @value.
func (r *RPCInvocation) SetAttachments(key string, value string) {
r.lock.Lock()
defer r.lock.Unlock()
......@@ -143,29 +143,24 @@ func (r *RPCInvocation) SetAttachments(key string, value string) {
r.attachments[key] = value
}
// SetAttribute. If Attributes is nil, it will be inited.
// SetAttribute sets attribute by @key and @value.
func (r *RPCInvocation) SetAttribute(key string, value interface{}) {
r.lock.Lock()
defer r.lock.Unlock()
r.attributes[key] = value
}
// Invoker ...
// Invoker gets the invoker in current context.
func (r *RPCInvocation) Invoker() protocol.Invoker {
return r.invoker
}
// SetInvoker ...
func (r *RPCInvocation) SetInvoker() protocol.Invoker {
return r.invoker
}
// CallBack ...
// CallBack sets RPC callback method.
func (r *RPCInvocation) CallBack() interface{} {
return r.callBack
}
// SetCallBack ...
// SetCallBack sets RPC callback method.
func (r *RPCInvocation) SetCallBack(c interface{}) {
r.callBack = c
}
......@@ -176,56 +171,56 @@ func (r *RPCInvocation) SetCallBack(c interface{}) {
type option func(invo *RPCInvocation)
// WithMethodName ...
// WithMethodName creates option with @methodName.
func WithMethodName(methodName string) option {
return func(invo *RPCInvocation) {
invo.methodName = methodName
}
}
// WithParameterTypes ...
// WithParameterTypes creates option with @parameterTypes.
func WithParameterTypes(parameterTypes []reflect.Type) option {
return func(invo *RPCInvocation) {
invo.parameterTypes = parameterTypes
}
}
// WithParameterValues ...
// WithParameterValues creates option with @parameterValues
func WithParameterValues(parameterValues []reflect.Value) option {
return func(invo *RPCInvocation) {
invo.parameterValues = parameterValues
}
}
// WithArguments ...
// WithArguments creates option with @arguments function.
func WithArguments(arguments []interface{}) option {
return func(invo *RPCInvocation) {
invo.arguments = arguments
}
}
// WithReply ...
// WithReply creates option with @reply function.
func WithReply(reply interface{}) option {
return func(invo *RPCInvocation) {
invo.reply = reply
}
}
// WithCallBack ...
// WithCallBack creates option with @callback function.
func WithCallBack(callBack interface{}) option {
return func(invo *RPCInvocation) {
invo.callBack = callBack
}
}
// WithAttachments ...
// WithAttachments creates option with @attachments.
func WithAttachments(attachments map[string]string) option {
return func(invo *RPCInvocation) {
invo.attachments = attachments
}
}
// WithInvoker ...
// WithInvoker creates option with @invoker.
func WithInvoker(invoker protocol.Invoker) option {
return func(invo *RPCInvocation) {
invo.invoker = invoker
......
......@@ -31,6 +31,7 @@ import (
// Extension - Invoker
type Invoker interface {
common.Node
// Invoke the invocation and return result.
Invoke(context.Context, Invocation) Result
}
......@@ -38,14 +39,14 @@ type Invoker interface {
// base invoker
/////////////////////////////
// BaseInvoker ...
// BaseInvoker provides default invoker implement
type BaseInvoker struct {
url common.URL
available bool
destroyed bool
}
// NewBaseInvoker ...
// NewBaseInvoker creates a new BaseInvoker
func NewBaseInvoker(url common.URL) *BaseInvoker {
return &BaseInvoker{
url: url,
......@@ -54,27 +55,27 @@ func NewBaseInvoker(url common.URL) *BaseInvoker {
}
}
// GetUrl ...
// GetUrl gets base invoker URL
func (bi *BaseInvoker) GetUrl() common.URL {
return bi.url
}
// IsAvailable ...
// IsAvailable gets available flag
func (bi *BaseInvoker) IsAvailable() bool {
return bi.available
}
// IsDestroyed ...
// IsDestroyed gets destroyed flag
func (bi *BaseInvoker) IsDestroyed() bool {
return bi.destroyed
}
// Invoke ...
// Invoke provides default invoker implement
func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Result {
return &RPCResult{}
}
// Destroy ...
// Destroy changes available and destroyed flag
func (bi *BaseInvoker) Destroy() {
logger.Infof("Destroy invoker: %s", bi.GetUrl().String())
bi.destroyed = true
......
......@@ -63,7 +63,7 @@ type Request struct {
// HTTP Client
// ////////////////////////////////////////////
// HTTPOptions ...
// HTTPOptions is a HTTP option include HandshakeTimeout and HTTPTimeout.
type HTTPOptions struct {
HandshakeTimeout time.Duration
HTTPTimeout time.Duration
......@@ -74,13 +74,13 @@ var defaultHTTPOptions = HTTPOptions{
HTTPTimeout: 3 * time.Second,
}
// HTTPClient ...
// HTTPClient is a HTTP client ,include ID and options.
type HTTPClient struct {
ID int64
options HTTPOptions
}
// NewHTTPClient ...
// NewHTTPClient creates a new HTTP client with HTTPOptions.
func NewHTTPClient(opt *HTTPOptions) *HTTPClient {
if opt == nil {
opt = &defaultHTTPOptions
......@@ -100,7 +100,7 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient {
}
}
// NewRequest ...
// NewRequest creates a new HTTP request with @service ,@method and @arguments.
func (c *HTTPClient) NewRequest(service common.URL, method string, args interface{}) *Request {
return &Request{
......@@ -114,7 +114,7 @@ func (c *HTTPClient) NewRequest(service common.URL, method string, args interfac
}
}
// Call ...
// Call makes a HTTP call with @ctx , @service ,@req and @rsp
func (c *HTTPClient) Call(ctx context.Context, service common.URL, req *Request, rsp interface{}) error {
// header
httpHeader := http.Header{}
......
......@@ -37,7 +37,7 @@ const (
VERSION = "2.0"
)
// CodecData ...
// CodecData is codec data for json RPC.
type CodecData struct {
ID int64
Method string
......@@ -64,6 +64,7 @@ type Error struct {
Data interface{} `json:"data,omitempty"`
}
// Error decodes response error for a string.
func (e *Error) Error() string {
buf, err := json.Marshal(e)
if err != nil {
......@@ -114,6 +115,7 @@ func newJsonClientCodec() *jsonClientCodec {
}
}
// Write codec data as byte.
func (c *jsonClientCodec) Write(d *CodecData) ([]byte, error) {
// If return error: it will be returned as is for this call.
// Allow param to be only Array, Slice, Map or Struct.
......@@ -170,6 +172,7 @@ func (c *jsonClientCodec) Write(d *CodecData) ([]byte, error) {
return buf.Bytes(), nil
}
// Read bytes as structured data
func (c *jsonClientCodec) Read(streamBytes []byte, x interface{}) error {
c.rsp.reset()
......@@ -223,6 +226,7 @@ func (r *serverRequest) reset() {
}
}
// UnmarshalJSON unmarshals JSON for server request.
func (r *serverRequest) UnmarshalJSON(raw []byte) error {
r.reset()
......@@ -281,7 +285,7 @@ type serverResponse struct {
Error interface{} `json:"error,omitempty"`
}
// ServerCodec ...
// ServerCodec is codec data for request server.
type ServerCodec struct {
req serverRequest
}
......@@ -296,7 +300,7 @@ func newServerCodec() *ServerCodec {
return &ServerCodec{}
}
// ReadHeader ...
// ReadHeader reads header and unmarshal to server codec
func (c *ServerCodec) ReadHeader(header map[string]string, body []byte) error {
if header["HttpMethod"] != "POST" {
return &Error{Code: -32601, Message: "Method not found"}
......@@ -328,7 +332,7 @@ func (c *ServerCodec) ReadHeader(header map[string]string, body []byte) error {
return nil
}
// ReadBody ...
// ReadBody reads @x as request body.
func (c *ServerCodec) ReadBody(x interface{}) error {
// If x!=nil and return error e:
// - Write() will be called with e.Error() in r.Error
......@@ -339,7 +343,7 @@ func (c *ServerCodec) ReadBody(x interface{}) error {
return nil
}
// 在这里把请求参数json 字符串转换成了相应的struct
// the request parameter JSON string is converted to the corresponding struct
params := []byte(*c.req.Params)
if err := json.Unmarshal(*c.req.Params, x); err != nil {
// Note: if c.request.Params is nil it's not an error, it's an optional member.
......@@ -362,7 +366,7 @@ func (c *ServerCodec) ReadBody(x interface{}) error {
return nil
}
// NewError ...
// NewError creates a error with @code and @message
func NewError(code int, message string) *Error {
return &Error{Code: code, Message: message}
}
......@@ -380,6 +384,7 @@ func newError(message string) *Error {
}
}
// Write responses as byte
func (c *ServerCodec) Write(errMsg string, x interface{}) ([]byte, error) {
// If return error: nothing happens.
// In r.Error will be "" or .Error() of error returned by:
......
......@@ -28,19 +28,19 @@ import (
"github.com/apache/dubbo-go/protocol"
)
// JsonrpcExporter ...
// JsonrpcExporter is JSON RPC exporter and extends from base invoker.
type JsonrpcExporter struct {
protocol.BaseExporter
}
// NewJsonrpcExporter ...
// NewJsonrpcExporter creates JSON RPC exporter with @key, @invoker and @exporterMap
func NewJsonrpcExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *JsonrpcExporter {
return &JsonrpcExporter{
BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap),
}
}
// Unexport ...
// Unexport exported JSON RPC service.
func (je *JsonrpcExporter) Unexport() {
serviceId := je.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "")
interfaceName := je.GetInvoker().GetUrl().GetParam(constant.INTERFACE_KEY, "")
......
......@@ -29,13 +29,13 @@ import (
invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
)
// JsonrpcInvoker ...
// JsonrpcInvoker is JSON RPC invoker
type JsonrpcInvoker struct {
protocol.BaseInvoker
client *HTTPClient
}
// NewJsonrpcInvoker ...
// NewJsonrpcInvoker creates JSON RPC invoker with @url and @client
func NewJsonrpcInvoker(url common.URL, client *HTTPClient) *JsonrpcInvoker {
return &JsonrpcInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
......@@ -43,7 +43,7 @@ func NewJsonrpcInvoker(url common.URL, client *HTTPClient) *JsonrpcInvoker {
}
}
// Invoke ...
// Invoke the JSON RPC invocation and return result.
func (ji *JsonrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
......
......@@ -44,14 +44,14 @@ func init() {
var jsonrpcProtocol *JsonrpcProtocol
// JsonrpcProtocol ...
// JsonrpcProtocol is JSON RPC protocol.
type JsonrpcProtocol struct {
protocol.BaseProtocol
serverMap map[string]*Server
serverLock sync.Mutex
}
// NewJsonrpcProtocol ...
// NewJsonrpcProtocol creates JSON RPC protocol
func NewJsonrpcProtocol() *JsonrpcProtocol {
return &JsonrpcProtocol{
BaseProtocol: protocol.NewBaseProtocol(),
......@@ -59,7 +59,7 @@ func NewJsonrpcProtocol() *JsonrpcProtocol {
}
}
// Export ...
// Export JSON RPC service for remote invocation
func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetUrl()
serviceKey := strings.TrimPrefix(url.Path, "/")
......@@ -74,7 +74,7 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
return exporter
}
// Refer ...
// Refer a remote JSON PRC service from registry
func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout
......@@ -93,7 +93,7 @@ func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker {
return invoker
}
// Destroy ...
// Destroy will destroy all invoker and exporter, so it only is called once.
func (jp *JsonrpcProtocol) Destroy() {
logger.Infof("jsonrpcProtocol destroy.")
......@@ -125,7 +125,7 @@ func (jp *JsonrpcProtocol) openServer(url common.URL) {
}
}
// GetProtocol ...
// GetProtocol gets JSON RPC protocol.
func GetProtocol() protocol.Protocol {
if jsonrpcProtocol == nil {
jsonrpcProtocol = NewJsonrpcProtocol()
......
......@@ -59,7 +59,7 @@ const (
PathPrefix = byte('/')
)
// Server ...
// Server is JSON RPC server wrapper
type Server struct {
done chan struct{}
once sync.Once
......@@ -69,7 +69,7 @@ type Server struct {
timeout time.Duration
}
// NewServer ...
// NewServer creates new JSON RPC server.
func NewServer() *Server {
return &Server{
done: make(chan struct{}),
......@@ -228,7 +228,7 @@ func accept(listener net.Listener, fn func(net.Conn)) error {
}
}
// Start ...
// Start JSON RPC server then ready for accept request.
func (s *Server) Start(url common.URL) {
listener, err := net.Listen("tcp", url.Location)
if err != nil {
......@@ -255,7 +255,7 @@ func (s *Server) Start(url common.URL) {
}()
}
// Stop ...
// Stop JSON RPC server, just can be call once.
func (s *Server) Stop() {
s.once.Do(func() {
close(s.done)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment