Skip to content
Snippets Groups Projects
Unverified Commit 4023340f authored by binbinHan's avatar binbinHan Committed by GitHub
Browse files

del default env init (#5537)


* del default env init

* refine

* fix bug

Co-authored-by: default avataroneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
parent 6ac11e00
No related branches found
No related tags found
No related merge requests found
Showing
with 35 additions and 90 deletions
......@@ -26,7 +26,6 @@ ONEFLOW_API_PYBIND11_MODULE("", m) {
m.def("IsEnvInited", &IsEnvInited);
m.def("InitEnv", &InitEnv);
m.def("InitDefaultEnv", &InitDefaultEnv);
m.def("DestroyEnv", &DestroyEnv, py::call_guard<py::gil_scoped_release>());
m.def("CurrentMachineId", &CurrentMachineId);
......
......@@ -58,16 +58,7 @@ inline Maybe<void> SetIsMultiClient(bool is_multi_client) {
return Maybe<void>::Ok();
}
inline Maybe<bool> IsEnvInited() {
return Global<EnvGlobalObjectsScope>::Get() != nullptr
&& !JUST(Global<EnvGlobalObjectsScope>::Get()->is_default_physical_env());
}
inline Maybe<void> DestroyDefaultEnv() {
if (Global<EnvGlobalObjectsScope>::Get() == nullptr) { return Maybe<void>::Ok(); }
Global<EnvGlobalObjectsScope>::Delete();
return Maybe<void>::Ok();
}
inline Maybe<bool> IsEnvInited() { return Global<EnvGlobalObjectsScope>::Get() != nullptr; }
inline Maybe<void> DestroyEnv() {
if (Global<EnvGlobalObjectsScope>::Get() == nullptr) { return Maybe<void>::Ok(); }
......@@ -80,24 +71,10 @@ inline Maybe<void> DestroyEnv() {
return Maybe<void>::Ok();
}
inline Maybe<void> InitDefaultEnv(const std::string& env_proto_str) {
EnvProto env_proto;
CHECK_OR_RETURN(TxtString2PbMessage(env_proto_str, &env_proto))
<< "failed to parse env_proto" << env_proto_str;
CHECK_ISNULL_OR_RETURN(Global<EnvGlobalObjectsScope>::Get());
// Global<T>::New is not allowed to be called here
// because glog is not constructed yet and LOG(INFO) has bad bahavior
Global<EnvGlobalObjectsScope>::SetAllocated(new EnvGlobalObjectsScope());
JUST(Global<EnvGlobalObjectsScope>::Get()->Init(env_proto));
if (!GlobalProcessCtx::IsThisProcessMaster()) { JUST(Cluster::WorkerLoop()); }
return Maybe<void>::Ok();
}
inline Maybe<void> InitEnv(const std::string& env_proto_str, bool is_multi_client) {
EnvProto env_proto;
CHECK_OR_RETURN(TxtString2PbMessage(env_proto_str, &env_proto))
<< "failed to parse env_proto" << env_proto_str;
JUST(DestroyDefaultEnv());
CHECK_ISNULL_OR_RETURN(Global<EnvGlobalObjectsScope>::Get());
// Global<T>::New is not allowed to be called here
// because glog is not constructed yet and LOG(INFO) has bad bahavior
......
......@@ -32,10 +32,6 @@ inline void InitEnv(const std::string& env_proto_str, bool is_multi_client) {
return oneflow::InitEnv(env_proto_str, is_multi_client).GetOrThrow();
}
inline void InitDefaultEnv(const std::string& env_proto_str) {
return oneflow::InitDefaultEnv(env_proto_str).GetOrThrow();
}
inline void DestroyEnv() { return oneflow::DestroyEnv().GetOrThrow(); }
inline long long CurrentMachineId() { return oneflow::CurrentMachineId().GetOrThrow(); }
......
......@@ -33,7 +33,7 @@ ONEFLOW_API_PYBIND11_MODULE("", m) {
// multi-client lazy global session context
m.def("CreateMultiClientSessionContext", &CreateMultiClientSessionContext);
m.def("InitMultiClientSessionContext", &InitMultiClientSessionContext);
m.def("DestroyMultiClientSessionContext", &DestroyMultiClientSessionContext);
m.def("TryDestroyMultiClientSessionContext", &TryDestroyMultiClientSessionContext);
using namespace oneflow;
m.def("NewSessionId", &NewSessionId);
......
......@@ -126,8 +126,13 @@ inline Maybe<void> InitMultiClientSessionContext(const std::string& config_proto
return Maybe<void>::Ok();
}
inline Maybe<void> DestroyMultiClientSessionContext() {
Global<MultiClientSessionContext>::Delete();
inline Maybe<void> TryDestroyMultiClientSessionContext() {
// Global<T>::Delete is not allowed to be called here
// because glog is not constructed yet and LOG(INFO) has bad bahavior
if (Global<MultiClientSessionContext>::Get() != nullptr) {
delete Global<MultiClientSessionContext>::Get();
Global<MultiClientSessionContext>::SetAllocated(nullptr);
}
return Maybe<void>::Ok();
}
......
......@@ -42,8 +42,8 @@ inline void InitMultiClientSessionContext(const std::string& config_proto_str) {
return oneflow::InitMultiClientSessionContext(config_proto_str).GetOrThrow();
}
inline void DestroyMultiClientSessionContext() {
return oneflow::DestroyMultiClientSessionContext().GetOrThrow();
inline void TryDestroyMultiClientSessionContext() {
return oneflow::TryDestroyMultiClientSessionContext().GetOrThrow();
}
#endif // ONEFLOW_API_PYTHON_SESSION_SESSION_API_H_
......@@ -47,12 +47,6 @@ def EnvResource():
return text_format.Parse(resource, resource_util.Resource())
def InitDefaultEnv(env_proto):
assert type(env_proto) is env_pb2.EnvProto
env_proto_str = text_format.MessageToString(env_proto)
oneflow._oneflow_internal.InitDefaultEnv(env_proto_str)
def InitEnv(env_proto, is_multi_client):
assert type(env_proto) is env_pb2.EnvProto
env_proto_str = text_format.MessageToString(env_proto)
......
......@@ -93,16 +93,6 @@ def env_init(is_multi_client):
return True
def init_default_physical_env():
default_physical_env_proto = _DefaultEnvProto()
log_dir = os.getenv("ONEFLOW_TEST_LOG_DIR")
if log_dir:
default_physical_env_proto.cpp_logging_conf.log_dir = log_dir
default_physical_env_proto.is_default_physical_env = True
CompleteEnvProto(default_physical_env_proto, False)
c_api_util.InitDefaultEnv(default_physical_env_proto)
@oneflow_export("env.current_resource", "current_resource")
def api_get_current_resource() -> resource_util.Resource:
r"""Get current resources, such as:machine nums, cpu/gpu device nums,
......
......@@ -48,6 +48,13 @@ def TryCloseDefaultSession():
del _sess_id2sess[default_sess_id]
def TryCloseAllSession():
global _sess_id2sess
for sess_id in _sess_id2sess.keys():
_sess_id2sess[sess_id].TryClose()
_sess_id2sess.clear()
def try_init_default_session(func):
@functools.wraps(func)
def Func(*args, **kwargs):
......
......@@ -22,5 +22,4 @@ message EnvProto {
optional int32 data_port = 3 [default = -1];
optional CppLoggingConf cpp_logging_conf = 4;
optional BootstrapConf ctrl_bootstrap_conf = 5;
optional bool is_default_physical_env = 6 [default = false];
}
......@@ -51,13 +51,8 @@ std::string LogDir(const std::string& log_dir) {
return v;
}
void InitLogging(const CppLoggingConf& logging_conf, bool default_physical_env) {
if (!default_physical_env) {
FLAGS_log_dir = LogDir(logging_conf.log_dir());
} else {
std::string default_env_log_path = JoinPath(logging_conf.log_dir(), "default_physical_env_log");
FLAGS_log_dir = LogDir(default_env_log_path);
}
void InitLogging(const CppLoggingConf& logging_conf) {
FLAGS_log_dir = LogDir(logging_conf.log_dir());
FLAGS_logtostderr = logging_conf.logtostderr();
FLAGS_logbuflevel = logging_conf.logbuflevel();
FLAGS_stderrthreshold = 1; // 1=WARNING
......@@ -111,8 +106,7 @@ void ClearAllSymbolAndIdCache() {
} // namespace
Maybe<void> EnvGlobalObjectsScope::Init(const EnvProto& env_proto) {
is_default_physical_env_ = env_proto.is_default_physical_env();
InitLogging(env_proto.cpp_logging_conf(), JUST(is_default_physical_env_));
InitLogging(env_proto.cpp_logging_conf());
#ifdef WITH_CUDA
InitGlobalCudaDeviceProp();
#endif
......
......@@ -19,7 +19,6 @@ limitations under the License.
#include "oneflow/core/common/maybe.h"
#include "oneflow/core/job/env_desc.h"
#include "oneflow/core/framework/device.h"
#include "oneflow/core/common/error.h"
namespace oneflow {
......@@ -28,17 +27,15 @@ class ParallelDesc;
class EnvGlobalObjectsScope final {
public:
OF_DISALLOW_COPY_AND_MOVE(EnvGlobalObjectsScope);
EnvGlobalObjectsScope() : is_default_physical_env_(Error::ValueError("Not initialized")) {}
EnvGlobalObjectsScope() = default;
~EnvGlobalObjectsScope();
Maybe<void> Init(const EnvProto& env_proto);
const Maybe<bool>& is_default_physical_env() const { return is_default_physical_env_; }
const std::shared_ptr<const ParallelDesc>& MutParallelDesc4Device(const Device& device);
private:
std::mutex mutex_;
Maybe<bool> is_default_physical_env_;
HashMap<Device, std::shared_ptr<const ParallelDesc>> device2parallel_desc_;
};
......
......@@ -83,14 +83,18 @@ oneflow._oneflow_internal.EnableEagerEnvironment(True)
del env_util
# capture oneflow methods so that they can be still accessed after `del oneflow`
def _SyncOnMasterFn():
import oneflow
if oneflow.python.framework.distribute.is_multi_client():
oneflow._oneflow_internal.eager.multi_client.Sync()
elif oneflow.python.framework.distribute.get_rank() == 0:
oneflow._oneflow_internal.eager.single_client.Sync()
def Sync():
if not oneflow._oneflow_internal.IsEnvInited():
return
if oneflow.python.framework.distribute.is_multi_client():
oneflow._oneflow_internal.eager.multi_client.Sync()
elif oneflow.python.framework.distribute.get_rank() == 0:
oneflow._oneflow_internal.eager.single_client.Sync()
return Sync
atexit.register(oneflow._oneflow_internal.SetShuttingDown)
......
......@@ -45,12 +45,6 @@ def EnvResource():
return text_format.Parse(resource, resource_util.Resource())
def InitDefaultEnv(env_proto):
assert type(env_proto) is env_pb2.EnvProto
env_proto_str = text_format.MessageToString(env_proto)
oneflow._oneflow_internal.InitDefaultEnv(env_proto_str)
def InitEnv(env_proto, is_multi_client):
assert type(env_proto) is env_pb2.EnvProto
env_proto_str = text_format.MessageToString(env_proto)
......
......@@ -83,16 +83,6 @@ def env_init():
return True
def init_default_physical_env():
default_physical_env_proto = _DefaultEnvProto()
log_dir = os.getenv("ONEFLOW_TEST_LOG_DIR")
if log_dir:
default_physical_env_proto.cpp_logging_conf.log_dir = log_dir
default_physical_env_proto.is_default_physical_env = True
CompleteEnvProto(default_physical_env_proto, False)
c_api_util.InitDefaultEnv(default_physical_env_proto)
@oneflow_export("env.current_resource", "current_resource")
def api_get_current_resource() -> resource_util.Resource:
r"""Get current resources, such as:machine nums, cpu/gpu device nums,
......
......@@ -53,7 +53,7 @@ class MultiClientSession(object):
def TryClose(self):
if self.status_ != self.Status.CLOSED:
oneflow._oneflow_internal.DestroyMultiClientSessionContext()
oneflow._oneflow_internal.TryDestroyMultiClientSessionContext()
oneflow._oneflow_internal.ClearSessionById(self.id)
self.status_ = self.Status.CLOSED
......
......@@ -58,7 +58,6 @@ time.sleep(1)
del time
oneflow._oneflow_internal.SetIsMultiClient(False)
env_util.init_default_physical_env()
session_context.OpenDefaultSession(
session_util.Session(oneflow._oneflow_internal.NewSessionId())
)
......@@ -114,12 +113,12 @@ INVALID_SPLIT_AXIS = oneflow._oneflow_internal.INVALID_SPLIT_AXIS
import atexit
from oneflow.compatible.single_client.python.framework.session_context import (
TryCloseDefaultSession,
TryCloseAllSession,
)
atexit.register(TryCloseDefaultSession)
atexit.register(TryCloseAllSession)
del TryCloseDefaultSession
del TryCloseAllSession
del atexit
import sys
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment