Skip to content

Commit

Permalink
Merge pull request #78 from ChannelFinder/recceiverid
Browse files Browse the repository at this point in the history
Add recceiver id, which is used by cleanOnStart and cleanOnStop
  • Loading branch information
jacomago authored Apr 3, 2024
2 parents e599f50 + cd16911 commit 5a055a2
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 48 deletions.
27 changes: 20 additions & 7 deletions server/cf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,28 @@ procs = cf
# stored in /etc/channelfinderapi.conf as described in the client

[cf]
# a space-separated list of infotags to set as CF Properties
#infotags = archive foo bar blah
# List environment variables that should be written as channel finder properties
# cf-store application
# Uncomment line below to turn on the feature to add alias records to channelfinder
# alias = on
# Uncomment line below to turn on the feature to add EPICS record type to channelfinder
# recordType = on
# Uncomment line below to turn on the feature to add description field to channelfinder
# recordDesc = on
# The size limit for finding channels (ie the value of the '~size' query parameter)
# If not specified then the fallback is the server default
# findSizeLimit = 10000
# Mark all channels as 'Inactive' when processor is stopped (default: True)
# cleanOnStop = True
# Mark all channels as 'Inactive' when processor is started (default: True)
# cleanOnStart = True
# Specify an optional id for the recceiver to be used with cleanOnStart and cleanOnStop
# default value is the hostname of the machine the python interpreter is started on
# recceiverID = recc1
# Debug output file location.
# Produces no file when not defined.
# debug_file_loc = /home/devuser/recsyncdata.txt
#
# Comma-separated list of VARIABLE:PropertyName,
# specifying which environment VARIABLEs to pass on to the channel finder
# and defining the corresponding PropertyName
#environment_vars=ENGINEER:Engineer,EPICS_BASE:EpicsVersion,PWD:WorkingDirectory
# Turn on optional alias and recordType properties
#alias = on
#recordType = on
#recordDesc = on
7 changes: 5 additions & 2 deletions server/demo.conf
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,12 @@ idkey = 42
# If not specified then the fallback is the server default
# findSizeLimit = 10000
# Mark all channels as 'Inactive' when processor is stopped (default: True)
#cleanOnStop = True
# cleanOnStop = True
# Mark all channels as 'Inactive' when processor is started (default: True)
#cleanOnStart = True
# cleanOnStart = True
# Specify an optional id for the recceiver to be used with cleanOnStart and cleanOnStop
# default value is the hostname of the machine the python interpreter is started on
# recceiverID = recc1
# Debug output file location.
# Produces no file when not defined.
# debug_file_loc = /home/devuser/recsyncdata.txt
Expand Down
81 changes: 42 additions & 39 deletions server/recceiver/cfstore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-

import logging
import socket
_log = logging.getLogger(__name__)

from zope.interface import implementer
Expand All @@ -17,6 +18,8 @@
import datetime
import os
import json
from channelfinder import ChannelFinderClient


# ITRANSACTION FORMAT:
#
Expand All @@ -30,6 +33,8 @@

__all__ = ['CFProcessor']

RECCEIVERID_KEY = 'recceiverID'
RECCEIVERID_DEFAULT = socket.gethostname()

@implementer(interfaces.IProcessor)
class CFProcessor(service.Service):
Expand Down Expand Up @@ -68,11 +73,10 @@ def _startServiceWithLock(self):
Using the default python cf-client. The url, username, and
password are provided by the channelfinder._conf module.
"""
from channelfinder import ChannelFinderClient
self.client = ChannelFinderClient()
try:
cf_props = [prop['name'] for prop in self.client.getAllProperties()]
reqd_props = {'hostName', 'iocName', 'pvStatus', 'time', 'iocid'}
reqd_props = {'hostName', 'iocName', 'pvStatus', 'time', 'iocid', RECCEIVERID_KEY}
if (self.conf.get('alias', 'default') == 'on'):
reqd_props.add('alias')
if (self.conf.get('recordType', 'default') == 'on'):
Expand Down Expand Up @@ -224,7 +228,7 @@ def _commitWithThread(self, TR):
pvInfoByName = {}
for rid, (info) in pvInfo.items():
if info["pvName"] in pvInfoByName:
_log.warn("Commit contains multiple records with PV name: %s (%s)", pv, iocid)
_log.warn("Commit contains multiple records with PV name: %s (%s)", info["pvName"], iocid)
continue
pvInfoByName[info["pvName"]] = info
_log.debug("Add record: %s: %s", rid, info)
Expand All @@ -246,42 +250,38 @@ def _commitWithThread(self, TR):
self.iocs[iocid]["channelcount"] += 1
for pv in delrec:
if iocid in self.channel_dict[pv]:
self.channel_dict[pv].remove(iocid)
if iocid in self.iocs:
self.iocs[iocid]["channelcount"] -= 1
if self.iocs[iocid]['channelcount'] == 0:
self.iocs.pop(iocid, None)
elif self.iocs[iocid]['channelcount'] < 0:
_log.error("Channel count negative: %s", iocid)
if len(self.channel_dict[pv]) <= 0: # case: channel has no more iocs
del self.channel_dict[pv]
self.remove_channel(pv, iocid)
"""In case, alias exists"""
if (self.conf.get('alias', 'default' == 'on')):
if pv in pvInfoByName and "aliases" in pvInfoByName[pv]:
for a in pvInfoByName[pv]["aliases"]:
self.channel_dict[a].remove(iocid)
if iocid in self.iocs:
self.iocs[iocid]["channelcount"] -= 1
if self.iocs[iocid]['channelcount'] == 0:
self.iocs.pop(iocid, None)
elif self.iocs[iocid]['channelcount'] < 0:
_log.error("Channel count negative: %s", iocid)
if len(self.channel_dict[a]) <= 0: # case: channel has no more iocs
del self.channel_dict[a]
self.remove_channel(a, iocid)
poll(__updateCF__, self, pvInfoByName, delrec, hostName, iocName, iocid, owner, time)
dict_to_file(self.channel_dict, self.iocs, self.conf)

def remove_channel(self, a, iocid):
self.channel_dict[a].remove(iocid)
if iocid in self.iocs:
self.iocs[iocid]["channelcount"] -= 1
if self.iocs[iocid]['channelcount'] == 0:
self.iocs.pop(iocid, None)
elif self.iocs[iocid]['channelcount'] < 0:
_log.error("Channel count negative: %s", iocid)
if len(self.channel_dict[a]) <= 0: # case: channel has no more iocs
del self.channel_dict[a]

def clean_service(self):
"""
Marks all channels as "Inactive" until the recsync server is back up
"""
sleep = 1
retry_limit = 5
owner = self.conf.get('username', 'cfstore')
recceiverid = self.conf.get(RECCEIVERID_KEY, RECCEIVERID_DEFAULT)
while 1:
try:
_log.info("CF Clean Started")
channels = self.client.findByArgs(prepareFindArgs(self.conf, [('pvStatus', 'Active')]))
channels = self.client.findByArgs(prepareFindArgs(self.conf, [('pvStatus', 'Active'), (RECCEIVERID_KEY, recceiverid)]))
if channels is not None:
new_channels = []
for ch in channels or []:
Expand Down Expand Up @@ -326,6 +326,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
channels_dict = proc.channel_dict
iocs = proc.iocs
conf = proc.conf
recceiverid = conf.get(RECCEIVERID_KEY, RECCEIVERID_DEFAULT)
new = set(pvInfoByName.keys())

if iocid in iocs:
Expand Down Expand Up @@ -354,12 +355,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
if len(new) == 0 or ch[u'name'] in delrec: # case: empty commit/del, remove all reference to ioc
if ch[u'name'] in channels_dict:
ch[u'owner'] = iocs[channels_dict[ch[u'name']][-1]]["owner"]
ch[u'properties'] = __merge_property_lists([
{u'name': 'hostName', u'owner': owner, u'value': iocs[channels_dict[ch[u'name']][-1]]["hostname"]},
{u'name': 'iocName', u'owner': owner, u'value': iocs[channels_dict[ch[u'name']][-1]]["iocname"]},
{u'name': 'iocid', u'owner': owner, u'value': channels_dict[ch[u'name']][-1]},
{u'name': 'pvStatus', u'owner': owner, u'value': 'Active'},
{u'name': 'time', u'owner': owner, u'value': iocTime}],
ch[u'properties'] = __merge_property_lists(ch_create_properties(owner, iocTime, recceiverid, channels_dict, iocs, ch),
ch[u'properties'])
if (conf.get('recordType', 'default') == 'on'):
ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[ch[u'name']][-1]]["recordType"]}), ch[u'properties'])
Expand All @@ -371,12 +367,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
for a in pvInfoByName[ch[u'name']]["aliases"]:
if a[u'name'] in channels_dict:
a[u'owner'] = iocs[channels_dict[a[u'name']][-1]]["owner"]
a[u'properties'] = __merge_property_lists([
{u'name': 'hostName', u'owner': owner, u'value': iocs[channels_dict[a[u'name']][-1]]["hostname"]},
{u'name': 'iocName', u'owner': owner, u'value': iocs[channels_dict[a[u'name']][-1]]["iocname"]},
{u'name': 'iocid', u'owner': owner, u'value': channels_dict[a[u'name']][-1]},
{u'name': 'pvStatus', u'owner': owner, u'value': 'Active'},
{u'name': 'time', u'owner': owner, u'value': iocTime}],
a[u'properties'] = __merge_property_lists(ch_create_properties(owner, iocTime, recceiverid, channels_dict, iocs, ch),
a[u'properties'])
if (conf.get('recordType', 'default') == 'on'):
ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[a[u'name']][-1]]["recordType"]}), ch[u'properties'])
Expand Down Expand Up @@ -463,11 +454,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
raise defer.CancelledError()

for pv in new:
newProps = [{u'name': 'hostName', u'owner': owner, u'value': hostName},
{u'name': 'iocName', u'owner': owner, u'value': iocName},
{u'name': 'iocid', u'owner': owner, u'value': iocid},
{u'name': 'pvStatus', u'owner': owner, u'value': "Active"},
{u'name': 'time', u'owner': owner, u'value': iocTime}]
newProps = create_properties(owner, iocTime, recceiverid, hostName, iocName, iocid)
if (conf.get('recordType', 'default') == 'on'):
newProps.append({u'name': 'recordType', u'owner': owner, u'value': pvInfoByName[pv]['recordType']})
if pv in pvInfoByName and "infoProperties" in pvInfoByName[pv]:
Expand Down Expand Up @@ -521,6 +508,22 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
if proc.cancelled:
raise defer.CancelledError()

def create_properties(owner, iocTime, recceiverid, hostName, iocName, iocid):
return [
{u'name': 'hostName', u'owner': owner, u'value': hostName},
{u'name': 'iocName', u'owner': owner, u'value': iocName},
{u'name': 'iocid', u'owner': owner, u'value': iocid},
{u'name': 'pvStatus', u'owner': owner, u'value': 'Active'},
{u'name': 'time', u'owner': owner, u'value': iocTime},
{u'name': RECCEIVERID_KEY, u'owner': owner, u'value': recceiverid}]

def ch_create_properties(owner, iocTime, recceiverid, channels_dict, iocs, ch):
return create_properties(owner, iocTime, recceiverid,
iocs[channels_dict[ch[u'name']][-1]]["hostname"],
iocs[channels_dict[ch[u'name']][-1]]["iocname"],
channels_dict[ch[u'name']][-1])


def __merge_property_lists(newProperties, oldProperties):
"""
Merges two lists of properties ensuring that there are no 2 properties with
Expand Down

0 comments on commit 5a055a2

Please sign in to comment.