-
Notifications
You must be signed in to change notification settings - Fork 425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TEZ-4061: InputAttemptIdentifier and CompositeInputAttemptIdentifier cannot be compared for equality #326
Conversation
💔 -1 overall
This message was automatically generated. |
@abstractdog |
@@ -282,6 +282,7 @@ private void processShufflePayload(DataMovementEventPayloadProto shufflePayload, | |||
|
|||
private void processInputFailedEvent(InputFailedEvent ife) { | |||
InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion()); | |||
LOG.info("Marking obsolete input: " + inputContext.getSourceVertexName() + " " + srcAttemptIdentifier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use {} formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -50,6 +50,12 @@ public InputAttemptIdentifier expand(int inputIdentifierOffset) { | |||
return new InputAttemptIdentifier(getInputIdentifier() + inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), getFetchTypeInfo(), getSpillEventId()); | |||
} | |||
|
|||
public boolean include(int thatInputIdentifier, int thatAttemptNumber) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few notes:
-
nit: this is more like "includes" because the method answers a question like whether this composite identifier includes another one, rather than taking and action
-
this method would be simpler with a single param:
includes(InputAttempIdentifier inputAttemptIdentifier)
- can you please add a method javadoc about what "includes" exactly means in this context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that includes
is more appropriate for the method. I modified the signature of the method as well as its name, and attached a javadoc on top of it.
@@ -50,6 +50,12 @@ public InputAttemptIdentifier expand(int inputIdentifierOffset) { | |||
return new InputAttemptIdentifier(getInputIdentifier() + inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), getFetchTypeInfo(), getSpillEventId()); | |||
} | |||
|
|||
public boolean include(int thatInputIdentifier, int thatAttemptNumber) { | |||
return | |||
super.getInputIdentifier() <= thatInputIdentifier && thatInputIdentifier < (super.getInputIdentifier() + inputIdentifierCount) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you reuse e.g. guava's Range for better readability?
https://www.geeksforgeeks.org/range-class-guava-java/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we can. Modified to use guava's Range.
...library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
Show resolved
Hide resolved
} | ||
// Avoid adding attempts which have been marked as OBSOLETE | ||
if (isObsoleteInputAttemptIdentifier(input)) { | ||
LOG.info("Skipping obsolete input: " + input); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the original behavior didn't log when we skip an obsolete input, is this an intentional change?
I can see that we still don't log anything in ShuffleScheduler when the same happens, so this is more like a LOG.debug, or back to
if (alreadyCompleted || isObsoleteInputAttemptIdentifier(input)) {
which one do you prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to synchronize the behaviour of ShuffleManager
and ShuffleScheduler
. Modified to use a single if clause.
thanks a lot for this patch @ngsg, sorry that we forgot about that |
@abstractdog , Thank you for your review. I’ve updated the patch to address your comments. Could you please review the changes? Thank you. |
💔 -1 overall
This message was automatically generated. |
thanks @ngsg for addressing the comments! I believe this is quite close to be merged |
@abstractdog , I've added a unit test that checks the boundary of the range inside |
🎊 +1 overall
This message was automatically generated. |
https://issues.apache.org/jira/browse/TEZ-4061