diff --git a/pkg/frontend/load.go b/pkg/frontend/load.go index 3d1400059bbbb90f403cd24c0f1be7f6bba96e57..7abfcc329ee43e5647d060e7aa8f70230072456c 100644 --- a/pkg/frontend/load.go +++ b/pkg/frontend/load.go @@ -128,6 +128,11 @@ func newNotifyEvent(t notifyEventType,e error,w *WriteBatchHandler)*notifyEvent } } +type PoolElement struct { + bat *batch.Batch + lineArray [][]string +} + type ParseLineHandler struct { SharePart DebugTime @@ -140,7 +145,7 @@ type ParseLineHandler struct { simdCsvConcurrencyCountOfWriteBatch int //wait write routines to quit simdCsvWaitWriteRoutineToQuit *sync.WaitGroup - simdCsvBatchPool chan *batch.Batch + simdCsvBatchPool chan *PoolElement simdCsvNotiyEventChan chan *notifyEvent closeOnce sync.Once @@ -152,6 +157,7 @@ type WriteBatchHandler struct { DebugTime batchData *batch.Batch + pl *PoolElement batchFilled int simdCsvErr error @@ -213,6 +219,7 @@ func (plh *ParseLineHandler) getLineOutFromSimdCsvRoutine() error { //step 2 : append line into line array plh.simdCsvLineArray[plh.lineIdx] = lineOut.Line plh.lineIdx++ + plh.lineCount++ plh.maxFieldCnt = Max(plh.maxFieldCnt,len(lineOut.Line)) //for _, ll := range lineOut.Line { // plh.bytes += uint64(utf8.RuneCount([]byte(ll))) @@ -292,7 +299,7 @@ func (plh *ParseLineHandler) close() { /* alloc space for the batch */ -func makeBatch(handler *ParseLineHandler) *batch.Batch { +func makeBatch(handler *ParseLineHandler) *PoolElement { batchData := batch.New(true,handler.attrName) //fmt.Printf("----- batchSize %d attrName %v \n",batchSize,handler.attrName) @@ -336,7 +343,10 @@ func makeBatch(handler *ParseLineHandler) *batch.Batch { batchData.Vecs[i] = vec } - return batchData + return &PoolElement{ + bat : batchData, + lineArray: make([][]string,handler.batchSize), + } } /* @@ -397,7 +407,7 @@ func initParseLineHandler(handler *ParseLineHandler) error { alloc a batch from the pool. if the pool does not have batch anymore, the caller routine will be suspended. */ -func allocBatch(handler *ParseLineHandler ) *batch.Batch{ +func allocBatch(handler *ParseLineHandler ) *PoolElement{ batchData := <- handler.simdCsvBatchPool return batchData } @@ -405,10 +415,10 @@ func allocBatch(handler *ParseLineHandler ) *batch.Batch{ /* return a batch into the pool */ -func releaseBatch(handler *ParseLineHandler,bat *batch.Batch) { +func releaseBatch(handler *ParseLineHandler,pl *PoolElement) { //clear batch //clear vector.nulls.Nulls - for _, vec := range bat.Vecs { + for _, vec := range pl.bat.Vecs { vec.Nsp = &nulls.Nulls{} switch vec.Typ.Oid { case types.T_char, types.T_varchar: @@ -416,7 +426,7 @@ func releaseBatch(handler *ParseLineHandler,bat *batch.Batch) { vBytes.Data = vBytes.Data[:0] } } - handler.simdCsvBatchPool <- bat + handler.simdCsvBatchPool <- pl } /** @@ -435,7 +445,13 @@ func initWriteBatchHandler(handler *ParseLineHandler,wHandler *WriteBatchHandler wHandler.closeRef = handler.closeRef wHandler.lineCount = handler.lineCount - wHandler.batchData = allocBatch(handler) + wHandler.pl = allocBatch(handler) + wHandler.simdCsvLineArray = wHandler.pl.lineArray + for i := 0; i < handler.lineIdx; i++ { + wHandler.simdCsvLineArray[i] = handler.simdCsvLineArray[i] + } + + wHandler.batchData = wHandler.pl.bat return nil } @@ -539,6 +555,11 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) //fmt.Printf("line %d %v \n",i,line) //wait_a := time.Now() rowIdx := batchBegin + i + offset := i + 1 + base := handler.lineCount - uint64(fetchCnt) + //fmt.Println(line) + //fmt.Printf("------ linecount %d fetchcnt %d base %d offset %d \n", + // handler.lineCount,fetchCnt,base,offset) //record missing column for k := 0; k < len(columnFLags); k++ { columnFLags[k] = 0 @@ -581,7 +602,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) if err != nil { logutil.Errorf("parse field[%v] err:%v", field, err) if !ignoreFieldError { - return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i) + return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset) } //mysql warning ER_TRUNCATED_WRONG_VALUE_FOR_FIELD result.Warnings++ @@ -599,7 +620,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) if err != nil { logutil.Errorf("parse field[%v] err:%v", field, err) if !ignoreFieldError { - return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i) + return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset) } result.Warnings++ d = 0 @@ -616,7 +637,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) if err != nil { logutil.Errorf("parse field[%v] err:%v", field, err) if !ignoreFieldError { - return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i) + return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset) } result.Warnings++ d = 0 @@ -633,7 +654,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) if err != nil { logutil.Errorf("parse field[%v] err:%v", field, err) if !ignoreFieldError { - return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i) + return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset) } result.Warnings++ d = 0 @@ -650,7 +671,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) if err != nil { logutil.Errorf("parse field[%v] err:%v", field, err) if !ignoreFieldError { - return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i) + return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset) } result.Warnings++ d = 0 @@ -667,7 +688,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) if err != nil { logutil.Errorf("parse field[%v] err:%v", field, err) if !ignoreFieldError { - return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i) + return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset) } result.Warnings++ d = 0 @@ -684,7 +705,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) if err != nil { logutil.Errorf("parse field[%v] err:%v", field, err) if !ignoreFieldError { - return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i) + return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset) } result.Warnings++ d = 0 @@ -701,7 +722,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) if err != nil { logutil.Errorf("parse field[%v] err:%v", field, err) if !ignoreFieldError { - return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i) + return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset) } result.Warnings++ d = 0 @@ -718,7 +739,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) if err != nil { logutil.Errorf("parse field[%v] err:%v", field, err) if !ignoreFieldError { - return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i) + return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset) } result.Warnings++ d = 0 @@ -737,7 +758,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) if err != nil { logutil.Errorf("parse field[%v] err:%v", field, err) if !ignoreFieldError { - return makeParsedFailedError(vec.Typ.String(),field,vecAttr,handler.lineCount,i) + return makeParsedFailedError(vec.Typ.String(),field,vecAttr,base,offset) } result.Warnings++ d = 0 @@ -1071,7 +1092,7 @@ func rowToColumnAndSaveToStorage(handler *WriteBatchHandler, forceConvert bool) fillBlank += time.Since(wait_b) handler.choose_false += time.Since(wait_d) } - handler.lineCount += uint64(fetchCnt) + handler.batchFilled = batchBegin + fetchCnt //if handler.batchFilled == handler.batchSize { @@ -1239,7 +1260,6 @@ func saveLinesToStorage(handler *ParseLineHandler, force bool) error { writeHandler := &WriteBatchHandler{ SharePart:SharePart{ lineIdx: handler.lineIdx, - simdCsvLineArray: handler.simdCsvLineArray[:handler.lineIdx], maxFieldCnt: handler.maxFieldCnt, }, } @@ -1257,8 +1277,9 @@ func saveLinesToStorage(handler *ParseLineHandler, force bool) error { err = rowToColumnAndSaveToStorage(writeHandler,force) writeHandler.simdCsvErr = err - releaseBatch(handler,writeHandler.batchData) + releaseBatch(handler, writeHandler.pl) writeHandler.batchData = nil + writeHandler.simdCsvLineArray = nil if err != nil { handler.simdCsvNotiyEventChan <- newNotifyEvent(NOTIFY_EVENT_WRITE_BATCH_ERROR,err,writeHandler) @@ -1283,7 +1304,6 @@ func (mce *MysqlCmdExecutor) LoadLoop(load *tree.Load, dbHandler engine.Database logutil.Errorf("loadLoop panic") } }() - var err error ses := mce.routine.GetSession() //begin:= time.Now() @@ -1331,7 +1351,7 @@ func (mce *MysqlCmdExecutor) LoadLoop(load *tree.Load, dbHandler engine.Database handler.simdCsvConcurrencyCountOfWriteBatch = Min(int(ses.Pu.SV.GetLoadDataConcurrencyCount()),runtime.NumCPU()) handler.simdCsvConcurrencyCountOfWriteBatch = Max(1,handler.simdCsvConcurrencyCountOfWriteBatch) - handler.simdCsvBatchPool = make(chan *batch.Batch, handler.simdCsvConcurrencyCountOfWriteBatch) + handler.simdCsvBatchPool = make(chan *PoolElement, handler.simdCsvConcurrencyCountOfWriteBatch) //fmt.Printf("-----write concurrent count %d \n",handler.simdCsvConcurrencyCountOfWriteBatch) @@ -1385,7 +1405,7 @@ func (mce *MysqlCmdExecutor) LoadLoop(load *tree.Load, dbHandler engine.Database wg.Add(1) go func() { defer wg.Done() - err = handler.getLineOutFromSimdCsvRoutine() + err := handler.getLineOutFromSimdCsvRoutine() if err != nil { logutil.Errorf("get line from simdcsv failed. err:%v",err) handler.simdCsvNotiyEventChan <- newNotifyEvent(NOTIFY_EVENT_OUTPUT_SIMDCSV_ERROR,err,nil) @@ -1400,7 +1420,7 @@ func (mce *MysqlCmdExecutor) LoadLoop(load *tree.Load, dbHandler engine.Database defer wg.Done() wait_b := time.Now() - err = handler.simdCsvReader.ReadLoop(handler.simdCsvGetParsedLinesChan) + err := handler.simdCsvReader.ReadLoop(handler.simdCsvGetParsedLinesChan) if err != nil { handler.simdCsvNotiyEventChan <- newNotifyEvent(NOTIFY_EVENT_READ_SIMDCSV_ERROR,err,nil) } @@ -1429,7 +1449,6 @@ func (mce *MysqlCmdExecutor) LoadLoop(load *tree.Load, dbHandler engine.Database case NOTIFY_EVENT_READ_SIMDCSV_ERROR, NOTIFY_EVENT_OUTPUT_SIMDCSV_ERROR, NOTIFY_EVENT_WRITE_BATCH_ERROR: - fmt.Println(ne.err) if !errorCanBeIgnored(ne.err) { err = ne.err quit = true