Skip to content

Commit

Permalink
Add CSI test to power-save (twisted)
Browse files Browse the repository at this point in the history
  • Loading branch information
rufferson committed Apr 5, 2021
1 parent c10f950 commit c125c3a
Showing 1 changed file with 45 additions and 1 deletion.
46 changes: 45 additions & 1 deletion tests/twisted/power-save.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from gabbletest import exec_test, GoogleXmlStream, make_result_iq, \
send_error_reply, disconnect_conn, make_presence, sync_stream, elem, \
acknowledge_iq
acknowledge_iq, XmppAuthenticator, StreamEvent
from servicetest import call_async, assertEquals, EventPattern, \
assertContains, sync_dbus
import ns
Expand Down Expand Up @@ -208,8 +208,52 @@ def test_disconnect(q, bus, conn, stream):

disconnect_conn(q, conn, stream)

NS_CSI="urn:xmpp:csi:0"
class CsiAuthenticator(XmppAuthenticator):
def __init__(self):
super().__init__('test', 'pass')

def streamIQ(self):
super().streamIQ()
self.xmlstream.addObserver(f"/active[@xmlns='{NS_CSI}']",
lambda x: self.xmlstream.event_func(StreamEvent('stream-active', x, self.xmlstream)))
self.xmlstream.addObserver(f"/inactive[@xmlns='{NS_CSI}']",
lambda x: self.xmlstream.event_func(StreamEvent('stream-inactive', x, self.xmlstream)))


def test_csi(q, bus, conn, stream):
assertContains(cs.CONN_IFACE_POWER_SAVING,
conn.Get(cs.CONN, "Interfaces",
dbus_interface=cs.PROPERTIES_IFACE))

assertEquals (False, conn.Get(cs.CONN_IFACE_POWER_SAVING,
"PowerSavingActive",
dbus_interface=cs.PROPERTIES_IFACE))

call_async(q, conn.PowerSaving, 'SetPowerSaving', True)

q.expect('stream-inactive')

q.expect_many(EventPattern('dbus-return', method='SetPowerSaving'),
EventPattern('dbus-signal', signal='PowerSavingChanged',
args=[True]))
call_async(q, conn.PowerSaving, 'SetPowerSaving', False)

q.expect('stream-active')

q.expect_many(
EventPattern('dbus-return', method='SetPowerSaving'),
EventPattern('dbus-signal', signal='PowerSavingChanged',
args=[False]))


if __name__ == '__main__':
exec_test(test, protocol=GoogleXmlStream)
exec_test(test_local_queueing)
exec_test(test_error, protocol=GoogleXmlStream)
exec_test(test_disconnect, protocol=GoogleXmlStream)

# The same powersave engine is used hence only checking
# CSI signaling
XmppAuthenticator.features[NS_CSI] = 'csi'
exec_test(test_csi, authenticator=CsiAuthenticator())

0 comments on commit c125c3a

Please sign in to comment.