-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Task Fusion #113
base: branch-24.03
Are you sure you want to change the base?
Task Fusion #113
Changes from all commits
8ff0261
4a9248f
99cd51c
a4e21d8
9b57fc8
bf7973a
b6121e7
e3708bf
8eca06f
dbf8ea9
a2c3874
303866b
e8b9021
7b0ee30
8238083
46856c7
6b4182b
db65e43
ddd8dc7
ef677e6
ba955e2
de337cf
78335cc
369909d
ab0c044
a9e1014
54d3bb8
dd46bdd
02103ad
d6ccdd2
94a1fba
12d13d1
51dd00f
f348080
f388f07
345f275
ab96ebc
1f9a655
e2afa73
80aa90f
f9eb119
ba55358
78366c8
f07381e
ea12377
d7a8dab
fe66dad
a73dec1
437e67e
9594cb5
239ae35
b49557b
e05e3ff
a59e142
399d070
5bb4df5
36c40cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -534,6 +534,8 @@ def __init__(self, context, task_id, mapper_id=0, tag=0): | |
self._sharding_space = None | ||
self._point = None | ||
self._output_regions = list() | ||
self._is_fused = False | ||
self._fusion_metadata = None | ||
|
||
@property | ||
def library_task_id(self): | ||
|
@@ -577,7 +579,6 @@ def add_store(self, args, store, proj, perm, tag, flags): | |
else: | ||
region = store.storage.region | ||
field_id = store.storage.field.field_id | ||
|
||
req = RegionReq(region, perm, proj, tag, flags) | ||
|
||
self._req_analyzer.insert(req, field_id) | ||
|
@@ -643,21 +644,35 @@ def set_sharding_space(self, space): | |
def set_point(self, point): | ||
self._point = point | ||
|
||
def add_fusion_metadata(self, is_fused, fusion_metadata): | ||
self._is_fused = is_fused | ||
self._fusion_metadata = fusion_metadata | ||
|
||
@staticmethod | ||
def pack_args(argbuf, args): | ||
argbuf.pack_32bit_uint(len(args)) | ||
for arg in args: | ||
arg.pack(argbuf) | ||
|
||
|
||
@staticmethod | ||
def pack_fusion_metadata(argbuf, is_fused, fusion_metadata): | ||
argbuf.pack_bool(is_fused) | ||
if is_fused: | ||
fusion_metadata.pack(argbuf) | ||
|
||
|
||
def build_task(self, launch_domain, argbuf): | ||
self._req_analyzer.analyze_requirements() | ||
self._out_analyzer.analyze_requirements() | ||
|
||
#pack fusion metadata | ||
self.pack_fusion_metadata(argbuf, self._is_fused, self._fusion_metadata) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not a fan of this design. The fusion task shouldn't be any different from a normal Legate task. I believe the fusion metadata can be passed as a bunch of scalar arguments to the fusion task. If we do that, then we don't need to make every place in the core handle fusion. Making the default path aware of task fusion doesn't sound like a good design. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is def worth discussing (I pondered this myself): There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But if you don't see any potential downsides of making them all scalars, then yeah it could totally be implemented to be scalar based. Would mostly just have to move book-keeping logic from the deserializer into the Fused Legate Task There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel what's actually missing here is an ability to pass an arbitrary chunk of data (probably a |
||
|
||
self.pack_args(argbuf, self._inputs) | ||
self.pack_args(argbuf, self._outputs) | ||
self.pack_args(argbuf, self._reductions) | ||
self.pack_args(argbuf, self._scalars) | ||
|
||
task = IndexTask( | ||
self.legion_task_id, | ||
launch_domain, | ||
|
@@ -683,6 +698,9 @@ def build_task(self, launch_domain, argbuf): | |
def build_single_task(self, argbuf): | ||
self._req_analyzer.analyze_requirements() | ||
self._out_analyzer.analyze_requirements() | ||
|
||
#pack fusion metadata | ||
self.pack_fusion_metadata(argbuf, self._is_fused, self._fusion_metadata) | ||
|
||
self.pack_args(argbuf, self._inputs) | ||
self.pack_args(argbuf, self._outputs) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4859,6 +4859,12 @@ def pack_32bit_int(self, arg): | |
self.size += 4 | ||
self.add_arg(arg, legion.LEGION_TYPE_INT32) | ||
|
||
def pack_32bit_int_arr(self, arg): | ||
self.fmt.append(str(len(arg))+"i") | ||
size = len(arg) | ||
self.size += 4*size | ||
self.args += arg | ||
|
||
def pack_64bit_int(self, arg): | ||
self.fmt.append("q") | ||
self.size += 8 | ||
|
@@ -5043,7 +5049,7 @@ def pack_dtype(self, dtype): | |
def get_string(self): | ||
if self.string is None or self.arglen != len(self.args): | ||
fmtstr = "".join(self.fmt) | ||
assert len(fmtstr) == len(self.args) + 1 | ||
#assert len(fmtstr) == len(self.args) + 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This assertion is being removed for performance reasons. The new method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll probably generalize this code, as I'm sure this is a utility not just for 32-bit integers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense; I found it faster to batch the packing of ints this way. |
||
self.string = struct.pack(fmtstr, *self.args) | ||
self.arglen = len(self.args) | ||
return self.string | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,11 @@ | |
from .legion import Future | ||
from .store import Store | ||
from .utils import OrderedSet | ||
|
||
from .legion import ( | ||
FieldSpace, | ||
Future | ||
) | ||
|
||
|
||
class Operation(object): | ||
def __init__(self, context, mapper_id=0, op_id=0): | ||
|
@@ -30,6 +34,7 @@ def __init__(self, context, mapper_id=0, op_id=0): | |
self._inputs = [] | ||
self._outputs = [] | ||
self._reductions = [] | ||
self._is_fused = False | ||
self._input_parts = [] | ||
self._output_parts = [] | ||
self._reduction_parts = [] | ||
|
@@ -145,11 +150,18 @@ def add_broadcast(self, store): | |
def add_constraint(self, constraint): | ||
self._constraints.append(constraint) | ||
|
||
def has_constraint(self, store1, store2): | ||
part1 = self._get_unique_partition(store1) | ||
part2 = self._get_unique_partition(store2) | ||
cons = [str(con) for con in self._constraints] | ||
return (str(part1 == part2) in cons) or (str(part2==part1) in cons) | ||
|
||
def execute(self): | ||
self._context.runtime.submit(self) | ||
|
||
def get_tag(self, strategy, part): | ||
if strategy.is_key_part(part): | ||
return 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one (ie hack) is interesting, and will be removed. When creating a fused operation, if one of the stores has a partition with tag=0, but another reference to the the same store w/ the same partition has tag=1 in the same fused op, the runtime will complain that these 2 partitions are aliased, even though theoretically nothing is wrong. I will likely change this to to ignore tags when coalescing within a fused op There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sounds like an auto-partitioner issue: the auto-parallelizer marks a partition as the "key" partition used to distribute tasks across processors. And the auto-parallelizer conveys that information in the tag. With task fusion, the auto-parallelizer shouldn't pick a tag for a particular task, but for the whole window, so that the partition has the same tag throughout the window. |
||
return 1 # LEGATE_CORE_KEY_STORE_TAG | ||
else: | ||
return 0 | ||
|
@@ -180,6 +192,7 @@ def __init__(self, context, task_id, mapper_id=0, op_id=0): | |
self._task_id = task_id | ||
self._scalar_args = [] | ||
self._futures = [] | ||
self._fusion_metadata = None | ||
|
||
def get_name(self): | ||
libname = self.context.library.get_name() | ||
|
@@ -195,14 +208,24 @@ def add_dtype_arg(self, dtype): | |
def add_future(self, future): | ||
self._futures.append(future) | ||
|
||
def add_fusion_metadata(self, fusion_metadata): | ||
self._is_fused = True | ||
self._fusion_metadata = fusion_metadata | ||
|
||
def launch(self, strategy): | ||
launcher = TaskLauncher(self.context, self._task_id, self.mapper_id) | ||
|
||
for input, input_part in zip(self._inputs, self._input_parts): | ||
if self._is_fused: | ||
launcher.add_fusion_metadata(self._is_fused, self._fusion_metadata) | ||
input_parts = self._input_parts | ||
output_parts = self._output_parts | ||
reduction_parts = self._reduction_parts | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's two way of generating the set of partitions for a fused op: |
||
for input, input_part in zip(self._inputs, input_parts): | ||
proj = strategy.get_projection(input_part) | ||
tag = self.get_tag(strategy, input_part) | ||
launcher.add_input(input, proj, tag=tag) | ||
for output, output_part in zip(self._outputs, self._output_parts): | ||
for output, output_part in zip(self._outputs, output_parts): | ||
if output.unbound: | ||
continue | ||
proj = strategy.get_projection(output_part) | ||
|
@@ -212,7 +235,7 @@ def launch(self, strategy): | |
# We update the key partition of a store only when it gets updated | ||
output.set_key_partition(partition) | ||
for ((reduction, redop), reduction_part) in zip( | ||
self._reductions, self._reduction_parts | ||
self._reductions, reduction_parts | ||
): | ||
partition = strategy.get_partition(reduction_part) | ||
can_read_write = partition.is_disjoint_for(strategy, reduction) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all changes to this file will be reverted before merge