diff --git a/pkg/container/batch/batch.go b/pkg/container/batch/batch.go index 950bdfc9b0b610d8f0e7d33c566df2d0edd375fb..8af2518b21f614d32fe4042e049475c3dcf23f81 100644 --- a/pkg/container/batch/batch.go +++ b/pkg/container/batch/batch.go @@ -46,7 +46,7 @@ func (bat *Batch) GetVector(name string, proc *process.Process) (*vector.Vector, proc.Mp.Free(data) data = buf[:mempool.CountSize+n] } - if err := bat.Vecs[i].Read(data, proc); err != nil { + if err := bat.Vecs[i].Read(data); err != nil { proc.Mp.Free(data) return nil, err } diff --git a/pkg/container/vector/bytes.go b/pkg/container/types/bytes.go similarity index 77% rename from pkg/container/vector/bytes.go rename to pkg/container/types/bytes.go index 0aa5c107f12a6bf886fc7a48badad9b93b737be3..c3c7a95e08fee8775fd75679e70ed5c9db2c2e25 100644 --- a/pkg/container/vector/bytes.go +++ b/pkg/container/types/bytes.go @@ -1,8 +1,7 @@ -package vector +package types import ( "bytes" - "unsafe" ) func (a *Bytes) Reset() { @@ -15,6 +14,14 @@ func (a *Bytes) Set(aidx int64, b *Bytes, bidx int64) int { return -1 } +func (a *Bytes) Window(start, end int) *Bytes { + return &Bytes{ + Data: a.Data, + Os: a.Os[start:end], + Ns: a.Ns[start:end], + } +} + func (a *Bytes) Append(vs [][]byte) error { o := uint32(len(a.Data)) for _, v := range vs { @@ -26,14 +33,6 @@ func (a *Bytes) Append(vs [][]byte) error { return nil } -func (a *Bytes) Window(start, end int) *Bytes { - return &Bytes{ - Data: a.Data, - Os: a.Os[start:end], - Ns: a.Ns[start:end], - } -} - func (a *Bytes) String() string { var buf bytes.Buffer @@ -48,14 +47,3 @@ func (a *Bytes) String() string { buf.WriteByte(']') return buf.String() } - -func (a *Bytes) ToStrings() []string { - var tm []byte - - rs := make([]string, len(a.Os)) - for i, o := range a.Os { - tm = a.Data[o : o+a.Ns[i]] - rs[i] = *(*string)(unsafe.Pointer(&tm)) - } - return rs -} diff --git a/pkg/container/types/date.go b/pkg/container/types/date.go new file mode 100644 index 0000000000000000000000000000000000000000..ca039feae1f1491583e7d472e1950da71f3e582e --- /dev/null +++ b/pkg/container/types/date.go @@ -0,0 +1,5 @@ +package types + +func (a Date) String() string { + return "" +} diff --git a/pkg/container/types/datetime.go b/pkg/container/types/datetime.go new file mode 100644 index 0000000000000000000000000000000000000000..a3ea7b0d3bb6583f82b123e54166b1d480105f3f --- /dev/null +++ b/pkg/container/types/datetime.go @@ -0,0 +1,5 @@ +package types + +func (a Datetime) String() string { + return "" +} diff --git a/pkg/container/types/decimal.go b/pkg/container/types/decimal.go new file mode 100644 index 0000000000000000000000000000000000000000..187236511a862960c62054f3e900825e8d8bd736 --- /dev/null +++ b/pkg/container/types/decimal.go @@ -0,0 +1,5 @@ +package types + +func (a Decimal) String() string { + return "" +} diff --git a/pkg/container/types/types.go b/pkg/container/types/types.go index d7a17020ce932036fe7dd9c63d50f84f53651d6e..1326cc7a69c6a26dd4f751da3e59c643f09261ba 100644 --- a/pkg/container/types/types.go +++ b/pkg/container/types/types.go @@ -3,18 +3,16 @@ package types import "fmt" const ( - // system family + // any family T_any = 0 // numeric/integer family T_int8 = 1 T_int16 = 2 - T_int24 = 3 - T_int32 = 4 + T_int32 = 3 T_int64 = 5 T_uint8 = 6 T_uint16 = 7 - T_uint24 = 8 T_uint32 = 9 T_uint64 = 10 @@ -25,33 +23,16 @@ const ( T_float32 = 12 T_float64 = 13 - // numeric/bit family - T_bit = 14 - // date family - T_date = 15 - T_time = 16 - T_year = 17 - T_datetime = 18 - T_timestamp = 19 + T_date = 15 // 3 byte + T_datetime = 18 // 8 byte // string family - T_char = 20 - T_varchar = 21 - T_binary = 22 - T_varbinary = 23 - - // string/text family - T_tinytext = 24 - T_mediumtext = 25 - T_text = 26 - T_longtext = 27 - - // string/blob family - T_tinyblob = 28 - T_mediumblob = 29 - T_blob = 30 - T_longblob = 31 + T_char = 20 + T_varchar = 21 + + // json family + T_json = 32 // system family T_sel = 200 //selection @@ -61,53 +42,52 @@ const ( type T uint8 type Type struct { - Typ T + Oid T + Size int32 // e.g. int8.Size = 1, int16.Size = 2, char.Size = 24(SliceHeader size) Width int32 Precision int32 } +type Bytes struct { + Data []byte + Os []uint32 + Ns []uint32 +} + +type Date struct { +} + +type Datetime struct { +} + +type Decimal struct { +} + var Types map[string]T = map[string]T{ - "tinyint": T_int8, - "smallint": T_int16, - "mediumint": T_int24, - "int": T_int32, - "integer": T_int32, - "bigint": T_int64, - - "tinyint unsigned": T_int8, - "smallint unsigned": T_int16, - "mediumint unsigned": T_int24, - "int unsigned": T_int32, - "integer unsigned": T_int32, - "bigint unsigned": T_int64, + "tinyint": T_int8, + "smallint": T_int16, + "int": T_int32, + "integer": T_int32, + "bigint": T_int64, + + "tinyint unsigned": T_int8, + "smallint unsigned": T_int16, + "int unsigned": T_int32, + "integer unsigned": T_int32, + "bigint unsigned": T_int64, "decimal": T_decimal, "float": T_float32, "double": T_float64, - "bit": T_bit, - - "date": T_date, - "time": T_time, - "year": T_year, - "datetime": T_datetime, - "timestamp": T_timestamp, - - "char": T_char, - "varchar": T_varchar, - "binary": T_binary, - "varbinary": T_varbinary, + "date": T_date, + "datetime": T_datetime, - "tinytext": T_tinytext, - "mediumtext": T_mediumtext, - "text": T_text, - "longtext": T_longtext, + "char": T_char, + "varchar": T_varchar, - "tinyblob": T_tinyblob, - "mediumblob": T_mediumblob, - "blob": T_blob, - "longblob": T_longblob, + "json": T_json, } func (t T) String() string { @@ -116,8 +96,6 @@ func (t T) String() string { return "TINYINT" case T_int16: return "SMALLINT" - case T_int24: - return "MEDIUMINT" case T_int32: return "INT" case T_int64: @@ -126,8 +104,6 @@ func (t T) String() string { return "TINYINT UNSIGNED" case T_uint16: return "SMALLINT UNSIGNED" - case T_uint24: - return "MEDIUMINT UNSIGNED" case T_uint32: return "INT UNSIGNED" case T_uint64: @@ -138,42 +114,16 @@ func (t T) String() string { return "FLOAT" case T_float64: return "DOUBLE" - case T_bit: - return "BIT" case T_date: return "DATE" - case T_time: - return "TIME" - case T_year: - return "YEAR" case T_datetime: return "DATETIME" - case T_timestamp: - return "TIMESTAMP" case T_char: return "CHAR" case T_varchar: return "VARCHAR" - case T_binary: - return "BINARY" - case T_varbinary: - return "VARBINARY" - case T_tinytext: - return "TINYTEXT" - case T_mediumtext: - return "MEDIUMTEXT" - case T_text: - return "TEXT" - case T_longtext: - return "LONGTEXT" - case T_tinyblob: - return "TINYBLOB" - case T_mediumblob: - return "MEDIUMBLOB" - case T_blob: - return "BLOB" - case T_longblob: - return "LONGBLOB" + case T_json: + return "JSON" case T_sel: return "SEL" case T_tuple: diff --git a/pkg/container/vector/types.go b/pkg/container/vector/types.go index e498d98ca46cd99f33a1fb35398b4d1fd1cf6a25..506e8df7f2742b883c6f6f9e067dd4afd2ad450a 100644 --- a/pkg/container/vector/types.go +++ b/pkg/container/vector/types.go @@ -9,27 +9,16 @@ import ( type Vector interface { Reset() - Type() types.T - Bools() []bool - Ints() []int64 - Sels() []int64 - Floats() []float64 - Bytes() Bytes - Tuple() [][]interface{} - Col() interface{} SetCol(interface{}) - Nulls() nulls.Nulls - SetNulls(nulls.Nulls) + Length() int Window(int, int) Vector - Length() int - - Append(interface{}, nulls.Nulls) + Append(interface{}) - Filter([]int64) Vector + Shuffle([]int64) Vector Read([]byte) error Show() ([]byte, error) @@ -40,13 +29,7 @@ type Vector interface { type Vector struct { Data []byte // raw data - Typ types.T + Typ types.Type Col interface{} Nsp *nulls.Nulls } - -type Bytes struct { - Data []byte - Os []uint32 - Ns []uint32 -} diff --git a/pkg/container/vector/vector.go b/pkg/container/vector/vector.go index 0f37e41057bf4a0865236ca064cb3a12b8417c1d..f56f246cd6715421281ae1459dd6269fc3917b3f 100644 --- a/pkg/container/vector/vector.go +++ b/pkg/container/vector/vector.go @@ -6,83 +6,128 @@ import ( "matrixbase/pkg/container/nulls" "matrixbase/pkg/container/types" "matrixbase/pkg/encoding" + "matrixbase/pkg/vectorize/shuffle" "matrixbase/pkg/vm/mempool" "matrixbase/pkg/vm/process" + "reflect" + "strconv" + "unsafe" ) -var ConstTrue, ConstFalse *Vector - -func init() { - ConstTrue = &Vector{Typ: types.T_bool, Col: []bool{true}, Nsp: &nulls.Nulls{}} - ConstFalse = &Vector{Typ: types.T_bool, Col: []bool{false}, Nsp: &nulls.Nulls{}} -} - -func New(typ types.T) *Vector { - switch typ { - case types.T_int: +func New(typ types.Type) *Vector { + switch typ.Oid { + case types.T_int8: + return &Vector{ + Typ: typ, + Col: []int8{}, + Nsp: &nulls.Nulls{}, + } + case types.T_int16: + return &Vector{ + Typ: typ, + Col: []int32{}, + Nsp: &nulls.Nulls{}, + } + case types.T_int32: return &Vector{ Typ: typ, Col: []int64{}, Nsp: &nulls.Nulls{}, } - case types.T_sel: + case types.T_int64: return &Vector{ Typ: typ, Col: []int64{}, Nsp: &nulls.Nulls{}, } - case types.T_bool: + case types.T_uint8: return &Vector{ Typ: typ, - Col: []bool{}, + Col: []uint8{}, Nsp: &nulls.Nulls{}, } - case types.T_float: + case types.T_uint16: + return &Vector{ + Typ: typ, + Col: []uint16{}, + Nsp: &nulls.Nulls{}, + } + case types.T_uint32: + return &Vector{ + Typ: typ, + Col: []uint32{}, + Nsp: &nulls.Nulls{}, + } + case types.T_uint64: + return &Vector{ + Typ: typ, + Col: []uint64{}, + Nsp: &nulls.Nulls{}, + } + case types.T_decimal: + return &Vector{ + Typ: typ, + Col: []types.Decimal{}, + Nsp: &nulls.Nulls{}, + } + case types.T_float32: + return &Vector{ + Typ: typ, + Col: []float32{}, + Nsp: &nulls.Nulls{}, + } + case types.T_float64: return &Vector{ Typ: typ, Col: []float64{}, Nsp: &nulls.Nulls{}, } + case types.T_date: + return &Vector{ + Typ: typ, + Col: []types.Date{}, + Nsp: &nulls.Nulls{}, + } + case types.T_datetime: + return &Vector{ + Typ: typ, + Col: []types.Datetime{}, + Nsp: &nulls.Nulls{}, + } + case types.T_sel: + return &Vector{ + Typ: typ, + Col: []int64{}, + Nsp: &nulls.Nulls{}, + } case types.T_tuple: return &Vector{ Typ: typ, Nsp: &nulls.Nulls{}, Col: [][]interface{}{}, } - case types.T_bytes, types.T_json: + case types.T_char, types.T_varchar, types.T_json: return &Vector{ Typ: typ, - Col: &Bytes{}, + Col: &types.Bytes{}, Nsp: &nulls.Nulls{}, } - default: - panic(fmt.Errorf("unsupport type %s", typ)) } + return nil } func (v *Vector) Reset() { - switch v.Typ { - case types.T_int: - v.Col = v.Col.([]int64)[:0] - case types.T_bool: - v.Col = v.Col.([]bool)[:0] - case types.T_float: - v.Col = v.Col.([]float64)[:0] - case types.T_bytes, types.T_json: - v.Col.(*Bytes).Reset() - case types.T_tuple: - v.Col = v.Col.([][]interface{})[:0] + switch v.Typ.Oid { + case types.T_char, types.T_varchar, types.T_json: + v.Col.(*types.Bytes).Reset() default: - panic(fmt.Errorf("unsupport type %s", v.Typ)) + *(*int)(unsafe.Pointer(uintptr(unsafe.Pointer(&v.Col)) + uintptr(strconv.IntSize>>3))) = 0 } } func (v *Vector) Free(p *process.Process) { if v.Data != nil { - p.Mp.Free(v.Data) - if encoding.DecodeUint64(v.Data[:mempool.CountSize]) == 0 { - p.Free(int64(cap(v.Data))) - } + p.Free(v.Data) } } @@ -90,179 +135,204 @@ func (v *Vector) SetCol(col interface{}) { v.Col = col } -func (v *Vector) Bools() []bool { - if v.Col == nil { - return []bool{} - } - return v.Col.([]bool) -} - -func (v *Vector) Ints() []int64 { - if v.Col == nil { - return []int64{} - } - return v.Col.([]int64) -} - -func (v *Vector) Sels() []int64 { - if v.Col == nil { - return []int64{} - } - return v.Col.([]int64) -} - -func (v *Vector) Floats() []float64 { - if v.Col == nil { - return []float64{} - } - return v.Col.([]float64) -} - -func (v *Vector) Bytes() *Bytes { - return v.Col.(*Bytes) -} - -func (v *Vector) Tuples() [][]interface{} { - if v.Col == nil { - return [][]interface{}{} - } - return v.Col.([][]interface{}) -} - func (v *Vector) Length() int { - switch v.Typ { - case types.T_int: - return len(v.Col.([]int64)) - case types.T_sel: - return len(v.Col.([]int64)) - case types.T_bool: - return len(v.Col.([]bool)) - case types.T_float: - return len(v.Col.([]float64)) - case types.T_bytes, types.T_json: - return len(v.Col.(*Bytes).Os) - case types.T_tuple: - return len(v.Col.([][]interface{})) + switch v.Typ.Oid { + case types.T_char, types.T_varchar, types.T_json: + return len(v.Col.(*types.Bytes).Os) default: - panic(fmt.Errorf("unsupport type %s", v.Typ)) + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v.Col)) + return hp.Len } } func (v *Vector) Window(start, end int) *Vector { - switch v.Typ { - case types.T_int: - return &Vector{ - Typ: v.Typ, - Col: v.Col.([]int64)[start:end], - Nsp: v.Nsp.Range(uint64(start), uint64(end)), - } - case types.T_bool: - return &Vector{ - Typ: v.Typ, - Col: v.Col.([]bool)[start:end], - Nsp: v.Nsp.Range(uint64(start), uint64(end)), - } - case types.T_float: - return &Vector{ - Typ: v.Typ, - Col: v.Col.([]float64)[start:end], - Nsp: v.Nsp.Range(uint64(start), uint64(end)), - } - case types.T_bytes, types.T_json: + switch v.Typ.Oid { + case types.T_char, types.T_varchar, types.T_json: return &Vector{ Typ: v.Typ, - Col: v.Col.(*Bytes).Window(start, end), + Col: v.Col.(*types.Bytes).Window(start, end), Nsp: v.Nsp.Range(uint64(start), uint64(end)), } - case types.T_tuple: + default: + col := v.Col + ptr := unsafe.Pointer(&col) + data := *(*uintptr)(unsafe.Pointer(uintptr(ptr))) + *(*uintptr)(unsafe.Pointer(uintptr(ptr))) = data + uintptr(v.Typ.Size)*uintptr(start) + *(*int)(unsafe.Pointer(uintptr(ptr) + uintptr(strconv.IntSize>>3))) = end - start + 1 return &Vector{ Typ: v.Typ, - Col: v.Col.([][]interface{})[start:end], + Col: col, Nsp: v.Nsp.Range(uint64(start), uint64(end)), } - default: - panic(fmt.Errorf("unsupport type %s", v.Typ)) } } func (v *Vector) Append(arg interface{}) error { - switch v.Typ { - case types.T_int: - col := v.Col.([]int64) - col = append(col, arg.([]int64)...) - v.Col = col - case types.T_bool: - col := v.Col.([]bool) - col = append(col, arg.([]bool)...) - v.Col = col - case types.T_float: - col := v.Col.([]float64) - col = append(col, arg.([]float64)...) - v.Col = col - case types.T_bytes, types.T_json: - return v.Col.(*Bytes).Append(arg.([][]byte)) + switch v.Typ.Oid { + case types.T_int8: + v.Col = append(v.Col.([]int8), arg.([]int8)...) + case types.T_int16: + v.Col = append(v.Col.([]int16), arg.([]int16)...) + case types.T_int32: + v.Col = append(v.Col.([]int32), arg.([]int32)...) + case types.T_int64: + v.Col = append(v.Col.([]int64), arg.([]int64)...) + case types.T_uint8: + v.Col = append(v.Col.([]uint8), arg.([]uint8)...) + case types.T_uint16: + v.Col = append(v.Col.([]uint16), arg.([]uint16)...) + case types.T_uint32: + v.Col = append(v.Col.([]uint32), arg.([]uint32)...) + case types.T_uint64: + v.Col = append(v.Col.([]uint64), arg.([]uint64)...) + case types.T_decimal: + v.Col = append(v.Col.([]types.Decimal), arg.([]types.Decimal)...) + case types.T_float32: + v.Col = append(v.Col.([]float32), arg.([]float32)...) + case types.T_float64: + v.Col = append(v.Col.([]float64), arg.([]float64)...) + case types.T_date: + v.Col = append(v.Col.([]types.Date), arg.([]types.Date)...) + case types.T_datetime: + v.Col = append(v.Col.([]types.Datetime), arg.([]types.Datetime)...) + case types.T_sel: + v.Col = append(v.Col.([]int64), arg.([]int64)...) case types.T_tuple: - col := v.Col.([][]interface{}) - col = append(col, arg.([][]interface{})...) - v.Col = col - default: - return fmt.Errorf("unsupport type %s", v.Typ) + v.Col = append(v.Col.([][]interface{}), arg.([][]interface{})...) + case types.T_char, types.T_varchar, types.T_json: + return v.Col.(*types.Bytes).Append(arg.([][]byte)) } return nil } -func (v *Vector) Filter(sels []int64) *Vector { - switch v.Typ { - case types.T_int: +func (v *Vector) Shuffle(sels []int64) *Vector { + switch v.Typ.Oid { + case types.T_int8: + vs := v.Col.([]int8) + shuffle.I8Shuffle(vs, sels) + v.Col = vs + v.Nsp = v.Nsp.Filter(sels) + case types.T_int16: + vs := v.Col.([]int16) + shuffle.I16Shuffle(vs, sels) + v.Col = vs + v.Nsp = v.Nsp.Filter(sels) + case types.T_int32: + vs := v.Col.([]int32) + shuffle.I32Shuffle(vs, sels) + v.Col = vs + v.Nsp = v.Nsp.Filter(sels) + case types.T_int64: vs := v.Col.([]int64) - for i, sel := range sels { - vs[i] = vs[sel] - } - v.Col = vs[:len(sels)] + shuffle.I64Shuffle(vs, sels) + v.Col = vs v.Nsp = v.Nsp.Filter(sels) - case types.T_bool: - vs := v.Col.([]bool) - for i, sel := range sels { - vs[i] = vs[sel] - } - v.Col = vs[:len(sels)] + case types.T_uint8: + vs := v.Col.([]uint8) + shuffle.Ui8Shuffle(vs, sels) + v.Col = vs + v.Nsp = v.Nsp.Filter(sels) + case types.T_uint16: + vs := v.Col.([]uint16) + shuffle.Ui16Shuffle(vs, sels) + v.Col = vs v.Nsp = v.Nsp.Filter(sels) - case types.T_float: + case types.T_uint32: + vs := v.Col.([]uint32) + shuffle.Ui32Shuffle(vs, sels) + v.Col = vs + v.Nsp = v.Nsp.Filter(sels) + case types.T_uint64: + vs := v.Col.([]uint64) + shuffle.Ui64Shuffle(vs, sels) + v.Col = vs + v.Nsp = v.Nsp.Filter(sels) + case types.T_decimal: + vs := v.Col.([]types.Decimal) + shuffle.DecimalShuffle(vs, sels) + v.Col = vs + v.Nsp = v.Nsp.Filter(sels) + case types.T_float32: + vs := v.Col.([]float32) + shuffle.Float32Shuffle(vs, sels) + v.Col = vs + v.Nsp = v.Nsp.Filter(sels) + case types.T_float64: vs := v.Col.([]float64) - for i, sel := range sels { - vs[i] = vs[sel] - } - v.Col = vs[:len(sels)] + shuffle.Float64Shuffle(vs, sels) + v.Col = vs v.Nsp = v.Nsp.Filter(sels) - case types.T_bytes, types.T_json: - col := v.Col.(*Bytes) - os, ns := col.Os, col.Ns - for i, sel := range sels { - os[i] = os[sel] - ns[i] = ns[sel] - } - col.Os = os[:len(sels)] - col.Ns = ns[:len(sels)] + case types.T_date: + vs := v.Col.([]types.Date) + shuffle.DateShuffle(vs, sels) + v.Col = vs + v.Nsp = v.Nsp.Filter(sels) + case types.T_datetime: + vs := v.Col.([]types.Datetime) + shuffle.DatetimeShuffle(vs, sels) + v.Col = vs + v.Nsp = v.Nsp.Filter(sels) + case types.T_sel: + vs := v.Col.([]int64) + shuffle.I64Shuffle(vs, sels) + v.Col = vs v.Nsp = v.Nsp.Filter(sels) case types.T_tuple: vs := v.Col.([][]interface{}) - for i, sel := range sels { - vs[i] = vs[sel] - } - v.Col = vs[:len(sels)] + shuffle.TupleShuffle(vs, sels) + v.Col = vs + v.Nsp = v.Nsp.Filter(sels) + case types.T_char, types.T_varchar, types.T_json: + vs := v.Col.(*types.Bytes) + shuffle.SShuffle(vs, sels) + v.Col = vs v.Nsp = v.Nsp.Filter(sels) - default: - panic(fmt.Errorf("unsupport type %s", v.Typ)) } - return v + return nil } func (v *Vector) Show() ([]byte, error) { var buf bytes.Buffer - switch v.Typ { - case types.T_int: - buf.WriteByte(byte(v.Typ)) + switch v.Typ.Oid { + case types.T_int8: + buf.Write(encoding.EncodeType(v.Typ)) + nb, err := v.Nsp.Show() + if err != nil { + return nil, err + } + buf.Write(encoding.EncodeUint32(uint32(len(nb)))) + if len(nb) > 0 { + buf.Write(nb) + } + buf.Write(encoding.EncodeInt8Slice(v.Col.([]int8))) + return buf.Bytes(), nil + case types.T_int16: + buf.Write(encoding.EncodeType(v.Typ)) + nb, err := v.Nsp.Show() + if err != nil { + return nil, err + } + buf.Write(encoding.EncodeUint32(uint32(len(nb)))) + if len(nb) > 0 { + buf.Write(nb) + } + buf.Write(encoding.EncodeInt16Slice(v.Col.([]int16))) + return buf.Bytes(), nil + case types.T_int32: + buf.Write(encoding.EncodeType(v.Typ)) + nb, err := v.Nsp.Show() + if err != nil { + return nil, err + } + buf.Write(encoding.EncodeUint32(uint32(len(nb)))) + if len(nb) > 0 { + buf.Write(nb) + } + buf.Write(encoding.EncodeInt32Slice(v.Col.([]int32))) + return buf.Bytes(), nil + case types.T_int64: + buf.Write(encoding.EncodeType(v.Typ)) nb, err := v.Nsp.Show() if err != nil { return nil, err @@ -273,8 +343,56 @@ func (v *Vector) Show() ([]byte, error) { } buf.Write(encoding.EncodeInt64Slice(v.Col.([]int64))) return buf.Bytes(), nil - case types.T_bool: - buf.WriteByte(byte(v.Typ)) + case types.T_uint8: + buf.Write(encoding.EncodeType(v.Typ)) + nb, err := v.Nsp.Show() + if err != nil { + return nil, err + } + buf.Write(encoding.EncodeUint32(uint32(len(nb)))) + if len(nb) > 0 { + buf.Write(nb) + } + buf.Write(encoding.EncodeUint8Slice(v.Col.([]uint8))) + return buf.Bytes(), nil + case types.T_uint16: + buf.Write(encoding.EncodeType(v.Typ)) + nb, err := v.Nsp.Show() + if err != nil { + return nil, err + } + buf.Write(encoding.EncodeUint32(uint32(len(nb)))) + if len(nb) > 0 { + buf.Write(nb) + } + buf.Write(encoding.EncodeUint16Slice(v.Col.([]uint16))) + return buf.Bytes(), nil + case types.T_uint32: + buf.Write(encoding.EncodeType(v.Typ)) + nb, err := v.Nsp.Show() + if err != nil { + return nil, err + } + buf.Write(encoding.EncodeUint32(uint32(len(nb)))) + if len(nb) > 0 { + buf.Write(nb) + } + buf.Write(encoding.EncodeUint32Slice(v.Col.([]uint32))) + return buf.Bytes(), nil + case types.T_uint64: + buf.Write(encoding.EncodeType(v.Typ)) + nb, err := v.Nsp.Show() + if err != nil { + return nil, err + } + buf.Write(encoding.EncodeUint32(uint32(len(nb)))) + if len(nb) > 0 { + buf.Write(nb) + } + buf.Write(encoding.EncodeUint64Slice(v.Col.([]uint64))) + return buf.Bytes(), nil + case types.T_decimal: + buf.Write(encoding.EncodeType(v.Typ)) nb, err := v.Nsp.Show() if err != nil { return nil, err @@ -283,10 +401,22 @@ func (v *Vector) Show() ([]byte, error) { if len(nb) > 0 { buf.Write(nb) } - buf.Write(encoding.EncodeBoolSlice(v.Col.([]bool))) + buf.Write(encoding.EncodeDecimalSlice(v.Col.([]types.Decimal))) return buf.Bytes(), nil - case types.T_float: - buf.WriteByte(byte(v.Typ)) + case types.T_float32: + buf.Write(encoding.EncodeType(v.Typ)) + nb, err := v.Nsp.Show() + if err != nil { + return nil, err + } + buf.Write(encoding.EncodeUint32(uint32(len(nb)))) + if len(nb) > 0 { + buf.Write(nb) + } + buf.Write(encoding.EncodeFloat32Slice(v.Col.([]float32))) + return buf.Bytes(), nil + case types.T_float64: + buf.Write(encoding.EncodeType(v.Typ)) nb, err := v.Nsp.Show() if err != nil { return nil, err @@ -297,8 +427,44 @@ func (v *Vector) Show() ([]byte, error) { } buf.Write(encoding.EncodeFloat64Slice(v.Col.([]float64))) return buf.Bytes(), nil - case types.T_bytes, types.T_json: - buf.WriteByte(byte(v.Typ)) + case types.T_date: + buf.Write(encoding.EncodeType(v.Typ)) + nb, err := v.Nsp.Show() + if err != nil { + return nil, err + } + buf.Write(encoding.EncodeUint32(uint32(len(nb)))) + if len(nb) > 0 { + buf.Write(nb) + } + buf.Write(encoding.EncodeDateSlice(v.Col.([]types.Date))) + return buf.Bytes(), nil + case types.T_datetime: + buf.Write(encoding.EncodeType(v.Typ)) + nb, err := v.Nsp.Show() + if err != nil { + return nil, err + } + buf.Write(encoding.EncodeUint32(uint32(len(nb)))) + if len(nb) > 0 { + buf.Write(nb) + } + buf.Write(encoding.EncodeDatetimeSlice(v.Col.([]types.Datetime))) + return buf.Bytes(), nil + case types.T_sel: + buf.Write(encoding.EncodeType(v.Typ)) + nb, err := v.Nsp.Show() + if err != nil { + return nil, err + } + buf.Write(encoding.EncodeUint32(uint32(len(nb)))) + if len(nb) > 0 { + buf.Write(nb) + } + buf.Write(encoding.EncodeInt64Slice(v.Col.([]int64))) + return buf.Bytes(), nil + case types.T_char, types.T_varchar, types.T_json: + buf.Write(encoding.EncodeType(v.Typ)) nb, err := v.Nsp.Show() if err != nil { return nil, err @@ -307,7 +473,7 @@ func (v *Vector) Show() ([]byte, error) { if len(nb) > 0 { buf.Write(nb) } - Col := v.Col.(*Bytes) + Col := v.Col.(*types.Bytes) cnt := int32(len(Col.Os)) buf.Write(encoding.EncodeInt32(cnt)) if cnt == 0 { @@ -317,55 +483,164 @@ func (v *Vector) Show() ([]byte, error) { buf.Write(Col.Data) return buf.Bytes(), nil default: - return nil, fmt.Errorf("unsupport encoding type %s", v.Typ) + return nil, fmt.Errorf("unsupport encoding type %s", v.Typ.Oid) } } -func (v *Vector) Read(data []byte, p *process.Process) error { - if err := p.Alloc(int64(cap(data))); err != nil { - return err - } +func (v *Vector) Read(data []byte) error { v.Data = data data = data[mempool.CountSize:] - switch typ := types.T(data[0]); typ { - case types.T_int: - size := encoding.DecodeUint32(data[1:]) + typ := encoding.DecodeType(data[:encoding.TypeSize]) + data = data[encoding.TypeSize:] + switch typ.Oid { + case types.T_int8: + size := encoding.DecodeUint32(data) + if size == 0 { + v.Col = encoding.DecodeInt8Slice(data[4:]) + } else { + data = data[4:] + if err := v.Nsp.Read(data[:size]); err != nil { + return err + } + v.Col = encoding.DecodeInt8Slice(data[size:]) + } + case types.T_int16: + size := encoding.DecodeUint32(data) + if size == 0 { + v.Col = encoding.DecodeInt16Slice(data[4:]) + } else { + data = data[4:] + if err := v.Nsp.Read(data[:size]); err != nil { + return err + } + v.Col = encoding.DecodeInt16Slice(data[size:]) + } + case types.T_int32: + size := encoding.DecodeUint32(data) if size == 0 { - v.Col = encoding.DecodeInt64Slice(data[5:]) + v.Col = encoding.DecodeInt32Slice(data[4:]) } else { - data = data[5:] + data = data[4:] + if err := v.Nsp.Read(data[:size]); err != nil { + return err + } + v.Col = encoding.DecodeInt32Slice(data[size:]) + } + case types.T_int64: + size := encoding.DecodeUint32(data) + if size == 0 { + v.Col = encoding.DecodeInt64Slice(data[4:]) + } else { + data = data[4:] if err := v.Nsp.Read(data[:size]); err != nil { return err } v.Col = encoding.DecodeInt64Slice(data[size:]) } - case types.T_bool: - size := encoding.DecodeUint32(data[1:]) + case types.T_uint8: + size := encoding.DecodeUint32(data) + if size == 0 { + v.Col = encoding.DecodeUint8Slice(data[4:]) + } else { + data = data[4:] + if err := v.Nsp.Read(data[:size]); err != nil { + return err + } + v.Col = encoding.DecodeUint8Slice(data[size:]) + } + case types.T_uint16: + size := encoding.DecodeUint32(data) + if size == 0 { + v.Col = encoding.DecodeUint16Slice(data[4:]) + } else { + data = data[4:] + if err := v.Nsp.Read(data[:size]); err != nil { + return err + } + v.Col = encoding.DecodeUint16Slice(data[size:]) + } + case types.T_uint32: + size := encoding.DecodeUint32(data) + if size == 0 { + v.Col = encoding.DecodeUint32Slice(data[4:]) + } else { + data = data[4:] + if err := v.Nsp.Read(data[:size]); err != nil { + return err + } + v.Col = encoding.DecodeUint32Slice(data[size:]) + } + case types.T_uint64: + size := encoding.DecodeUint32(data) + if size == 0 { + v.Col = encoding.DecodeUint64Slice(data[4:]) + } else { + data = data[4:] + if err := v.Nsp.Read(data[:size]); err != nil { + return err + } + v.Col = encoding.DecodeUint64Slice(data[size:]) + } + case types.T_decimal: + size := encoding.DecodeUint32(data) + if size == 0 { + v.Col = encoding.DecodeDecimalSlice(data[4:]) + } else { + data = data[4:] + if err := v.Nsp.Read(data[:size]); err != nil { + return err + } + v.Col = encoding.DecodeDecimalSlice(data[size:]) + } + case types.T_float32: + size := encoding.DecodeUint32(data) if size == 0 { - v.Col = encoding.DecodeBoolSlice(data[5:]) + v.Col = encoding.DecodeFloat32Slice(data[4:]) } else { - data = data[5:] + data = data[4:] if err := v.Nsp.Read(data[:size]); err != nil { return err } - v.Col = encoding.DecodeBoolSlice(data[size:]) + v.Col = encoding.DecodeFloat32Slice(data[size:]) } - case types.T_float: - size := encoding.DecodeUint32(data[1:]) + case types.T_float64: + size := encoding.DecodeUint32(data) if size == 0 { - v.Col = encoding.DecodeFloat64Slice(data[5:]) + v.Col = encoding.DecodeFloat64Slice(data[4:]) } else { - data = data[5:] + data = data[4:] if err := v.Nsp.Read(data[:size]); err != nil { return err } v.Col = encoding.DecodeFloat64Slice(data[size:]) } - case types.T_bytes, types.T_json: - Col := v.Col.(*Bytes) + case types.T_date: + size := encoding.DecodeUint32(data) + if size == 0 { + v.Col = encoding.DecodeDateSlice(data[4:]) + } else { + data = data[4:] + if err := v.Nsp.Read(data[:size]); err != nil { + return err + } + v.Col = encoding.DecodeDateSlice(data[size:]) + } + case types.T_datetime: + size := encoding.DecodeUint32(data) + if size == 0 { + v.Col = encoding.DecodeDatetimeSlice(data[4:]) + } else { + data = data[4:] + if err := v.Nsp.Read(data[:size]); err != nil { + return err + } + v.Col = encoding.DecodeDatetimeSlice(data[size:]) + } + case types.T_char, types.T_varchar, types.T_json: + Col := v.Col.(*types.Bytes) Col.Reset() - size := encoding.DecodeUint32(data[1:]) - data = data[5:] + size := encoding.DecodeUint32(data) + data = data[4:] if size > 0 { if err := v.Nsp.Read(data[:size]); err != nil { return err @@ -378,7 +653,7 @@ func (v *Vector) Read(data []byte, p *process.Process) error { } data = data[4:] Col.Os = make([]uint32, cnt) - Col.Ns = encoding.DecodeUint32Slice(data[: 4*cnt : 4*cnt]) + Col.Ns = encoding.DecodeUint32Slice(data[:4*cnt]) Col.Data = data[4*cnt:] { o := uint32(0) @@ -387,15 +662,40 @@ func (v *Vector) Read(data []byte, p *process.Process) error { o += n } } - default: - return fmt.Errorf("unsupport decoding type %s", typ) } return nil } func (v *Vector) String() string { - switch v.Typ { - case types.T_int: + switch v.Typ.Oid { + case types.T_int8: + col := v.Col.([]int8) + if len(col) == 1 { + if v.Nsp.Contains(0) { + fmt.Print("null") + } else { + return fmt.Sprintf("%v", col[0]) + } + } + case types.T_int16: + col := v.Col.([]int16) + if len(col) == 1 { + if v.Nsp.Contains(0) { + fmt.Print("null") + } else { + return fmt.Sprintf("%v", col[0]) + } + } + case types.T_int32: + col := v.Col.([]int32) + if len(col) == 1 { + if v.Nsp.Contains(0) { + fmt.Print("null") + } else { + return fmt.Sprintf("%v", col[0]) + } + } + case types.T_int64: col := v.Col.([]int64) if len(col) == 1 { if v.Nsp.Contains(0) { @@ -404,8 +704,44 @@ func (v *Vector) String() string { return fmt.Sprintf("%v", col[0]) } } - case types.T_bool: - col := v.Col.([]bool) + case types.T_uint8: + col := v.Col.([]uint8) + if len(col) == 1 { + if v.Nsp.Contains(0) { + fmt.Print("null") + } else { + return fmt.Sprintf("%v", col[0]) + } + } + case types.T_uint16: + col := v.Col.([]uint16) + if len(col) == 1 { + if v.Nsp.Contains(0) { + fmt.Print("null") + } else { + return fmt.Sprintf("%v", col[0]) + } + } + case types.T_uint32: + col := v.Col.([]uint32) + if len(col) == 1 { + if v.Nsp.Contains(0) { + fmt.Print("null") + } else { + return fmt.Sprintf("%v", col[0]) + } + } + case types.T_uint64: + col := v.Col.([]uint64) + if len(col) == 1 { + if v.Nsp.Contains(0) { + fmt.Print("null") + } else { + return fmt.Sprintf("%v", col[0]) + } + } + case types.T_decimal: + col := v.Col.([]types.Decimal) if len(col) == 1 { if v.Nsp.Contains(0) { fmt.Print("null") @@ -413,7 +749,16 @@ func (v *Vector) String() string { return fmt.Sprintf("%v", col[0]) } } - case types.T_float: + case types.T_float32: + col := v.Col.([]float32) + if len(col) == 1 { + if v.Nsp.Contains(0) { + fmt.Print("null") + } else { + return fmt.Sprintf("%v", col[0]) + } + } + case types.T_float64: col := v.Col.([]float64) if len(col) == 1 { if v.Nsp.Contains(0) { @@ -422,22 +767,31 @@ func (v *Vector) String() string { return fmt.Sprintf("%v", col[0]) } } - case types.T_bytes: - col := v.Col.(*Bytes) - if len(col.Os) == 1 { + case types.T_date: + col := v.Col.([]types.Date) + if len(col) == 1 { if v.Nsp.Contains(0) { fmt.Print("null") } else { - return fmt.Sprintf("%s", col.Data[:col.Ns[0]]) + return fmt.Sprintf("%v", col[0]) } } - case types.T_json: - col := v.Col.(*Bytes) - if len(col.Os) == 1 { + case types.T_datetime: + col := v.Col.([]types.Datetime) + if len(col) == 1 { if v.Nsp.Contains(0) { fmt.Print("null") } else { - return fmt.Sprintf("%s", col.Data[:col.Ns[0]]) + return fmt.Sprintf("%v", col[0]) + } + } + case types.T_sel: + col := v.Col.([]int64) + if len(col) == 1 { + if v.Nsp.Contains(0) { + fmt.Print("null") + } else { + return fmt.Sprintf("%v", col[0]) } } case types.T_tuple: @@ -449,6 +803,16 @@ func (v *Vector) String() string { return fmt.Sprintf("%v", col[0]) } } + case types.T_char, types.T_varchar, types.T_json: + col := v.Col.(*types.Bytes) + if len(col.Os) == 1 { + if v.Nsp.Contains(0) { + fmt.Print("null") + } else { + return fmt.Sprintf("%s", col.Data[:col.Ns[0]]) + } + } + } return fmt.Sprintf("%v-%s", v.Col, v.Nsp) } diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go index 0dd4c654f7bcccd551c42489c7e1d93a1495be67..691349f68ef3f4023b15529f944c6f7578a48fde 100644 --- a/pkg/encoding/encoding.go +++ b/pkg/encoding/encoding.go @@ -3,10 +3,23 @@ package encoding import ( "bytes" "encoding/gob" + "matrixbase/pkg/container/types" "reflect" "unsafe" ) +var TypeSize int +var DateSize int +var DatetimeSize int +var DecimalSize int + +func init() { + TypeSize = int(unsafe.Sizeof(types.Type{})) + DateSize = int(unsafe.Sizeof(types.Date{})) + DatetimeSize = int(unsafe.Sizeof(types.Datetime{})) + DecimalSize = int(unsafe.Sizeof(types.Decimal{})) +} + func Encode(v interface{}) ([]byte, error) { var buf bytes.Buffer @@ -20,6 +33,33 @@ func Decode(data []byte, v interface{}) error { return gob.NewDecoder(bytes.NewReader(data)).Decode(v) } +func EncodeType(v types.Type) []byte { + hp := reflect.SliceHeader{Data: uintptr(unsafe.Pointer(&v)), Len: TypeSize, Cap: TypeSize} + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeType(v []byte) types.Type { + return *(*types.Type)(unsafe.Pointer(&v[0])) +} + +func EncodeInt16(v int16) []byte { + hp := reflect.SliceHeader{Data: uintptr(unsafe.Pointer(&v)), Len: 2, Cap: 2} + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeInt16(v []byte) int16 { + return *(*int16)(unsafe.Pointer(&v[0])) +} + +func EncodeUint16(v uint16) []byte { + hp := reflect.SliceHeader{Data: uintptr(unsafe.Pointer(&v)), Len: 2, Cap: 2} + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeUint16(v []byte) uint16 { + return *(*uint16)(unsafe.Pointer(&v[0])) +} + func EncodeInt32(v int32) []byte { hp := reflect.SliceHeader{Data: uintptr(unsafe.Pointer(&v)), Len: 4, Cap: 4} return *(*[]byte)(unsafe.Pointer(&hp)) @@ -38,6 +78,15 @@ func DecodeUint32(v []byte) uint32 { return *(*uint32)(unsafe.Pointer(&v[0])) } +func EncodeInt64(v int64) []byte { + hp := reflect.SliceHeader{Data: uintptr(unsafe.Pointer(&v)), Len: 8, Cap: 8} + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeInt64(v []byte) int64 { + return *(*int64)(unsafe.Pointer(&v[0])) +} + func EncodeUint64(v uint64) []byte { hp := reflect.SliceHeader{Data: uintptr(unsafe.Pointer(&v)), Len: 8, Cap: 8} return *(*[]byte)(unsafe.Pointer(&hp)) @@ -47,14 +96,52 @@ func DecodeUint64(v []byte) uint64 { return *(*uint64)(unsafe.Pointer(&v[0])) } -func EncodeBoolSlice(v []bool) []byte { +func EncodeInt8Slice(v []int8) []byte { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeInt8Slice(v []byte) []int8 { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + return *(*[]int8)(unsafe.Pointer(&hp)) +} + +func EncodeUint8Slice(v []uint8) []byte { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeUint8Slice(v []byte) []uint8 { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + return *(*[]uint8)(unsafe.Pointer(&hp)) +} + +func EncodeInt16Slice(v []int16) []byte { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len *= 2 + hp.Cap *= 2 + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeInt16Slice(v []byte) []int16 { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len /= 2 + hp.Cap /= 2 + return *(*[]int16)(unsafe.Pointer(&hp)) +} + +func EncodeUint16Slice(v []uint16) []byte { hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len *= 2 + hp.Cap *= 2 return *(*[]byte)(unsafe.Pointer(&hp)) } -func DecodeBoolSlice(v []byte) []bool { +func DecodeUint16Slice(v []byte) []uint16 { hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) - return *(*[]bool)(unsafe.Pointer(&hp)) + hp.Len /= 2 + hp.Cap /= 2 + return *(*[]uint16)(unsafe.Pointer(&hp)) } func EncodeInt32Slice(v []int32) []byte { @@ -99,6 +186,34 @@ func DecodeInt64Slice(v []byte) []int64 { return *(*[]int64)(unsafe.Pointer(&hp)) } +func EncodeUint64Slice(v []uint64) []byte { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len *= 8 + hp.Cap *= 8 + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeUint64Slice(v []byte) []uint64 { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len /= 8 + hp.Cap /= 8 + return *(*[]uint64)(unsafe.Pointer(&hp)) +} + +func EncodeFloat32Slice(v []float32) []byte { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len *= 4 + hp.Cap *= 4 + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeFloat32Slice(v []byte) []float32 { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len /= 4 + hp.Cap /= 4 + return *(*[]float32)(unsafe.Pointer(&hp)) +} + func EncodeFloat64Slice(v []float64) []byte { hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) hp.Len *= 8 @@ -113,6 +228,48 @@ func DecodeFloat64Slice(v []byte) []float64 { return *(*[]float64)(unsafe.Pointer(&hp)) } +func EncodeDateSlice(v []types.Date) []byte { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len *= DateSize + hp.Cap *= DateSize + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeDateSlice(v []byte) []types.Date { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len /= DateSize + hp.Cap /= DateSize + return *(*[]types.Date)(unsafe.Pointer(&hp)) +} + +func EncodeDatetimeSlice(v []types.Datetime) []byte { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len *= DatetimeSize + hp.Cap *= DatetimeSize + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeDatetimeSlice(v []byte) []types.Datetime { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len /= DatetimeSize + hp.Cap /= DatetimeSize + return *(*[]types.Datetime)(unsafe.Pointer(&hp)) +} + +func EncodeDecimalSlice(v []types.Decimal) []byte { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len *= DecimalSize + hp.Cap *= DecimalSize + return *(*[]byte)(unsafe.Pointer(&hp)) +} + +func DecodeDecimalSlice(v []byte) []types.Decimal { + hp := *(*reflect.SliceHeader)(unsafe.Pointer(&v)) + hp.Len /= DecimalSize + hp.Cap /= DecimalSize + return *(*[]types.Decimal)(unsafe.Pointer(&hp)) +} + func EncodeStringSlice(vs []string) []byte { var o int32 var buf bytes.Buffer diff --git a/pkg/sql/colexec/unittest/projection_test.go b/pkg/sql/colexec/unittest/projection_test.go index 74a83674459da3692cb1404609c596c856361d0b..d523a435eb4eb2526c6f7db79bd485a2e226c2fe 100644 --- a/pkg/sql/colexec/unittest/projection_test.go +++ b/pkg/sql/colexec/unittest/projection_test.go @@ -20,10 +20,10 @@ func TestProjection(t *testing.T) { proc := process.New(guest.New(1<<20, host.New(1<<20)), mempool.New(1<<32, 8)) { - ins = append(ins, vm.Instruction{vm.Projection, projection.Argument{[]string{"uid"}}}) + ins = append(ins, vm.Instruction{vm.Projection, projection.Argument{[]string{"uid", "price"}}}) ins = append(ins, vm.Instruction{vm.Output, nil}) } - p := pipeline.New([]uint64{1}, []string{"uid"}, ins) + p := pipeline.New([]uint64{1, 1}, []string{"uid", "price"}, ins) p.Run(segments(proc), proc) fmt.Printf("guest: %v, host: %v\n", proc.Size(), proc.HostSize()) } diff --git a/pkg/vectorize/shuffle/shuffle.go b/pkg/vectorize/shuffle/shuffle.go new file mode 100644 index 0000000000000000000000000000000000000000..2455164041021163d93e67235a89761951b71a13 --- /dev/null +++ b/pkg/vectorize/shuffle/shuffle.go @@ -0,0 +1,220 @@ +package shuffle + +import "matrixbase/pkg/container/types" + +var ( + i8Shuffle func([]int8, []int64) []int8 + i16Shuffle func([]int16, []int64) []int16 + i32Shuffle func([]int32, []int64) []int32 + i64Shuffle func([]int64, []int64) []int64 + + ui8Shuffle func([]uint8, []int64) []uint8 + ui16Shuffle func([]uint16, []int64) []uint16 + ui32Shuffle func([]uint32, []int64) []uint32 + ui64Shuffle func([]uint64, []int64) []uint64 + + float32Shuffle func([]float32, []int64) []float32 + float64Shuffle func([]float64, []int64) []float64 + + decimalShuffle func([]types.Decimal, []int64) []types.Decimal + + dateShuffle func([]types.Date, []int64) []types.Date + datetimeShuffle func([]types.Datetime, []int64) []types.Datetime + + tupleShuffle func([][]interface{}, []int64) [][]interface{} + + sShuffle func(*types.Bytes, []int64) *types.Bytes +) + +func init() { + i8Shuffle = i8ShufflePure + i16Shuffle = i16ShufflePure + i32Shuffle = i32ShufflePure + i64Shuffle = i64ShufflePure + + ui8Shuffle = ui8ShufflePure + ui16Shuffle = ui16ShufflePure + ui32Shuffle = ui32ShufflePure + ui64Shuffle = ui64ShufflePure + + float32Shuffle = float32ShufflePure + float64Shuffle = float64ShufflePure + + decimalShuffle = decimalShufflePure + + dateShuffle = dateShufflePure + datetimeShuffle = datetimeShufflePure + + tupleShuffle = tupleShufflePure + + sShuffle = sShufflePure +} + +func I8Shuffle(vs []int8, sels []int64) []int8 { + return i8Shuffle(vs, sels) +} + +func I16Shuffle(vs []int16, sels []int64) []int16 { + return i16Shuffle(vs, sels) +} + +func I32Shuffle(vs []int32, sels []int64) []int32 { + return i32Shuffle(vs, sels) +} + +func I64Shuffle(vs []int64, sels []int64) []int64 { + return i64Shuffle(vs, sels) +} + +func Ui8Shuffle(vs []uint8, sels []int64) []uint8 { + return ui8Shuffle(vs, sels) +} + +func Ui16Shuffle(vs []uint16, sels []int64) []uint16 { + return ui16Shuffle(vs, sels) +} + +func Ui32Shuffle(vs []uint32, sels []int64) []uint32 { + return ui32Shuffle(vs, sels) +} + +func Ui64Shuffle(vs []uint64, sels []int64) []uint64 { + return ui64Shuffle(vs, sels) +} + +func Float32Shuffle(vs []float32, sels []int64) []float32 { + return float32Shuffle(vs, sels) +} + +func Float64Shuffle(vs []float64, sels []int64) []float64 { + return float64Shuffle(vs, sels) +} + +func DecimalShuffle(vs []types.Decimal, sels []int64) []types.Decimal { + return decimalShuffle(vs, sels) +} + +func DateShuffle(vs []types.Date, sels []int64) []types.Date { + return dateShuffle(vs, sels) +} + +func DatetimeShuffle(vs []types.Datetime, sels []int64) []types.Datetime { + return datetimeShuffle(vs, sels) +} + +func TupleShuffle(vs [][]interface{}, sels []int64) [][]interface{} { + return tupleShuffle(vs, sels) +} + +func SShuffle(vs *types.Bytes, sels []int64) *types.Bytes { + return sShuffle(vs, sels) +} + +func i8ShufflePure(vs []int8, sels []int64) []int8 { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func i16ShufflePure(vs []int16, sels []int64) []int16 { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func i32ShufflePure(vs []int32, sels []int64) []int32 { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func i64ShufflePure(vs []int64, sels []int64) []int64 { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func ui8ShufflePure(vs []uint8, sels []int64) []uint8 { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func ui16ShufflePure(vs []uint16, sels []int64) []uint16 { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func ui32ShufflePure(vs []uint32, sels []int64) []uint32 { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func ui64ShufflePure(vs []uint64, sels []int64) []uint64 { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func float32ShufflePure(vs []float32, sels []int64) []float32 { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func float64ShufflePure(vs []float64, sels []int64) []float64 { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func decimalShufflePure(vs []types.Decimal, sels []int64) []types.Decimal { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func dateShufflePure(vs []types.Date, sels []int64) []types.Date { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func datetimeShufflePure(vs []types.Datetime, sels []int64) []types.Datetime { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func tupleShufflePure(vs [][]interface{}, sels []int64) [][]interface{} { + for i, sel := range sels { + vs[i] = vs[sel] + } + return vs[:len(sels)] +} + +func sShufflePure(vs *types.Bytes, sels []int64) *types.Bytes { + os, ns := vs.Os, vs.Ns + for i, sel := range sels { + os[i] = os[sel] + ns[i] = ns[sel] + } + vs.Os = os[:len(sels)] + vs.Ns = ns[:len(sels)] + return vs +} diff --git a/pkg/vm/engine/logEngine/kv/cache/cache.go b/pkg/vm/engine/logEngine/kv/cache/cache.go index 2d7b080e2552a56c655e39379358b6c89f51b395..af441861559bbf23ccf7310d6ea7d060f6e78acb 100644 --- a/pkg/vm/engine/logEngine/kv/cache/cache.go +++ b/pkg/vm/engine/logEngine/kv/cache/cache.go @@ -4,7 +4,7 @@ import ( "container/list" "io/ioutil" "matrixbase/pkg/logger" - "matrixbase/pkg/vm/mempool" + "matrixbase/pkg/vm/process" "os" "path" @@ -64,9 +64,9 @@ func (c *Cache) Set(k string, s int64) error { return err } -func (c *Cache) Get(k string, mp *mempool.Mempool) ([]byte, *aio.AIO, aio.RequestId, bool, error) { +func (c *Cache) Get(k string, proc *process.Process) ([]byte, *aio.AIO, aio.RequestId, bool, error) { c.RLock() - v, a, id, ok, err := c.get(k, mp) + v, a, id, ok, err := c.get(k, proc) c.RUnlock() return v, a, id, ok, err } @@ -81,10 +81,10 @@ func (c *Cache) del(k string) error { return nil } -func (c *Cache) get(k string, mp *mempool.Mempool) ([]byte, *aio.AIO, aio.RequestId, bool, error) { +func (c *Cache) get(k string, proc *process.Process) ([]byte, *aio.AIO, aio.RequestId, bool, error) { if e, ok := c.mp[k]; ok { c.lt.MoveToFront(e) - data, a, id, err := readFile(path.Join(c.path, k), mp) + data, a, id, err := readFile(path.Join(c.path, k), proc) return data, a, id, true, err } @@ -123,7 +123,7 @@ func (c *Cache) reduce() { } } -func readFile(name string, mp *mempool.Mempool) ([]byte, *aio.AIO, aio.RequestId, error) { +func readFile(name string, proc *process.Process) ([]byte, *aio.AIO, aio.RequestId, error) { a, err := aio.NewAIO(name, os.O_RDONLY, 0666) if err != nil { return nil, nil, 0, err @@ -132,8 +132,11 @@ func readFile(name string, mp *mempool.Mempool) ([]byte, *aio.AIO, aio.RequestId if err != nil { return nil, nil, 0, err } - size := int(fi.Size()) - data := mp.Alloc(size) + size := fi.Size() + data, err := proc.Alloc(size) + if err != nil { + return nil, nil, 0, err + } id, err := a.ReadAt(data, 0) if err != nil { a.Close() diff --git a/pkg/vm/engine/logEngine/kv/kv.go b/pkg/vm/engine/logEngine/kv/kv.go index bb6dcc904b825f877a4ce83491b4744e527df056..a73fb7f5d8c552ffbd10f7857006bc30dfb90fda 100644 --- a/pkg/vm/engine/logEngine/kv/kv.go +++ b/pkg/vm/engine/logEngine/kv/kv.go @@ -3,7 +3,7 @@ package kv import ( "matrixbase/pkg/vm/engine/logEngine/kv/cache" "matrixbase/pkg/vm/engine/logEngine/kv/s3" - "matrixbase/pkg/vm/mempool" + "matrixbase/pkg/vm/process" aio "github.com/traetox/goaio" ) @@ -32,11 +32,11 @@ func (a *KV) Set(k string, v []byte) error { return a.re.Set(k, v) } -func (a *KV) Get(k string, mp *mempool.Mempool) ([]byte, *aio.AIO, aio.RequestId, error) { - if v, ap, id, ok, err := a.kc.Get(k, mp); ok { +func (a *KV) Get(k string, proc *process.Process) ([]byte, *aio.AIO, aio.RequestId, error) { + if v, ap, id, ok, err := a.kc.Get(k, proc); ok { return v, ap, id, err } - v, ap, id, err := a.re.Get(k, mp) + v, ap, id, err := a.re.Get(k, proc) if err != nil { return nil, nil, 0, err } diff --git a/pkg/vm/engine/logEngine/kv/s3/s3.go b/pkg/vm/engine/logEngine/kv/s3/s3.go index e442dc1127fcfb4e758111a6c9527a003355920a..f3a8f95a2d1c596947be9475ce2f41d595f5be5e 100644 --- a/pkg/vm/engine/logEngine/kv/s3/s3.go +++ b/pkg/vm/engine/logEngine/kv/s3/s3.go @@ -3,7 +3,7 @@ package s3 import ( "bytes" "encoding/gob" - "matrixbase/pkg/vm/mempool" + "matrixbase/pkg/vm/process" "os" "path" @@ -56,7 +56,7 @@ func (a *KV) Set(k string, v []byte) error { return err } -func (a *KV) Get(k string, mp *mempool.Mempool) ([]byte, *aio.AIO, aio.RequestId, error) { +func (a *KV) Get(k string, proc *process.Process) ([]byte, *aio.AIO, aio.RequestId, error) { name := path.Join(a.cfg.Path, k) fp, err := os.Create(name) if err != nil { @@ -70,10 +70,10 @@ func (a *KV) Get(k string, mp *mempool.Mempool) ([]byte, *aio.AIO, aio.RequestId return nil, nil, 0, err } fp.Close() - return readFile(name, mp) + return readFile(name, proc) } -func readFile(name string, mp *mempool.Mempool) ([]byte, *aio.AIO, aio.RequestId, error) { +func readFile(name string, proc *process.Process) ([]byte, *aio.AIO, aio.RequestId, error) { a, err := aio.NewAIO(name, os.O_RDONLY, 0666) if err != nil { return nil, nil, 0, err @@ -82,8 +82,11 @@ func readFile(name string, mp *mempool.Mempool) ([]byte, *aio.AIO, aio.RequestId if err != nil { return nil, nil, 0, err } - size := int(fi.Size()) - data := mp.Alloc(size) + size := fi.Size() + data, err := proc.Alloc(size) + if err != nil { + return nil, nil, 0, err + } id, err := a.ReadAt(data, 0) if err != nil { a.Close() diff --git a/pkg/vm/engine/logEngine/segment/segment.go b/pkg/vm/engine/logEngine/segment/segment.go index 9e1af113cedfa3190af38f061c4081c209438206..e9cb2a1308b22c7c9d570741267929bdeaeba376 100644 --- a/pkg/vm/engine/logEngine/segment/segment.go +++ b/pkg/vm/engine/logEngine/segment/segment.go @@ -25,7 +25,7 @@ func (s *Segment) Read(cs []uint64, attrs []string, proc *process.Process) (*bat bat.Is = make([]batch.Info, len(attrs)) for i, attr := range attrs { md := s.mp[attr] - data, ap, id, err := s.db.Get(s.id+"."+attr, proc.Mp) + data, ap, id, err := s.db.Get(s.id+"."+attr, proc) if err != nil { for j := 0; j < i; j++ { bat.Is[i].Wg.Wait() diff --git a/pkg/vm/engine/memEngine/engine.go b/pkg/vm/engine/memEngine/engine.go index 0a93b4127bdb5d2b565031654519194e9b649001..e31cdef61212b872d5a9b4d00b95ced4acc8cf3b 100644 --- a/pkg/vm/engine/memEngine/engine.go +++ b/pkg/vm/engine/memEngine/engine.go @@ -7,10 +7,13 @@ import ( "matrixbase/pkg/vm/engine/memEngine/meta" "matrixbase/pkg/vm/mempool" "matrixbase/pkg/vm/metadata" + "matrixbase/pkg/vm/mmu/guest" + "matrixbase/pkg/vm/mmu/host" + "matrixbase/pkg/vm/process" ) func New(db *kv.KV) *memEngine { - return &memEngine{db, mempool.New(4<<20, 16)} + return &memEngine{db, process.New(guest.New(1<<20, host.New(1<<20)), mempool.New(1<<32, 16))} } func (e *memEngine) Delete(name string) error { @@ -36,11 +39,11 @@ func (e *memEngine) Relations() []engine.Relation { func (e *memEngine) Relation(name string) (engine.Relation, error) { var md meta.Metadata - data, err := e.db.Get(name, e.mp) + data, err := e.db.Get(name, e.proc) if err != nil { return nil, err } - defer e.mp.Free(data) + defer e.proc.Free(data) if err := encoding.Decode(data[mempool.CountSize:], &md); err != nil { return nil, err } diff --git a/pkg/vm/engine/memEngine/kv/kv.go b/pkg/vm/engine/memEngine/kv/kv.go index f7a1ed746c3136a59857487bfe35e96760743429..943799bd78ab7940b7e5704e4a850cd94e766607 100644 --- a/pkg/vm/engine/memEngine/kv/kv.go +++ b/pkg/vm/engine/memEngine/kv/kv.go @@ -1,6 +1,9 @@ package kv -import "matrixbase/pkg/vm/mempool" +import ( + "matrixbase/pkg/vm/mempool" + "matrixbase/pkg/vm/process" +) func New() *KV { return &KV{make(map[string][]byte)} @@ -20,12 +23,15 @@ func (a *KV) Set(k string, v []byte) error { return nil } -func (a *KV) Get(k string, mp *mempool.Mempool) ([]byte, error) { +func (a *KV) Get(k string, proc *process.Process) ([]byte, error) { v, ok := a.mp[k] if !ok { return nil, NotExist } - data := mp.Alloc(len(v)) + data, err := proc.Alloc(int64(len(v))) + if err != nil { + return nil, err + } copy(data[mempool.CountSize:], v) return data[:len(v)+mempool.CountSize], nil } diff --git a/pkg/vm/engine/memEngine/segment/segment.go b/pkg/vm/engine/memEngine/segment/segment.go index b7035b7156ed0a6381304ae6fa6e6f56d536d746..b4a90fe01f32a85b4e1b186dc59f3cae2bf268af 100644 --- a/pkg/vm/engine/memEngine/segment/segment.go +++ b/pkg/vm/engine/memEngine/segment/segment.go @@ -28,7 +28,7 @@ func (s *Segment) Read(cs []uint64, attrs []string, proc *process.Process) (*bat bat := batch.New(attrs) for i, attr := range attrs { md := s.mp[attr] - vec, err := s.read(s.id+"."+attr, md.Alg, md.Type, proc.Mp) + vec, err := s.read(s.id+"."+attr, md.Alg, md.Type, proc) if err != nil { for j := 0; j < i; j++ { copy(bat.Vecs[j].Data, mempool.OneCount) @@ -42,24 +42,28 @@ func (s *Segment) Read(cs []uint64, attrs []string, proc *process.Process) (*bat return bat, nil } -func (s *Segment) read(id string, alg int, typ types.T, mp *mempool.Mempool) (*vector.Vector, error) { - data, err := s.db.Get(id, mp) +func (s *Segment) read(id string, alg int, typ types.Type, proc *process.Process) (*vector.Vector, error) { + data, err := s.db.Get(id, proc) if err != nil { return nil, err } if alg == compress.Lz4 { n := int(encoding.DecodeInt32(data[len(data)-4:])) - buf := mp.Alloc(n) + buf, err := proc.Alloc(int64(n)) + if err != nil { + proc.Free(data) + return nil, err + } if _, err := compress.Decompress(data[mempool.CountSize:len(data)-4], buf[mempool.CountSize:], alg); err != nil { - mp.Free(data) + proc.Free(data) return nil, err } data = buf[:mempool.CountSize+n] - mp.Free(data) + proc.Free(data) } vec := vector.New(typ) - if err := vec.Read(data, s.proc); err != nil { - mp.Free(data) + if err := vec.Read(data); err != nil { + proc.Free(data) return nil, err } return vec, nil diff --git a/pkg/vm/engine/memEngine/testEngine.go b/pkg/vm/engine/memEngine/testEngine.go index 2bc9364f1eac24ec3328bd9d21e8c6ffe8d31d89..49edc924ebce95b83b2821566efbb41eb1aede43 100644 --- a/pkg/vm/engine/memEngine/testEngine.go +++ b/pkg/vm/engine/memEngine/testEngine.go @@ -18,9 +18,21 @@ func NewTestEngine() engine.Engine { var attrs []metadata.Attribute { - attrs = append(attrs, metadata.Attribute{compress.Lz4, types.T_bytes, "orderId"}) - attrs = append(attrs, metadata.Attribute{compress.Lz4, types.T_bytes, "uid"}) - attrs = append(attrs, metadata.Attribute{compress.Lz4, types.T_float, "price"}) + attrs = append(attrs, metadata.Attribute{ + Alg: compress.Lz4, + Name: "orderId", + Type: types.Type{types.T(types.T_varchar), 24, 0, 0}, + }) + attrs = append(attrs, metadata.Attribute{ + Alg: compress.Lz4, + Name: "uid", + Type: types.Type{types.T(types.T_varchar), 24, 0, 0}, + }) + attrs = append(attrs, metadata.Attribute{ + Alg: compress.Lz4, + Name: "price", + Type: types.Type{types.T(types.T_float64), 8, 8, 0}, + }) } if err := e.Create("test", attrs); err != nil { log.Fatal(err) @@ -34,7 +46,7 @@ func NewTestEngine() engine.Engine { bat := batch.New([]string{"orderId", "uid", "price"}) { { - vec := vector.New(types.T_bytes) + vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0}) vs := make([][]byte, 10) for i := 0; i < 10; i++ { vs[i] = []byte(fmt.Sprintf("%v", i)) @@ -45,7 +57,7 @@ func NewTestEngine() engine.Engine { bat.Vecs[0] = vec } { - vec := vector.New(types.T_bytes) + vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0}) vs := make([][]byte, 10) for i := 0; i < 10; i++ { vs[i] = []byte(fmt.Sprintf("%v", i%4)) @@ -56,7 +68,7 @@ func NewTestEngine() engine.Engine { bat.Vecs[1] = vec } { - vec := vector.New(types.T_float) + vec := vector.New(types.Type{types.T(types.T_float64), 8, 8, 0}) vs := make([]float64, 10) for i := 0; i < 10; i++ { vs[i] = float64(i) @@ -66,7 +78,6 @@ func NewTestEngine() engine.Engine { } bat.Vecs[2] = vec } - } if err := r.Write(bat); err != nil { log.Fatal(err) @@ -75,7 +86,7 @@ func NewTestEngine() engine.Engine { { bat := batch.New([]string{"orderId", "uid", "price"}) { - vec := vector.New(types.T_bytes) + vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0}) vs := make([][]byte, 10) for i := 10; i < 20; i++ { vs[i-10] = []byte(fmt.Sprintf("%v", i)) @@ -86,7 +97,7 @@ func NewTestEngine() engine.Engine { bat.Vecs[0] = vec } { - vec := vector.New(types.T_bytes) + vec := vector.New(types.Type{types.T(types.T_varchar), 24, 0, 0}) vs := make([][]byte, 10) for i := 10; i < 20; i++ { vs[i-10] = []byte(fmt.Sprintf("%v", i%4)) @@ -97,7 +108,7 @@ func NewTestEngine() engine.Engine { bat.Vecs[1] = vec } { - vec := vector.New(types.T_float) + vec := vector.New(types.Type{types.T(types.T_float64), 8, 8, 0}) vs := make([]float64, 10) for i := 10; i < 20; i++ { vs[i-10] = float64(i) diff --git a/pkg/vm/engine/memEngine/types.go b/pkg/vm/engine/memEngine/types.go index 5b51ef3318bb8f058db368ab03c271be7e585ea9..0d114612e345a172ea3f3c78e886c1bffacac43c 100644 --- a/pkg/vm/engine/memEngine/types.go +++ b/pkg/vm/engine/memEngine/types.go @@ -3,13 +3,13 @@ package memEngine import ( "matrixbase/pkg/vm/engine/memEngine/kv" "matrixbase/pkg/vm/engine/memEngine/meta" - "matrixbase/pkg/vm/mempool" + "matrixbase/pkg/vm/process" ) // standalone memory engine type memEngine struct { - db *kv.KV - mp *mempool.Mempool + db *kv.KV + proc *process.Process } type relation struct { diff --git a/pkg/vm/mempool/mempool.go b/pkg/vm/mempool/mempool.go index 4ec5e6321002b1e05718b6d532a68f69184679f0..ae9ff85150ba285bddac045fa71e9380daa4b81a 100644 --- a/pkg/vm/mempool/mempool.go +++ b/pkg/vm/mempool/mempool.go @@ -38,11 +38,11 @@ func (m *Mempool) Alloc(size int) []byte { return data } -func (m *Mempool) Free(data []byte) { +func (m *Mempool) Free(data []byte) bool { count := encoding.DecodeUint64(data[:8]) copy(data, encoding.EncodeUint64(count-1)) if count > 1 { - return + return false } size := cap(data) if size <= m.maxSize { @@ -51,8 +51,9 @@ func (m *Mempool) Free(data []byte) { if len(b.slots) < b.nslot { b.slots = append(b.slots, data) } - return + return true } } } + return true } diff --git a/pkg/vm/metadata/types.go b/pkg/vm/metadata/types.go index 8859e37f2e32bd3bd49eb00b05b7eb8c4d54cb32..b3f61eff59b13e98c6e6b99d7309d93064e1ec3b 100644 --- a/pkg/vm/metadata/types.go +++ b/pkg/vm/metadata/types.go @@ -10,7 +10,7 @@ type Node struct { } type Attribute struct { - Alg int // compression algorithm - Type types.T // type of attribute - Name string // name of attribute + Alg int // compression algorithm + Name string // name of attribute + Type types.Type // type of attribute } diff --git a/pkg/vm/process/process.go b/pkg/vm/process/process.go index 94f8de0b583d2adcbdac2e43b0019f852dc44f0e..2a0aba95eb84e7249956a2d1da76bddea77bfad1 100644 --- a/pkg/vm/process/process.go +++ b/pkg/vm/process/process.go @@ -20,10 +20,17 @@ func (p *Process) HostSize() int64 { return p.Gm.HostSize() } -func (p *Process) Free(size int64) { - p.Gm.Free(size) +func (p *Process) Free(data []byte) { + if p.Mp.Free(data) { + p.Gm.Free(int64(cap(data))) + } } -func (p *Process) Alloc(size int64) error { - return p.Gm.Alloc(size) +func (p *Process) Alloc(size int64) ([]byte, error) { + data := p.Mp.Alloc(int(size)) + if err := p.Gm.Alloc(int64(cap(data))); err != nil { + p.Mp.Free(data) + return nil, err + } + return data, nil } diff --git a/pkg/vm/process/types.go b/pkg/vm/process/types.go index 9955ef5f917d5b42ad44964a5dc483860796e721..6cea871f7bdd489cd8ca8fb756510f269b7dd30b 100644 --- a/pkg/vm/process/types.go +++ b/pkg/vm/process/types.go @@ -8,8 +8,10 @@ import ( /* type Process interface { Size() int64 - Free(int64) - Alloc(int64) error + HostSize() int64 + + Free([]byte) + Alloc(int64) ([]byte, error) } */