Skip to content

Commit

Permalink
Merge pull request #114 from smileisak/freeze-all-operators
Browse files Browse the repository at this point in the history
Freeze all operators with same priority and warn
  • Loading branch information
Sergey Vasilyev authored Jul 8, 2019
2 parents 51da125 + 4e0cd0a commit fac95fa
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ junit.xml
# Documentation
docs/_build
docs/packages

# VirtualEnv
env

# VSCode
.vscode
3 changes: 2 additions & 1 deletion docs/peering.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ notices that other operators start with a higher priority, it freezes
its operation until those operators stop working.

This is done to prevent collisions of multiple operators handling
the same objects.
the same objects. If two operators runs with the same priority all operators
issue a warning and freeze, so that the cluster becomes not served anymore.

To set the operator's priority, use :option:`--priority`:

Expand Down
24 changes: 14 additions & 10 deletions kopf/engines/peering.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@
import os
import random
import socket
from typing import Optional, Mapping, Iterable, Union
from typing import Iterable, Mapping, Optional, Union

import iso8601

from kopf.clients import fetching
from kopf.clients import patching
from kopf.reactor import registries

logger = logging.getLogger(__name__)

# The CRD info on the special sync-object.
Expand Down Expand Up @@ -231,12 +230,14 @@ async def peers_handler(
if not freeze.is_set():
logger.info(f"Freezing operations in favour of {prio_peers}.")
freeze.set()
else:
if same_peers:
logger.warning(f"Possibly conflicting operators with the same priority: {same_peers}.")
if freeze.is_set():
logger.info(f"Resuming operations after the freeze.")
freeze.clear()

elif same_peers:
logger.warning(f"Possibly conflicting operators with the same priority: {same_peers}.")
logger.warning(f"Freezed all Operators: {peers}")
freeze.set()
elif freeze.is_set():
logger.info(f"Resuming operations after the freeze. Conflicting operators with the same priority are gone")
freeze.clear()


async def peers_keepalive(
Expand All @@ -256,9 +257,12 @@ async def peers_keepalive(
await asyncio.sleep(max(1, int(ourselves.lifetime.total_seconds() - 10)))
finally:
try:
await ourselves.disappear()
except:
await asyncio.shield(ourselves.disappear())
except asyncio.CancelledError:
# It is the cancellation of `keepalive()`, not of the shielded `disappear()`.
pass
except Exception:
logger.exception(f"Couldn't remove self from the peering. Ignoring.")


def detect_own_id() -> str:
Expand Down

0 comments on commit fac95fa

Please sign in to comment.