001: package org.drools.common;
002:
003: /*
004: * Copyright 2005 JBoss Inc
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: */
018:
019: import java.io.ByteArrayInputStream;
020: import java.io.ByteArrayOutputStream;
021: import java.io.Externalizable;
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.io.ObjectInput;
025: import java.io.ObjectOutput;
026: import java.io.ObjectOutputStream;
027: import java.io.Serializable;
028: import java.util.ArrayList;
029: import java.util.Collections;
030: import java.util.HashMap;
031: import java.util.HashSet;
032: import java.util.Iterator;
033: import java.util.List;
034: import java.util.Map;
035: import java.util.Set;
036: import java.util.Map.Entry;
037:
038: import org.drools.FactException;
039: import org.drools.PackageIntegrationException;
040: import org.drools.RuleBase;
041: import org.drools.RuleBaseConfiguration;
042: import org.drools.RuleIntegrationException;
043: import org.drools.StatefulSession;
044: import org.drools.rule.CompositePackageClassLoader;
045: import org.drools.rule.InvalidPatternException;
046: import org.drools.rule.MapBackedClassLoader;
047: import org.drools.rule.Package;
048: import org.drools.rule.PackageCompilationData;
049: import org.drools.rule.Rule;
050: import org.drools.ruleflow.common.core.Process;
051: import org.drools.spi.FactHandleFactory;
052: import org.drools.util.ObjectHashSet;
053:
054: /**
055: * Implementation of <code>RuleBase</code>.
056: *
057: * @author <a href="mailto:bob@werken.com">bob mcwhirter</a>
058: * @author <a href="mailto:mark.proctor@jboss.com">Mark Proctor</a>
059: *
060: * @version $Id: RuleBaseImpl.java,v 1.5 2005/08/14 22:44:12 mproctor Exp $
061: */
062: abstract public class AbstractRuleBase implements InternalRuleBase,
063: Externalizable {
064: // ------------------------------------------------------------
065: // Instance members
066: // ------------------------------------------------------------
067: protected String id;
068:
069: protected int workingMemoryCounter;
070:
071: protected RuleBaseConfiguration config;
072:
073: protected Map pkgs;
074:
075: protected Map processes;
076:
077: protected Map agendaGroupRuleTotals;
078:
079: protected transient CompositePackageClassLoader packageClassLoader;
080:
081: protected transient MapBackedClassLoader classLoader;
082:
083: /** The fact handle factory. */
084: protected FactHandleFactory factHandleFactory;
085:
086: protected Map globals;
087:
088: private ReloadPackageCompilationData reloadPackageCompilationData = null;
089:
090: /**
091: * WeakHashMap to keep references of WorkingMemories but allow them to be
092: * garbage collected
093: */
094: protected transient ObjectHashSet statefulSessions;
095:
096: /**
097: * Default constructor - for Externalizable. This should never be used by a user, as it
098: * will result in an invalid state for the instance.
099: */
100: public AbstractRuleBase() {
101:
102: }
103:
104: public synchronized int nextWorkingMemoryCounter() {
105: return this .workingMemoryCounter++;
106: }
107:
108: /**
109: * Construct.
110: *
111: * @param rete
112: * The rete network.
113: */
114: public AbstractRuleBase(final String id,
115: final RuleBaseConfiguration config,
116: final FactHandleFactory factHandleFactory) {
117: if (id != null) {
118: this .id = id;
119: } else {
120: this .id = "default";
121: }
122: this .config = (config != null) ? config
123: : new RuleBaseConfiguration();
124: this .config.makeImmutable();
125: this .factHandleFactory = factHandleFactory;
126:
127: if (this .config.isSequential()) {
128: this .agendaGroupRuleTotals = new HashMap();
129: }
130:
131: this .packageClassLoader = new CompositePackageClassLoader(
132: Thread.currentThread().getContextClassLoader());
133: this .classLoader = new MapBackedClassLoader(Thread
134: .currentThread().getContextClassLoader());
135: this .packageClassLoader.addClassLoader(this .classLoader);
136: this .pkgs = new HashMap();
137: this .processes = new HashMap();
138: this .globals = new HashMap();
139: this .statefulSessions = new ObjectHashSet();
140: }
141:
142: // ------------------------------------------------------------
143: // Instance methods
144: // ------------------------------------------------------------
145:
146: /**
147: * Handles the write serialization of the Package. Patterns in Rules may reference generated data which cannot be serialized by default methods.
148: * The Package uses PackageCompilationData to hold a reference to the generated bytecode. The generated bytecode must be restored before any Rules.
149: *
150: */
151: public void doWriteExternal(final ObjectOutput stream,
152: final Object[] objects) throws IOException {
153: stream.writeObject(this .pkgs);
154:
155: // Rules must be restored by an ObjectInputStream that can resolve using a given ClassLoader to handle seaprately by storing as
156: // a byte[]
157: final ByteArrayOutputStream bos = new ByteArrayOutputStream();
158: final ObjectOutput out = new ObjectOutputStream(bos);
159: out.writeObject(this .id);
160: out.writeObject(this .agendaGroupRuleTotals);
161: out.writeObject(this .factHandleFactory);
162: out.writeObject(this .globals);
163: out.writeObject(this .config);
164:
165: for (int i = 0, length = objects.length; i < length; i++) {
166: out.writeObject(objects[i]);
167: }
168:
169: stream.writeObject(bos.toByteArray());
170: }
171:
172: /**
173: * Handles the read serialization of the Package. Patterns in Rules may reference generated data which cannot be serialized by default methods.
174: * The Package uses PackageCompilationData to hold a reference to the generated bytecode; which must be restored before any Rules.
175: * A custom ObjectInputStream, able to resolve classes against the bytecode in the PackageCompilationData, is used to restore the Rules.
176: *
177: */
178: public void doReadExternal(final ObjectInput stream,
179: final Object[] objects) throws IOException,
180: ClassNotFoundException {
181: // PackageCompilationData must be restored before Rules as it has the ClassLoader needed to resolve the generated code references in Rules
182: this .pkgs = (Map) stream.readObject();
183:
184: if (stream instanceof DroolsObjectInputStream) {
185: DroolsObjectInputStream parentStream = (DroolsObjectInputStream) stream;
186: parentStream.setRuleBase(this );
187: this .packageClassLoader = new CompositePackageClassLoader(
188: parentStream.getClassLoader());
189: this .classLoader = new MapBackedClassLoader(parentStream
190: .getClassLoader());
191: } else {
192: this .packageClassLoader = new CompositePackageClassLoader(
193: Thread.currentThread().getContextClassLoader());
194: this .classLoader = new MapBackedClassLoader(Thread
195: .currentThread().getContextClassLoader());
196: }
197:
198: this .packageClassLoader.addClassLoader(this .classLoader);
199:
200: for (final Iterator it = this .pkgs.values().iterator(); it
201: .hasNext();) {
202: this .packageClassLoader
203: .addClassLoader(((Package) it.next())
204: .getPackageCompilationData()
205: .getClassLoader());
206: }
207:
208: // Return the rules stored as a byte[]
209: final byte[] bytes = (byte[]) stream.readObject();
210:
211: // Use a custom ObjectInputStream that can resolve against a given classLoader
212: final DroolsObjectInputStream childStream = new DroolsObjectInputStream(
213: new ByteArrayInputStream(bytes),
214: this .packageClassLoader);
215: childStream.setRuleBase(this );
216:
217: this .id = (String) childStream.readObject();
218: this .agendaGroupRuleTotals = (Map) childStream.readObject();
219: this .factHandleFactory = (FactHandleFactory) childStream
220: .readObject();
221: this .globals = (Map) childStream.readObject();
222:
223: this .config = (RuleBaseConfiguration) childStream.readObject();
224:
225: this .statefulSessions = new ObjectHashSet();
226:
227: for (int i = 0, length = objects.length; i < length; i++) {
228: objects[i] = childStream.readObject();
229: }
230: }
231:
232: /**
233: * @return the id
234: */
235: public String getId() {
236: return this .id;
237: }
238:
239: /**
240: * @see RuleBase
241: */
242: public StatefulSession newStatefulSession() {
243: return newStatefulSession(true);
244: }
245:
246: /**
247: * @see RuleBase
248: */
249: abstract public StatefulSession newStatefulSession(
250: boolean keepReference);
251:
252: public synchronized void disposeStatefulSession(
253: final StatefulSession statefulSession) {
254: this .statefulSessions.remove(statefulSession);
255: }
256:
257: /**
258: * @see RuleBase
259: */
260: public FactHandleFactory getFactHandleFactory() {
261: return this .factHandleFactory;
262: }
263:
264: public FactHandleFactory newFactHandleFactory() {
265: return this .factHandleFactory.newInstance();
266: }
267:
268: public Process[] getProcesses() {
269: return (Process[]) this .processes.values().toArray(
270: new Process[this .processes.size()]);
271: }
272:
273: public Package[] getPackages() {
274: return (Package[]) this .pkgs.values().toArray(
275: new Package[this .pkgs.size()]);
276: }
277:
278: public Map getPackagesMap() {
279: return this .pkgs;
280: }
281:
282: public Map getGlobals() {
283: return this .globals;
284: }
285:
286: public Map getAgendaGroupRuleTotals() {
287: return this .agendaGroupRuleTotals;
288: }
289:
290: /**
291: * Add a <code>Package</code> to the network. Iterates through the
292: * <code>Package</code> adding Each individual <code>Rule</code> to the
293: * network. Before update network each referenced <code>WorkingMemory</code>
294: * is locked.
295: *
296: * @param pkg
297: * The package to add.
298: * @throws PackageIntegrationException
299: *
300: * @throws RuleIntegrationException
301: * if an error prevents complete construction of the network for
302: * the <code>Rule</code>.
303: * @throws FactException
304: * @throws InvalidPatternException
305: */
306: public synchronized void addPackage(final Package newPkg)
307: throws PackageIntegrationException {
308: newPkg.checkValidity();
309:
310: synchronized (this .pkgs) {
311: final Package pkg = (Package) this .pkgs.get(newPkg
312: .getName());
313: // INVARIANT: lastAquiredLock always contains the index of the last aquired lock +1
314: // in the working memory array
315: int lastAquiredLock = 0;
316: // get a snapshot of current working memories for locking
317: final InternalWorkingMemory[] wms = getWorkingMemories();
318:
319: try {
320: // Iterate each workingMemory and lock it
321: // This is so we don't update the Rete network during propagation
322: for (lastAquiredLock = 0; lastAquiredLock < wms.length; lastAquiredLock++) {
323: wms[lastAquiredLock].getLock().lock();
324: }
325:
326: if (pkg != null) {
327: mergePackage(pkg, newPkg);
328: } else {
329: this .pkgs.put(newPkg.getName(), newPkg);
330: }
331:
332: final Map newGlobals = newPkg.getGlobals();
333:
334: // Check that the global data is valid, we cannot change the type
335: // of an already declared global variable
336: for (final Iterator it = newGlobals.keySet().iterator(); it
337: .hasNext();) {
338: final String identifier = (String) it.next();
339: final Class type = (Class) newGlobals
340: .get(identifier);
341: final boolean f = this .globals
342: .containsKey(identifier);
343: if (f) {
344: final boolean y = !this .globals.get(identifier)
345: .equals(type);
346: if (f && y) {
347: throw new PackageIntegrationException(pkg);
348: }
349: }
350: }
351: this .globals.putAll(newGlobals);
352:
353: final Rule[] rules = newPkg.getRules();
354:
355: for (int i = 0; i < rules.length; ++i) {
356: addRule(rules[i]);
357: }
358:
359: //and now the rule flows
360: if (newPkg.getRuleFlows() != Collections.EMPTY_MAP) {
361: Map flows = newPkg.getRuleFlows();
362: for (Iterator iter = flows.entrySet().iterator(); iter
363: .hasNext();) {
364: Entry flow = (Entry) iter.next();
365: this .processes.put(flow.getKey(), flow
366: .getValue());
367: }
368: }
369:
370: this .packageClassLoader.addClassLoader(newPkg
371: .getPackageCompilationData().getClassLoader());
372:
373: } finally {
374: // Iterate each workingMemory and attempt to fire any rules, that were activated as a result
375: // of the new rule addition. Unlock after fireAllRules();
376:
377: // as per the INVARIANT defined above, we need to iterate from lastAquiredLock-1 to 0.
378: for (lastAquiredLock--; lastAquiredLock > -1; lastAquiredLock--) {
379: wms[lastAquiredLock].fireAllRules();
380: wms[lastAquiredLock].getLock().unlock();
381: }
382: }
383: }
384:
385: }
386:
387: /**
388: * Merge a new package with an existing package.
389: * Most of the work is done by the concrete implementations,
390: * but this class does some work (including combining imports, compilation data, globals,
391: * and the actual Rule objects into the package).
392: */
393: private void mergePackage(final Package pkg, final Package newPkg)
394: throws PackageIntegrationException {
395: final Map globals = pkg.getGlobals();
396: final Set imports = pkg.getImports();
397:
398: // First update the binary files
399: // @todo: this probably has issues if you add classes in the incorrect order - functions, rules, invokers.
400: final PackageCompilationData compilationData = pkg
401: .getPackageCompilationData();
402: final PackageCompilationData newCompilationData = newPkg
403: .getPackageCompilationData();
404: final String[] files = newCompilationData.list();
405: for (int i = 0, length = files.length; i < length; i++) {
406: compilationData.write(files[i], newCompilationData
407: .read(files[i]));
408: }
409:
410: // Merge imports
411: imports.addAll(newPkg.getImports());
412:
413: // Add invokers
414: compilationData
415: .putAllInvokers(newCompilationData.getInvokers());
416:
417: if (compilationData.isDirty()) {
418: if (this .reloadPackageCompilationData == null) {
419: this .reloadPackageCompilationData = new ReloadPackageCompilationData();
420: }
421: this .reloadPackageCompilationData
422: .addPackageCompilationData(compilationData);
423: }
424:
425: // Add globals
426: for (final Iterator it = newPkg.getGlobals().keySet()
427: .iterator(); it.hasNext();) {
428: final String identifier = (String) it.next();
429: final Class type = (Class) globals.get(identifier);
430: if (globals.containsKey(identifier)
431: && !globals.get(identifier).equals(type)) {
432: throw new PackageIntegrationException(
433: "Unable to merge new Package", newPkg);
434: }
435: }
436: globals.putAll(newPkg.getGlobals());
437:
438: //Add rules into the RuleBase package
439: //as this is needed for individual rule removal later on
440: final Rule[] newRules = newPkg.getRules();
441: for (int i = 0; i < newRules.length; i++) {
442: final Rule newRule = newRules[i];
443: if (pkg.getRule(newRule.getName()) == null) {
444: pkg.addRule(newRule);
445: }
446: }
447:
448: //and now the rule flows
449: if (newPkg.getRuleFlows() != Collections.EMPTY_MAP) {
450: Map flows = newPkg.getRuleFlows();
451: for (Iterator iter = flows.values().iterator(); iter
452: .hasNext();) {
453: Process flow = (Process) iter.next();
454: pkg.addRuleFlow(flow);
455: }
456: }
457: }
458:
459: protected synchronized void addRule(final Rule rule)
460: throws InvalidPatternException {
461: if (!rule.isValid()) {
462: throw new IllegalArgumentException(
463: "The rule called "
464: + rule.getName()
465: + " is not valid. Check for compile errors reported.");
466: }
467: }
468:
469: public synchronized void removePackage(final String packageName) {
470: synchronized (this .pkgs) {
471: final Package pkg = (Package) this .pkgs.get(packageName);
472: if (pkg == null) {
473: throw new IllegalArgumentException("Package name '"
474: + packageName
475: + "' does not exist for this Rule Base.");
476: }
477:
478: // INVARIANT: lastAquiredLock always contains the index of the last aquired lock +1
479: // in the working memory array
480: int lastAquiredLock = 0;
481: // get a snapshot of current working memories for locking
482: final InternalWorkingMemory[] wms = getWorkingMemories();
483:
484: try {
485: // Iterate each workingMemory and lock it
486: // This is so we don't update the Rete network during propagation
487: for (lastAquiredLock = 0; lastAquiredLock < wms.length; lastAquiredLock++) {
488: wms[lastAquiredLock].getLock().lock();
489: }
490:
491: final Rule[] rules = pkg.getRules();
492:
493: for (int i = 0; i < rules.length; ++i) {
494: removeRule(rules[i]);
495: }
496:
497: this .packageClassLoader.removeClassLoader(pkg
498: .getPackageCompilationData().getClassLoader());
499:
500: pkg.clear();
501:
502: // getting the list of referenced globals
503: final Set referencedGlobals = new HashSet();
504: for (final Iterator it = this .pkgs.values().iterator(); it
505: .hasNext();) {
506: final org.drools.rule.Package pkgref = (org.drools.rule.Package) it
507: .next();
508: if (pkgref != pkg) {
509: referencedGlobals.addAll(pkgref.getGlobals()
510: .keySet());
511: }
512: }
513: // removing globals declared inside the package that are not shared
514: for (final Iterator it = pkg.getGlobals().keySet()
515: .iterator(); it.hasNext();) {
516: final String globalName = (String) it.next();
517: if (!referencedGlobals.contains(globalName)) {
518: this .globals.remove(globalName);
519: }
520: }
521: //and now the rule flows
522: Map flows = pkg.getRuleFlows();
523: for (Iterator iter = flows.keySet().iterator(); iter
524: .hasNext();) {
525: removeProcess((String) iter.next());
526: }
527: // removing the package itself from the list
528: this .pkgs.remove(pkg.getName());
529: } finally {
530: // Iterate each workingMemory and attempt to fire any rules, that were activated as a result
531: // of the new rule addition. Unlock after fireAllRules();
532:
533: // as per the INVARIANT defined above, we need to iterate from lastAquiredLock-1 to 0.
534: for (lastAquiredLock--; lastAquiredLock > -1; lastAquiredLock--) {
535: wms[lastAquiredLock].fireAllRules();
536: wms[lastAquiredLock].getLock().unlock();
537: }
538: }
539: }
540: }
541:
542: public void removeRule(final String packageName,
543: final String ruleName) {
544: synchronized (this .pkgs) {
545: final Package pkg = (Package) this .pkgs.get(packageName);
546: if (pkg == null) {
547: throw new IllegalArgumentException("Package name '"
548: + packageName
549: + "' does not exist for this Rule Base.");
550: }
551:
552: final Rule rule = pkg.getRule(ruleName);
553: if (rule == null) {
554: throw new IllegalArgumentException("Rule name '"
555: + ruleName
556: + "' does not exist in the Package '"
557: + packageName + "'.");
558: }
559:
560: // INVARIANT: lastAquiredLock always contains the index of the last aquired lock +1
561: // in the working memory array
562: int lastAquiredLock = 0;
563: // get a snapshot of current working memories for locking
564: final InternalWorkingMemory[] wms = getWorkingMemories();
565:
566: PackageCompilationData compilationData = null;
567:
568: try {
569: // Iterate each workingMemory and lock it
570: // This is so we don't update the Rete network during propagation
571: for (lastAquiredLock = 0; lastAquiredLock < wms.length; lastAquiredLock++) {
572: wms[lastAquiredLock].getLock().lock();
573: }
574:
575: removeRule(rule);
576: compilationData = pkg.removeRule(rule);
577: if (this .reloadPackageCompilationData == null) {
578: this .reloadPackageCompilationData = new ReloadPackageCompilationData();
579: }
580: this .reloadPackageCompilationData
581: .addPackageCompilationData(compilationData);
582:
583: } finally {
584: // Iterate each workingMemory and attempt to fire any rules, that were activated as a result
585: // of the new rule addition. Unlock after fireAllRules();
586:
587: // as per the INVARIANT defined above, we need to iterate from lastAquiredLock-1 to 0.
588: for (lastAquiredLock--; lastAquiredLock > -1; lastAquiredLock--) {
589: wms[lastAquiredLock].getLock().unlock();
590: }
591: }
592: }
593: }
594:
595: protected abstract void removeRule(Rule rule);
596:
597: public void removeFunction(String packageName, String functionName) {
598: synchronized (this .pkgs) {
599: final Package pkg = (Package) this .pkgs.get(packageName);
600: if (pkg == null) {
601: throw new IllegalArgumentException("Package name '"
602: + packageName
603: + "' does not exist for this Rule Base.");
604: }
605:
606: PackageCompilationData compilationData = pkg
607: .removeFunction(functionName);
608: if (compilationData == null) {
609: throw new IllegalArgumentException("function name '"
610: + packageName
611: + "' does not exist in the Package '"
612: + packageName + "'.");
613: }
614:
615: if (this .reloadPackageCompilationData == null) {
616: this .reloadPackageCompilationData = new ReloadPackageCompilationData();
617: }
618: this .reloadPackageCompilationData
619: .addPackageCompilationData(compilationData);
620: }
621: }
622:
623: public synchronized void addProcess(final Process process) {
624: synchronized (this .pkgs) {
625: this .processes.put(process.getId(), process);
626: }
627:
628: }
629:
630: public synchronized void removeProcess(final String id) {
631: synchronized (this .pkgs) {
632: this .processes.remove(id);
633: }
634: }
635:
636: public Process getProcess(final String id) {
637: Process process = null;
638: synchronized (this .pkgs) {
639: process = (Process) this .processes.get(id);
640: }
641: return process;
642: }
643:
644: protected synchronized void addStatefulSession(
645: final StatefulSession statefulSession) {
646: this .statefulSessions.add(statefulSession);
647: }
648:
649: public Package getPackage(String name) {
650: return (Package) this .pkgs.get(name);
651: }
652:
653: public StatefulSession[] getStatefulSessions() {
654: return (StatefulSession[]) this .statefulSessions
655: .toArray(new StatefulSession[this .statefulSessions
656: .size()]);
657: }
658:
659: public InternalWorkingMemory[] getWorkingMemories() {
660: return (InternalWorkingMemory[]) this .statefulSessions
661: .toArray(new InternalWorkingMemory[this .statefulSessions
662: .size()]);
663: }
664:
665: public RuleBaseConfiguration getConfiguration() {
666: return this .config;
667: }
668:
669: public StatefulSession newStatefulSession(final InputStream stream)
670: throws IOException, ClassNotFoundException {
671: return newStatefulSession(stream, true);
672: }
673:
674: public StatefulSession newStatefulSession(final InputStream stream,
675: final boolean keepReference) throws IOException,
676: ClassNotFoundException {
677:
678: if (this .config.isSequential()) {
679: throw new RuntimeException(
680: "Cannot have a stateful rule session, with sequential configuration set to true");
681: }
682:
683: final DroolsObjectInputStream streamWithLoader = new DroolsObjectInputStream(
684: stream, this .packageClassLoader);
685:
686: final AbstractWorkingMemory workingMemory = (AbstractWorkingMemory) streamWithLoader
687: .readObject();
688:
689: synchronized (this .pkgs) {
690: workingMemory.setRuleBase(this );
691: return (StatefulSession) workingMemory;
692: }
693: }
694:
695: public void addClass(String className, byte[] bytes) {
696: this .classLoader.addClass(className, bytes);
697: }
698:
699: public CompositePackageClassLoader getCompositePackageClassLoader() {
700: return this .packageClassLoader;
701: }
702:
703: public MapBackedClassLoader getMapBackedClassLoader() {
704: return this .classLoader;
705: }
706:
707: public void executeQueuedActions() {
708: synchronized (this .pkgs) {
709: if (this .reloadPackageCompilationData != null) {
710: this .reloadPackageCompilationData.execute(this );
711: }
712: }
713: }
714:
715: public static class ReloadPackageCompilationData implements
716: RuleBaseAction {
717: private Set set;
718:
719: public void addPackageCompilationData(
720: PackageCompilationData packageCompilationData) {
721: if (set == null) {
722: this .set = new HashSet();
723: }
724:
725: this .set.add(packageCompilationData);
726: }
727:
728: public void execute(InternalRuleBase ruleBase) {
729: for (Iterator it = this .set.iterator(); it.hasNext();) {
730: PackageCompilationData packageCompilationData = (PackageCompilationData) it
731: .next();
732: packageCompilationData.reload();
733: }
734: }
735: }
736:
737: public static interface RuleBaseAction extends Serializable {
738: public void execute(InternalRuleBase ruleBase);
739: }
740: }
|