Skip to content

Commit

Permalink
Stop writing failed actions with the invocation ID as part of their k…
Browse files Browse the repository at this point in the history
…ey (#7985)

Nothing is reading this, since now executions are stored as
ExecuteResponses for all actions (passed or failed).
  • Loading branch information
vanja-p authored Dec 2, 2024
1 parent 0b39287 commit 4c5fc19
Show file tree
Hide file tree
Showing 10 changed files with 21 additions and 76 deletions.
4 changes: 2 additions & 2 deletions enterprise/server/execution_service/execution_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (es *ExecutionService) GetExecution(ctx context.Context, req *espb.GetExecu
// that owns the execution, switch to that group's ctx.
// Also if the execution was done anonymously, switch to
// anonymous ctx.
res, err := execution.GetCachedExecuteResponse(ctx, es.env, ex.ExecutionId)
res, err := execution.GetCachedExecuteResponse(ctx, es.env.GetActionCacheClient(), ex.ExecutionId)
if err != nil {
return err
}
Expand Down Expand Up @@ -166,7 +166,7 @@ func (es *ExecutionService) WaitExecution(req *espb.WaitExecutionRequest, stream
// WriteExecutionProfile writes the uncompressed JSON execution profile in
// Google's Trace Event Format.
func (es *ExecutionService) WriteExecutionProfile(ctx context.Context, w io.Writer, executionID string) error {
res, err := execution.GetCachedExecuteResponse(ctx, es.env, executionID)
res, err := execution.GetCachedExecuteResponse(ctx, es.env.GetActionCacheClient(), executionID)
if err != nil {
return status.WrapError(err, "get cached execute response")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func testExecuteAndPublishOperation(t *testing.T, platformOverrides map[string]s
// Should also be able to fetch the ExecuteResponse from cache. See field
// comment on Execution.execute_response_digest for notes on serialization
// format.
cachedExecuteResponse, err := execution.GetCachedExecuteResponse(ctx, env, taskID)
cachedExecuteResponse, err := execution.GetCachedExecuteResponse(ctx, env.GetActionCacheClient(), taskID)
require.NoError(t, err)
assert.Empty(t, cmp.Diff(expectedExecuteResponse, cachedExecuteResponse, protocmp.Transform()))

Expand Down Expand Up @@ -440,7 +440,7 @@ func TestMarkFailed(t *testing.T) {
assert.Equal(t, executionID, ex.ExecutionID)

// ExecuteResponse should be cached after marking failed
executeResponse, err := execution.GetCachedExecuteResponse(ctx, env, executionID)
executeResponse, err := execution.GetCachedExecuteResponse(ctx, env.GetActionCacheClient(), executionID)
require.NoError(t, err)
assert.Equal(t, "It didn't work", executeResponse.GetStatus().GetMessage())

Expand Down
12 changes: 2 additions & 10 deletions enterprise/server/remote_execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,18 +399,10 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch
md.WorkerCompletedTimestamp = timestamppb.Now()
actionResult.ExecutionMetadata = md

// If the action failed or do_not_cache is set, upload information about the error via a failed
// ActionResult under an invocation-specific digest, which will not ever be seen by bazel but
// may be viewed via the Buildbuddy UI.
if task.GetAction().GetDoNotCache() || cmdResult.Error != nil || cmdResult.ExitCode != 0 {
resultDigest, err := digest.AddInvocationIDToDigest(req.GetActionDigest(), digestFunction, task.GetInvocationId())
if err != nil {
if !task.GetAction().GetDoNotCache() && cmdResult.Error == nil && cmdResult.ExitCode == 0 {
if err := cachetools.UploadActionResult(ctx, acClient, adInstanceDigest, actionResult); err != nil {
return finishWithErrFn(status.UnavailableErrorf("Error uploading action result: %s", err.Error()))
}
adInstanceDigest = digest.NewResourceName(resultDigest, req.GetInstanceName(), rspb.CacheType_AC, digestFunction)
}
if err := cachetools.UploadActionResult(ctx, acClient, adInstanceDigest, actionResult); err != nil {
return finishWithErrFn(status.UnavailableErrorf("Error uploading action result: %s", err.Error()))
}

// If there's an error that we know the client won't retry, return an error
Expand Down
1 change: 1 addition & 0 deletions enterprise/server/test/integration/remote_execution/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_test(
"//enterprise/server/testutil/buildbuddy_enterprise",
"//enterprise/server/testutil/testexecutor",
"//enterprise/server/testutil/testredis",
"//enterprise/server/util/execution",
"//proto:build_event_stream_go_proto",
"//proto:remote_execution_go_proto",
"//server/build_event_protocol/build_event_handler",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ func (r *Env) AddBuildBuddyServerWithOptions(opts *BuildBuddyServerOptions) *Bui
env.SetAuthDB(r.testEnv.GetAuthDB())
env.SetUserDB(r.testEnv.GetUserDB())
env.SetInvocationDB(r.testEnv.GetInvocationDB())
env.SetActionCacheClient(r.GetActionResultStorageClient())

server := newBuildBuddyServer(r.t, env, opts)
r.buildBuddyServers[server] = struct{}{}
Expand Down Expand Up @@ -918,21 +919,6 @@ func (r *Env) DownloadOutputsToNewTempDir(res *CommandResult) string {
return tmpDir
}

func (r *Env) GetActionResultForFailedAction(ctx context.Context, cmd *Command, invocationID string) (*repb.ActionResult, error) {
d := cmd.GetActionResourceName().GetDigest()
actionResultDigest, err := digest.AddInvocationIDToDigest(d, cmd.GetActionResourceName().GetDigestFunction(), invocationID)
if err != nil {
assert.FailNow(r.t, fmt.Sprintf("unable to attach invocation ID %q to digest", invocationID))
}
req := &repb.GetActionResultRequest{
InstanceName: cmd.GetActionResourceName().GetInstanceName(),
ActionDigest: actionResultDigest,
DigestFunction: cmd.GetActionResourceName().GetDigestFunction(),
}
acClient := r.GetActionResultStorageClient()
return acClient.GetActionResult(context.Background(), req)
}

func (r *Env) GetStdoutAndStderr(ctx context.Context, actionResult *repb.ActionResult, instanceName string) (string, string, error) {
stdout := ""
if actionResult.GetStdoutDigest() != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/enterprise/server/testutil/buildbuddy_enterprise"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/testutil/testexecutor"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/testutil/testredis"
"github.com/buildbuddy-io/buildbuddy/enterprise/server/util/execution"
"github.com/buildbuddy-io/buildbuddy/server/build_event_protocol/build_event_handler"
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/metrics"
Expand Down Expand Up @@ -99,8 +100,8 @@ func TestActionResultCacheWithSuccessfulAction(t *testing.T) {
res := cmd.Wait()
assert.Equal(t, 0, res.ExitCode, "exit code should be propagated")

_, err := rbe.GetActionResultForFailedAction(context.Background(), cmd, invocationID)
assert.True(t, status.IsNotFoundError(err))
_, err := cachetools.GetActionResult(context.Background(), rbe.GetActionResultStorageClient(), cmd.GetActionResourceName())
assert.NoError(t, err)
}

func TestActionResultCacheWithFailedAction(t *testing.T) {
Expand All @@ -127,11 +128,11 @@ func TestActionResultCacheWithFailedAction(t *testing.T) {
assert.Equal(t, 5, res.ExitCode, "exit code should be propagated")

ctx := context.Background()
failedActionResult, err := rbe.GetActionResultForFailedAction(ctx, cmd, invocationID)
assert.NoError(t, err)
assert.Equal(t, int32(5), failedActionResult.GetExitCode(), "exit code should be set in action result")
stdout, stderr, err := rbe.GetStdoutAndStderr(ctx, failedActionResult, res.InstanceName)
assert.NoError(t, err)
execRes, err := execution.GetCachedExecuteResponse(ctx, rbe.GetActionResultStorageClient(), res.ID)
require.NoError(t, err)
assert.Equal(t, int32(5), execRes.GetResult().GetExitCode(), "exit code should be set in action result")
stdout, stderr, err := rbe.GetStdoutAndStderr(ctx, execRes.GetResult(), res.InstanceName)
require.NoError(t, err)
assert.Equal(t, "hello\n", stdout, "stdout should be propagated")
assert.Equal(t, "bye\n", stderr, "stderr should be propagated")

Expand Down Expand Up @@ -194,11 +195,11 @@ func TestSimpleCommand_Timeout_StdoutStderrStillVisible(t *testing.T) {
assert.Equal(t, "ExampleStderr\n", stderr, "stderr should be propagated")
taskCount := testmetrics.CounterValue(t, metrics.RemoteExecutionTasksStartedCount)
assert.Equal(t, 1, int(taskCount-initialTaskCount), "unexpected number of tasks started")
ar, err = rbe.GetActionResultForFailedAction(ctx, cmd, invocationID)
execRes, err := execution.GetCachedExecuteResponse(ctx, rbe.GetActionResultStorageClient(), res.ID)
require.NoError(t, err)
assert.Empty(
t,
cmp.Diff(res.ActionResult, ar, protocmp.Transform()),
cmp.Diff(res.ActionResult, execRes.GetResult(), protocmp.Transform()),
"failed action result should match what was sent in the ExecuteResponse")
}

Expand Down
1 change: 0 additions & 1 deletion enterprise/server/util/execution/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ go_library(
"//proto:execution_stats_go_proto",
"//proto:remote_execution_go_proto",
"//proto:stored_invocation_go_proto",
"//server/environment",
"//server/remote_cache/digest",
"//server/tables",
"//server/util/proto",
Expand Down
5 changes: 2 additions & 3 deletions enterprise/server/util/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"strings"
"time"

"github.com/buildbuddy-io/buildbuddy/server/environment"
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
"github.com/buildbuddy-io/buildbuddy/server/tables"
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
Expand Down Expand Up @@ -104,7 +103,7 @@ func TableExecToClientProto(in *tables.Execution) (*espb.Execution, error) {
return out, nil
}

func GetCachedExecuteResponse(ctx context.Context, env environment.Env, taskID string) (*repb.ExecuteResponse, error) {
func GetCachedExecuteResponse(ctx context.Context, ac repb.ActionCacheClient, taskID string) (*repb.ExecuteResponse, error) {
rn, err := digest.ParseUploadResourceName(taskID)
if err != nil {
return nil, err
Expand All @@ -118,7 +117,7 @@ func GetCachedExecuteResponse(ctx context.Context, env environment.Env, taskID s
InstanceName: rn.GetInstanceName(),
DigestFunction: rn.GetDigestFunction(),
}
rsp, err := env.GetActionCacheClient().GetActionResult(ctx, req)
rsp, err := ac.GetActionResult(ctx, req)
if err != nil {
return nil, err
}
Expand Down
19 changes: 0 additions & 19 deletions server/remote_cache/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,25 +415,6 @@ func ComputeForFile(path string, digestType repb.DigestFunction_Value) (*repb.Di
return Compute(f, digestType)
}

// AddInvocationIDToDigest combines the hash of the input digest and input invocationID and re-hash.
// This is only to be used for failed action results.
func AddInvocationIDToDigest(digest *repb.Digest, digestType repb.DigestFunction_Value, invocationID string) (*repb.Digest, error) {
if digest == nil {
return nil, status.FailedPreconditionError("nil digest")
}

h, err := HashForDigestType(digestType)
if err != nil {
return nil, err
}
h.Write([]byte(digest.Hash))
h.Write([]byte(invocationID))
return &repb.Digest{
Hash: fmt.Sprintf("%x", h.Sum(nil)),
SizeBytes: digest.SizeBytes,
}, nil
}

func isResourceName(url string, matcher *regexp.Regexp) bool {
return matcher.MatchString(url)
}
Expand Down
16 changes: 1 addition & 15 deletions tools/cas/cas.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ var (

instanceName = flag.String("remote_instance_name", "", "Remote instance name")
apiKey = flag.String("api_key", "", "API key to attach to the outgoing context")
invocationID = flag.String("invocation_id", "", "Invocation ID. This is required when fetching the result of a failed action. Otherwise, it's optional.")

showMetadata = flag.Bool("metadata", false, "Whether to fetch and log metadata for the digest (printed to stderr).")
showMetadataOnly = flag.Bool("metadata_only", false, "Whether to *only* fetch metadata, not the contents. This will print the metadata to stdout instead of stderr.")
Expand All @@ -55,10 +54,6 @@ var (
// Show stderr contents:
//
// bazel run //tools/cas -- -target=grpcs://remote.buildbuddy.dev -digest=HASH/SIZE -type=stderr
//
// Show a failed action result proto (requires invocation ID):
//
// bazel run //tools/cas -- -target=grpcs://remote.buildbuddy.dev -digest=HASH/SIZE -type=ActionResult -invocation_id=IID
func main() {
flag.Parse()
if *target == "" {
Expand Down Expand Up @@ -143,16 +138,7 @@ func main() {
if *blobType == "ActionResult" {
ar, err := cachetools.GetActionResult(ctx, acClient, ind)
if err != nil {
log.Infof("Could not fetch ActionResult; maybe the action failed. Attempting to fetch failed action using invocation ID = %q", *invocationID)
failedDigest, err := digest.AddInvocationIDToDigest(ind.GetDigest(), ind.GetDigestFunction(), *invocationID)
if err != nil {
log.Fatal(err.Error())
}
ind := digest.NewResourceName(failedDigest, ind.GetInstanceName(), rspb.CacheType_AC, repb.DigestFunction_SHA256)
ar, err = cachetools.GetActionResult(ctx, acClient, ind)
if err != nil {
log.Fatal(err.Error())
}
log.Fatal(err.Error())
}
printMessage(ar)
return
Expand Down

0 comments on commit 4c5fc19

Please sign in to comment.