-
Notifications
You must be signed in to change notification settings - Fork 1
/
other_agents.py
68 lines (50 loc) · 2.05 KB
/
other_agents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import numpy as np
# TODO - Test
def get_parker_action(machine, job_slot):
align_score = 0
act = len(job_slot.slot)
for i in range(len(job_slot.slot)):
new_job = job_slot.slot[i]
if new_job is not None: # 存在添加的任务
avail_res = machine.avail_slot[: new_job.len, :]
res_left = avail_res - new_job.res_vec
if np.all(res_left[:] >= 0):
tmp_align_score = avail_res[0, :].dot(new_job.res_vec)
if tmp_align_score > align_score:
align_score = tmp_align_score
act = i
return act
def get_sjf_action(machine, job_slot):
sjf_score = 0
act = len(job_slot.slot) # 如果没有动作,保持
for i in range(job_slot.slot):
new_job = job_slot.slot[i]
if new_job is not None:
avail_res = machine.avail_slot[: new_job.len, :]
res_left = avail_res - new_job.res_vec
if np.all(res_left[:] >= 0): # 资源足够
tmp_sjf_score = 1 / float(new_job.len)
if tmp_sjf_score > sjf_score:
sjf_score = tmp_sjf_score
act = i
return act
def get_packer_sjf_action(machine, job_slot, knob):
combined_score = 0
act = len(job_slot.slot)
for i in range(len(job_slot.slot)):
new_job = job_slot.slot[i]
if new_job is not None:
avail_res = machine.avail_slot[: new_job.len, :]
res_left = avail_res - new_job.res_vec
if np.all(res_left[:] >= 0):
tmp_align_score = avail_res[0, :].dot(new_job.res_vec)
tmp_sjf_score = 1 / float(new_job.len)
tmp_combined_score = knob * tmp_align_score + (1 - knob) * tmp_sjf_score
if tmp_combined_score > combined_score:
combined_score = tmp_combined_score
act = i
return act
def get_random_action(machine, job_slot):
num_act = len(job_slot.slot) + 1 # 允许空动作
act = np.random.randint(num_act)
return act