Skip to content
Snippets Groups Projects
Commit d6c0b021 authored by 张超魁's avatar 张超魁
Browse files

Basic

parent 00886efc
No related branches found
No related tags found
No related merge requests found
.idea
__pycache__
bin
lib
venv
*.cfg
*.rdb
\ No newline at end of file
import logging
INPUT_DIR = "job_files"
OUTPUT_DIR = "results"
LOG_LEVEL = logging.DEBUG
redis_host = "localhost"
redis_port = 6379
ISULA_VOLUME_PATH = "/home/zck1022/python/isula_schedule/volumes"
ISULA_RM_TIMEOUT = 10
TIMER_POLL_INTERVAL = 1
PREFIX = "schedule_test"
import redis_queue
import json
import vmms_isula
q = redis_queue.RedisQueue("common")
vmms = vmms_isula.LocalIsula()
while True:
job = json.loads(q.get())
vmms.copy_in(job)
vmms.run_job(job)
vmms.copy_out(job)
main.py 0 → 100644
import json
import os
from flask import Flask
from flask import jsonify, request
import logging
import config
import redis_queue
from utils import *
logging.basicConfig(level=config.LOG_LEVEL,
format='%(asctime)s|%(levelname)s|%(filename)s[%(lineno)d]|%(message)s')
app = Flask("iSUla Schedule")
q = redis_queue.RedisQueue("common")
# TODO: authentication
@app.get("/")
def main():
logging.debug("hello")
return jsonify({"hello": "world"})
# Maybe workspace_id is better?
# Maybe put is better
@app.get("/create/<string:workspace_name>")
def create(workspace_name):
logging.info("Ready to create workspace [%s]." % workspace_name)
input_path = "%s/%s" % (config.INPUT_DIR, workspace_name)
output_path = "%s/%s" % (config.OUTPUT_DIR, workspace_name)
if os.path.exists(input_path) or os.path.exists(output_path):
logging.debug("workspace [%s] has exist." % workspace_name)
return jsonify(json_message(FAILURE, error_messages["WORKSPACE_EXIST"]))
try:
os.makedirs(input_path)
os.makedirs(output_path)
except NotADirectoryError:
logging.debug("workspace directory create failed.")
return jsonify(json_message(FAILURE, error_messages["DIR_CREATE_FAIL"]))
except Exception:
logging.debug("exception.")
return jsonify(json_message(FAILURE, error_messages["OTHER"]))
logging.info("workspace [%s] created successfully." % workspace_name)
return jsonify(json_message(SUCCESS, "create successfully"))
@app.post("/upload/<string:workspace_name>")
def upload(workspace_name):
logging.info("Ready to receive file to workspace [%s]." % workspace_name)
path = "%s/%s" % (config.INPUT_DIR, workspace_name)
if "file" not in request.files:
logging.debug("post request does not have the file part.")
return jsonify(json_message(FAILURE, error_messages["NO_FILE"]))
file = request.files['file']
if file.filename == '':
logging.debug("filename is empty.")
return jsonify(json_message(FAILURE, error_messages["EMPTY_FILENAME"]))
file.save(os.path.join(path, file.filename))
logging.info("receive [%s] to [%s]" % (file.filename, workspace_name))
return jsonify(json_message(SUCCESS, "upload successfully."))
"""
{
"image": "ubuntu",
"image_version": "1.0",
"files": [],
"output_file": "filename",
"timeout": 0,
"command": "sh script.sh",
"callback_url": "..."
}
"""
@app.post("/execute/<string:workspace_name>")
def execute(workspace_name):
logging.info("execute workspace [%s]." % workspace_name)
job = request.json
job["name"] = workspace_name
q.put(json.dumps(job))
# print(job)
return jsonify(json_message(SUCCESS, "success"))
@app.get("/output/<string:workspace_name>/<string:output_file>")
def output(workspace_name, output_file):
# set header
logging.info("get workspace [%s] result file [%s]." % (workspace_name, output_file))
output_path = "%s/%s/%s" % (config.OUTPUT_DIR, workspace_name, output_file)
if not os.path.exists(output_path):
logging.debug("workspace [%s/%s] has exist." % (workspace_name, output_file))
return jsonify(json_message(FAILURE, error_messages["OUTPUT_NOT_EXIST"]))
with open(output_path, "r", encoding="utf8") as f:
output_dict = {"output": f.read()}
return jsonify(json_message(SUCCESS, "success", output_dict))
if __name__ == '__main__':
app.run()
import redis
import config
redis_connection = None
def connect_redis():
global redis_connection
if redis_connection is None:
redis_connection = redis.StrictRedis(host=config.redis_host, port=config.redis_port, db=0)
return redis_connection
class RedisQueue:
def __init__(self, queue_name):
self.queue_name = queue_name
self.__db = connect_redis()
def qsize(self):
return self.__db.llen(self.queue_name)
def put(self, obj):
self.__db.rpush(self.queue_name, obj)
def get(self, timeout=0, blocked=True):
if blocked is True:
return self.__db.blpop(self.queue_name, timeout=timeout)
return self.__db.lpop(self.queue_name)
import vmms_isula
job = {
"name": "test1",
"image": "ubuntu",
"image_version": "latest",
"files": ["t.sh"],
"output_file": "out.txt",
"timeout": 0,
"command": "sh t.sh > out.txt",
"callback_url": "..."
}
li = vmms_isula.LocalIsula()
li.copy_in(job)
li.run_job(job)
li.copy_out(job)
utils.py 0 → 100644
SUCCESS = 0
FAILURE = 1
error_messages = {
"DIR_CREATE_FAIL": "directory create failed, please check FILE_DIR in config.py",
"WORKSPACE_EXIST": "job has existed.",
"NO_FILE": "No file in request, please check post request",
"EMPTY_FILENAME": "filename is empty, please check the request",
"OUTPUT_NOT_EXIST": "output file not exist",
"OTHER": "other failure, please check log",
}
def json_message(status: int, message: str, other=None) -> dict:
if other is None:
other = {}
return {"status": status, "message": message, **other}
import subprocess
import re
import time
import os
import shutil
import config
def timeout(command, time_out=1):
"""
$ timeout time_out command > /dev/null 2>&1
timeout - Run a unix command with a timeout. Return -1 on
timeout, otherwise return the return value from the command, which
is typically 0 for success, 1-255 for failure.
"""
# Launch the command
# $ command > /dev/null 2>&1
print(command)
p = subprocess.Popen(
command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT
)
# Wait for the command to complete
# poll to get result, p.poll() is equivalent to wait()
t = 0.0
while t < time_out and p.poll() is None:
time.sleep(config.TIMER_POLL_INTERVAL)
t += config.TIMER_POLL_INTERVAL
# Determine why the while loop terminated
if p.poll() is None:
try:
os.kill(p.pid, 9)
except OSError:
pass
returncode = -1
else:
returncode = p.poll()
return returncode
def timeout_with_return_status(command, time_out, return_value=0):
"""
run $ timeout time_out command > /dev/null 2>&1
until time_out or ret_code == returnValue
timeoutWithReturnStatus - Run a Unix command with a timeout,
until the expected value is returned by the command; On timeout,
return last error code obtained from the command.
"""
# $ command > /dev/null 2>&1
p = subprocess.Popen(
command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT
)
t = 0.0
ret = -1
while t < time_out:
ret = p.poll()
if ret is None:
time.sleep(config.TIMER_POLL_INTERVAL)
t += config.TIMER_POLL_INTERVAL
elif ret == return_value:
return ret
else:
p = subprocess.Popen(
command, stdout=open("/dev/null", "w"), stderr=subprocess.STDOUT
)
return ret
class LocalIsula(object):
def __init__(self):
if len(config.ISULA_VOLUME_PATH) == 0:
exit(1)
def instance_name(self, job_name):
return "%s-%s" % (config.PREFIX, job_name)
def get_volume_path(self, instance_name):
return os.path.join(config.ISULA_VOLUME_PATH, instance_name, "")
def copy_in(self, job):
instance_name = self.instance_name(job["name"])
volume_path = self.get_volume_path(instance_name)
# Create a fresh volume
os.makedirs(volume_path, exist_ok=True)
for file in job["files"]:
# Create output directory if it does not exist
os.makedirs(os.path.dirname(volume_path), exist_ok=True)
shutil.copy(config.INPUT_DIR + "/" + job["name"] + "/" + file, volume_path + file)
return 0
def run_job(self, job):
instance_name = self.instance_name(job["name"])
volume_path = self.get_volume_path(instance_name)
args = ["isula", "run", "--net", "host", "--annotation", "native.umask=normal", "--name", instance_name, "-v"]
args = args + ["%s:%s" % (volume_path, "/home/mount")]
args = args + ["{}:{}".format(job["image"], job["image_version"])]
args = args + ["sh", "-c"]
args = args + ["cd /home/mount; %s" % job["command"]]
ret = timeout(args, job["timeout"] * 2)
return ret
def copy_out(self, job):
instance_name = self.instance_name(job["name"])
volume_path = self.get_volume_path(instance_name)
shutil.move(volume_path + job["output_file"], config.OUTPUT_DIR + '/' + job["name"] + "/" + job["output_file"])
self.destroy_vm(job)
return 0
def destroy_vm(self, job):
instance_name = self.instance_name(job["name"])
volume_path = self.get_volume_path(instance_name)
timeout(["isula", "rm", "-f", instance_name], config.ISULA_RM_TIMEOUT)
if instance_name in os.listdir(volume_path):
shutil.rmtree(volume_path + instance_name)
def get_images(self):
"""getImages - Executes `isula images` and returns a list of
images that can be used to boot a isula container with. This
function is a lot of parsing and so can break easily.
"""
result = set()
o = subprocess.check_output("isula images", shell=True).decode("utf-8")
o_l = o.split("\n")
o_l.pop()
o_l.reverse()
o_l.pop()
for row in o_l:
row_l = row.split(" ")
result.add(re.sub(r".*/([^/]*)", r"\1", row_l[0]))
return list(result)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment