Skip to content

Commit

Permalink
Issue Changes for handling zk scale down (#120)
Browse files Browse the repository at this point in the history
* changes for handling zk scale down

Signed-off-by: pbelgundi <[email protected]>

* code changes after testing

Signed-off-by: pbelgundi <[email protected]>

* changes for dependency injection

Signed-off-by: pbelgundi <[email protected]>

* fix unit tests

Signed-off-by: pbelgundi <[email protected]>

* Add license header

Signed-off-by: pbelgundi <[email protected]>
  • Loading branch information
pbelgundi authored Feb 28, 2020
1 parent 8c1b44b commit d84aefd
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 73 deletions.
6 changes: 6 additions & 0 deletions docker/bin/zookeeperStart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,23 @@ if [[ "$WRITE_CONFIGURATION" == true ]]; then
echo $MYID > $MYID_FILE
echo "server.${MYID}=${ZKCONFIG}" > $DYNCONFIG
else
set -e
ZKURL=$(zkConnectionString)
CONFIG=`java -Dlog4j.configuration=file:"$LOG4J_CONF" -jar /root/zu.jar get-all $ZKURL`
echo Writing configuration gleaned from zookeeper ensemble
echo "$CONFIG" | grep -v "^version="> $DYNCONFIG
set +e
fi
fi

if [[ "$REGISTER_NODE" == true ]]; then
ROLE=observer
ZKURL=$(zkConnectionString)
ZKCONFIG=$(zkConfig)
set -e
echo Registering node and writing local configuration to disk.
java -Dlog4j.configuration=file:"$LOG4J_CONF" -jar /root/zu.jar add $ZKURL $MYID $ZKCONFIG $DYNCONFIG
set +e
fi

ZOOCFGDIR=/data/conf
Expand Down
6 changes: 4 additions & 2 deletions docker/bin/zookeeperTeardown.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ ZKURL=$(zkConnectionString)
set -e
MYID=`cat $MYID_FILE`

echo "Cluster size=$CLUSTER_SIZE, MyId=$MYID"
if [[ "$CLUSTER_SIZE" -lt "$MYID" ]]; then
ZNODE_PATH="/zookeeper-operator/$CLUSTER_NAME"
CLUSTERSIZE=`java -Dlog4j.configuration=file:"$LOG4J_CONF" -jar /root/zu.jar sync $ZKURL $ZNODE_PATH`
echo "CLUSTER_SIZE=$CLUSTERSIZE, MyId=$MYID"
if [[ -n "$CLUSTERSIZE" && "$CLUSTERSIZE" -lt "$MYID" ]]; then
# If ClusterSize < MyId, this server is being permanantly removed.
java -Dlog4j.configuration=file:"$LOG4J_CONF" -jar /root/zu.jar remove $ZKURL $MYID
echo $?
Expand Down
27 changes: 25 additions & 2 deletions docker/zu/src/main/java/io/pravega/zookeeper/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package io.pravega.zookeeper

import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.AsyncCallback.VoidCallback
import java.io.File

const val OBSERVER = "observer"
Expand All @@ -20,7 +21,7 @@ const val PARTICIPANT= "participant"
* Utility to Register a server with the Zookeeper Ensemble
*/
fun main(args: Array<String>) {
val message = "Usage: zu <add | get-all | get | remove | get-role> [options...]"
val message = "Usage: zu <add | get-all | get | remove | get-role | sync> [options...]"
if (args.isEmpty()) {
help(message)
}
Expand All @@ -31,9 +32,31 @@ fun main(args: Array<String>) {
"get" == args[0] -> runGet(args)
"remove" == args[0] -> runRemove(args)
"get-role" == args[0] -> runGetRole(args)
"sync" == args[0] -> runSync(args)
else -> help(message)
}
}

fun runSync(args: Array<String>, suppressOutput: Boolean = false): String {
if (args.size < 3) {
help("Usage: zu sync <zk-url> <path>")
}
var (_, zkUrl, path) = args
return try {
val zk = newZookeeperAdminClient(zkUrl)
zk.sync(path, null, null)
val dataArr = zk.getData(path, null, null)
val clusterSize = String(dataArr).substringAfter("=").trim()
if (! suppressOutput) {
print(clusterSize)
}
clusterSize
} catch (e: Exception) {
System.err.println("Error performing zookeeper sync operation:")
e.printStackTrace(System.err)
System.exit(1)
""
}
}

fun runGetAll(args: Array<String>, suppressOutput: Boolean = false): String {
Expand Down Expand Up @@ -134,4 +157,4 @@ fun runRemove(args: Array<String>) {
fun help(message: String) {
System.err.println(message)
System.exit(1)
}
}
2 changes: 2 additions & 0 deletions pkg/apis/zookeeper/v1beta1/zookeepercluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ type ZookeeperClusterStatus struct {

// ExternalClientEndpoint is the internal client IP and port
ExternalClientEndpoint string `json:"externalClientEndpoint"`

MetaRootCreated bool `json:"metaRootCreated"`
}

// MembersStatus is the status of the members of the cluster with both
Expand Down
10 changes: 0 additions & 10 deletions pkg/apis/zookeeper/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 42 additions & 50 deletions pkg/controller/zookeepercluster/zookeepercluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -60,7 +59,7 @@ func AddZookeeperReconciler(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newZookeeperClusterReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileZookeeperCluster{client: mgr.GetClient(), scheme: mgr.GetScheme()}
return &ReconcileZookeeperCluster{client: mgr.GetClient(), scheme: mgr.GetScheme(), zkClient: new(zk.DefaultZookeeperClient)}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand Down Expand Up @@ -111,10 +110,10 @@ var _ reconcile.Reconciler = &ReconcileZookeeperCluster{}
type ReconcileZookeeperCluster struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
scheme *runtime.Scheme
log logr.Logger
skipSTSReconcile int
client client.Client
scheme *runtime.Scheme
log logr.Logger
zkClient zk.ZookeeperClient
}

type reconcileFun func(cluster *zookeeperv1beta1.ZookeeperCluster) error
Expand Down Expand Up @@ -190,24 +189,24 @@ func (r *ReconcileZookeeperCluster) reconcileStatefulSet(instance *zookeeperv1be
} else {
foundSTSSize := *foundSts.Spec.Replicas
newSTSSize := *sts.Spec.Replicas
if newSTSSize < foundSTSSize {
// We're dealing with STS scale down
if !r.isConfigMapInSync(instance) {
r.log.Info("Skipping StatefulSet reconcile as ConfigMap not updated yet.")
return nil
if newSTSSize != foundSTSSize {
zkUri := utils.GetZkServiceUri(instance)
err = r.zkClient.Connect(zkUri)
if err != nil {
return fmt.Errorf("Error storing cluster size %v", err)
}
/*
After updating ConfigMap we need to wait for changes to sync to the volume,
failing which `zookeeperTeardown.sh` won't get invoked for the pods that are being scaled down
and these will stay in the ensemble config forever.
For details see:
https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#mounted-configmaps-are-updated-automatically
*/
r.skipSTSReconcile++
if r.skipSTSReconcile < 6 {
r.log.Info("Waiting for Config Map update to sync...Skipping STS Reconcile")
return nil
defer r.zkClient.Close()
r.log.Info("Connected to ZK", "ZKURI", zkUri)

path := utils.GetMetaPath(instance)
version, err := r.zkClient.NodeExists(path)
if err != nil {
return fmt.Errorf("Error doing exists check for znode %s: %v", path, err)
}

data := "CLUSTER_SIZE=" + strconv.Itoa(int(newSTSSize))
r.log.Info("Updating Cluster Size.", "New Data:", data, "Version", version)
r.zkClient.UpdateNode(path, data, version)
}
return r.updateStatefulSet(instance, foundSts, sts)
}
Expand All @@ -225,35 +224,9 @@ func (r *ReconcileZookeeperCluster) updateStatefulSet(instance *zookeeperv1beta1
}
instance.Status.Replicas = foundSts.Status.Replicas
instance.Status.ReadyReplicas = foundSts.Status.ReadyReplicas
r.skipSTSReconcile = 0
return nil
}

func (r *ReconcileZookeeperCluster) isConfigMapInSync(instance *zookeeperv1beta1.ZookeeperCluster) bool {
cm := zk.MakeConfigMap(instance)
foundCm := &corev1.ConfigMap{}
err := r.client.Get(context.TODO(), types.NamespacedName{
Name: cm.Name,
Namespace: cm.Namespace,
}, foundCm)
if err != nil {
r.log.Error(err, "Error getting config map.")
return false
} else {
// found config map, now check number of replicas in configMap
envStr := foundCm.Data["env.sh"]
splitSlice := strings.Split(envStr, "CLUSTER_SIZE=")
if len(splitSlice) < 2 {
r.log.Error(err, "Error: Could not find cluster size in configmap.")
return false
}
cs := strings.TrimSpace(splitSlice[1])
clusterSize, _ := strconv.Atoi(cs)
return (int32(clusterSize) == instance.Spec.Replicas)
}
return false
}

func (r *ReconcileZookeeperCluster) reconcileClientService(instance *zookeeperv1beta1.ZookeeperCluster) (err error) {
svc := zk.MakeClientService(instance)
if err = controllerutil.SetControllerReference(instance, svc, r.scheme); err != nil {
Expand Down Expand Up @@ -424,6 +397,24 @@ func (r *ReconcileZookeeperCluster) reconcileClusterStatus(instance *zookeeperv1
}
instance.Status.Members.Ready = readyMembers
instance.Status.Members.Unready = unreadyMembers

//If Cluster is in a ready state...
if instance.Spec.Replicas == instance.Status.ReadyReplicas && (!instance.Status.MetaRootCreated) {
r.log.Info("Cluster is Ready, Creating ZK Metadata...")
zkUri := utils.GetZkServiceUri(instance)
err := r.zkClient.Connect(zkUri)
if err != nil {
return fmt.Errorf("Error creating cluster metaroot. Connect to zk failed %v", err)
}
defer r.zkClient.Close()
metaPath := utils.GetMetaPath(instance)
r.log.Info("Connected to zookeeper:", "ZKUri", zkUri, "Creating Path", metaPath)
if err := r.zkClient.CreateNode(instance, metaPath); err != nil {
return fmt.Errorf("Error creating cluster metadata path %s, %v", metaPath, err)
}
r.log.Info("Metadata znode created.")
instance.Status.MetaRootCreated = true
}
r.log.Info("Updating zookeeper status",
"StatefulSet.Namespace", instance.Namespace,
"StatefulSet.Name", instance.Name)
Expand All @@ -435,8 +426,9 @@ func YAMLExporterReconciler(zookeepercluster *zookeeperv1beta1.ZookeeperCluster)
var scheme = scheme.Scheme
scheme.AddKnownTypes(zookeeperv1beta1.SchemeGroupVersion, zookeepercluster)
return &ReconcileZookeeperCluster{
client: fake.NewFakeClient(zookeepercluster),
scheme: scheme,
client: fake.NewFakeClient(zookeepercluster),
scheme: scheme,
zkClient: new(zk.DefaultZookeeperClient),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,41 @@ func TestZookeepercluster(t *testing.T) {
RunSpecs(t, "ZookeeperCluster Controller Spec")
}

type MockZookeeperClient struct {
// dummy struct
}

func (client *MockZookeeperClient) Connect(zkUri string) (err error) {
// do nothing
return nil
}

func (client *MockZookeeperClient) CreateNode(zoo *v1beta1.ZookeeperCluster, zNodePath string) (err error) {
return nil
}

func (client *MockZookeeperClient) UpdateNode(path string, data string, version int32) (err error) {
return nil
}

func (client *MockZookeeperClient) NodeExists(zNodePath string) (version int32, err error) {
return 0, nil
}

func (client *MockZookeeperClient) Close() {
return
}

var _ = Describe("ZookeeperCluster Controller", func() {
const (
Name = "example"
Namespace = "default"
)

var (
s = scheme.Scheme
r *ReconcileZookeeperCluster
s = scheme.Scheme
mockZkClient = new(MockZookeeperClient)
r *ReconcileZookeeperCluster
)

Context("Reconcile", func() {
Expand Down Expand Up @@ -78,7 +104,7 @@ var _ = Describe("ZookeeperCluster Controller", func() {

BeforeEach(func() {
cl = fake.NewFakeClient(z)
r = &ReconcileZookeeperCluster{client: cl, scheme: s}
r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: mockZkClient}
res, err = r.Reconcile(req)
})

Expand Down Expand Up @@ -108,15 +134,15 @@ var _ = Describe("ZookeeperCluster Controller", func() {
BeforeEach(func() {
z.WithDefaults()
cl = fake.NewFakeClient(z)
r = &ReconcileZookeeperCluster{client: cl, scheme: s}
r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: mockZkClient}
res, err = r.Reconcile(req)
})

It("should not error", func() {
Ω(err).To(BeNil())
})

It("should requeue after ReconfileTime delay", func() {
It("should requeue after ReconcileTime delay", func() {
Ω(res.RequeueAfter).To(Equal(ReconcileTime))
})

Expand Down Expand Up @@ -177,7 +203,7 @@ var _ = Describe("ZookeeperCluster Controller", func() {
st := zk.MakeStatefulSet(z)
next.Spec.Replicas = 6
cl = fake.NewFakeClient([]runtime.Object{next, st}...)
r = &ReconcileZookeeperCluster{client: cl, scheme: s}
r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: mockZkClient}
res, err = r.Reconcile(req)
})

Expand All @@ -204,7 +230,7 @@ var _ = Describe("ZookeeperCluster Controller", func() {
next := z.DeepCopy()
st := zk.MakeStatefulSet(z)
cl = fake.NewFakeClient([]runtime.Object{next, st}...)
r = &ReconcileZookeeperCluster{client: cl, scheme: s}
r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: mockZkClient}
res, err = r.Reconcile(req)
})

Expand Down Expand Up @@ -232,7 +258,7 @@ var _ = Describe("ZookeeperCluster Controller", func() {
next.Spec.Ports[0].ContainerPort = 2182
svc := zk.MakeClientService(z)
cl = fake.NewFakeClient([]runtime.Object{next, svc}...)
r = &ReconcileZookeeperCluster{client: cl, scheme: s}
r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: mockZkClient}
res, err = r.Reconcile(req)
})

Expand Down
Loading

0 comments on commit d84aefd

Please sign in to comment.