First_Hack_At_API

Revision 1 as of 2010-05-13 11:10:05

Clear message

danni on #telepathy

import gobject

from telepathy.client import Channel from telepathy.interfaces import (

  • CONN_INTERFACE, CHANNEL_INTERFACE_GROUP, CHANNEL_TYPE_TEXT, CHANNEL_INTERFACE, CONNECTION_INTERFACE_REQUESTS, CHANNEL_INTERFACE_TUBE, CHANNEL_TYPE_DBUS_TUBE)

from telepathy.constants import (

  • CONNECTION_HANDLE_TYPE_CONTACT, CONNECTION_HANDLE_TYPE_ROOM, CONNECTION_STATUS_CONNECTED, CONNECTION_STATUS_DISCONNECTED, CONNECTION_STATUS_CONNECTING, SOCKET_ACCESS_CONTROL_CREDENTIALS, TUBE_CHANNEL_STATE_LOCAL_PENDING, TUBE_CHANNEL_STATE_REMOTE_PENDING, TUBE_CHANNEL_STATE_OPEN, TUBE_CHANNEL_STATE_NOT_OFFERED)

class Connection(object):

  • def init(self):

    • self.object_path = "....."

class Account(object):

  • def init(self):

    • self.object_path = "....." self.connection = Connection()

class MessageTubeHandler(object):

  • gsignals = {'handle-channel' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,

    • (gobject.TYPE_PYOBJECT,gobject.TYPE_PYOBJECT)),
    • }

    def init(self, client_name):

    • bus_name = '.'.join ([CLIENT, client_name]) object_path = '/' + bus_name.replace('.', '/')

      bus_name = dbus.service.BusName(bus_name, bus=dbus.SessionBus())

      telepathy.server.Handler.init(self, bus_name, object_path) telepathy.server.DBusProperties.init(self) self._implement_property_get(CLIENT, {

      • 'Interfaces': lambda: [ CLIENT_HANDLER ],
      • })
      self._implement_property_get(CLIENT_HANDLER, {
      • 'HandlerChannelFilter': lambda: dbus.Array([

        • dbus.Dictionary({ }, signature='sv')
        • ], signature='a{sv}')
      • })

    def HandleChannels(self, account, connection, channels, requests_satisfied,

    • user_action_time, handler_info):
    • for channel in channels:
      • self.emit('handle-channel', MessageTube(account, channel))

class MessageTube(object):

  • def init(self, account, channel_path=None, contact_id=None):

    • self._account = account self._connection = account.connection if contact_id:
      • request a channel
      if channel:
      • self.tube = Channel(self.conn.dbus_proxy.bus_name, path) self.tube[CHANNEL_INTERFACE_TUBE].connect_to_signal(
        • "TubeChannelStateChanged", self.tube_channel_state_changed_cb) self.tube[CHANNEL_INTERFACE].connect_to_signal("Closed",

          • self.tube_closed_cb)
          self._on_channel_created(props)
    def _on_channel_created(self, props):
    • initiator_id = props[CHANNEL_INTERFACE + ".InitiatorID"]

      service = props[CHANNEL_TYPE_DBUS_TUBE + ".ServiceName"] state = self.tube[PROPERTIES_IFACE].Get(CHANNEL_INTERFACE_TUBE, 'State') print "new D-Bus tube offered by %s. Service: %s. State: %s" % (

      • initiator_id, service, tube_state[state])
    def send_message(self, msg):
    • pass
    def _on_message_received(self, dict):
    • """on_message_received - internal functionm, do not call directly""" emit("message_recieved",self.contact, dict)

    gsignals = {'message-received' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,

    • (gobject.TYPE_PYOBJECT,gobject.TYPE_PYOBJECT)),
    • }

class Client(Boo):

  • def init(self):

    • self.tube_handler = MessageTubeHandler("tubey") self.tube_handler.connect("handle-channel", self.on_handle_channel)

    def on_handle_channel(self, tube):
    • self.image_tube = tube tube.connect("message-received", self.on_message_received)
    def on_message_received(self, tube, msg):
    • if "img_url" in msg:
      • self._download_image(msg["img_url"])
    def on_contact_selected(self, contact, account):
    • self.image_tube = tp.MessageTube(contact, account) msg = {} msg["note"] = self.label1.get_text() self.image_tube.send(msg)

    def on_tube_initiated(self, tube):
    • if "kenvandine" in tube.contact.props.identifier:
      • return

http://wiki.laptop.org/go/Sugar.presence