Skip to content
Snippets Groups Projects
Commit 43921eac authored by i-robot's avatar i-robot Committed by Gitee
Browse files

!21 fix num_parallel_workers greater than CPU cores

Merge pull request !21 from zhaoting/master
parents 6513f021 1844d42a
No related branches found
No related tags found
No related merge requests found
......@@ -259,6 +259,7 @@ You can start training using python or shell scripts. The usage of shell scripts
`CKPT_PATH` `FREEZE_LAYER` and `FILTER_HEAD` are optional, when set `CKPT_PATH`, `FREEZE_LAYER` must be set. `FREEZE_LAYER` should be in ["none", "backbone"], and if you set `FREEZE_LAYER`="backbone", the parameter in backbone will be freezed when training and the parameter in head will not be load from checkpoint. if `FILTER_HEAD`=True, the parameter in head will not be load from checkpoint.
> RANK_TABLE_FILE is HCCL configuration file when running on Ascend.
> If you run stand alone please set `RANK_TABLE_FILE=None`.
> The common restrictions on using the distributed service are as follows. For details, see the HCCL documentation.
>
> - In a single-node system, a cluster of 1, 2, 4, or 8 devices is supported. In a multi-node system, a cluster of 8 x N devices is supported.
......
......@@ -260,6 +260,7 @@ MobileNetV2总体网络架构如下:
`CKPT_PATH` `FREEZE_LAYER``FILTER_HEAD` 是可选择的选项, 如果设置`CKPT_PATH`, `FREEZE_LAYER` 也必须同时设置. `FREEZE_LAYER` 可以是 ["none", "backbone"], 如果设置 `FREEZE_LAYER`="backbone", 训练过程中backbone中的参数会被冻结,同时不会从checkpoint中加载head部分的参数. 如果`FILTER_HEAD`=True, 不会从checkpoint中加载head部分的参数.
> RANK_TABLE_FILE 是在Ascned上运行分布式任务时HCCL的配置文件
> 单卡运行时请设置为RANK_TABLE_FILE=None
> 我们列出使用分布式服务常见的使用限制,详细的可以查看HCCL对应的使用文档。
>
> - 单机场景下支持1、2、4、8卡设备集群,多机场景下支持8*n卡设备集群。
......
......@@ -46,10 +46,6 @@ run_ascend()
echo "error: DATASET_PATH=$6 is not a directory or file"
exit 1
fi
RUN_DISTRIBUTE=True
if [ $2 -eq 1 ] ; then
RUN_DISTRIBUTE=False
fi
BASEPATH=$(cd "`dirname $0`" || exit; pwd)
CONFIG_FILE="${BASEPATH}/../$2"
......@@ -69,6 +65,22 @@ run_ascend()
fi
mkdir ../train
cd ../train || exit
RUN_DISTRIBUTE=True
if [ $3 -eq 1 ] ; then
RUN_DISTRIBUTE=False
export DEVICE_ID=${CANDIDATE_DEVICE[0]}
export RANK_ID=0
nohup python ${BASEPATH}/../train.py \
--run_distribute=$RUN_DISTRIBUTE \
--config_path=$CONFIG_FILE \
--platform=$1 \
--dataset_path=$6 \
--pretrain_ckpt=$PRETRAINED_CKPT \
--freeze_layer=$FREEZE_LAYER \
--filter_head=$FILTER_HEAD \
&> ../train.log &
exit 1
fi
cpus=`cat /proc/cpuinfo| grep "processor"| wc -l`
avg=`expr $cpus \/ $RANK_SIZE`
gap=`expr $avg \- 1`
......@@ -120,14 +132,6 @@ run_gpu()
GPU: sh run_train.sh GPU [DEVICE_NUM] [VISIABLE_DEVICES(0,1,2,3,4,5,6,7)] [DATASET_PATH]"
exit 1
fi;
if [ $2 -eq 1 ] ; then
RUN_DISTRIBUTE=False
elif [ $2 -gt 1 ] && [ $2 -le 8 ] ; then
RUN_DISTRIBUTE=True
else
echo "error: DEVICE_NUM=$2 is not in (1-8)"
exit 1
fi;
if [ ! -d $4 ]
then
......@@ -147,6 +151,26 @@ run_gpu()
cd ../train || exit
export CUDA_VISIBLE_DEVICES="$3"
if [ $2 -eq 1 ] ; then
RUN_DISTRIBUTE=False
nohup python ${BASEPATH}/../train.py \
--config_path=$CONFIG_FILE \
--platform=$1 \
--run_distribute=$RUN_DISTRIBUTE \
--dataset_path=$4 \
--pretrain_ckpt=$PRETRAINED_CKPT \
--freeze_layer=$FREEZE_LAYER \
--filter_head=$FILTER_HEAD \
&> ../train.log &
exit 1
elif [ $2 -gt 1 ] && [ $2 -le 8 ] ; then
RUN_DISTRIBUTE=True
else
echo "error: DEVICE_NUM=$2 is not in (1-8)"
exit 1
fi;
mpirun -n $2 --allow-run-as-root --output-filename log_output --merge-stderr-to-stdout \
python ${BASEPATH}/../train.py \
--config_path=$CONFIG_FILE \
......
......@@ -76,7 +76,7 @@ def create_dataset(dataset_path, do_train, config, repeat_num=1, enable_cache=Fa
type_cast_op = C2.TypeCast(mstype.int32)
data_set = data_set.map(operations=trans, input_columns="image", num_parallel_workers=num_workers)
data_set = data_set.map(operations=type_cast_op, input_columns="label", num_parallel_workers=8)
data_set = data_set.map(operations=type_cast_op, input_columns="label", num_parallel_workers=num_workers)
# apply shuffle operations
data_set = data_set.shuffle(buffer_size=buffer_size)
......
......@@ -15,15 +15,12 @@
"""
create train or eval dataset.
"""
import os
import multiprocessing
import mindspore.common.dtype as mstype
import mindspore.dataset as ds
import mindspore.dataset.vision.c_transforms as C
import mindspore.dataset.transforms.c_transforms as C2
from mindspore.communication.management import init, get_rank, get_group_size
from src.model_utils.config import config
from src.model_utils.device_adapter import get_device_num, get_rank_id
def create_dataset1(dataset_path, do_train, repeat_num=1, batch_size=32, train_image_size=224, eval_image_size=224,
target="Ascend", distribute=False, enable_cache=False, cache_session_id=None):
......@@ -42,15 +39,8 @@ def create_dataset1(dataset_path, do_train, repeat_num=1, batch_size=32, train_i
Returns:
dataset
"""
if target == "Ascend":
device_num, rank_id = _get_rank_info()
else:
if distribute:
init()
rank_id = get_rank()
device_num = get_group_size()
else:
device_num = 1
device_num, rank_id = _get_rank_info(distribute)
_check_num_parallel_workers(max_num_parallel_workers=12)
ds.config.set_prefetch_size(64)
if device_num == 1:
data_set = ds.Cifar10Dataset(dataset_path, num_parallel_workers=12, shuffle=True)
......@@ -113,15 +103,8 @@ def create_dataset2(dataset_path, do_train, repeat_num=1, batch_size=32, train_i
Returns:
dataset
"""
if target == "Ascend":
device_num, rank_id = _get_rank_info()
else:
if distribute:
init()
rank_id = get_rank()
device_num = get_group_size()
else:
device_num = 1
device_num, rank_id = _get_rank_info(distribute)
_check_num_parallel_workers(max_num_parallel_workers=12)
ds.config.set_prefetch_size(64)
if device_num == 1:
......@@ -192,16 +175,7 @@ def create_dataset_pynative(dataset_path, do_train, repeat_num=1, batch_size=32,
Returns:
dataset
"""
ds.config.set_numa_enable(True)
if target == "Ascend":
device_num, rank_id = _get_rank_info()
else:
if distribute:
init()
rank_id = get_rank()
device_num = get_group_size()
else:
device_num = 1
device_num, rank_id = _get_rank_info(distribute)
if device_num == 1:
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=8, shuffle=True)
......@@ -269,16 +243,8 @@ def create_dataset3(dataset_path, do_train, repeat_num=1, batch_size=32, train_i
Returns:
dataset
"""
if target == "Ascend":
device_num, rank_id = _get_rank_info()
else:
if distribute:
init()
rank_id = get_rank()
device_num = get_group_size()
else:
device_num = 1
rank_id = 1
device_num, rank_id = _get_rank_info(distribute)
_check_num_parallel_workers(max_num_parallel_workers=8)
if device_num == 1:
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=8, shuffle=True)
else:
......@@ -346,15 +312,8 @@ def create_dataset4(dataset_path, do_train, repeat_num=1, batch_size=32, train_i
Returns:
dataset
"""
if target == "Ascend":
device_num, rank_id = _get_rank_info()
else:
if distribute:
init()
rank_id = get_rank()
device_num = get_group_size()
else:
device_num = 1
device_num, rank_id = _get_rank_info(distribute)
_check_num_parallel_workers(max_num_parallel_workers=12)
ds.config.set_prefetch_size(64)
if device_num == 1:
data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=12, shuffle=True)
......@@ -404,25 +363,25 @@ def create_dataset4(dataset_path, do_train, repeat_num=1, batch_size=32, train_i
return data_set
def _get_rank_info():
def _get_rank_info(distribute):
"""
get rank size and rank id
"""
rank_size = int(os.environ.get("RANK_SIZE", 1))
if config.device_target == "Ascend":
if rank_size > 1:
rank_size = get_device_num()
rank_id = get_rank_id()
else:
rank_size = 1
rank_id = 0
if distribute:
init()
rank_id = get_rank()
device_num = get_group_size()
else:
if rank_size > 1:
rank_size = get_group_size()
rank_id = get_rank()
else:
rank_size = 1
rank_id = 0
return rank_size, rank_id
rank_id = 0
device_num = 1
return device_num, rank_id
def _check_num_parallel_workers(max_num_parallel_workers=None):
"""
Check num_parallel_workers used in dataset operations.
If num_parallel_workers > the real CPU cores number, set num_parallel_workers = the real CPU cores number.
"""
cores = multiprocessing.cpu_count()
if max_num_parallel_workers is not None and cores < max_num_parallel_workers:
print("The num_parallel_workers {} is set too large, now set it {}".format(max_num_parallel_workers, cores))
ds.config.set_num_parallel_workers(cores)
......@@ -53,8 +53,7 @@ def eval_net():
# create dataset
dataset = create_dataset(dataset_path=config.data_path,
do_train=False,
batch_size=config.batch_size,
target=target)
batch_size=config.batch_size)
# define net
net = squeezenet(num_classes=config.class_num)
......
......@@ -15,7 +15,7 @@
"""
create train or eval dataset.
"""
import os
import multiprocessing
import mindspore.common.dtype as mstype
import mindspore.dataset as ds
import mindspore.dataset.vision.c_transforms as C
......@@ -27,7 +27,7 @@ def create_dataset_cifar(dataset_path,
do_train,
repeat_num=1,
batch_size=32,
target="Ascend"):
run_distribute=False):
"""
create a train or evaluate cifar10 dataset
Args:
......@@ -35,19 +35,12 @@ def create_dataset_cifar(dataset_path,
do_train(bool): whether dataset is used for train or eval.
repeat_num(int): the repeat times of dataset. Default: 1
batch_size(int): the batch size of dataset. Default: 32
target(str): the device target. Default: Ascend
run_distribute(bool): Whether run in distribute or not. Default: False
Returns:
dataset
"""
if target == "Ascend":
device_num, rank_id = _get_rank_info()
elif target == "CPU":
device_num = 1
else:
init()
rank_id = get_rank()
device_num = get_group_size()
device_num, rank_id = _get_rank_info(run_distribute)
_check_num_parallel_workers(8)
if device_num == 1:
data_set = ds.Cifar10Dataset(dataset_path,
......@@ -102,7 +95,7 @@ def create_dataset_imagenet(dataset_path,
do_train,
repeat_num=1,
batch_size=32,
target="Ascend"):
run_distribute=False):
"""
create a train or eval imagenet dataset
......@@ -111,17 +104,13 @@ def create_dataset_imagenet(dataset_path,
do_train(bool): whether dataset is used for train or eval.
repeat_num(int): the repeat times of dataset. Default: 1
batch_size(int): the batch size of dataset. Default: 32
target(str): the device target. Default: Ascend
run_distribute(bool): Whether run in distribute or not. Default: False
Returns:
dataset
"""
if target == "Ascend":
device_num, rank_id = _get_rank_info()
else:
init()
rank_id = get_rank()
device_num = get_group_size()
device_num, rank_id = _get_rank_info(run_distribute)
_check_num_parallel_workers(10)
if device_num == 1:
data_set = ds.ImageFolderDataset(dataset_path,
......@@ -174,17 +163,25 @@ def create_dataset_imagenet(dataset_path,
return data_set
def _get_rank_info():
def _get_rank_info(distribute):
"""
get rank size and rank id
"""
rank_size = int(os.environ.get("RANK_SIZE", 1))
if rank_size > 1:
rank_size = get_group_size()
if distribute:
init()
rank_id = get_rank()
device_num = get_group_size()
else:
rank_size = 1
rank_id = 0
device_num = 1
return device_num, rank_id
return rank_size, rank_id
def _check_num_parallel_workers(max_num_parallel_workers=None):
"""
Check num_parallel_workers used in dataset operations.
If num_parallel_workers > the real CPU cores number, set num_parallel_workers = the real CPU cores number.
"""
cores = multiprocessing.cpu_count()
if max_num_parallel_workers is not None and cores < max_num_parallel_workers:
print("The num_parallel_workers {} is set too large, now set it {}".format(max_num_parallel_workers, cores))
ds.config.set_num_parallel_workers(cores)
......@@ -81,7 +81,7 @@ def train_net():
do_train=True,
repeat_num=1,
batch_size=config.batch_size,
target=target)
run_distribute=config.run_distribute)
step_size = dataset.get_dataset_size()
# define net
......
......@@ -19,6 +19,7 @@ from __future__ import division
import os
import json
import multiprocessing
import xml.etree.ElementTree as et
import numpy as np
import cv2
......@@ -393,6 +394,10 @@ def data_to_mindrecord_byte_image(dataset="coco", is_training=True, prefix="ssd.
def create_ssd_dataset(mindrecord_file, batch_size=32, repeat_num=10, device_num=1, rank=0,
is_training=True, num_parallel_workers=6, use_multiprocessing=True):
"""Create SSD dataset with MindDataset."""
cores = multiprocessing.cpu_count()
if cores < num_parallel_workers:
print("The num_parallel_workers {} is set too large, now set it {}".format(num_parallel_workers, cores))
num_parallel_workers = cores
ds = de.MindDataset(mindrecord_file, columns_list=["img_id", "image", "annotation"], num_shards=device_num,
shard_id=rank, num_parallel_workers=num_parallel_workers, shuffle=is_training)
decode = C.Decode()
......
......@@ -14,6 +14,7 @@
# ============================================================================
"""Dataset preprocessing."""
import os
import multiprocessing
import math as m
import numpy as np
from PIL import Image
......@@ -67,7 +68,8 @@ def transpose_hwc2whc(image):
return image
def create_dataset(dataset_path, batch_size=1, num_shards=1, shard_id=0, device_target='Ascend'):
def create_dataset(dataset_path, batch_size=1, num_shards=1, shard_id=0, device_target='Ascend',
num_parallel_workers=8):
"""
create train or evaluation dataset for warpctc
......@@ -77,8 +79,12 @@ def create_dataset(dataset_path, batch_size=1, num_shards=1, shard_id=0, device_
num_shards(int): number of devices
shard_id(int): rank id
device_target(str): platform of training, support Ascend and GPU
num_parallel_workers(int): Number of data processing threads.
"""
cores = multiprocessing.cpu_count()
if num_parallel_workers > cores:
print("The num_parallel_workers {} is set too large, now set it {}".format(num_parallel_workers, cores))
num_parallel_workers = cores
dataset = _CaptchaDataset(dataset_path, config.max_captcha_digits, device_target)
data_set = ds.GeneratorDataset(dataset, ["image", "label"], shuffle=True, num_shards=num_shards, shard_id=shard_id)
image_trans = [
......@@ -97,11 +103,15 @@ def create_dataset(dataset_path, batch_size=1, num_shards=1, shard_id=0, device_
c.TypeCast(mstype.int32)
]
if device_target == 'Ascend':
data_set = data_set.map(operations=image_trans, input_columns=["image"], num_parallel_workers=8)
data_set = data_set.map(operations=transpose_hwc2whc, input_columns=["image"], num_parallel_workers=8)
data_set = data_set.map(operations=image_trans, input_columns=["image"],
num_parallel_workers=num_parallel_workers)
data_set = data_set.map(operations=transpose_hwc2whc, input_columns=["image"],
num_parallel_workers=num_parallel_workers)
else:
data_set = data_set.map(operations=image_trans_gpu, input_columns=["image"], num_parallel_workers=8)
data_set = data_set.map(operations=label_trans, input_columns=["label"], num_parallel_workers=8)
data_set = data_set.map(operations=image_trans_gpu, input_columns=["image"],
num_parallel_workers=num_parallel_workers)
data_set = data_set.map(operations=label_trans, input_columns=["label"],
num_parallel_workers=num_parallel_workers)
data_set = data_set.batch(batch_size, drop_remainder=True)
return data_set
......@@ -46,12 +46,12 @@ if __name__ == '__main__':
rank_id = 0
group_size = 1
config = train_config
data_sink = (args.device_target == "GPU")
data_sink = (args.device_target != "CPU")
context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target, save_graphs=False)
if args.device_target == "GPU":
context.set_context(enable_graph_kernel=True)
if args.is_distributed:
init('nccl')
init()
rank_id = get_rank()
group_size = get_group_size()
context.reset_auto_parallel_context()
......
......@@ -13,8 +13,8 @@
# limitations under the License.
# ============================================================================
"""Face detection yolov3 data pre-process."""
import multiprocessing
import numpy as np
import mindspore.dataset.vision.py_transforms as P
import mindspore.dataset as de
......@@ -246,6 +246,11 @@ compose_map_func = (preprocess_fn)
def create_dataset(args):
"""Create dataset object."""
args.logger.info('start create dataloader')
cores = multiprocessing.cpu_count()
max_num_parallel_workers = 16
if cores < max_num_parallel_workers:
print("The num_parallel_workers {} is set too large, now set it {}".format(max_num_parallel_workers, cores))
de.config.set_num_parallel_workers(cores)
ds = de.MindDataset(args.mindrecord_path + "0", columns_list=["image", "annotation"], num_shards=args.world_size,
shard_id=args.local_rank)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment