001: package org.enhydra.shark.corba.poa;
002:
003: import java.util.HashMap;
004: import java.util.Map;
005:
006: import javax.transaction.UserTransaction;
007:
008: import org.omg.CORBA.ORB;
009: import org.omg.WorkflowModel.WfRequester;
010:
011: /**
012: * WfRequesterImpl - Workflow Requester implementation
013: */
014: public class WfLinkingRequesterForCORBA implements
015: org.enhydra.shark.api.client.wfmodel.WfRequester {
016:
017: // transient private Collective __collective;
018: private ORB orb;
019:
020: static ORB the_orb;
021:
022: static Map requesters = new HashMap();
023:
024: static Map collectives = new HashMap();
025:
026: static boolean ignoreProblematicRequester = true;
027:
028: public WfLinkingRequesterForCORBA() {
029:
030: }
031:
032: public WfLinkingRequesterForCORBA(ORB orb) {
033: this .orb = orb;
034: }
035:
036: public static void setOrb(ORB the_Orb) {
037: the_orb = the_Orb;
038: }
039:
040: public static synchronized void setIgnoreProblematicRequesterProcess(
041: boolean ignore) {
042: ignoreProblematicRequester = ignore;
043: }
044:
045: public static synchronized void setCORBARequester(String procId,
046: WfRequester myCORBARequester) {
047: requesters.put(procId, myCORBARequester);
048: }
049:
050: /*
051: * public WfLinkingRequesterForCORBA(ORB orb, Collective toJoin, WfRequester
052: * myCORBARequester) { // if (WfLinkingRequesterForCORBA.myServer==null) { //
053: * WfLinkingRequesterForCORBA.myServer=server; // } this.orb = orb; __collective =
054: * toJoin; this.myCORBARequester = myCORBARequester; }
055: */
056: public static synchronized void removeCORBARequester(String procId) {
057: requesters.remove(procId);
058:
059: }
060:
061: public static WfRequester getCORBARequester(String procId) {
062: return (WfRequester) requesters.get(procId);
063: }
064:
065: public static synchronized void addCollective(String pId,
066: Collective c) {
067: collectives.put(pId, c);
068: }
069:
070: public static synchronized void emptyCollective(String pId, ORB orb) {
071: Collective c = (Collective) collectives.remove(pId);
072: if (c != null) {
073: c.__disband(orb);
074: }
075: }
076:
077: /**
078: * Gets the number of processes.
079: */
080: public int how_many_performer()
081: throws org.enhydra.shark.api.client.wfbase.BaseException {
082: throw new org.enhydra.shark.api.client.wfbase.BaseException();
083: }
084:
085: /**
086: * Gets an iterator of processes.
087: */
088: public org.enhydra.shark.api.client.wfmodel.WfProcessIterator get_iterator_performer()
089: throws org.enhydra.shark.api.client.wfbase.BaseException {
090: throw new org.enhydra.shark.api.client.wfbase.BaseException();
091: }
092:
093: /**
094: * A list of processes
095: *
096: * @return List of WfProcess objects.
097: */
098: public org.enhydra.shark.api.client.wfmodel.WfProcess[] get_sequence_performer(
099: int max_number)
100: throws org.enhydra.shark.api.client.wfbase.BaseException {
101: throw new org.enhydra.shark.api.client.wfbase.BaseException();
102: }
103:
104: /**
105: * Checks if a WfProcess is associated with this requester object
106: *
107: * @return true if the process is found.
108: */
109: public boolean is_member_of_performer(
110: org.enhydra.shark.api.client.wfmodel.WfProcess member)
111: throws org.enhydra.shark.api.client.wfbase.BaseException {
112: throw new org.enhydra.shark.api.client.wfbase.BaseException();
113: }
114:
115: public void receive_event(
116: org.enhydra.shark.api.client.wfmodel.WfEventAudit event)
117: throws org.enhydra.shark.api.client.wfbase.BaseException,
118: org.enhydra.shark.api.client.wfmodel.InvalidPerformer {
119: String procId = null;
120: boolean ne = false;
121: boolean excH = false;
122: UserTransaction ut = null;
123: boolean e = SharkCORBAServer.doesTransactionExist();
124: try {
125: ut = SharkCORBAUtilities.beginTransaction(e);
126: procId = event.process_key();
127: WfRequester req = WfLinkingRequesterForCORBA
128: .getCORBARequester(procId);
129: if (req != null) {
130: try {
131: ne = req._non_existent();
132: } catch (Exception ex) {
133: ne = true;
134: }
135: if (!ne) {
136: Collective __collective = new Collective.CollectiveCORBA();
137: WfLinkingRequesterForCORBA.addCollective(procId,
138: __collective);
139: if (orb == null)
140: orb = the_orb;
141: req.receive_event(SharkCORBAUtilities
142: .makeCORBAEventAudit(__collective, event));
143: System.out
144: .println("External requester for process "
145: + procId + " notified.");
146: } else {
147: String msg = "External CORBA requester for process "
148: + procId
149: + " can't be found due to some network problem or client application shutdown";
150: if (ignoreProblematicRequester) {
151: msg += " - it'll be further ignored!";
152: } else {
153: msg += "!";
154: }
155: System.out.println(msg);
156: }
157: } else {
158: String msg = "Can't find CORBA external requester for process "
159: + procId
160: + ". It might be caused by shark CORBA server shutdown, shark cluster usage,...";
161: if (ignoreProblematicRequester) {
162: msg += " or because shark CORBA setup to ignore requester notification if some problem previously occured.";
163: }
164: System.out.println(msg);
165: }
166: SharkCORBAUtilities.commitTransaction(ut, e);
167: } catch (Throwable ex) {
168: SharkCORBAUtilities.rollbackTransaction(ut, e);
169: String msg = "Problem accessing CORBA external requester for process "
170: + procId
171: + ". Problem could be caused by some network problem or some problem in the implementation of requester's code.";
172: if (ignoreProblematicRequester) {
173: msg += " - it'll be further ignored!";
174: } else {
175: msg += "!";
176: }
177: System.out.println(msg);
178: excH = true;
179: }
180: if (ignoreProblematicRequester && (ne || excH)) {
181: WfLinkingRequesterForCORBA.removeCORBARequester(procId);
182: }
183:
184: }
185:
186: }
|