First_Hack_At_API
danni on #telepathy
import gobject
from telepathy.client import Channel from telepathy.interfaces import (
- CONN_INTERFACE, CHANNEL_INTERFACE_GROUP, CHANNEL_TYPE_TEXT, CHANNEL_INTERFACE, CONNECTION_INTERFACE_REQUESTS, CHANNEL_INTERFACE_TUBE, CHANNEL_TYPE_DBUS_TUBE)
from telepathy.constants import (
- CONNECTION_HANDLE_TYPE_CONTACT, CONNECTION_HANDLE_TYPE_ROOM, CONNECTION_STATUS_CONNECTED, CONNECTION_STATUS_DISCONNECTED, CONNECTION_STATUS_CONNECTING, SOCKET_ACCESS_CONTROL_CREDENTIALS, TUBE_CHANNEL_STATE_LOCAL_PENDING, TUBE_CHANNEL_STATE_REMOTE_PENDING, TUBE_CHANNEL_STATE_OPEN, TUBE_CHANNEL_STATE_NOT_OFFERED)
class Connection(object):
def init(self):
- self.object_path = "....."
class Account(object):
def init(self):
- self.object_path = "....." self.connection = Connection()
class MessageTubeHandler(object):
gsignals = {'handle-channel' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
- (gobject.TYPE_PYOBJECT,gobject.TYPE_PYOBJECT)),
- }
def init(self, client_name):
- bus_name = '.'.join ([CLIENT, client_name]) object_path = '/' + bus_name.replace('.', '/')
bus_name = dbus.service.BusName(bus_name, bus=dbus.SessionBus())
telepathy.server.Handler.init(self, bus_name, object_path) telepathy.server.DBusProperties.init(self) self._implement_property_get(CLIENT, {
- 'Interfaces': lambda: [ CLIENT_HANDLER ],
- })
'HandlerChannelFilter': lambda: dbus.Array([
- dbus.Dictionary({ }, signature='sv')
- ], signature='a{sv}')
- })
def HandleChannels(self, account, connection, channels, requests_satisfied,
- user_action_time, handler_info):
- for channel in channels:
self.emit('handle-channel', MessageTube(account, channel))
class MessageTube(object):
def init(self, account, channel_path=None, contact_id=None):
- self._account = account self._connection = account.connection if contact_id:
- request a channel
- self.tube = Channel(self.conn.dbus_proxy.bus_name, path) self.tube[CHANNEL_INTERFACE_TUBE].connect_to_signal(
"TubeChannelStateChanged", self.tube_channel_state_changed_cb) self.tube[CHANNEL_INTERFACE].connect_to_signal("Closed",
- self.tube_closed_cb)
- initiator_id = props[CHANNEL_INTERFACE + ".InitiatorID"]
service = props[CHANNEL_TYPE_DBUS_TUBE + ".ServiceName"] state = self.tube[PROPERTIES_IFACE].Get(CHANNEL_INTERFACE_TUBE, 'State') print "new D-Bus tube offered by %s. Service: %s. State: %s" % (
- initiator_id, service, tube_state[state])
- pass
- """on_message_received - internal functionm, do not call directly""" emit("message_recieved",self.contact, dict)
gsignals = {'message-received' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
- (gobject.TYPE_PYOBJECT,gobject.TYPE_PYOBJECT)),
- }
- self._account = account self._connection = account.connection if contact_id:
class Client(Boo):
def init(self):
self.tube_handler = MessageTubeHandler("tubey") self.tube_handler.connect("handle-channel", self.on_handle_channel)
- self.image_tube = tube tube.connect("message-received", self.on_message_received)
- if "img_url" in msg:
- self._download_image(msg["img_url"])
self.image_tube = tp.MessageTube(contact, account) msg = {} msg["note"] = self.label1.get_text() self.image_tube.send(msg)
- if "kenvandine" in tube.contact.props.identifier:
- return