001: // Informa -- RSS Library for Java
002: // Copyright (c) 2002 by Niko Schmuck
003: //
004: // Niko Schmuck
005: // http://sourceforge.net/projects/informa
006: // mailto:niko_schmuck@users.sourceforge.net
007: //
008: // This library is free software.
009: //
010: // You may redistribute it and/or modify it under the terms of the GNU
011: // Lesser General Public License as published by the Free Software Foundation.
012: //
013: // Version 2.1 of the license should be included with this distribution in
014: // the file LICENSE. If the license is not included with this distribution,
015: // you may find a copy at the FSF web site at 'www.gnu.org' or 'www.fsf.org',
016: // or you may write to the Free Software Foundation, 675 Mass Ave, Cambridge,
017: // MA 02139 USA.
018: //
019: // This library is distributed in the hope that it will be useful,
020: // but WITHOUT ANY WARRANTY; without even the implied waranty of
021: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
022: // Lesser General Public License for more details.
023: //
024:
025: // $Id: PersistChanGrpMgr.java,v 1.27 2006/12/04 23:43:27 italobb Exp $
026:
027: package de.nava.informa.utils;
028:
029: import java.util.*;
030:
031: import org.hibernate.*;
032:
033: import org.apache.commons.logging.Log;
034: import org.apache.commons.logging.LogFactory;
035:
036: import de.nava.informa.core.ChannelBuilderException;
037: import de.nava.informa.impl.hibernate.*;
038:
039: /**
040: * PersistChanGrpMgr - Controls and Manages a single Hibernate based Informa
041: * ChannelGroup. Provides for threaded Updating of the Channel Group,
042: * persistence management, session management etc.
043: *
044: * N O T T H R E A D S A F E
045: *
046: */
047: public class PersistChanGrpMgr {
048:
049: private static final int DEFAULT_STARTDELAY = 1 * 1000; // ms
050: private static final int DEFAULT_PERIOD = 10 * 60 * 1000; // ms
051: private static final int DEFAULT_ACCEPTERRORS = 10;
052:
053: private static final int DBG_DEFAULT_STARTDELAY = 100; // ms
054: private static final int DBG_DEFAULT_PERIOD = 20000; // ms
055: private static final int DBG_DEFAULT_ACCEPTERRORS = 10;
056:
057: private static Log logger = LogFactory
058: .getLog(PersistChanGrpMgr.class);
059: private ChannelBuilder builder;
060: private ChannelGroup group;
061: private SessionHandler handler;
062: private PersistChanGrpMgrObserverIF globalChannelObserver;
063: private boolean activated = false;
064: private PersistChanGrpMgrTask task;
065: private int pollingCounter;
066:
067: int taskStartDelay;
068: int taskPeriod;
069: int acceptNrErrors;
070:
071: /**
072: * Constructor.
073: *
074: * @param handler - SessionHandler to use. This needs to have been built by caller.
075: *
076: * @param debug - - true will run this in debug mode, which basically means that threads are run
077: * with no delays thereby revealing threading bugs.
078: */
079: public PersistChanGrpMgr(SessionHandler handler, boolean debug) {
080: if (handler == null)
081: throw new IllegalStateException("Invalid handler");
082: this .handler = handler;
083: builder = new ChannelBuilder(handler);
084: pollingCounter = 0;
085:
086: if (debug) {
087: taskStartDelay = DBG_DEFAULT_STARTDELAY;
088: taskPeriod = DBG_DEFAULT_PERIOD;
089: acceptNrErrors = DBG_DEFAULT_ACCEPTERRORS;
090: } else {
091: taskStartDelay = DEFAULT_STARTDELAY;
092: taskPeriod = DEFAULT_PERIOD;
093: acceptNrErrors = DEFAULT_ACCEPTERRORS;
094: }
095: }
096:
097: /**
098: * Called to create a Group.
099: *
100: * @param name - Text name of the group
101: * @return - Channel Group being managed by this PersistChanGrpMgr
102: */
103: public ChannelGroup createGroup(String name) {
104: logger.debug("Creating Persistent Group: " + name);
105: if (group != null)
106: throw new IllegalStateException(
107: "Can't call createGroup twice in a row.");
108: if (activated)
109: throw new IllegalStateException(
110: "Can't create groups while activated.");
111:
112: ChannelGroup result = null;
113: synchronized (builder) {
114: result = findChannelGroup(name);
115: if (result == null) {
116: try {
117: builder.beginTransaction();
118: result = (ChannelGroup) builder
119: .createChannelGroup(name);
120: builder.endTransaction();
121: } catch (ChannelBuilderException e) {
122: try {
123: builder.endTransaction();
124: } catch (ChannelBuilderException e1) {
125: e1.printStackTrace();
126: }
127: e.printStackTrace();
128: }
129: }
130: group = result;
131: }
132: logger.info("createGroup(\"" + name + "\" yielded: " + result);
133: return result;
134: }
135:
136: /**
137: * Deletes persistent group.
138: */
139: public void deleteGroup() {
140: if (group == null)
141: return;
142: logger.debug("Deleting Persistent Group: " + group.getTitle());
143:
144: synchronized (builder) {
145: try {
146: builder.beginTransaction();
147: builder.reload(group);
148:
149: // Remove group from links with channels
150: Channel[] chans = (Channel[]) group.getChannels()
151: .toArray(new Channel[0]);
152: for (int i = 0; i < chans.length; i++) {
153: Channel chan = chans[i];
154:
155: final Set<ChannelGroup> grps = chan.getGroups();
156: grps.remove(group);
157: group.getChannels().remove(chan);
158:
159: // Delete channel if it was the last group it was assigned to
160: if (grps.size() == 0)
161: builder.delete(chan);
162: }
163:
164: builder.delete(group);
165:
166: builder.endTransaction();
167: group = null;
168: } catch (ChannelBuilderException e) {
169: logger.error("Unable to delete Persistent Group: "
170: + e.getMessage());
171: builder.resetTransaction();
172: }
173: }
174: }
175:
176: /**
177: * Check if this PersistChanGrp has specified CHannel as a member already
178: *
179: * @param achannel - candidate channel to check
180: * @return TRUE = yes
181: */
182: public boolean hasChannel(final Channel achannel) {
183: return group.getChannels().contains(achannel);
184: }
185:
186: /**
187: * Add a channel to this Persisten Channel Group. If Channel already exists then just add it,
188: * if it doesn't then create it and add it.
189: *
190: * @param url the url of the rss feed
191: * @return Channel so created or located
192: */
193: public Channel addChannel(String url) {
194: if (activated)
195: throw new IllegalStateException(
196: "can't add Channels while activated.");
197: Channel achannel = null;
198: synchronized (builder) {
199: try {
200: builder.beginTransaction();
201: builder.reload(group);
202:
203: achannel = findChannel(url);
204: if (achannel == null) { // Channel is not in the database
205: achannel = newChannel(url);
206: logger.debug("Added New Channel: " + url);
207: } else { // Channel is in the database, but it may not already be in this group
208: if (!hasChannel(achannel)) {
209: logger.debug("Loaded existing channel" + url);
210: group.add(achannel);
211: achannel.getGroups().add(group);
212: }
213: }
214: builder.endTransaction();
215: } catch (Exception e) {
216: e.printStackTrace();
217: builder.resetTransaction();
218: }
219: return achannel;
220: }
221: }
222:
223: /**
224: * Move a Channel from this PersistentChannelGroup to a different one
225: *
226: * @param channel channel in this PersistentChannelGroup that is being moved.
227: * @param destGrp destination where the Channel is going to
228: */
229: public void moveChannelTo(Channel channel, PersistChanGrpMgr destGrp) {
230: if (activated || destGrp.isActivated())
231: throw new IllegalStateException(
232: "can't move Channels while activated.");
233: synchronized (builder) {
234: try {
235: builder.beginTransaction();
236: builder.reload(group);
237: builder.reload(channel);
238: ChannelGroup dstGroup = builder.reload(destGrp
239: .getChannelGroup());
240:
241: group.remove(channel);
242: channel.getGroups().remove(group);
243:
244: dstGroup.add(channel);
245: channel.getGroups().add(dstGroup);
246:
247: builder.endTransaction();
248: } catch (Exception e) {
249: e.printStackTrace();
250: builder.resetTransaction();
251: }
252: }
253: }
254:
255: /**
256: * Delete specified channel from this PersistChanGrpMgr. Status indicates whether Channel was
257: * previously part of this group.
258: *
259: * @param channel - Channel being removed from the Group.
260: * @return true if channel was deleted, false if channel was not a member to begin with
261: */
262: public boolean deleteChannel(Channel channel) {
263: boolean result = false;
264: if (activated)
265: throw new IllegalStateException(
266: "can't delete Channels while activated.");
267: synchronized (builder) {
268: try {
269: builder.beginTransaction();
270: builder.reload(group);
271: builder.reload(channel);
272:
273: if (hasChannel(channel)) {
274: group.remove(channel);
275: channel.getGroups().remove(group);
276: builder.delete(channel);
277:
278: result = true;
279: }
280:
281: builder.endTransaction();
282: } catch (Exception e) {
283: e.printStackTrace();
284: builder.resetTransaction();
285: }
286: }
287:
288: return result;
289: }
290:
291: /**
292: * Delete specified item from specified Channel
293: *
294: * @param channel - Channel to delete from
295: * @param item - Item to delete from that channel
296: *
297: * @return number of items left in the channel AFTER the deletion.
298: */
299: public int deleteItemFromChannel(Channel channel, Item item) {
300: if (activated)
301: throw new IllegalStateException(
302: "can't delete Items while activated");
303: int result = 0;
304: synchronized (builder) {
305: try {
306: builder.beginTransaction();
307: builder.reload(channel);
308: builder.reload(item);
309:
310: channel.removeItem(item);
311: builder.delete(item);
312:
313: result = channel.getItems().size();
314:
315: builder.endTransaction();
316: } catch (ChannelBuilderException e) {
317: e.printStackTrace();
318: builder.resetTransaction();
319: }
320: }
321: return result;
322: }
323:
324: /**
325: * Return number of Items currently in specified Channel
326: *
327: * @param channel Channel to query
328: * @return number of Items
329: */
330: public int getItemCount(Channel channel) {
331: if (activated)
332: throw new IllegalStateException(
333: "can't count Items while activated");
334: int result = 0;
335: synchronized (builder) {
336: try {
337: builder.beginTransaction();
338: builder.reload(channel);
339:
340: result = channel.getItems().size();
341:
342: builder.endTransaction();
343: } catch (ChannelBuilderException e) {
344: e.printStackTrace();
345: builder.resetTransaction();
346: }
347: }
348: return result;
349: }
350:
351: /*
352: * Notification handlers.
353: *
354: * With persistent Channels we have an alternate notification mechanism because keeping the
355: * observer in the ChannelIF doesn't work because that doesn't get persisted so the setting is
356: * lost between sessions. We might want to consider rearchitecting and not storing the observers
357: * in the ChannelIFs at all. Same goes for the Items.
358: */
359:
360: /**
361: * notifyChannelsAndItems - Notify both item and channel listeners for a channel and all its
362: * items. This is useful if the client wants to treat a Channel that was recently read in by
363: * hibernate in a consistent way with listeners.
364: *
365: * @param channel - Relevant channel.
366: */
367: public void notifyChannelsAndItems(Channel channel) {
368: synchronized (builder) {
369: try {
370: builder.beginTransaction();
371: builder.reload(channel);
372:
373: notifyChannelRetrieved(channel);
374: notifyItems(channel);
375:
376: builder.endTransaction();
377: } catch (ChannelBuilderException e) {
378: e.printStackTrace();
379: builder.resetTransaction();
380: }
381: }
382: }
383:
384: /**
385: * Send notifications for all the items of this channel that they have been added.
386: *
387: * @param channelHandle -
388: */
389: public void notifyItems(Channel channelHandle) {
390: if (globalChannelObserver != null) {
391: Iterator iterChan = channelHandle.getItems().iterator();
392: while (iterChan.hasNext()) {
393: notifyItemAdded((Item) iterChan.next());
394: }
395: }
396: }
397:
398: /**
399: * notifyChannelsAndItems - Call notifyChannelAndItems(channels) across all channels in this
400: * PersistentChanGrpMgr.
401: */
402: public void notifyChannelsAndItems() {
403: Iterator chanIter = group.getChannels().iterator();
404: while (chanIter.hasNext()) {
405: notifyChannelsAndItems((Channel) chanIter.next());
406: }
407: }
408:
409: /**
410: * Send notifications about all Channels in this group (but not their items) -
411: */
412: public void notifyChannels() {
413: // Iterator chanIter = group.getChannels().iterator();
414: Iterator chanIter = channelIterator();
415: while (chanIter.hasNext()) {
416: notifyChannelRetrieved((Channel) chanIter.next());
417: }
418: }
419:
420: /**
421: * Send notification that specified channel was retrieved.
422: *
423: * @param chan -
424: */
425: public void notifyChannelRetrieved(Channel chan) {
426: if (globalChannelObserver != null) {
427: try {
428: globalChannelObserver.channelRetrieved(chan);
429: } catch (Exception e) {
430: // We don't need any troubles with observer exceptions.
431: logger.error(e.getMessage(), e);
432: }
433: }
434: }
435:
436: /**
437: * Send notification that specified item was retrieved.
438: *
439: * @param newItem -
440: */
441: public void notifyItemAdded(Item newItem) {
442: if (globalChannelObserver != null) {
443: try {
444: globalChannelObserver.itemAdded(newItem);
445: } catch (Exception e) {
446: // We don't need any troubles with observer exceptions.
447: logger.error(e.getMessage(), e);
448: }
449: }
450: }
451:
452: /**
453: * Notify that the PersistChanGrpMgrTask is currently in the middle of its 'run()' method.
454: *
455: * @param isPolling true - start polling, false- end
456: */
457: public void notifyPolling(boolean isPolling) {
458: if (globalChannelObserver != null) {
459: try {
460: globalChannelObserver.pollingNow(group.getTitle(),
461: pollingCounter, isPolling);
462: } catch (Exception e) {
463: // We don't need any troubles with observer interrupt our polling.
464: logger.error(e.getMessage(), e);
465: }
466: }
467: }
468:
469: /**
470: * Setup the one and only Global observer. Note this is not an observer chain, but just a single
471: * one.
472: *
473: * @param obser Observer to register
474: */
475: public void setGlobalObserver(PersistChanGrpMgrObserverIF obser) {
476: globalChannelObserver = obser;
477: }
478:
479: /**
480: * activate -
481: * -
482: */
483: public synchronized void activate() {
484: if (activated)
485: return;
486:
487: task = new PersistChanGrpMgrTask(this , taskPeriod);
488: task.start();
489:
490: activated = true;
491: }
492:
493: /**
494: * Simply return whether we are currently activated (that is, running the tasks that download and
495: * process RSS. Will also return true if the PersistChanGrpMgrTask is still in the middle of
496: * finishing.
497: *
498: * @return true = activated
499: */
500: public boolean isActivated() {
501: return activated || (task != null && task.isRunning());
502: }
503:
504: /**
505: * Bump up polling counter by one.
506: *
507: */
508: public void incrPollingCounter() {
509: pollingCounter++;
510: }
511:
512: /**
513: * Return how many times the task has polled the feed since this PersistChanGrp was built
514: *
515: * @return polling count so far
516: */
517: public int getPollingCounter() {
518: return pollingCounter;
519: }
520:
521: /**
522: * Interrupts the update task and return immediately. Do not waits for task to stop.
523: */
524: public synchronized void deActivate() {
525: deActivate(false);
526: }
527:
528: /**
529: * Interrupts the update task and return immediately. Waits for task to finish
530: * if <code>waitForFinish</code> argument set.
531: *
532: * @param waitForFinish TRUE to wait until task actually finishes.
533: */
534: public synchronized void deActivate(final boolean waitForFinish) {
535: if (task != null)
536: logger.debug("deActivate(" + task.getName() + ") "
537: + activated);
538: else
539: logger.debug("deActivate task = null");
540:
541: if (!activated)
542: return;
543:
544: task.interrupt(waitForFinish);
545:
546: activated = false;
547: }
548:
549: /**
550: * Change parameters of how this PersistChanGrpMgr works. Only allowed when this PersistChanGrp is
551: * inactive.
552: *
553: * @param startDel ms before starting (-1 means don't change.)
554: * @param period ms between iterations (-1 means don't change.)
555: * @param acceptErr number of errors before putting a channel offline (-1 means don't change)
556: */
557: public void setParams(final int startDel, final int period,
558: final int acceptErr) {
559: if (activated)
560: throw new IllegalStateException(
561: "can't setParams while activated");
562: if (startDel != -1)
563: taskStartDelay = startDel;
564: if (period != -1)
565: taskPeriod = period;
566: if (acceptErr != -1)
567: acceptNrErrors = acceptErr;
568: }
569:
570: /**
571: * Create an iterator to iterate across all the channels in this group.
572: * @return the iterator
573: */
574: public Iterator channelIterator() {
575: Iterator ret = null;
576: synchronized (builder) {
577: try {
578: builder.beginTransaction();
579: builder.reload(group);
580:
581: ret = group.getAll().iterator();
582:
583: builder.endTransaction();
584: } catch (Exception e) {
585: e.printStackTrace();
586: }
587: }
588: return ret;
589: }
590:
591: /**
592: * Get currently associated ChannelBuilder
593: *
594: * @return the current cb
595: */
596: public ChannelBuilder getBuilder() {
597: return builder;
598: }
599:
600: /**
601: * Get currently assocaited ChannelGrouo
602: *
603: * @return the cg
604: */
605: public ChannelGroup getChannelGroup() {
606: return group;
607: }
608:
609: /**
610: * Get currently assocaited SessionHandler
611: *
612: * @return the sh
613: */
614: public SessionHandler getHandler() {
615: return handler;
616: }
617:
618: /**
619: * @return acceptable number of errors
620: */
621: public int getAcceptNrErrors() {
622: return acceptNrErrors;
623: }
624:
625: /**
626: * Return nicely formatted string for this object
627: *
628: * @return - the string
629: */
630: public String toString() {
631: String result = "";
632: synchronized (builder) {
633: try {
634: builder.beginTransaction();
635: builder.reload(group);
636:
637: result = group.getTitle() + "["
638: + group.getChannels().size() + "]";
639:
640: builder.endTransaction();
641: } catch (Exception e) {
642: e.printStackTrace();
643: }
644: }
645: return result;
646: }
647:
648: /**
649: * newChannel -
650: *
651: * @param url
652: * @return -
653: */
654: private Channel newChannel(String url) {
655: Channel channel;
656: synchronized (builder) {
657: channel = (Channel) builder
658: .createChannel("[uninitialized channel]");
659: }
660: channel.setLocationString(url);
661:
662: group.add(channel);
663: channel.getGroups().add(group);
664:
665: return channel;
666: }
667:
668: /**
669: * Search for a Channel for the indicated url (i.e. the site url or xml feed url)
670: *
671: * N.B: This method assumes that a builder.beginTransaction() has been performed.
672: *
673: * @param url
674: * @return - Channel or null if none found
675: */
676: private Channel findChannel(String url) {
677: Channel achan = null;
678: synchronized (builder) {
679: Session sess = builder.getSession();
680: try {
681: Query q = sess
682: .createQuery("from Channel chan where chan.locationString = :url order by chan.id desc");
683: q.setParameter("url", url, Hibernate.STRING);
684: final List channels = q.list();
685:
686: // List channels will contain 0 or more Channels in the database for said url.
687:
688: final int size = channels.size();
689: if (size > 0) {
690: if (size > 1) {
691: logger.error("Multiple Channels for " + url
692: + " found.");
693: }
694: achan = (Channel) channels.get(0);
695: }
696: } catch (HibernateException e) {
697: achan = null;
698: e.printStackTrace();
699: }
700: }
701:
702: logger.info("findChannel: " + url + "->" + achan);
703: return achan;
704: }
705:
706: /**
707: * Search for the indicated Channel Group.
708: *
709: * @param name - Name of ChannelGroup to locate in database
710: * @return - ChannelGroup or null if none found by that name
711: */
712: private ChannelGroup findChannelGroup(String name) {
713: ChannelGroup result = null;
714: synchronized (builder) {
715: try {
716: builder.beginTransaction();
717:
718: final Session sess = builder.getSession();
719: final Query q = sess
720: .createQuery("from ChannelGroup as grp where grp.title = :title");
721: q.setParameter("title", name, Hibernate.STRING);
722: final List results = q.list();
723:
724: final int size = results.size();
725: if (size > 0) {
726: if (size > 1) {
727: logger.error("Multiple Channel Groups called "
728: + name + " found.");
729: }
730: result = (ChannelGroup) results.get(0);
731: }
732:
733: builder.endTransaction();
734: } catch (Exception e) {
735: e.printStackTrace();
736: builder.resetTransaction();
737: }
738: }
739:
740: logger.info("findChannelGroup: " + name + "->" + result);
741: return result;
742: }
743: }
|