Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Fusion #113

Open
wants to merge 57 commits into
base: branch-24.03
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
8ff0261
task fusion and legality constraints
shivsundram Sep 27, 2021
4a9248f
makeshift serializer for inline ops
shivsundram Sep 27, 2021
99cd51c
reductions scalars, opids, need to remove dynamic allocations
shivsundram Sep 29, 2021
a4e21d8
fusion metadata passed via serialization now
shivsundram Sep 29, 2021
9b57fc8
remove redundant store partitioning
shivsundram Sep 30, 2021
bf7973a
remove creation of deferred arrays
shivsundram Oct 1, 2021
b6121e7
optimized packing, some transform packing
shivsundram Oct 1, 2021
e3708bf
more stuff
shivsundram Oct 4, 2021
8eca06f
partial fusion
Oct 14, 2021
dbf8ea9
merge attempt
Oct 14, 2021
a2c3874
second merge attempt
Oct 15, 2021
303866b
finishing merge
Oct 15, 2021
e8b9021
fix future stuff
Oct 15, 2021
7b0ee30
merge conflict
Oct 15, 2021
8238083
re add serializer, fix horrible merge bug
Oct 15, 2021
46856c7
debugging crap
Oct 22, 2021
6b4182b
trying merge
Oct 22, 2021
db65e43
op registry working
Oct 25, 2021
ddd8dc7
Change the pip package name to match the conda package and update ver…
marcinz Oct 27, 2021
ef677e6
Fix the version of Legion to a particular commit
marcinz Oct 28, 2021
ba955e2
Do not find a default branch for the release
marcinz Oct 28, 2021
de337cf
Change the Legion checkout target
marcinz Oct 28, 2021
78335cc
Bumped up the version of pyarrow
marcinz Oct 28, 2021
369909d
gpu descriptors
Nov 1, 2021
ab0c044
Remove back edges from partition symbols back to operations to avoid
magnatelee Nov 1, 2021
a9e1014
Merge branch 'branch-21.10' into cycle-fix
magnatelee Nov 1, 2021
54d3bb8
Make sure we don't create cycles between region fields and attachments
magnatelee Nov 1, 2021
dd46bdd
Merge pull request #84 from magnatelee/cycle-fix
magnatelee Nov 2, 2021
02103ad
Merge pull request #78 from lightsighter/interpreter-check
magnatelee Nov 3, 2021
d6ccdd2
Handle cases where one instance is used by multiple mappings
magnatelee Nov 3, 2021
94a1fba
Merge branch 'branch-21.10' into mapper-bug-fix
magnatelee Nov 3, 2021
12d13d1
Merge pull request #91 from magnatelee/mapper-bug-fix
magnatelee Nov 3, 2021
51dd00f
Fix import of legion CFFI
manopapad Nov 5, 2021
f348080
Merge pull request #97 from manopapad/fixinit
manopapad Nov 5, 2021
f388f07
Make sure we flush deferred detachments
magnatelee Nov 5, 2021
345f275
reduction fix
Nov 6, 2021
ab96ebc
Merge pull request #99 from magnatelee/detachment-fix
magnatelee Nov 6, 2021
1f9a655
put new constraint stuff back in
Nov 13, 2021
e2afa73
constant optimization
Nov 15, 2021
80aa90f
better constant opt
Nov 22, 2021
f9eb119
terminal dots
Nov 22, 2021
ba55358
useless merge
Nov 22, 2021
78366c8
merging new branch
Nov 22, 2021
f07381e
reuse partitions
Dec 1, 2021
ea12377
install.py
Dec 3, 2021
d7a8dab
new way of applying constraints
Dec 6, 2021
fe66dad
minor cleanup
Dec 7, 2021
a73dec1
more cleanup
Dec 7, 2021
437e67e
use alignment info when fusing
Dec 12, 2021
9594cb5
new apply methods
Dec 12, 2021
239ae35
removing serializer code
Dec 13, 2021
b49557b
more cleanup
Dec 13, 2021
e05e3ff
remove fusion reference from core
Dec 13, 2021
a59e142
remove comments
Dec 13, 2021
399d070
more cleanup
Dec 13, 2021
5bb4df5
partitioning fix
Apr 20, 2022
36c40cb
choose midpoint partition
Jun 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions install.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,8 +794,8 @@ def driver():
)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all changes to this file will be reverted before merge

parser.add_argument(
"--cuda",
action=BooleanFlag,
default=os.environ.get("USE_CUDA", "0") == "1",
action= BooleanFlag,
default=True,#os.environ.get("USE_CUDA", "0") == "1",
help="Build Legate with CUDA support.",
)
parser.add_argument(
Expand Down Expand Up @@ -895,7 +895,7 @@ def driver():
"--clean",
dest="clean_first",
action=BooleanFlag,
default=True,
default=False,
help="Clean before build, and pull latest Legion.",
)
parser.add_argument(
Expand Down
3 changes: 2 additions & 1 deletion legate/core/corelib.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
class CoreLib(Library):
def __init__(self):
self._lib = None

def get_name(self):
return "legate.core"

Expand All @@ -38,6 +38,7 @@ def get_c_header(self):
def initialize(self, shared_lib):
self._lib = shared_lib
shared_lib.legate_parse_config()
#self.fused_id = self._lib.LEGATE_CORE_FUSED_TASK_ID

def get_registration_callback(self):
return "legate_core_perform_registration"
Expand Down
22 changes: 20 additions & 2 deletions legate/core/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ def __init__(self, context, task_id, mapper_id=0, tag=0):
self._sharding_space = None
self._point = None
self._output_regions = list()
self._is_fused = False
self._fusion_metadata = None

@property
def library_task_id(self):
Expand Down Expand Up @@ -577,7 +579,6 @@ def add_store(self, args, store, proj, perm, tag, flags):
else:
region = store.storage.region
field_id = store.storage.field.field_id

req = RegionReq(region, perm, proj, tag, flags)

self._req_analyzer.insert(req, field_id)
Expand Down Expand Up @@ -643,21 +644,35 @@ def set_sharding_space(self, space):
def set_point(self, point):
self._point = point

def add_fusion_metadata(self, is_fused, fusion_metadata):
self._is_fused = is_fused
self._fusion_metadata = fusion_metadata

@staticmethod
def pack_args(argbuf, args):
argbuf.pack_32bit_uint(len(args))
for arg in args:
arg.pack(argbuf)


@staticmethod
def pack_fusion_metadata(argbuf, is_fused, fusion_metadata):
argbuf.pack_bool(is_fused)
if is_fused:
fusion_metadata.pack(argbuf)


def build_task(self, launch_domain, argbuf):
self._req_analyzer.analyze_requirements()
self._out_analyzer.analyze_requirements()

#pack fusion metadata
self.pack_fusion_metadata(argbuf, self._is_fused, self._fusion_metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of this design. The fusion task shouldn't be any different from a normal Legate task. I believe the fusion metadata can be passed as a bunch of scalar arguments to the fusion task. If we do that, then we don't need to make every place in the core handle fusion. Making the default path aware of task fusion doesn't sound like a good design.

Copy link
Author

@shivsundram shivsundram Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is def worth discussing (I pondered this myself):
I did think about making them scalars, but was not sure if there were any other implications/downsides of storing all the metadata that way (eg are there a maximum number of scalars we can give a task), since we'd be sending in quite a few scalars to the task (or rather multiple lists, each of which can have >50 scalars; the number of scalars in 4/5 of these lists is equivalent to the fused_op length, even with potential deduplication of stores).
If we did this, in the task's scalars array, we'd thus be mixing "metadata" scalars (which are really a bunch of lists) with the actual scalars used by the sub-tasks, which seemed to not really adhere to the abstraction of having a dedicated "scalars" array in context data structure. I thus chose to serialize it as Legate Task argument data. As you stated, the downside is that the core/default has to be "fusion aware".

Copy link
Author

@shivsundram shivsundram Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if you don't see any potential downsides of making them all scalars, then yeah it could totally be implemented to be scalar based. Would mostly just have to move book-keeping logic from the deserializer into the Fused Legate Task

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel what's actually missing here is an ability to pass an arbitrary chunk of data (probably a memoryview object in Python) to a task, which in theory possible, so we should probably extend the buffer builder interface to take an an arbitrary memory view object and pack it as a single contiguous chunk. The assumption here is that the task knows how to interpret those bytes. It shouldn't be too difficult for me to add it.


self.pack_args(argbuf, self._inputs)
self.pack_args(argbuf, self._outputs)
self.pack_args(argbuf, self._reductions)
self.pack_args(argbuf, self._scalars)

task = IndexTask(
self.legion_task_id,
launch_domain,
Expand All @@ -683,6 +698,9 @@ def build_task(self, launch_domain, argbuf):
def build_single_task(self, argbuf):
self._req_analyzer.analyze_requirements()
self._out_analyzer.analyze_requirements()

#pack fusion metadata
self.pack_fusion_metadata(argbuf, self._is_fused, self._fusion_metadata)

self.pack_args(argbuf, self._inputs)
self.pack_args(argbuf, self._outputs)
Expand Down
8 changes: 7 additions & 1 deletion legate/core/legion.py
Original file line number Diff line number Diff line change
Expand Up @@ -4859,6 +4859,12 @@ def pack_32bit_int(self, arg):
self.size += 4
self.add_arg(arg, legion.LEGION_TYPE_INT32)

def pack_32bit_int_arr(self, arg):
self.fmt.append(str(len(arg))+"i")
size = len(arg)
self.size += 4*size
self.args += arg

def pack_64bit_int(self, arg):
self.fmt.append("q")
self.size += 8
Expand Down Expand Up @@ -5043,7 +5049,7 @@ def pack_dtype(self, dtype):
def get_string(self):
if self.string is None or self.arglen != len(self.args):
fmtstr = "".join(self.fmt)
assert len(fmtstr) == len(self.args) + 1
#assert len(fmtstr) == len(self.args) + 1
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion is being removed for performance reasons. The new method pack_32bit_int_arr above is a much faster way of packing a set of 32 bit ints (rather than appending them individually), but it breaks the invariant that len(fmtstr) == len(self.args) + 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll probably generalize this code, as I'm sure this is a utility not just for 32-bit integers.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense; I found it faster to batch the packing of ints this way.

self.string = struct.pack(fmtstr, *self.args)
self.arglen = len(self.args)
return self.string
Expand Down
36 changes: 32 additions & 4 deletions legate/core/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
from .legion import Future
from .store import Store
from .utils import OrderedSet

from .legion import (
FieldSpace,
Future
)


class Operation(object):
def __init__(self, context, mapper_id=0, op_id=0):
Expand All @@ -30,6 +34,7 @@ def __init__(self, context, mapper_id=0, op_id=0):
self._inputs = []
self._outputs = []
self._reductions = []
self._is_fused = False
self._input_parts = []
self._output_parts = []
self._reduction_parts = []
Expand Down Expand Up @@ -145,11 +150,18 @@ def add_broadcast(self, store):
def add_constraint(self, constraint):
self._constraints.append(constraint)

def has_constraint(self, store1, store2):
part1 = self._get_unique_partition(store1)
part2 = self._get_unique_partition(store2)
cons = [str(con) for con in self._constraints]
return (str(part1 == part2) in cons) or (str(part2==part1) in cons)

def execute(self):
self._context.runtime.submit(self)

def get_tag(self, strategy, part):
if strategy.is_key_part(part):
return 0
Copy link
Author

@shivsundram shivsundram Dec 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one (ie hack) is interesting, and will be removed. When creating a fused operation, if one of the stores has a partition with tag=0, but another reference to the the same store w/ the same partition has tag=1 in the same fused op, the runtime will complain that these 2 partitions are aliased, even though theoretically nothing is wrong. I will likely change this to to ignore tags when coalescing within a fused op

Copy link
Contributor

@magnatelee magnatelee Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like an auto-partitioner issue: the auto-parallelizer marks a partition as the "key" partition used to distribute tasks across processors. And the auto-parallelizer conveys that information in the tag. With task fusion, the auto-parallelizer shouldn't pick a tag for a particular task, but for the whole window, so that the partition has the same tag throughout the window.

return 1 # LEGATE_CORE_KEY_STORE_TAG
else:
return 0
Expand Down Expand Up @@ -180,6 +192,7 @@ def __init__(self, context, task_id, mapper_id=0, op_id=0):
self._task_id = task_id
self._scalar_args = []
self._futures = []
self._fusion_metadata = None

def get_name(self):
libname = self.context.library.get_name()
Expand All @@ -195,14 +208,29 @@ def add_dtype_arg(self, dtype):
def add_future(self, future):
self._futures.append(future)

def add_fusion_metadata(self, fusion_metadata):
self._is_fused = True
self._fusion_metadata = fusion_metadata

def launch(self, strategy):
launcher = TaskLauncher(self.context, self._task_id, self.mapper_id)

for input, input_part in zip(self._inputs, self._input_parts):
if self._is_fused:
launcher.add_fusion_metadata(self._is_fused, self._fusion_metadata)
if self._is_fused: #fused ops re-use encapsulated unfused partitions
input_parts = self._unfused_input_parts
output_parts = self._unfused_output_parts
reduction_parts = self._unfused_reduction_parts
else:
input_parts = self._input_parts
output_parts = self._output_parts
reduction_parts = self._reduction_parts
Copy link
Author

@shivsundram shivsundram Dec 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, a hash method is used to map a store (or rather it's constraints) to it's partition in solver.py. For a certain store (for inputs, outputs, or reductions), its hash will ingest/use its task's absolute id, which is problematic for fusion; the fused task will have a different absolute id than the unfused tasks.
To take care of this we set input_parts to self._unfused_input_parts, so we can hash into the original "unfused" partitions. Otherwise, the method for hashing into a store's partition will complain and say that the partition for the current store doesn't exist for the fused op


Copy link
Author

@shivsundram shivsundram Dec 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's two way of generating the set of partitions for a fused op:
-1 is to create the fused op, then repartition all the stores of the fused op (even though they were already partitioned in their unfused form)
-The other way is to build a superstrategy of all the individual 'unfused' partitions
The second is much cheaper (partitioning_stores twice has significant/tangible overhead) and is what is currently used in the code

for input, input_part in zip(self._inputs, input_parts):
proj = strategy.get_projection(input_part)
tag = self.get_tag(strategy, input_part)
launcher.add_input(input, proj, tag=tag)
for output, output_part in zip(self._outputs, self._output_parts):
for output, output_part in zip(self._outputs, output_parts):
if output.unbound:
continue
proj = strategy.get_projection(output_part)
Expand All @@ -212,7 +240,7 @@ def launch(self, strategy):
# We update the key partition of a store only when it gets updated
output.set_key_partition(partition)
for ((reduction, redop), reduction_part) in zip(
self._reductions, self._reduction_parts
self._reductions, reduction_parts
):
partition = strategy.get_partition(reduction_part)
can_read_write = partition.is_disjoint_for(strategy, reduction)
Expand Down
2 changes: 0 additions & 2 deletions legate/core/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,8 @@ def construct(self, region, complete=False):
transform = Transform(tile_shape.ndim, tile_shape.ndim)
for idx, size in enumerate(tile_shape):
transform.trans[idx, idx] = size

lo = Shape((0,) * tile_shape.ndim) + self._offset
hi = self._tile_shape - 1 + self._offset

extent = Rect(hi, lo, exclusive=False)

color_space = self._runtime.find_or_create_index_space(
Expand Down
Loading