001: /**
002: * $RCSfile: $
003: * $Revision: $
004: * $Date: $
005: *
006: * Copyright (C) 2007 Jive Software. All rights reserved.
007: *
008: * This software is published under the terms of the GNU Public License (GPL),
009: * a copy of which is included in this distribution.
010: */package org.jivesoftware.openfire.pep;
011:
012: import org.dom4j.DocumentHelper;
013: import org.dom4j.Element;
014: import org.dom4j.QName;
015: import org.jivesoftware.openfire.PacketRouter;
016: import org.jivesoftware.openfire.SessionManager;
017: import org.jivesoftware.openfire.XMPPServer;
018: import org.jivesoftware.openfire.commands.AdHocCommandManager;
019: import org.jivesoftware.openfire.entitycaps.EntityCapabilities;
020: import org.jivesoftware.openfire.entitycaps.EntityCapabilitiesManager;
021: import org.jivesoftware.openfire.pubsub.*;
022: import org.jivesoftware.openfire.pubsub.models.AccessModel;
023: import org.jivesoftware.openfire.pubsub.models.PublisherModel;
024: import org.jivesoftware.openfire.roster.Roster;
025: import org.jivesoftware.openfire.roster.RosterItem;
026: import org.jivesoftware.openfire.session.ClientSession;
027: import org.jivesoftware.openfire.user.UserNotFoundException;
028: import org.jivesoftware.util.FastDateFormat;
029: import org.jivesoftware.util.LocaleUtils;
030: import org.jivesoftware.util.StringUtils;
031: import org.xmpp.packet.JID;
032: import org.xmpp.packet.Message;
033: import org.xmpp.packet.Packet;
034: import org.xmpp.packet.PacketExtension;
035:
036: import java.util.*;
037: import java.util.concurrent.ConcurrentHashMap;
038: import java.util.concurrent.LinkedBlockingQueue;
039:
040: /**
041: * A PEPService is a {@link PubSubService} for use with XEP-0163: "Personal Eventing via
042: * Pubsub" Version 1.0
043: *
044: * @author Armando Jagucki
045: *
046: */
047: public class PEPService implements PubSubService {
048: /**
049: * The bare JID that this service is identified by.
050: */
051: private String serviceOwnerJID;
052:
053: /**
054: * Collection node that acts as the root node of the entire node hierarchy.
055: */
056: private CollectionNode rootCollectionNode = null;
057:
058: /**
059: * Nodes managed by this service, table: key nodeID (String); value Node
060: */
061: private Map<String, Node> nodes = new ConcurrentHashMap<String, Node>();
062:
063: /**
064: * The packet router for the server.
065: */
066: private PacketRouter router = null;
067:
068: /**
069: * Default configuration to use for newly created leaf nodes.
070: */
071: private DefaultNodeConfiguration leafDefaultConfiguration;
072:
073: /**
074: * Default configuration to use for newly created collection nodes.
075: */
076: private DefaultNodeConfiguration collectionDefaultConfiguration;
077:
078: /**
079: * Returns the permission policy for creating nodes. A true value means that
080: * not anyone can create a node, only the service admin.
081: */
082: private boolean nodeCreationRestricted = true;
083:
084: /**
085: * Keep a registry of the presence's show value of users that subscribed to
086: * a node of the pep service and for which the node only delivers
087: * notifications for online users or node subscriptions deliver events based
088: * on the user presence show value. Offline users will not have an entry in
089: * the map. Note: Key-> bare JID and Value-> Map whose key is full JID of
090: * connected resource and value is show value of the last received presence.
091: */
092: private Map<String, Map<String, String>> barePresences = new ConcurrentHashMap<String, Map<String, String>>();
093:
094: /**
095: * Queue that holds the items that need to be added to the database.
096: */
097: private Queue<PublishedItem> itemsToAdd = new LinkedBlockingQueue<PublishedItem>();
098:
099: /**
100: * Queue that holds the items that need to be deleted from the database.
101: */
102: private Queue<PublishedItem> itemsToDelete = new LinkedBlockingQueue<PublishedItem>();
103:
104: /**
105: * Manager that keeps the list of ad-hoc commands and processing command
106: * requests.
107: */
108: private AdHocCommandManager adHocCommandManager;
109:
110: /**
111: * Used to handle filtered-notifications.
112: */
113: private EntityCapabilitiesManager entityCapsManager = EntityCapabilitiesManager
114: .getInstance();
115:
116: /**
117: * The time to elapse between each execution of the maintenance process.
118: * Default is 2 minutes.
119: */
120: private int items_task_timeout = 2 * 60 * 1000;
121:
122: /**
123: * Task that saves or deletes published items from the database.
124: */
125: private PublishedItemTask publishedItemTask;
126:
127: /**
128: * Timer to save published items to the database or remove deleted or old
129: * items.
130: */
131: private Timer timer = new Timer("PEP service maintenance");
132:
133: /**
134: * Date format to use for time stamps in delayed event notifications.
135: */
136: private static final FastDateFormat fastDateFormat;
137:
138: static {
139: fastDateFormat = FastDateFormat.getInstance(
140: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", TimeZone
141: .getTimeZone("UTC"));
142: }
143:
144: /**
145: * Constructs a PEPService.
146: *
147: * @param server the XMPP server.
148: * @param bareJID the bare JID (service ID) of the user owning the service.
149: */
150: public PEPService(XMPPServer server, String bareJID) {
151: this .serviceOwnerJID = bareJID;
152: router = server.getPacketRouter();
153:
154: // Initialize the ad-hoc commands manager to use for this pep service
155: adHocCommandManager = new AdHocCommandManager();
156: adHocCommandManager.addCommand(new PendingSubscriptionsCommand(
157: this ));
158:
159: // Save or delete published items from the database every 2 minutes
160: // starting in 2 minutes (default values)
161: publishedItemTask = new PublishedItemTask(this );
162: timer.schedule(publishedItemTask, items_task_timeout,
163: items_task_timeout);
164:
165: // Load default configuration for leaf nodes
166: leafDefaultConfiguration = PubSubPersistenceManager
167: .loadDefaultConfiguration(this , true);
168: if (leafDefaultConfiguration == null) {
169: // Create and save default configuration for leaf nodes;
170: leafDefaultConfiguration = new DefaultNodeConfiguration(
171: true);
172: leafDefaultConfiguration
173: .setAccessModel(AccessModel.presence);
174: leafDefaultConfiguration
175: .setPublisherModel(PublisherModel.publishers);
176: leafDefaultConfiguration.setDeliverPayloads(true);
177: leafDefaultConfiguration.setLanguage("English");
178: leafDefaultConfiguration.setMaxPayloadSize(5120);
179: leafDefaultConfiguration.setNotifyConfigChanges(true);
180: leafDefaultConfiguration.setNotifyDelete(true);
181: leafDefaultConfiguration.setNotifyRetract(true);
182: leafDefaultConfiguration.setPersistPublishedItems(false);
183: leafDefaultConfiguration.setMaxPublishedItems(-1);
184: leafDefaultConfiguration.setPresenceBasedDelivery(false);
185: leafDefaultConfiguration.setSendItemSubscribe(true);
186: leafDefaultConfiguration.setSubscriptionEnabled(true);
187: leafDefaultConfiguration.setReplyPolicy(null);
188: PubSubPersistenceManager.createDefaultConfiguration(this ,
189: leafDefaultConfiguration);
190: }
191: // Load default configuration for collection nodes
192: collectionDefaultConfiguration = PubSubPersistenceManager
193: .loadDefaultConfiguration(this , false);
194: if (collectionDefaultConfiguration == null) {
195: // Create and save default configuration for collection nodes;
196: collectionDefaultConfiguration = new DefaultNodeConfiguration(
197: false);
198: collectionDefaultConfiguration
199: .setAccessModel(AccessModel.presence);
200: collectionDefaultConfiguration
201: .setPublisherModel(PublisherModel.publishers);
202: collectionDefaultConfiguration.setDeliverPayloads(false);
203: collectionDefaultConfiguration.setLanguage("English");
204: collectionDefaultConfiguration.setNotifyConfigChanges(true);
205: collectionDefaultConfiguration.setNotifyDelete(true);
206: collectionDefaultConfiguration.setNotifyRetract(true);
207: collectionDefaultConfiguration
208: .setPresenceBasedDelivery(false);
209: collectionDefaultConfiguration.setSubscriptionEnabled(true);
210: collectionDefaultConfiguration.setReplyPolicy(null);
211: collectionDefaultConfiguration
212: .setAssociationPolicy(CollectionNode.LeafNodeAssociationPolicy.all);
213: collectionDefaultConfiguration.setMaxLeafNodes(-1);
214: PubSubPersistenceManager.createDefaultConfiguration(this ,
215: collectionDefaultConfiguration);
216: }
217:
218: // Load nodes to memory
219: PubSubPersistenceManager.loadNodes(this );
220: // Ensure that we have a root collection node
221: if (nodes.isEmpty()) {
222: // Create root collection node
223: JID creatorJID = new JID(bareJID);
224: rootCollectionNode = new CollectionNode(this , null,
225: bareJID, creatorJID);
226: // Add the creator as the node owner
227: rootCollectionNode.addOwner(creatorJID);
228: // Save new root node
229: rootCollectionNode.saveToDB();
230: } else {
231: rootCollectionNode = (CollectionNode) getNode(bareJID);
232: }
233: }
234:
235: public void addNode(Node node) {
236: nodes.put(node.getNodeID(), node);
237: }
238:
239: public void removeNode(String nodeID) {
240: nodes.remove(nodeID);
241: }
242:
243: public Node getNode(String nodeID) {
244: return nodes.get(nodeID);
245: }
246:
247: public Collection<Node> getNodes() {
248: return nodes.values();
249: }
250:
251: public CollectionNode getRootCollectionNode() {
252: return rootCollectionNode;
253: }
254:
255: public JID getAddress() {
256: return new JID(serviceOwnerJID);
257: }
258:
259: public String getServiceID() {
260: // The bare JID of the user is the service ID for PEP
261: return serviceOwnerJID;
262: }
263:
264: public DefaultNodeConfiguration getDefaultNodeConfiguration(
265: boolean leafType) {
266: if (leafType) {
267: return leafDefaultConfiguration;
268: }
269: return collectionDefaultConfiguration;
270: }
271:
272: public Collection<String> getShowPresences(JID subscriber) {
273: return PubSubEngine.getShowPresences(this , subscriber);
274: }
275:
276: public boolean canCreateNode(JID creator) {
277: // Node creation is always allowed for sysadmin
278: if (isNodeCreationRestricted() && !isServiceAdmin(creator)) {
279: // The user is not allowed to create nodes
280: return false;
281: }
282: return true;
283: }
284:
285: /**
286: * Returns true if the the prober is allowed to see the presence of the probee.
287: *
288: * @param prober the user that is trying to probe the presence of another user.
289: * @param probee the username of the uset that is being probed.
290: * @return true if the the prober is allowed to see the presence of the probee.
291: * @throws UserNotFoundException If the probee does not exist in the local server or the prober
292: * is not present in the roster of the probee.
293: */
294: private boolean canProbePresence(JID prober, JID probee)
295: throws UserNotFoundException {
296: Roster roster;
297: roster = XMPPServer.getInstance().getRosterManager().getRoster(
298: prober.getNode());
299: RosterItem item = roster.getRosterItem(probee);
300:
301: if (item.getSubStatus() == RosterItem.SUB_BOTH
302: || item.getSubStatus() == RosterItem.SUB_FROM) {
303: return true;
304: }
305:
306: return false;
307: }
308:
309: public boolean isCollectionNodesSupported() {
310: return true;
311: }
312:
313: public boolean isInstantNodeSupported() {
314: return true;
315: }
316:
317: public boolean isMultipleSubscriptionsEnabled() {
318: return false;
319: }
320:
321: public boolean isServiceAdmin(JID user) {
322: // Here we consider a 'service admin' to be the user that this PEPService
323: // is associated with.
324: if (serviceOwnerJID.equals(user.toBareJID())) {
325: return true;
326: } else {
327: return false;
328: }
329: }
330:
331: public boolean isNodeCreationRestricted() {
332: return nodeCreationRestricted;
333: }
334:
335: public void presenceSubscriptionNotRequired(Node node, JID user) {
336: PubSubEngine.presenceSubscriptionNotRequired(this , node, user);
337: }
338:
339: public void presenceSubscriptionRequired(Node node, JID user) {
340: PubSubEngine.presenceSubscriptionRequired(this , node, user);
341: }
342:
343: public void send(Packet packet) {
344: router.route(packet);
345: }
346:
347: public void broadcast(Node node, Message message,
348: Collection<JID> jids) {
349: message.setFrom(getAddress());
350: for (JID jid : jids) {
351: message.setTo(jid);
352: message.setID(node.getNodeID() + "__" + jid.toBareJID()
353: + "__" + StringUtils.randomString(5));
354: router.route(message);
355: }
356: }
357:
358: public void sendNotification(Node node, Message message,
359: JID recipientJID) {
360: message.setTo(recipientJID);
361: message.setFrom(getAddress());
362: message.setID(node.getNodeID() + "__"
363: + recipientJID.toBareJID() + "__"
364: + StringUtils.randomString(5));
365:
366: // If the recipient subscribed with a bare JID and this PEPService can retrieve
367: // presence information for the recipient, collect all of their full JIDs and
368: // send the notification to each below.
369: Set<JID> recipientFullJIDs = new HashSet<JID>();
370: if (XMPPServer.getInstance().isLocal(recipientJID)) {
371: if (recipientJID.getResource() == null) {
372: for (ClientSession clientSession : SessionManager
373: .getInstance().getSessions(
374: recipientJID.getNode())) {
375: recipientFullJIDs.add(clientSession.getAddress());
376: }
377: }
378: } else {
379: // Since recipientJID is not local, try to get presence info from cached known remote
380: // presences.
381: Map<String, Set<JID>> knownRemotePresences = XMPPServer
382: .getInstance().getIQPEPHandler()
383: .getKnownRemotePresenes();
384:
385: Set<JID> remotePresenceSet = knownRemotePresences
386: .get(getAddress().toBareJID());
387: if (remotePresenceSet != null) {
388: for (JID remotePresence : remotePresenceSet) {
389: if (recipientJID.toBareJID().equals(
390: remotePresence.toBareJID())) {
391: recipientFullJIDs.add(remotePresence);
392: }
393: }
394: }
395: }
396:
397: if (recipientFullJIDs.isEmpty()) {
398: router.route(message);
399: return;
400: }
401:
402: for (JID recipientFullJID : recipientFullJIDs) {
403: // Include an Extended Stanza Addressing "replyto" extension specifying the publishing
404: // resource. However, only include the extension if the receiver has a presence subscription
405: // to the service owner.
406: try {
407: JID publisher = null;
408:
409: // Get the ID of the node that had an item published to or retracted from.
410: Element itemsElement = message.getElement().element(
411: "event").element("items");
412: String nodeID = itemsElement.attributeValue("node");
413:
414: // Get the ID of the item that was published or retracted.
415: String itemID = null;
416: Element itemElement = itemsElement.element("item");
417: if (itemElement == null) {
418: Element retractElement = itemsElement
419: .element("retract");
420: if (retractElement != null) {
421: itemID = retractElement.attributeValue("id");
422: }
423: } else {
424: itemID = itemElement.attributeValue("id");
425: }
426:
427: // Check if the recipientFullJID is interested in notifications for this node.
428: // If the recipient has not yet requested any notification filtering, continue and send
429: // the notification.
430: EntityCapabilities entityCaps = entityCapsManager
431: .getEntityCapabilities(recipientFullJID);
432: if (entityCaps != null) {
433: if (!entityCaps.containsFeature(nodeID + "+notify")) {
434: return;
435: }
436: }
437:
438: // Get the full JID of the item publisher from the node that was published to.
439: // This full JID will be used as the "replyto" address in the addressing extension.
440: if (node.isCollectionNode()) {
441: for (Node leafNode : node.getNodes()) {
442: if (leafNode.getNodeID().equals(nodeID)) {
443: publisher = leafNode.getPublishedItem(
444: itemID).getPublisher();
445:
446: // Ensure the recipientJID has access to receive notifications for items published to the leaf node.
447: AccessModel accessModel = leafNode
448: .getAccessModel();
449: if (!accessModel.canAccessItems(leafNode,
450: recipientFullJID, publisher)) {
451: return;
452: }
453:
454: break;
455: }
456: }
457: } else {
458: publisher = node.getPublishedItem(itemID)
459: .getPublisher();
460: }
461:
462: // Ensure the recipient is subscribed to the service owner's (publisher's) presence.
463: if (canProbePresence(publisher, recipientFullJID)) {
464: Element addresses = DocumentHelper
465: .createElement(QName
466: .get("addresses",
467: "http://jabber.org/protocol/address"));
468: Element address = addresses.addElement("address");
469: address.addAttribute("type", "replyto");
470: address.addAttribute("jid", publisher.toString());
471:
472: Message extendedMessage = message.createCopy();
473: extendedMessage.addExtension(new PacketExtension(
474: addresses));
475:
476: extendedMessage.setTo(recipientFullJID);
477: router.route(extendedMessage);
478: }
479: } catch (IndexOutOfBoundsException e) {
480: // Do not add addressing extension to message.
481: } catch (UserNotFoundException e) {
482: // Do not add addressing extension to message.
483: router.route(message);
484: } catch (NullPointerException e) {
485: try {
486: if (canProbePresence(getAddress(), recipientFullJID)) {
487: message.setTo(recipientFullJID);
488: }
489: } catch (UserNotFoundException e1) {
490: // Do nothing
491: }
492: router.route(message);
493: }
494: }
495: }
496:
497: /**
498: * Sends an event notification for the last published item of each leaf node under the
499: * root collection node to the recipient JID. If the recipient has no subscription to
500: * the root collection node, has not yet been authorized, or is pending to be
501: * configured -- then no notifications are going to be sent.<p>
502: *
503: * Depending on the subscription configuration the event notifications may or may not have
504: * a payload, may not be sent if a keyword (i.e. filter) was defined and it was not matched.
505: *
506: * @param recipientJID the recipient that is to receive the last published item notifications.
507: */
508: public void sendLastPublishedItems(JID recipientJID) {
509: // Ensure the recipient has a subscription to this service's root collection node.
510: NodeSubscription subscription = rootCollectionNode
511: .getSubscription(recipientJID);
512: if (subscription == null) {
513: subscription = rootCollectionNode.getSubscription(new JID(
514: recipientJID.toBareJID()));
515: }
516: if (subscription == null) {
517: return;
518: }
519:
520: // Send the last published item of each leaf node to the recipient.
521: for (Node leafNode : rootCollectionNode.getNodes()) {
522: // Retrieve last published item for the leaf node.
523: PublishedItem leafLastPublishedItem = null;
524: leafLastPublishedItem = leafNode.getLastPublishedItem();
525: if (leafLastPublishedItem == null) {
526: continue;
527: }
528:
529: // Check if the published item can be sent to the subscriber
530: if (!subscription.canSendPublicationEvent(
531: leafLastPublishedItem.getNode(),
532: leafLastPublishedItem)) {
533: return;
534: }
535:
536: // Send event notification to the subscriber
537: Message notification = new Message();
538: Element event = notification.getElement().addElement(
539: "event", "http://jabber.org/protocol/pubsub#event");
540: Element items = event.addElement("items");
541: items.addAttribute("node", leafLastPublishedItem.getNode()
542: .getNodeID());
543: Element item = items.addElement("item");
544: if (((LeafNode) leafLastPublishedItem.getNode())
545: .isItemRequired()) {
546: item.addAttribute("id", leafLastPublishedItem.getID());
547: }
548: if (leafLastPublishedItem.getNode().isPayloadDelivered()
549: && leafLastPublishedItem.getPayload() != null) {
550: item.add(leafLastPublishedItem.getPayload()
551: .createCopy());
552: }
553: // Add a message body (if required)
554: if (subscription.isIncludingBody()) {
555: notification
556: .setBody(LocaleUtils
557: .getLocalizedString("pubsub.notification.message.body"));
558: }
559: // Include date when published item was created
560: notification.getElement().addElement("x", "jabber:x:delay")
561: .addAttribute(
562: "stamp",
563: fastDateFormat.format(leafLastPublishedItem
564: .getCreationDate()));
565: // Send the event notification to the subscriber
566: this .sendNotification(subscription.getNode(), notification,
567: subscription.getJID());
568: }
569: }
570:
571: public void queueItemToAdd(PublishedItem newItem) {
572: PubSubEngine.queueItemToAdd(this , newItem);
573:
574: }
575:
576: public void queueItemToRemove(PublishedItem removedItem) {
577: PubSubEngine.queueItemToRemove(this , removedItem);
578:
579: }
580:
581: public Map<String, Map<String, String>> getBarePresences() {
582: return barePresences;
583: }
584:
585: public Queue<PublishedItem> getItemsToAdd() {
586: return itemsToAdd;
587: }
588:
589: public Queue<PublishedItem> getItemsToDelete() {
590: return itemsToDelete;
591: }
592:
593: public AdHocCommandManager getManager() {
594: return adHocCommandManager;
595: }
596:
597: public PublishedItemTask getPublishedItemTask() {
598: return publishedItemTask;
599: }
600:
601: public void setPublishedItemTask(PublishedItemTask task) {
602: publishedItemTask = task;
603: }
604:
605: public Timer getTimer() {
606: return timer;
607: }
608:
609: public int getItemsTaskTimeout() {
610: return items_task_timeout;
611: }
612:
613: public void setItemsTaskTimeout(int timeout) {
614: items_task_timeout = timeout;
615: }
616:
617: }
|