Skip to content

Commit

Permalink
Merge pull request #15 from openebs/dev
Browse files Browse the repository at this point in the history
Fixes for replica registration and rebuilding
  • Loading branch information
kmova authored Sep 27, 2017
2 parents 3d6d1c3 + 0c4f446 commit f311bcc
Show file tree
Hide file tree
Showing 18 changed files with 97 additions and 83 deletions.
12 changes: 2 additions & 10 deletions app/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,7 @@ func rmBackup(c *cli.Context) error {
return fmt.Errorf("Missing required parameter backup")
}

if err := task.RmBackup(backup); err != nil {
return err
}

return nil
return task.RmBackup(backup)
}

func restoreBackup(c *cli.Context) error {
Expand All @@ -137,11 +133,7 @@ func restoreBackup(c *cli.Context) error {
return fmt.Errorf("Missing required parameter backup")
}

if err := task.RestoreBackup(backup); err != nil {
return err
}

return nil
return task.RestoreBackup(backup)
}

func inspectBackup(c *cli.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion backend/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (f *Wrapper) SetRevisionCounter(counter int64) error {
return nil
}

func (f *Wrapper) UpdatePeerDetails(replicaCount int64, quorumReplicaCount int64) error {
func (f *Wrapper) UpdatePeerDetails(replicaCount int, quorumReplicaCount int) error {
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions backend/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func (r *Remote) GetRevisionCounter() (int64, error) {
if replica.State != "open" && replica.State != "dirty" {
return 0, fmt.Errorf("Invalid state %v for getting revision counter", replica.State)
}
return replica.RevisionCounter, nil
counter, _ := strconv.ParseInt(replica.RevisionCounter, 10, 64)
return counter, nil
}

func (r *Remote) GetVolUsage() (types.VolUsage, error) {
Expand Down Expand Up @@ -175,10 +176,11 @@ func (r *Remote) GetVolUsage() (types.VolUsage, error) {

func (r *Remote) SetRevisionCounter(counter int64) error {
logrus.Infof("Set revision counter of %s to : %v", r.name, counter)
return r.doAction("setrevisioncounter", &map[string]int64{"counter": counter})
localRevCount := strconv.FormatInt(counter, 10)
return r.doAction("setrevisioncounter", &map[string]string{"counter": localRevCount})
}

func (r *Remote) UpdatePeerDetails(replicaCount int64, quorumReplicaCount int64) error {
func (r *Remote) UpdatePeerDetails(replicaCount int, quorumReplicaCount int) error {
logrus.Infof("Update peer details of %s ", r.name)
return r.doAction("updatepeerdetails",
&map[string]interface{}{
Expand Down
10 changes: 2 additions & 8 deletions backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,7 @@ func doBackupDelete(c *cli.Context) error {
}
backupURL = UnescapeURL(backupURL)

if err := backupstore.DeleteDeltaBlockBackup(backupURL); err != nil {
return err
}
return nil
return backupstore.DeleteDeltaBlockBackup(backupURL)
}

func cmdBackupRestore(c *cli.Context) {
Expand Down Expand Up @@ -269,10 +266,7 @@ func doBackupRestore(c *cli.Context) error {
return err
}

if err := createNewSnapshotMetafile(toFile + ".meta"); err != nil {
return err
}
return nil
return createNewSnapshotMetafile(toFile + ".meta")
}

func createNewSnapshotMetafile(file string) error {
Expand Down
16 changes: 9 additions & 7 deletions controller/client/controller_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -187,13 +188,14 @@ func (c *ControllerClient) GetVolume() (*rest.Volume, error) {
return &volumes.Data[0], nil
}

func (c *ControllerClient) Register(address string, revisionCount int64, peerDetails types.PeerDetails, replicaType string, upTime time.Duration) error {
err := c.post("/register", &types.RegReplica{
Address: address,
RevCount: revisionCount,
PeerDetail: peerDetails,
RepType: replicaType,
UpTime: upTime,
func (c *ControllerClient) Register(address string, revisionCount int64, peerDetails types.PeerDetails, replicaType string, upTime time.Duration, state string) error {
err := c.post("/register", &rest.RegReplica{
Address: address,
RevCount: strconv.FormatInt(revisionCount, 10),
PeerDetails: peerDetails,
RepType: replicaType,
UpTime: upTime,
RepState: state,
}, nil)
return err
}
Expand Down
13 changes: 8 additions & 5 deletions controller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ type Controller struct {
size int64
sectorSize int64
replicas []types.Replica
replicaCount int64
replicaCount int
quorumReplicas []types.Replica
quorumReplicaCount int64
quorumReplicaCount int
factory types.BackendFactory
backend *replicator
frontend types.Frontend
Expand Down Expand Up @@ -220,6 +220,10 @@ func (c *Controller) registerReplica(register types.RegReplica) error {
return nil
}
}
fmt.Println("STATE =", register.RepState)
if register.RepState == "rebuilding" {
return nil
}

if c.MaxRevReplica == "" {
c.MaxRevReplica = register.Address
Expand All @@ -229,13 +233,13 @@ func (c *Controller) registerReplica(register types.RegReplica) error {
c.MaxRevReplica = register.Address
}

if ((int64)(len(c.RegisteredReplicas)) >= c.replicaCount/2) && ((int64)(len(c.RegisteredReplicas)+len(c.RegisteredQuorumReplicas)) > (c.quorumReplicaCount+c.replicaCount)/2) {
if ((len(c.RegisteredReplicas)) >= c.replicaCount/2) && ((len(c.RegisteredReplicas) + len(c.RegisteredQuorumReplicas)) > (c.quorumReplicaCount+c.replicaCount)/2) {
c.signalToAdd()
c.StartSignalled = true
return nil
}

if c.RegisteredReplicas[c.MaxRevReplica].PeerDetail.ReplicaCount == 0 {
if c.RegisteredReplicas[c.MaxRevReplica].PeerDetail.ReplicaCount <= 1 {
c.signalToAdd()
c.StartSignalled = true
return nil
Expand Down Expand Up @@ -392,7 +396,6 @@ func (c *Controller) addReplicaNoLock(newBackend types.Backend, address string,
Address: address,
Mode: types.WO,
})
c.replicaCount++

c.backend.AddBackend(address, newBackend)

Expand Down
7 changes: 7 additions & 0 deletions controller/rebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ func (c *Controller) VerifyRebuildReplica(address string) error {
}
logrus.Debugf("WO replica %v's chain verified, update mode to RW", address)
c.setReplicaModeNoLock(address, types.RW)
if len(c.replicas) > c.replicaCount {
c.replicaCount = len(c.replicas)
}
if len(c.quorumReplicas) > c.quorumReplicaCount {
c.quorumReplicaCount = len(c.quorumReplicas)
}
c.backend.UpdatePeerDetails(c.replicaCount, c.quorumReplicaCount)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion controller/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (r *replicator) SetQuorumRevisionCounter(address string, counter int64) err
return nil
}

func (r *replicator) UpdatePeerDetails(replicaCount int64, quorumReplicaCount int64) error {
func (r *replicator) UpdatePeerDetails(replicaCount int, quorumReplicaCount int) error {

for _, backend := range r.backends {
if err := backend.backend.UpdatePeerDetails(replicaCount, quorumReplicaCount); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion controller/rest/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,10 @@ type PrepareRebuildOutput struct {
type RegReplica struct {
client.Resource
Address string `json:"Address"`
RevCount int64 `json:"RevCount"`
RevCount string `json:"RevCount"`
PeerDetails types.PeerDetails `json:"PeerDetails"`
RepType string `json:"RepType"`
RepState string `json:"RepState"`
UpTime time.Duration `json:"UpTime"`
}

Expand Down
14 changes: 8 additions & 6 deletions controller/rest/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rest

import (
"net/http"
"strconv"

"github.com/gorilla/mux"
"github.com/openebs/jiva/types"
Expand Down Expand Up @@ -39,18 +40,19 @@ func (s *Server) GetReplica(rw http.ResponseWriter, req *http.Request) error {
}

func (s *Server) RegisterReplica(rw http.ResponseWriter, req *http.Request) error {
var regReplica RegReplica
var (
regReplica RegReplica
localRevCount int64
)
apiContext := api.GetApiContext(req)
if err := apiContext.Read(&regReplica); err != nil {
return err
}

local := types.RegReplica{Address: regReplica.Address, RevCount: regReplica.RevCount, PeerDetail: regReplica.PeerDetails, RepType: regReplica.RepType, UpTime: regReplica.UpTime}
if err := s.c.RegisterReplica(local); err != nil {
return err
}
localRevCount, _ = strconv.ParseInt(regReplica.RevCount, 10, 64)
local := types.RegReplica{Address: regReplica.Address, RevCount: localRevCount, PeerDetail: regReplica.PeerDetails, RepType: regReplica.RepType, UpTime: regReplica.UpTime, RepState: regReplica.RepState}
return s.c.RegisterReplica(local)

return nil
}

func (s *Server) CreateReplica(rw http.ResponseWriter, req *http.Request) error {
Expand Down
5 changes: 1 addition & 4 deletions replica/peer_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ func (r *Replica) writePeerDetails(peerDetails types.PeerDetails) error {
if r.peerFile == nil {
return fmt.Errorf("BUG: peer file wasn't initialized")
}
if err := r.encodeToFile(&peerDetails, peerDetailsFile); err != nil {
return err
}
return nil
return r.encodeToFile(&peerDetails, peerDetailsFile)
}

func (r *Replica) openPeerFile(isCreate bool) error {
Expand Down
17 changes: 8 additions & 9 deletions replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ func CreateTempReplica() (*Replica, error) {
return r, nil
}

func CreateTempServer() (*Server, error) {
return &Server{
dir: Dir,
}, nil
}

func ReadInfo(dir string) (Info, error) {
var info Info
err := (&Replica{dir: dir}).unmarshalFile(volumeMetaData, &info)
Expand Down Expand Up @@ -297,10 +303,7 @@ func (r *Replica) Resize(obj interface{}) error {
}
}
r.info.Size = sizeInBytes
if err := r.encodeToFile(&r.info, volumeMetaData); err != nil {
return err
}
return nil
return r.encodeToFile(&r.info, volumeMetaData)
}

func (r *Replica) Reload() (*Replica, error) {
Expand Down Expand Up @@ -336,11 +339,7 @@ func (r *Replica) RemoveDiffDisk(name string) error {
return err
}

if err := r.rmDisk(name); err != nil {
return err
}

return nil
return r.rmDisk(name)
}

func (r *Replica) hardlinkDisk(target, source string) error {
Expand Down
14 changes: 7 additions & 7 deletions replica/rest/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ type Replica struct {
Chain []string `json:"chain"`
Disks map[string]replica.DiskInfo `json:"disks"`
RemainSnapshots int `json:"remainsnapshots"`
RevisionCounter int64 `json:"revisioncounter"`
RevisionCounter string `json:"revisioncounter"`
ReplicaCounter int64 `json:"replicacounter"`
UsedLogicalBlocks string `json:"usedlogicalblocks"`
UsedBlocks string `json:"usedblocks"`
}

type Stats struct {
client.Resource
ReplicaCounter int64 `json:"replicacounter"`
RevisionCounter int64 `json:"revisioncounter"`
ReplicaCounter int64 `json:"replicacounter"`
RevisionCounter string `json:"revisioncounter"`
}

type CreateInput struct {
Expand Down Expand Up @@ -91,7 +91,7 @@ type PrepareRemoveDiskOutput struct {

type RevisionCounter struct {
client.Resource
Counter int64 `json:"counter"`
Counter string `json:"counter"`
}

type ReplicaCounter struct {
Expand All @@ -101,8 +101,8 @@ type ReplicaCounter struct {

type PeerDetails struct {
client.Resource
ReplicaCount int64 `json:"replicacount"`
QuorumReplicaCount int64 `json:"quorumreplicacount"`
ReplicaCount int `json:"replicacount"`
QuorumReplicaCount int `json:"quorumreplicacount"`
}

type Action struct {
Expand Down Expand Up @@ -193,7 +193,7 @@ func NewReplica(context *api.ApiContext, state replica.State, info replica.Info,
r.Chain, _ = rep.DisplayChain()
r.Disks = rep.ListDisks()
r.RemainSnapshots = rep.GetRemainSnapshotCounts()
r.RevisionCounter = rep.GetRevisionCounter()
r.RevisionCounter = strconv.FormatInt(rep.GetRevisionCounter(), 10)
}

return r
Expand Down
5 changes: 3 additions & 2 deletions replica/rest/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *Server) GetStats(rw http.ResponseWriter, req *http.Request) error {
Actions: map[string]string{},
Links: map[string]string{},
},
RevisionCounter: stats.RevisionCounter,
RevisionCounter: strconv.FormatInt(stats.RevisionCounter, 10),
ReplicaCounter: stats.ReplicaCounter,
}
apiContext.Write(resp)
Expand Down Expand Up @@ -239,7 +239,8 @@ func (s *Server) SetRevisionCounter(rw http.ResponseWriter, req *http.Request) e
if err := apiContext.Read(&input); err != nil && err != io.EOF {
return err
}
return s.doOp(req, s.s.SetRevisionCounter(input.Counter))
counter, _ := strconv.ParseInt(input.Counter, 10, 64)
return s.doOp(req, s.s.SetRevisionCounter(counter))
}

func (s *Server) UpdatePeerDetails(rw http.ResponseWriter, req *http.Request) error {
Expand Down
20 changes: 19 additions & 1 deletion replica/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,29 @@ func (s *Server) Status() (State, Info) {
return Open, info
}
}

func (s *Server) PrevStatus() (State, Info) {
info, err := ReadInfo(s.dir)
if os.IsNotExist(err) {
return Initial, Info{}
} else if err != nil {
return Error, Info{}
}
switch {
case info.Rebuilding:
return Rebuilding, info
case info.Dirty:
return Dirty, info
}

return Closed, info
}

func (s *Server) Stats() *types.Stats {
r := s.r
return &types.Stats{
RevisionCounter: r.revisionCache,
ReplicaCounter: r.peerCache.ReplicaCount,
ReplicaCounter: int64(r.peerCache.ReplicaCount),
}
}

Expand Down
5 changes: 1 addition & 4 deletions sync/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,7 @@ func (t *Task) RmBackup(backup string) error {
return fmt.Errorf("Cannot find a suitable replica for remove backup")
}

if err := t.rmBackup(replica, backup); err != nil {
return err
}
return nil
return t.rmBackup(replica, backup)
}

func (t *Task) rmBackup(replicaInController *rest.Replica, backup string) error {
Expand Down
Loading

0 comments on commit f311bcc

Please sign in to comment.