[docs]classNonUniqueFeature(Exception):""" Exception raised when a non-unique feature is found on the manager """pass
[docs]classStreamFeature(ET.Element):""" Class to manage server features """def__init__(self,tag:str="stream:features",**extra:str)->None:attrib:Dict[str,str]={"xmlns":"http://etherx.jabber.org/streams"}super().__init__(tag,attrib,**extra)self._features:Dict[str,ET.Element]={}
[docs]defregister(self,feature:ET.Element):""" Register a new feature """iffeature.taginself._features.keys():raiseNonUniqueFeature("Feature already registered")self._features[feature.tag]=feature
[docs]defunregister(self,feature):""" Unregister a feature """iffeature.taginself._features:delself._features[feature.tag]
[docs]defreset(self):""" Remove all features registered in the StreamFeature object. """self._features.clear()
[docs]deftostring(self)->str:""" Return a string representation of the xml message """returnself.to_bytes().decode()
[docs]defto_bytes(self)->bytes:""" Return an encoded xml message """res=self.__copy__()for_,featureinself._features.items():res.append(feature)returnET.tostring(res)