diff --git a/gen/go/proto/resources/common/cron/cron.go b/gen/go/proto/resources/common/cron/cron.go index 4f8ff96cc..8d5c0dd8f 100644 --- a/gen/go/proto/resources/common/cron/cron.go +++ b/gen/go/proto/resources/common/cron/cron.go @@ -1,5 +1,9 @@ package cron +import "time" + +const DefaultCronTimeout = 10 * time.Second + func (x *Cronjob) Merge(in *Cronjob) *Cronjob { x.Schedule = in.Schedule @@ -22,6 +26,14 @@ func (x *Cronjob) Merge(in *Cronjob) *Cronjob { return x } +func (x *Cronjob) GetRunTimeout() time.Duration { + if x.Timeout == nil { + return DefaultCronTimeout + } + + return x.Timeout.AsDuration() +} + func (x *CronjobData) Merge(in *CronjobData) *CronjobData { x.Data = in.Data diff --git a/gen/go/proto/resources/common/cron/cron.pb.go b/gen/go/proto/resources/common/cron/cron.pb.go index 4ba11bcf0..2ca2fb2a4 100644 --- a/gen/go/proto/resources/common/cron/cron.pb.go +++ b/gen/go/proto/resources/common/cron/cron.pb.go @@ -84,8 +84,10 @@ type Cronjob struct { Schedule string `protobuf:"bytes,2,opt,name=schedule,proto3" json:"schedule,omitempty"` State CronjobState `protobuf:"varint,3,opt,name=state,proto3,enum=resources.common.cron.CronjobState" json:"state,omitempty"` NextScheduleTime *timestamp.Timestamp `protobuf:"bytes,4,opt,name=next_schedule_time,json=nextScheduleTime,proto3" json:"next_schedule_time,omitempty"` - LastAttemptTime *timestamp.Timestamp `protobuf:"bytes,5,opt,name=last_attempt_time,json=lastAttemptTime,proto3" json:"last_attempt_time,omitempty"` - Data *CronjobData `protobuf:"bytes,6,opt,name=data,proto3" json:"data,omitempty"` + LastAttemptTime *timestamp.Timestamp `protobuf:"bytes,5,opt,name=last_attempt_time,json=lastAttemptTime,proto3,oneof" json:"last_attempt_time,omitempty"` + StartedTime *timestamp.Timestamp `protobuf:"bytes,6,opt,name=started_time,json=startedTime,proto3,oneof" json:"started_time,omitempty"` + Timeout *durationpb.Duration `protobuf:"bytes,7,opt,name=timeout,proto3,oneof" json:"timeout,omitempty"` + Data *CronjobData `protobuf:"bytes,8,opt,name=data,proto3" json:"data,omitempty"` } func (x *Cronjob) Reset() { @@ -153,6 +155,20 @@ func (x *Cronjob) GetLastAttemptTime() *timestamp.Timestamp { return nil } +func (x *Cronjob) GetStartedTime() *timestamp.Timestamp { + if x != nil { + return x.StartedTime + } + return nil +} + +func (x *Cronjob) GetTimeout() *durationpb.Duration { + if x != nil { + return x.Timeout + } + return nil +} + func (x *Cronjob) GetData() *CronjobData { if x != nil { return x.Data @@ -388,6 +404,51 @@ func (x *CronjobCompletedEvent) GetData() *CronjobData { return nil } +type GenericCronData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Attributes map[string]string `protobuf:"bytes,1,rep,name=attributes,proto3" json:"attributes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *GenericCronData) Reset() { + *x = GenericCronData{} + mi := &file_resources_common_cron_cron_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GenericCronData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GenericCronData) ProtoMessage() {} + +func (x *GenericCronData) ProtoReflect() protoreflect.Message { + mi := &file_resources_common_cron_cron_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GenericCronData.ProtoReflect.Descriptor instead. +func (*GenericCronData) Descriptor() ([]byte, []int) { + return file_resources_common_cron_cron_proto_rawDescGZIP(), []int{5} +} + +func (x *GenericCronData) GetAttributes() map[string]string { + if x != nil { + return x.Attributes + } + return nil +} + var File_resources_common_cron_cron_proto protoreflect.FileDescriptor var file_resources_common_cron_cron_proto_rawDesc = []byte{ @@ -400,7 +461,7 @@ var file_resources_common_cron_cron_proto_rawDesc = []byte{ 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x23, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc6, 0x02, 0x0a, 0x07, 0x43, 0x72, + 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x80, 0x04, 0x0a, 0x07, 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x63, 0x68, @@ -412,65 +473,88 @@ var file_resources_common_cron_cron_proto_rawDesc = []byte{ 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x10, 0x6e, 0x65, - 0x78, 0x74, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x4a, + 0x78, 0x74, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x4f, 0x0a, 0x11, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, - 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6c, 0x61, 0x73, 0x74, 0x41, - 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x04, 0x64, 0x61, - 0x74, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x73, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x63, 0x72, 0x6f, 0x6e, - 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x44, 0x61, 0x74, 0x61, 0x52, 0x04, 0x64, 0x61, - 0x74, 0x61, 0x22, 0x84, 0x01, 0x0a, 0x0b, 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x44, 0x61, - 0x74, 0x61, 0x12, 0x3d, 0x0a, 0x0a, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x73, 0x2e, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x54, 0x69, 0x6d, - 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, - 0x74, 0x12, 0x2d, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x88, 0x01, 0x01, - 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x22, 0x72, 0x0a, 0x15, 0x43, 0x72, 0x6f, - 0x6e, 0x6a, 0x6f, 0x62, 0x4c, 0x6f, 0x63, 0x6b, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, - 0x74, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x3d, - 0x0a, 0x0a, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x74, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, 0x51, 0x0a, - 0x15, 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, - 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x38, 0x0a, 0x07, 0x63, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, - 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x73, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x63, 0x72, 0x6f, 0x6e, 0x2e, - 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x52, 0x07, 0x63, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, - 0x22, 0xf8, 0x01, 0x0a, 0x15, 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x43, 0x6f, 0x6d, 0x70, - 0x6c, 0x65, 0x74, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, - 0x0a, 0x06, 0x73, 0x75, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, - 0x73, 0x75, 0x63, 0x65, 0x73, 0x73, 0x12, 0x38, 0x0a, 0x07, 0x65, 0x6e, 0x64, 0x44, 0x61, 0x74, - 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x73, 0x2e, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x54, 0x69, - 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x07, 0x65, 0x6e, 0x64, 0x44, 0x61, 0x74, 0x65, - 0x12, 0x33, 0x0a, 0x07, 0x65, 0x6c, 0x61, 0x70, 0x73, 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x65, 0x6c, - 0x61, 0x70, 0x73, 0x65, 0x64, 0x12, 0x3b, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, - 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x6f, 0x6e, - 0x6a, 0x6f, 0x62, 0x44, 0x61, 0x74, 0x61, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x88, - 0x01, 0x01, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x2a, 0x7e, 0x0a, 0x0c, 0x43, - 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1d, 0x0a, 0x19, 0x43, - 0x52, 0x4f, 0x4e, 0x4a, 0x4f, 0x42, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x55, 0x4e, 0x53, - 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x43, 0x52, - 0x4f, 0x4e, 0x4a, 0x4f, 0x42, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x57, 0x41, 0x49, 0x54, - 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x19, 0x0a, 0x15, 0x43, 0x52, 0x4f, 0x4e, 0x4a, 0x4f, 0x42, - 0x5f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x02, - 0x12, 0x19, 0x0a, 0x15, 0x43, 0x52, 0x4f, 0x4e, 0x4a, 0x4f, 0x42, 0x5f, 0x53, 0x54, 0x41, 0x54, - 0x45, 0x5f, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x03, 0x42, 0x48, 0x5a, 0x46, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x66, 0x69, 0x76, 0x65, 0x6e, 0x65, - 0x74, 0x2d, 0x61, 0x70, 0x70, 0x2f, 0x66, 0x69, 0x76, 0x65, 0x6e, 0x65, 0x74, 0x2f, 0x67, 0x65, - 0x6e, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x72, 0x65, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x73, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x63, 0x72, 0x6f, 0x6e, - 0x3b, 0x63, 0x72, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x00, 0x52, 0x0f, 0x6c, 0x61, 0x73, + 0x74, 0x41, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x88, 0x01, 0x01, 0x12, + 0x46, 0x0a, 0x0c, 0x73, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, + 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x73, 0x2e, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x01, 0x52, 0x0b, 0x73, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, + 0x54, 0x69, 0x6d, 0x65, 0x88, 0x01, 0x01, 0x12, 0x38, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, + 0x75, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x48, 0x02, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x88, 0x01, + 0x01, 0x12, 0x36, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x22, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, + 0x6f, 0x6e, 0x2e, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x44, + 0x61, 0x74, 0x61, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x6c, 0x61, + 0x73, 0x74, 0x5f, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x42, + 0x0f, 0x0a, 0x0d, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, + 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x84, 0x01, 0x0a, + 0x0b, 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x44, 0x61, 0x74, 0x61, 0x12, 0x3d, 0x0a, 0x0a, + 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x74, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x2d, 0x0a, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x48, + 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x88, 0x01, 0x01, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, + 0x61, 0x74, 0x61, 0x22, 0x72, 0x0a, 0x15, 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x4c, 0x6f, + 0x63, 0x6b, 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1a, 0x0a, 0x08, + 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x3d, 0x0a, 0x0a, 0x75, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, + 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, 0x51, 0x0a, 0x15, 0x43, 0x72, 0x6f, 0x6e, 0x6a, + 0x6f, 0x62, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x12, 0x38, 0x0a, 0x07, 0x63, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x63, 0x6f, + 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, + 0x62, 0x52, 0x07, 0x63, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x22, 0xf8, 0x01, 0x0a, 0x15, 0x43, + 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x75, 0x63, 0x65, + 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x73, 0x75, 0x63, 0x65, 0x73, 0x73, + 0x12, 0x38, 0x0a, 0x07, 0x65, 0x6e, 0x64, 0x44, 0x61, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x07, 0x65, 0x6e, 0x64, 0x44, 0x61, 0x74, 0x65, 0x12, 0x33, 0x0a, 0x07, 0x65, 0x6c, + 0x61, 0x70, 0x73, 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x65, 0x6c, 0x61, 0x70, 0x73, 0x65, 0x64, 0x12, + 0x3b, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, + 0x2e, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x44, 0x61, 0x74, + 0x61, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x88, 0x01, 0x01, 0x42, 0x07, 0x0a, 0x05, + 0x5f, 0x64, 0x61, 0x74, 0x61, 0x22, 0xa8, 0x01, 0x0a, 0x0f, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x69, + 0x63, 0x43, 0x72, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x12, 0x56, 0x0a, 0x0a, 0x61, 0x74, 0x74, + 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x36, 0x2e, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, + 0x2e, 0x63, 0x72, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x43, 0x72, 0x6f, + 0x6e, 0x44, 0x61, 0x74, 0x61, 0x2e, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, + 0x73, 0x1a, 0x3d, 0x0a, 0x0f, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, + 0x2a, 0x7e, 0x0a, 0x0c, 0x43, 0x72, 0x6f, 0x6e, 0x6a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x12, 0x1d, 0x0a, 0x19, 0x43, 0x52, 0x4f, 0x4e, 0x4a, 0x4f, 0x42, 0x5f, 0x53, 0x54, 0x41, 0x54, + 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, + 0x19, 0x0a, 0x15, 0x43, 0x52, 0x4f, 0x4e, 0x4a, 0x4f, 0x42, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x45, + 0x5f, 0x57, 0x41, 0x49, 0x54, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x19, 0x0a, 0x15, 0x43, 0x52, + 0x4f, 0x4e, 0x4a, 0x4f, 0x42, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x50, 0x45, 0x4e, 0x44, + 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x19, 0x0a, 0x15, 0x43, 0x52, 0x4f, 0x4e, 0x4a, 0x4f, 0x42, + 0x5f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x03, + 0x42, 0x48, 0x5a, 0x46, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x66, + 0x69, 0x76, 0x65, 0x6e, 0x65, 0x74, 0x2d, 0x61, 0x70, 0x70, 0x2f, 0x66, 0x69, 0x76, 0x65, 0x6e, + 0x65, 0x74, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, + 0x2f, 0x63, 0x72, 0x6f, 0x6e, 0x3b, 0x63, 0x72, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -486,7 +570,7 @@ func file_resources_common_cron_cron_proto_rawDescGZIP() []byte { } var file_resources_common_cron_cron_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_resources_common_cron_cron_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_resources_common_cron_cron_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_resources_common_cron_cron_proto_goTypes = []any{ (CronjobState)(0), // 0: resources.common.cron.CronjobState (*Cronjob)(nil), // 1: resources.common.cron.Cronjob @@ -494,27 +578,32 @@ var file_resources_common_cron_cron_proto_goTypes = []any{ (*CronjobLockOwnerState)(nil), // 3: resources.common.cron.CronjobLockOwnerState (*CronjobSchedulerEvent)(nil), // 4: resources.common.cron.CronjobSchedulerEvent (*CronjobCompletedEvent)(nil), // 5: resources.common.cron.CronjobCompletedEvent - (*timestamp.Timestamp)(nil), // 6: resources.timestamp.Timestamp - (*anypb.Any)(nil), // 7: google.protobuf.Any - (*durationpb.Duration)(nil), // 8: google.protobuf.Duration + (*GenericCronData)(nil), // 6: resources.common.cron.GenericCronData + nil, // 7: resources.common.cron.GenericCronData.AttributesEntry + (*timestamp.Timestamp)(nil), // 8: resources.timestamp.Timestamp + (*durationpb.Duration)(nil), // 9: google.protobuf.Duration + (*anypb.Any)(nil), // 10: google.protobuf.Any } var file_resources_common_cron_cron_proto_depIdxs = []int32{ 0, // 0: resources.common.cron.Cronjob.state:type_name -> resources.common.cron.CronjobState - 6, // 1: resources.common.cron.Cronjob.next_schedule_time:type_name -> resources.timestamp.Timestamp - 6, // 2: resources.common.cron.Cronjob.last_attempt_time:type_name -> resources.timestamp.Timestamp - 2, // 3: resources.common.cron.Cronjob.data:type_name -> resources.common.cron.CronjobData - 6, // 4: resources.common.cron.CronjobData.updated_at:type_name -> resources.timestamp.Timestamp - 7, // 5: resources.common.cron.CronjobData.data:type_name -> google.protobuf.Any - 6, // 6: resources.common.cron.CronjobLockOwnerState.updated_at:type_name -> resources.timestamp.Timestamp - 1, // 7: resources.common.cron.CronjobSchedulerEvent.cronjob:type_name -> resources.common.cron.Cronjob - 6, // 8: resources.common.cron.CronjobCompletedEvent.endDate:type_name -> resources.timestamp.Timestamp - 8, // 9: resources.common.cron.CronjobCompletedEvent.elapsed:type_name -> google.protobuf.Duration - 2, // 10: resources.common.cron.CronjobCompletedEvent.data:type_name -> resources.common.cron.CronjobData - 11, // [11:11] is the sub-list for method output_type - 11, // [11:11] is the sub-list for method input_type - 11, // [11:11] is the sub-list for extension type_name - 11, // [11:11] is the sub-list for extension extendee - 0, // [0:11] is the sub-list for field type_name + 8, // 1: resources.common.cron.Cronjob.next_schedule_time:type_name -> resources.timestamp.Timestamp + 8, // 2: resources.common.cron.Cronjob.last_attempt_time:type_name -> resources.timestamp.Timestamp + 8, // 3: resources.common.cron.Cronjob.started_time:type_name -> resources.timestamp.Timestamp + 9, // 4: resources.common.cron.Cronjob.timeout:type_name -> google.protobuf.Duration + 2, // 5: resources.common.cron.Cronjob.data:type_name -> resources.common.cron.CronjobData + 8, // 6: resources.common.cron.CronjobData.updated_at:type_name -> resources.timestamp.Timestamp + 10, // 7: resources.common.cron.CronjobData.data:type_name -> google.protobuf.Any + 8, // 8: resources.common.cron.CronjobLockOwnerState.updated_at:type_name -> resources.timestamp.Timestamp + 1, // 9: resources.common.cron.CronjobSchedulerEvent.cronjob:type_name -> resources.common.cron.Cronjob + 8, // 10: resources.common.cron.CronjobCompletedEvent.endDate:type_name -> resources.timestamp.Timestamp + 9, // 11: resources.common.cron.CronjobCompletedEvent.elapsed:type_name -> google.protobuf.Duration + 2, // 12: resources.common.cron.CronjobCompletedEvent.data:type_name -> resources.common.cron.CronjobData + 7, // 13: resources.common.cron.GenericCronData.attributes:type_name -> resources.common.cron.GenericCronData.AttributesEntry + 14, // [14:14] is the sub-list for method output_type + 14, // [14:14] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_resources_common_cron_cron_proto_init() } @@ -522,6 +611,7 @@ func file_resources_common_cron_cron_proto_init() { if File_resources_common_cron_cron_proto != nil { return } + file_resources_common_cron_cron_proto_msgTypes[0].OneofWrappers = []any{} file_resources_common_cron_cron_proto_msgTypes[1].OneofWrappers = []any{} file_resources_common_cron_cron_proto_msgTypes[4].OneofWrappers = []any{} type x struct{} @@ -530,7 +620,7 @@ func file_resources_common_cron_cron_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_resources_common_cron_cron_proto_rawDesc, NumEnums: 1, - NumMessages: 5, + NumMessages: 7, NumExtensions: 0, NumServices: 0, }, diff --git a/gen/go/proto/resources/common/cron/cron.pb.validate.go b/gen/go/proto/resources/common/cron/cron.pb.validate.go index d01263471..17f7daa77 100644 --- a/gen/go/proto/resources/common/cron/cron.pb.validate.go +++ b/gen/go/proto/resources/common/cron/cron.pb.validate.go @@ -92,11 +92,11 @@ func (m *Cronjob) validate(all bool) error { } if all { - switch v := interface{}(m.GetLastAttemptTime()).(type) { + switch v := interface{}(m.GetData()).(type) { case interface{ ValidateAll() error }: if err := v.ValidateAll(); err != nil { errors = append(errors, CronjobValidationError{ - field: "LastAttemptTime", + field: "Data", reason: "embedded message failed validation", cause: err, }) @@ -104,49 +104,119 @@ func (m *Cronjob) validate(all bool) error { case interface{ Validate() error }: if err := v.Validate(); err != nil { errors = append(errors, CronjobValidationError{ - field: "LastAttemptTime", + field: "Data", reason: "embedded message failed validation", cause: err, }) } } - } else if v, ok := interface{}(m.GetLastAttemptTime()).(interface{ Validate() error }); ok { + } else if v, ok := interface{}(m.GetData()).(interface{ Validate() error }); ok { if err := v.Validate(); err != nil { return CronjobValidationError{ - field: "LastAttemptTime", + field: "Data", reason: "embedded message failed validation", cause: err, } } } - if all { - switch v := interface{}(m.GetData()).(type) { - case interface{ ValidateAll() error }: - if err := v.ValidateAll(); err != nil { - errors = append(errors, CronjobValidationError{ - field: "Data", + if m.LastAttemptTime != nil { + + if all { + switch v := interface{}(m.GetLastAttemptTime()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, CronjobValidationError{ + field: "LastAttemptTime", + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, CronjobValidationError{ + field: "LastAttemptTime", + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(m.GetLastAttemptTime()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return CronjobValidationError{ + field: "LastAttemptTime", reason: "embedded message failed validation", cause: err, - }) + } } - case interface{ Validate() error }: + } + + } + + if m.StartedTime != nil { + + if all { + switch v := interface{}(m.GetStartedTime()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, CronjobValidationError{ + field: "StartedTime", + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, CronjobValidationError{ + field: "StartedTime", + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(m.GetStartedTime()).(interface{ Validate() error }); ok { if err := v.Validate(); err != nil { - errors = append(errors, CronjobValidationError{ - field: "Data", + return CronjobValidationError{ + field: "StartedTime", reason: "embedded message failed validation", cause: err, - }) + } } } - } else if v, ok := interface{}(m.GetData()).(interface{ Validate() error }); ok { - if err := v.Validate(); err != nil { - return CronjobValidationError{ - field: "Data", - reason: "embedded message failed validation", - cause: err, + + } + + if m.Timeout != nil { + + if all { + switch v := interface{}(m.GetTimeout()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, CronjobValidationError{ + field: "Timeout", + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, CronjobValidationError{ + field: "Timeout", + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(m.GetTimeout()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return CronjobValidationError{ + field: "Timeout", + reason: "embedded message failed validation", + cause: err, + } } } + } if len(errors) > 0 { @@ -847,3 +917,105 @@ var _ interface { Cause() error ErrorName() string } = CronjobCompletedEventValidationError{} + +// Validate checks the field values on GenericCronData with the rules defined +// in the proto definition for this message. If any rules are violated, the +// first error encountered is returned, or nil if there are no violations. +func (m *GenericCronData) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on GenericCronData with the rules +// defined in the proto definition for this message. If any rules are +// violated, the result is a list of violation errors wrapped in +// GenericCronDataMultiError, or nil if none found. +func (m *GenericCronData) ValidateAll() error { + return m.validate(true) +} + +func (m *GenericCronData) validate(all bool) error { + if m == nil { + return nil + } + + var errors []error + + // no validation rules for Attributes + + if len(errors) > 0 { + return GenericCronDataMultiError(errors) + } + + return nil +} + +// GenericCronDataMultiError is an error wrapping multiple validation errors +// returned by GenericCronData.ValidateAll() if the designated constraints +// aren't met. +type GenericCronDataMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m GenericCronDataMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m GenericCronDataMultiError) AllErrors() []error { return m } + +// GenericCronDataValidationError is the validation error returned by +// GenericCronData.Validate if the designated constraints aren't met. +type GenericCronDataValidationError struct { + field string + reason string + cause error + key bool +} + +// Field function returns field value. +func (e GenericCronDataValidationError) Field() string { return e.field } + +// Reason function returns reason value. +func (e GenericCronDataValidationError) Reason() string { return e.reason } + +// Cause function returns cause value. +func (e GenericCronDataValidationError) Cause() error { return e.cause } + +// Key function returns key value. +func (e GenericCronDataValidationError) Key() bool { return e.key } + +// ErrorName returns error name. +func (e GenericCronDataValidationError) ErrorName() string { return "GenericCronDataValidationError" } + +// Error satisfies the builtin error interface +func (e GenericCronDataValidationError) Error() string { + cause := "" + if e.cause != nil { + cause = fmt.Sprintf(" | caused by: %v", e.cause) + } + + key := "" + if e.key { + key = "key for " + } + + return fmt.Sprintf( + "invalid %sGenericCronData.%s: %s%s", + key, + e.field, + e.reason, + cause) +} + +var _ error = GenericCronDataValidationError{} + +var _ interface { + Field() string + Reason() string + Key() bool + Cause() error + ErrorName() string +} = GenericCronDataValidationError{} diff --git a/gen/go/proto/services/calendar/calendar.go b/gen/go/proto/services/calendar/calendar.go index c88045ef3..9f02ac61a 100644 --- a/gen/go/proto/services/calendar/calendar.go +++ b/gen/go/proto/services/calendar/calendar.go @@ -7,6 +7,7 @@ import ( "github.com/fivenet-app/fivenet/pkg/access" "github.com/fivenet-app/fivenet/pkg/config/appconfig" "github.com/fivenet-app/fivenet/pkg/events" + "github.com/fivenet-app/fivenet/pkg/housekeeper" "github.com/fivenet-app/fivenet/pkg/mstlystcdata" "github.com/fivenet-app/fivenet/pkg/notifi" "github.com/fivenet-app/fivenet/pkg/perms" @@ -33,6 +34,20 @@ var ( tUserProps = table.FivenetUserProps ) +func init() { + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetCalendar, + TimestampColumn: table.FivenetCalendar.DeletedAt, + MinDays: 60, + }) + + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetCalendarEntries, + TimestampColumn: table.FivenetCalendarEntries.DeletedAt, + MinDays: 60, + }) +} + type Server struct { CalendarServiceServer diff --git a/gen/go/proto/services/docstore/docstore.go b/gen/go/proto/services/docstore/docstore.go index ec39bb560..84feba51f 100644 --- a/gen/go/proto/services/docstore/docstore.go +++ b/gen/go/proto/services/docstore/docstore.go @@ -16,6 +16,7 @@ import ( "github.com/fivenet-app/fivenet/pkg/grpc/auth" "github.com/fivenet-app/fivenet/pkg/grpc/auth/userinfo" "github.com/fivenet-app/fivenet/pkg/grpc/errswrap" + "github.com/fivenet-app/fivenet/pkg/housekeeper" "github.com/fivenet-app/fivenet/pkg/html/htmldiffer" "github.com/fivenet-app/fivenet/pkg/html/htmlsanitizer" "github.com/fivenet-app/fivenet/pkg/mstlystcdata" @@ -38,6 +39,8 @@ import ( const ( DocsDefaultPageSize = 16 DocShortContentLength = 128 + + housekeeperMinDays = 60 ) var ( @@ -50,6 +53,37 @@ var ( tDUserAccess = table.FivenetDocumentsUserAccess.AS("user_access") ) +func init() { + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetDocuments, + TimestampColumn: table.FivenetDocuments.DeletedAt, + MinDays: housekeeperMinDays, + }) + + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetDocumentsTemplates, + TimestampColumn: table.FivenetDocumentsTemplates.DeletedAt, + MinDays: housekeeperMinDays, + }) + + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetDocumentsComments, + TimestampColumn: table.FivenetDocumentsComments.DeletedAt, + MinDays: housekeeperMinDays, + }) + + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetDocumentsReferences, + TimestampColumn: table.FivenetDocumentsReferences.DeletedAt, + MinDays: housekeeperMinDays, + }) + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetDocumentsRelations, + TimestampColumn: table.FivenetDocumentsRelations.DeletedAt, + MinDays: housekeeperMinDays, + }) +} + type Server struct { DocStoreServiceServer diff --git a/gen/go/proto/services/docstore/workflow_state.go b/gen/go/proto/services/docstore/workflow_state.go index 718c936c3..a68943f85 100644 --- a/gen/go/proto/services/docstore/workflow_state.go +++ b/gen/go/proto/services/docstore/workflow_state.go @@ -59,7 +59,7 @@ type WorkflowParams struct { func NewWorkflow(p WorkflowParams) *Workflow { w := &Workflow{ - logger: p.Logger, + logger: p.Logger.Named("docstore.workflow"), tracer: p.TP.Tracer("docstore_workflow"), db: p.DB, notif: p.Notif, @@ -71,7 +71,7 @@ func NewWorkflow(p WorkflowParams) *Workflow { p.LC.Append(fx.StartHook(func(ctx context.Context) error { if err := p.Cron.RegisterCronjob(ctx, &cron.Cronjob{ Name: "docstore.workflow_run", - Schedule: "*/15 * * * * *", // Every minute + Schedule: "@always", // Every minute }); err != nil { return err } diff --git a/gen/go/proto/services/mailer/mailer.go b/gen/go/proto/services/mailer/mailer.go index 3afe14f2b..81bd4bbc1 100644 --- a/gen/go/proto/services/mailer/mailer.go +++ b/gen/go/proto/services/mailer/mailer.go @@ -6,6 +6,7 @@ import ( "github.com/fivenet-app/fivenet/gen/go/proto/resources/mailer" "github.com/fivenet-app/fivenet/pkg/access" "github.com/fivenet-app/fivenet/pkg/events" + "github.com/fivenet-app/fivenet/pkg/housekeeper" "github.com/fivenet-app/fivenet/pkg/mstlystcdata" "github.com/fivenet-app/fivenet/pkg/perms" "github.com/fivenet-app/fivenet/pkg/server/audit" @@ -14,6 +15,32 @@ import ( "google.golang.org/grpc" ) +func init() { + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetMailerEmails, + TimestampColumn: table.FivenetMailerEmails.DeletedAt, + MinDays: 60, + }) + + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetMailerThreads, + TimestampColumn: table.FivenetMailerThreads.DeletedAt, + MinDays: 60, + }) + + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetMailerMessages, + TimestampColumn: table.FivenetMailerMessages.DeletedAt, + MinDays: 60, + }) + + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetMailerTemplates, + TimestampColumn: table.FivenetMailerTemplates.DeletedAt, + MinDays: 60, + }) +} + type Server struct { MailerServiceServer diff --git a/gen/go/proto/services/qualifications/qualifications.go b/gen/go/proto/services/qualifications/qualifications.go index 267a47a61..aef55c6ce 100644 --- a/gen/go/proto/services/qualifications/qualifications.go +++ b/gen/go/proto/services/qualifications/qualifications.go @@ -16,6 +16,7 @@ import ( "github.com/fivenet-app/fivenet/pkg/config" "github.com/fivenet-app/fivenet/pkg/grpc/auth" "github.com/fivenet-app/fivenet/pkg/grpc/errswrap" + "github.com/fivenet-app/fivenet/pkg/housekeeper" "github.com/fivenet-app/fivenet/pkg/mstlystcdata" "github.com/fivenet-app/fivenet/pkg/notifi" "github.com/fivenet-app/fivenet/pkg/perms" @@ -39,6 +40,32 @@ const ( QualificationsLabelDefaultFormat = "%abbr%: %name%" ) +func init() { + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetQualifications, + TimestampColumn: table.FivenetQualifications.DeletedAt, + MinDays: 30, + }) + + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetQualificationsExamUsers, + TimestampColumn: table.FivenetQualificationsExamUsers.EndsAt, + MinDays: 30, + }) + + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetQualificationsRequests, + TimestampColumn: table.FivenetQualificationsRequests.DeletedAt, + MinDays: 30, + }) + + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetQualificationsResults, + TimestampColumn: table.FivenetQualificationsResults.DeletedAt, + MinDays: 30, + }) +} + var tQuali = table.FivenetQualifications.AS("qualification") type Server struct { diff --git a/gen/go/proto/services/wiki/wiki.go b/gen/go/proto/services/wiki/wiki.go index d56518882..abf9e0f90 100644 --- a/gen/go/proto/services/wiki/wiki.go +++ b/gen/go/proto/services/wiki/wiki.go @@ -16,6 +16,7 @@ import ( "github.com/fivenet-app/fivenet/pkg/grpc/auth" "github.com/fivenet-app/fivenet/pkg/grpc/auth/userinfo" "github.com/fivenet-app/fivenet/pkg/grpc/errswrap" + "github.com/fivenet-app/fivenet/pkg/housekeeper" "github.com/fivenet-app/fivenet/pkg/html/htmldiffer" "github.com/fivenet-app/fivenet/pkg/mstlystcdata" "github.com/fivenet-app/fivenet/pkg/perms" @@ -36,6 +37,14 @@ import ( const defaultWikiUpperLimit = 250 +func init() { + housekeeper.AddTable(&housekeeper.Table{ + Table: table.FivenetWikiPages, + TimestampColumn: table.FivenetWikiPages.DeletedAt, + MinDays: 60, + }) +} + var ( tPage = table.FivenetWikiPages.AS("page") tPageShort = table.FivenetWikiPages.AS("pageshort") diff --git a/gen/grpc-api.md b/gen/grpc-api.md index bd1d3d426..532c45fc7 100644 --- a/gen/grpc-api.md +++ b/gen/grpc-api.md @@ -79,6 +79,8 @@ - [CronjobData](#resources-common-cron-CronjobData) - [CronjobLockOwnerState](#resources-common-cron-CronjobLockOwnerState) - [CronjobSchedulerEvent](#resources-common-cron-CronjobSchedulerEvent) + - [GenericCronData](#resources-common-cron-GenericCronData) + - [GenericCronData.AttributesEntry](#resources-common-cron-GenericCronData-AttributesEntry) - [CronjobState](#resources-common-cron-CronjobState) @@ -1855,7 +1857,9 @@ | schedule | [string](#string) | | | | state | [CronjobState](#resources-common-cron-CronjobState) | | | | next_schedule_time | [resources.timestamp.Timestamp](#resources-timestamp-Timestamp) | | | -| last_attempt_time | [resources.timestamp.Timestamp](#resources-timestamp-Timestamp) | | | +| last_attempt_time | [resources.timestamp.Timestamp](#resources-timestamp-Timestamp) | optional | | +| started_time | [resources.timestamp.Timestamp](#resources-timestamp-Timestamp) | optional | | +| timeout | [google.protobuf.Duration](#google-protobuf-Duration) | optional | | | data | [CronjobData](#resources-common-cron-CronjobData) | | | @@ -1928,6 +1932,37 @@ + + + +### GenericCronData + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| attributes | [GenericCronData.AttributesEntry](#resources-common-cron-GenericCronData-AttributesEntry) | repeated | | + + + + + + + + +### GenericCronData.AttributesEntry + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| key | [string](#string) | | | +| value | [string](#string) | | | + + + + + diff --git a/gen/ts/resources/common/cron/cron.ts b/gen/ts/resources/common/cron/cron.ts index 2f256eec1..28c6a3a07 100644 --- a/gen/ts/resources/common/cron/cron.ts +++ b/gen/ts/resources/common/cron/cron.ts @@ -10,8 +10,8 @@ import { UnknownFieldHandler } from "@protobuf-ts/runtime"; import type { PartialMessage } from "@protobuf-ts/runtime"; import { reflectionMergePartial } from "@protobuf-ts/runtime"; import { MessageType } from "@protobuf-ts/runtime"; -import { Duration } from "../../../google/protobuf/duration"; import { Any } from "../../../google/protobuf/any"; +import { Duration } from "../../../google/protobuf/duration"; import { Timestamp } from "../../timestamp/timestamp"; /** * @generated from protobuf message resources.common.cron.Cronjob @@ -34,11 +34,19 @@ export interface Cronjob { */ nextScheduleTime?: Timestamp; /** - * @generated from protobuf field: resources.timestamp.Timestamp last_attempt_time = 5; + * @generated from protobuf field: optional resources.timestamp.Timestamp last_attempt_time = 5; */ lastAttemptTime?: Timestamp; /** - * @generated from protobuf field: resources.common.cron.CronjobData data = 6; + * @generated from protobuf field: optional resources.timestamp.Timestamp started_time = 6; + */ + startedTime?: Timestamp; + /** + * @generated from protobuf field: optional google.protobuf.Duration timeout = 7; + */ + timeout?: Duration; + /** + * @generated from protobuf field: resources.common.cron.CronjobData data = 8; */ data?: CronjobData; } @@ -102,6 +110,17 @@ export interface CronjobCompletedEvent { */ data?: CronjobData; } +/** + * @generated from protobuf message resources.common.cron.GenericCronData + */ +export interface GenericCronData { + /** + * @generated from protobuf field: map attributes = 1; + */ + attributes: { + [key: string]: string; + }; +} /** * @generated from protobuf enum resources.common.cron.CronjobState */ @@ -132,7 +151,9 @@ class Cronjob$Type extends MessageType { { no: 3, name: "state", kind: "enum", T: () => ["resources.common.cron.CronjobState", CronjobState, "CRONJOB_STATE_"] }, { no: 4, name: "next_schedule_time", kind: "message", T: () => Timestamp }, { no: 5, name: "last_attempt_time", kind: "message", T: () => Timestamp }, - { no: 6, name: "data", kind: "message", T: () => CronjobData } + { no: 6, name: "started_time", kind: "message", T: () => Timestamp }, + { no: 7, name: "timeout", kind: "message", T: () => Duration }, + { no: 8, name: "data", kind: "message", T: () => CronjobData } ]); } create(value?: PartialMessage): Cronjob { @@ -161,10 +182,16 @@ class Cronjob$Type extends MessageType { case /* resources.timestamp.Timestamp next_schedule_time */ 4: message.nextScheduleTime = Timestamp.internalBinaryRead(reader, reader.uint32(), options, message.nextScheduleTime); break; - case /* resources.timestamp.Timestamp last_attempt_time */ 5: + case /* optional resources.timestamp.Timestamp last_attempt_time */ 5: message.lastAttemptTime = Timestamp.internalBinaryRead(reader, reader.uint32(), options, message.lastAttemptTime); break; - case /* resources.common.cron.CronjobData data */ 6: + case /* optional resources.timestamp.Timestamp started_time */ 6: + message.startedTime = Timestamp.internalBinaryRead(reader, reader.uint32(), options, message.startedTime); + break; + case /* optional google.protobuf.Duration timeout */ 7: + message.timeout = Duration.internalBinaryRead(reader, reader.uint32(), options, message.timeout); + break; + case /* resources.common.cron.CronjobData data */ 8: message.data = CronjobData.internalBinaryRead(reader, reader.uint32(), options, message.data); break; default: @@ -191,12 +218,18 @@ class Cronjob$Type extends MessageType { /* resources.timestamp.Timestamp next_schedule_time = 4; */ if (message.nextScheduleTime) Timestamp.internalBinaryWrite(message.nextScheduleTime, writer.tag(4, WireType.LengthDelimited).fork(), options).join(); - /* resources.timestamp.Timestamp last_attempt_time = 5; */ + /* optional resources.timestamp.Timestamp last_attempt_time = 5; */ if (message.lastAttemptTime) Timestamp.internalBinaryWrite(message.lastAttemptTime, writer.tag(5, WireType.LengthDelimited).fork(), options).join(); - /* resources.common.cron.CronjobData data = 6; */ + /* optional resources.timestamp.Timestamp started_time = 6; */ + if (message.startedTime) + Timestamp.internalBinaryWrite(message.startedTime, writer.tag(6, WireType.LengthDelimited).fork(), options).join(); + /* optional google.protobuf.Duration timeout = 7; */ + if (message.timeout) + Duration.internalBinaryWrite(message.timeout, writer.tag(7, WireType.LengthDelimited).fork(), options).join(); + /* resources.common.cron.CronjobData data = 8; */ if (message.data) - CronjobData.internalBinaryWrite(message.data, writer.tag(6, WireType.LengthDelimited).fork(), options).join(); + CronjobData.internalBinaryWrite(message.data, writer.tag(8, WireType.LengthDelimited).fork(), options).join(); let u = options.writeUnknownFields; if (u !== false) (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); @@ -436,3 +469,66 @@ class CronjobCompletedEvent$Type extends MessageType { * @generated MessageType for protobuf message resources.common.cron.CronjobCompletedEvent */ export const CronjobCompletedEvent = new CronjobCompletedEvent$Type(); +// @generated message type with reflection information, may provide speed optimized methods +class GenericCronData$Type extends MessageType { + constructor() { + super("resources.common.cron.GenericCronData", [ + { no: 1, name: "attributes", kind: "map", K: 9 /*ScalarType.STRING*/, V: { kind: "scalar", T: 9 /*ScalarType.STRING*/ } } + ]); + } + create(value?: PartialMessage): GenericCronData { + const message = globalThis.Object.create((this.messagePrototype!)); + message.attributes = {}; + if (value !== undefined) + reflectionMergePartial(this, message, value); + return message; + } + internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: GenericCronData): GenericCronData { + let message = target ?? this.create(), end = reader.pos + length; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case /* map attributes */ 1: + this.binaryReadMap1(message.attributes, reader, options); + break; + default: + let u = options.readUnknownField; + if (u === "throw") + throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); + let d = reader.skip(wireType); + if (u !== false) + (u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); + } + } + return message; + } + private binaryReadMap1(map: GenericCronData["attributes"], reader: IBinaryReader, options: BinaryReadOptions): void { + let len = reader.uint32(), end = reader.pos + len, key: keyof GenericCronData["attributes"] | undefined, val: GenericCronData["attributes"][any] | undefined; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case 1: + key = reader.string(); + break; + case 2: + val = reader.string(); + break; + default: throw new globalThis.Error("unknown map entry field for field resources.common.cron.GenericCronData.attributes"); + } + } + map[key ?? ""] = val ?? ""; + } + internalBinaryWrite(message: GenericCronData, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter { + /* map attributes = 1; */ + for (let k of globalThis.Object.keys(message.attributes)) + writer.tag(1, WireType.LengthDelimited).fork().tag(1, WireType.LengthDelimited).string(k).tag(2, WireType.LengthDelimited).string(message.attributes[k]).join(); + let u = options.writeUnknownFields; + if (u !== false) + (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); + return writer; + } +} +/** + * @generated MessageType for protobuf message resources.common.cron.GenericCronData + */ +export const GenericCronData = new GenericCronData$Type(); diff --git a/main.go b/main.go index e368781fc..0c4ef5b71 100644 --- a/main.go +++ b/main.go @@ -43,6 +43,7 @@ import ( "github.com/fivenet-app/fivenet/pkg/grpc" "github.com/fivenet-app/fivenet/pkg/grpc/auth" "github.com/fivenet-app/fivenet/pkg/grpc/auth/userinfo" + "github.com/fivenet-app/fivenet/pkg/housekeeper" "github.com/fivenet-app/fivenet/pkg/html/htmldiffer" "github.com/fivenet-app/fivenet/pkg/html/htmlsanitizer" "github.com/fivenet-app/fivenet/pkg/lang" @@ -90,6 +91,7 @@ type WorkerCmd struct { ModuleUserTracker bool `help:"Start User tracker module" default:"true"` ModuleJobsTimeclock bool `help:"Start Jobs timeclock housekeeper module" default:"true"` ModuleDocsWorkflow bool `help:"Start Docstore Workflow module" default:"true"` + ModuleHousekeeper bool `help:"Start Housekeepr module" default:"true"` } func (c *WorkerCmd) Run(ctx *Context) error { @@ -113,6 +115,9 @@ func (c *WorkerCmd) Run(ctx *Context) error { if c.ModuleDocsWorkflow { fxOpts = append(fxOpts, fx.Invoke(func(*pbdocstore.Workflow) {})) } + if c.ModuleHousekeeper { + fxOpts = append(fxOpts, fx.Invoke(func(*housekeeper.Housekeeper) {})) + } // Only run cron agent in worker fxOpts = append(fxOpts, fx.Invoke(func(*croner.Agent) {})) @@ -186,6 +191,7 @@ func getFxBaseOpts(startTimeout time.Duration) []fx.Option { server.HTTPServerModule, centrumstate.StateModule, storage.Module, + housekeeper.Module, fx.Provide( mstlystcdata.NewCache, diff --git a/pkg/croner/cron.go b/pkg/croner/cron.go index 1b99b6a51..a9f8747df 100644 --- a/pkg/croner/cron.go +++ b/pkg/croner/cron.go @@ -11,6 +11,7 @@ import ( "github.com/adhocore/gronx" "github.com/fivenet-app/fivenet/gen/go/proto/resources/common/cron" + "github.com/fivenet-app/fivenet/gen/go/proto/resources/timestamp" "github.com/fivenet-app/fivenet/pkg/config" "github.com/fivenet-app/fivenet/pkg/events" "github.com/fivenet-app/fivenet/pkg/nats/locks" @@ -121,7 +122,6 @@ func (c *Cron) RegisterCronjob(ctx context.Context, job *cron.Cronjob) error { if err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) { return fmt.Errorf("failed to load existing cron job %s. %w", job.Name, err) } - if cj != nil { // Merge with existing cronjob data cj.Merge(job) @@ -129,6 +129,18 @@ func (c *Cron) RegisterCronjob(ctx context.Context, job *cron.Cronjob) error { cj = job } + if cj.State == cron.CronjobState_CRONJOB_STATE_UNSPECIFIED { + cj.State = cron.CronjobState_CRONJOB_STATE_PENDING + } + + if cj.NextScheduleTime == nil { + nextTime, err := gronx.NextTick(cj.Schedule, false) + if err != nil { + return err + } + cj.NextScheduleTime = timestamp.New(nextTime) + } + if cj.Data == nil { cj.Data = &cron.CronjobData{} } diff --git a/pkg/croner/scheduler.go b/pkg/croner/scheduler.go index ac3bd3653..117bf3527 100644 --- a/pkg/croner/scheduler.go +++ b/pkg/croner/scheduler.go @@ -153,6 +153,12 @@ func (s *Scheduler) start(ctx context.Context) { return true } + // Check if the cron job is already/still running and under the timeout check + if job.StartedTime != nil && (job.State == cron.CronjobState_CRONJOB_STATE_RUNNING && + time.Since(job.StartedTime.AsTime()) <= job.GetRunTimeout()) { + return true + } + ok, err := s.gron.IsDue(job.Schedule, t) if err != nil { s.logger.Error("failed to chek cron job due time", zap.String("job_name", key), zap.String("schedule", job.Schedule)) @@ -165,9 +171,22 @@ func (s *Scheduler) start(ctx context.Context) { wg.Add(1) go func() { defer wg.Done() + if err := s.runCronjob(ctx, job); err != nil { s.logger.Error("failed to trigger cron job run", zap.String("job_name", job.Name)) } + + if err := s.store.ComputeUpdate(ctx, key, true, func(key string, existing *cron.Cronjob) (*cron.Cronjob, bool, error) { + if existing == nil { + return existing, false, nil + } + + existing.State = cron.CronjobState_CRONJOB_STATE_RUNNING + + return existing, true, nil + }); err != nil { + s.logger.Error("failed to update status of cron job", zap.String("job_name", job.Name)) + } }() return true @@ -238,13 +257,14 @@ func (s *Scheduler) watchForCompletions(msg jetstream.Msg) { return existing, false, nil } + existing.State = cron.CronjobState_CRONJOB_STATE_PENDING + nextTime, err := gronx.NextTick(existing.Schedule, false) if err != nil { return existing, false, err } - - existing.LastAttemptTime = timestamp.New(time.Now()) existing.NextScheduleTime = timestamp.New(nextTime) + existing.LastAttemptTime = timestamp.New(time.Now()) existing.Data.Merge(job.Data) diff --git a/pkg/housekeeper/housekeeper.go b/pkg/housekeeper/housekeeper.go index f0702544f..8c161ec8b 100644 --- a/pkg/housekeeper/housekeeper.go +++ b/pkg/housekeeper/housekeeper.go @@ -1,8 +1,20 @@ package housekeeper import ( + "context" + "database/sql" + "fmt" + "slices" + + "github.com/fivenet-app/fivenet/gen/go/proto/resources/common/cron" + "github.com/fivenet-app/fivenet/pkg/croner" jet "github.com/go-jet/jet/v2/mysql" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" "go.uber.org/fx" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" ) var Module = fx.Module("db_housekeeper", @@ -11,20 +23,137 @@ var Module = fx.Module("db_housekeeper", ), ) -type Table struct { - Table jet.Table - DeletedAt jet.Column - MinDays int -} +const DefaultRetentionDays = 30 -type Housekeeper struct{} +type Housekeeper struct { + logger *zap.Logger + tracer trace.Tracer + + db *sql.DB +} type Params struct { fx.In + + LC fx.Lifecycle + + Logger *zap.Logger + DB *sql.DB + TP *tracesdk.TracerProvider + + Cron croner.ICron + CronHandlers *croner.Handlers } +const lastTableMapIndex = "last_key" + func New(p Params) *Housekeeper { - h := &Housekeeper{} + h := &Housekeeper{ + logger: p.Logger.Named("housekeeper"), + tracer: p.TP.Tracer("housekeeper"), + db: p.DB, + } + + p.LC.Append(fx.StartHook(func(ctx context.Context) error { + if err := p.Cron.RegisterCronjob(ctx, &cron.Cronjob{ + Name: "housekeeper.run", + Schedule: "*/20 * * * * * *", // Every 5 minutes + }); err != nil { + return err + } + + return nil + })) + + p.CronHandlers.Add("housekeeper.run", func(ctx context.Context, data *cron.CronjobData) error { + ctx, span := h.tracer.Start(ctx, "housekeeper.run") + defer span.End() + + dest := &cron.GenericCronData{ + Attributes: map[string]string{}, + } + if data.Data == nil { + data.Data = &anypb.Any{} + } + + if err := anypb.UnmarshalTo(data.Data, dest, proto.UnmarshalOptions{}); err != nil { + h.logger.Error("failed to unmarshal document workflow cron data", zap.Error(err)) + } + + if err := h.runHousekeeper(ctx, dest); err != nil { + return fmt.Errorf("error during docstore workflow handling. %w", err) + } + + if err := data.Data.MarshalFrom(dest); err != nil { + return fmt.Errorf("failed to marshal updated document workflow cron data. %w", err) + } + + return nil + }) return h } + +func (h *Housekeeper) runHousekeeper(ctx context.Context, data *cron.GenericCronData) error { + keys := []string{} + for key := range tablesList { + keys = append(keys, key) + } + slices.Sort(keys) + if len(keys) == 0 { + return nil + } + + lastTblKey, ok := data.Attributes[lastTableMapIndex] + if !ok { + // Take first table + lastTblKey = keys[0] + } else { + idx := slices.Index(keys, lastTblKey) + if idx == -1 || len(keys) <= idx+1 { + h.logger.Warn("last table key not found in keys, falling back to first table") + lastTblKey = keys[0] + } else { + lastTblKey = keys[idx+1] + } + } + + tbl, ok := tablesList[lastTblKey] + if !ok { + return nil + } + + condition := jet.AND( + tbl.TimestampColumn.IS_NOT_NULL(), + tbl.TimestampColumn.LT_EQ( + jet.CURRENT_DATE().SUB(jet.INTERVAL(tbl.MinDays, jet.DAY)), + ), + ) + + if tbl.Condition != nil { + condition = condition.AND(tbl.Condition) + } + + stmt := tbl.Table. + DELETE(). + WHERE(condition). + LIMIT(2000) + + res, err := stmt.ExecContext(ctx, h.db) + if err != nil { + return err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return err + } + + if rowsAffected > 0 { + h.logger.Info("housekeeper run deleted rows", zap.String("table", lastTblKey), zap.Int64("rows", rowsAffected)) + } + + data.Attributes[lastTableMapIndex] = lastTblKey + + return nil +} diff --git a/pkg/housekeeper/tables.go b/pkg/housekeeper/tables.go new file mode 100644 index 000000000..553016954 --- /dev/null +++ b/pkg/housekeeper/tables.go @@ -0,0 +1,30 @@ +package housekeeper + +import ( + "sync" + + jet "github.com/go-jet/jet/v2/mysql" +) + +var ( + tablesMu = sync.Mutex{} + tablesList = map[string]*Table{} +) + +type Table struct { + Table jet.Table + TimestampColumn jet.ColumnTimestamp + Condition jet.BoolExpression + MinDays int +} + +func AddTable(tbl *Table) { + tablesMu.Lock() + defer tablesMu.Unlock() + + if tbl.MinDays < 30 { + tbl.MinDays = 30 + } + + tablesList[tbl.Table.TableName()] = tbl +} diff --git a/proto/resources/common/cron/cron.proto b/proto/resources/common/cron/cron.proto index 67bb348fb..554ba0c64 100644 --- a/proto/resources/common/cron/cron.proto +++ b/proto/resources/common/cron/cron.proto @@ -14,9 +14,12 @@ message Cronjob { CronjobState state = 3; resources.timestamp.Timestamp next_schedule_time = 4; - resources.timestamp.Timestamp last_attempt_time = 5; + optional resources.timestamp.Timestamp last_attempt_time = 5; + optional resources.timestamp.Timestamp started_time = 6; - CronjobData data = 6; + optional google.protobuf.Duration timeout = 7; + + CronjobData data = 8; } enum CronjobState { @@ -48,3 +51,7 @@ message CronjobCompletedEvent { optional CronjobData data = 5; } + +message GenericCronData { + map attributes = 1; +}