/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package dubbo import ( "bytes" "reflect" ) import ( "github.com/apache/dubbo-go-hessian2" "github.com/dubbogo/getty" perrors "github.com/pkg/errors" ) import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" ) //////////////////////////////////////////// // RpcClientPackageHandler //////////////////////////////////////////// type RpcClientPackageHandler struct { client *Client } func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { return &RpcClientPackageHandler{client: client} } func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { pkg := &DubboPackage{} buf := bytes.NewBuffer(data) err := pkg.Unmarshal(buf, p.client) if err != nil { originErr := perrors.Cause(err) if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { return nil, 0, nil } logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err) return nil, 0, perrors.WithStack(err) } pkg.Err = pkg.Body.(*hessian.Response).Exception pkg.Body = pkg.Body.(*hessian.Response).RspObj return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) error { req, ok := pkg.(*DubboPackage) if !ok { logger.Errorf("illegal pkg:%+v\n", pkg) return perrors.New("invalid rpc request") } buf, err := req.Marshal() if err != nil { logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err)) return perrors.WithStack(err) } return perrors.WithStack(ss.WriteBytes(buf.Bytes())) } //////////////////////////////////////////// // RpcServerPackageHandler //////////////////////////////////////////// var ( rpcServerPkgHandler = &RpcServerPackageHandler{} ) type RpcServerPackageHandler struct{} func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { pkg := &DubboPackage{ Body: make([]interface{}, 7), } buf := bytes.NewBuffer(data) err := pkg.Unmarshal(buf) if err != nil { originErr := perrors.Cause(err) if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { return nil, 0, nil } logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err) return nil, 0, perrors.WithStack(err) } if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 { // convert params of request req := pkg.Body.([]interface{}) // length of body should be 7 if len(req) > 0 { var dubboVersion, argsTypes string var args []interface{} var attachments map[interface{}]interface{} if req[0] != nil { dubboVersion = req[0].(string) } if req[1] != nil { pkg.Service.Path = req[1].(string) } if req[2] != nil { pkg.Service.Version = req[2].(string) } if req[3] != nil { pkg.Service.Method = req[3].(string) } if req[4] != nil { argsTypes = req[4].(string) } if req[5] != nil { args = req[5].([]interface{}) } if req[6] != nil { attachments = req[6].(map[interface{}]interface{}) } pkg.Service.Interface = attachments[constant.INTERFACE_KEY].(string) if pkg.Service.Path == "" && attachments[constant.PATH_KEY] != nil { pkg.Service.Path = attachments[constant.PATH_KEY].(string) } if attachments[constant.GROUP_KEY] != nil { pkg.Service.Group = attachments[constant.GROUP_KEY].(string) } pkg.Body = map[string]interface{}{ "dubboVersion": dubboVersion, "argsTypes": argsTypes, "args": args, "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Path), // path as a key "attachments": attachments, } } } return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) error { res, ok := pkg.(*DubboPackage) if !ok { logger.Errorf("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg)) return perrors.New("invalid rpc response") } buf, err := res.Marshal() if err != nil { logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err)) return perrors.WithStack(err) } return perrors.WithStack(ss.WriteBytes(buf.Bytes())) }