ParallelDeflateOutputStream.cs :  » Development » DotNetZip » Ionic » Zlib » C# / CSharp Open Source

Home
C# / CSharp Open Source
1.2.6.4 mono .net core
2.2.6.4 mono core
3.Aspect Oriented Frameworks
4.Bloggers
5.Build Systems
6.Business Application
7.Charting Reporting Tools
8.Chat Servers
9.Code Coverage Tools
10.Content Management Systems CMS
11.CRM ERP
12.Database
13.Development
14.Email
15.Forum
16.Game
17.GIS
18.GUI
19.IDEs
20.Installers Generators
21.Inversion of Control Dependency Injection
22.Issue Tracking
23.Logging Tools
24.Message
25.Mobile
26.Network Clients
27.Network Servers
28.Office
29.PDF
30.Persistence Frameworks
31.Portals
32.Profilers
33.Project Management
34.RSS RDF
35.Rule Engines
36.Script
37.Search Engines
38.Sound Audio
39.Source Control
40.SQL Clients
41.Template Engines
42.Testing
43.UML
44.Web Frameworks
45.Web Service
46.Web Testing
47.Wiki Engines
48.Windows Presentation Foundation
49.Workflows
50.XML Parsers
C# / C Sharp
C# / C Sharp by API
C# / CSharp Tutorial
C# / CSharp Open Source » Development » DotNetZip 
DotNetZip » Ionic » Zlib » ParallelDeflateOutputStream.cs
//#define Trace

// ParallelDeflateOutputStream.cs
// ------------------------------------------------------------------
//
// A DeflateStream that does compression only, it uses a
// divide-and-conquer approach with multiple threads to exploit multiple
// CPUs for the DEFLATE computation.
//
// last saved:
// Time-stamp: <2010-January-20 19:24:58>
// ------------------------------------------------------------------
//
// Copyright (c) 2009-2010 by Dino Chiesa
// All rights reserved!
//
// ------------------------------------------------------------------

using System;
using System.Threading;
using Ionic.Zlib;
using System.IO;


namespace Ionic.Zlib{
        internal class WorkItem
        {
            internal enum Status { None=0, Filling=1, Filled=2, Compressing=3, Compressed=4, Writing=5, Done=6 }
            public byte[] buffer;
            public byte[] compressed;
            public int status;
            public int crc;
            public int index;
            public int inputBytesAvailable;
            public int compressedBytesAvailable;
            public ZlibCodec compressor;

            public WorkItem(int size, Ionic.Zlib.CompressionLevel compressLevel, CompressionStrategy strategy)
            {
                buffer= new byte[size];
                // alloc 5 bytes overhead for every block (margin of safety= 2)
                int n = size + ((size / 32768)+1) * 5 * 2;
                compressed= new byte[n];

                status = (int)Status.None;
                compressor = new ZlibCodec();
                compressor.InitializeDeflate(compressLevel, false);
                compressor.OutputBuffer = compressed;
                compressor.InputBuffer = buffer;
            }
        }

    /// <summary>
    ///   A class for compressing and decompressing streams using the
    ///   Deflate algorithm with multiple threads.
    /// </summary>
    ///
    /// <remarks>
    /// <para>
    ///   This class is for compression only, and that can be only
    ///   through writing.
    /// </para>
    ///
    /// <para>
    ///   For more information on the Deflate algorithm, see IETF RFC 1951, "DEFLATE
    ///   Compressed Data Format Specification version 1.3."
    /// </para>
    ///
    /// <para>
    ///   This class is similar to <see cref="Ionic.Zlib.DeflateStream"/>, except
    ///   that this implementation uses an approach that employs multiple worker
    ///   threads to perform the DEFLATE.  On a multi-cpu or multi-core computer,
    ///   the performance of this class can be significantly higher than the
    ///   single-threaded DeflateStream, particularly for larger streams.  How
    ///   large?  Anything over 10mb is a good candidate for parallel compression.
    /// </para>
    ///
    /// <para>
    ///   The tradeoff is that this class uses more memory and more CPU than the
    ///   vanilla DeflateStream, and also is less efficient as a compressor. For
    ///   large files the size of the compressed data stream can be less than 1%
    ///   larger than the size of a compressed data stream from the vanialla
    ///   DeflateStream.  For smaller files the difference can be larger.  The
    ///   difference will also be larger if you set the BufferSize to be lower
    ///   than the default value.  Your mileage may vary. Finally, for small
    ///   files, the ParallelDeflateOutputStream can be much slower than the vanilla
    ///   DeflateStream, because of the overhead of using the thread pool.
    /// </para>
    ///
    /// </remarks>
    /// <seealso cref="Ionic.Zlib.DeflateStream" />
    public class ParallelDeflateOutputStream : System.IO.Stream
    {

        private static readonly int IO_BUFFER_SIZE_DEFAULT = 64 * 1024;  // 128k

        private System.Collections.Generic.List<WorkItem> _pool;
        private bool                        _leaveOpen;
        private System.IO.Stream            _outStream;
        private int                         _nextToFill, _nextToWrite;
        private int                         _bufferSize = IO_BUFFER_SIZE_DEFAULT;
        private ManualResetEvent            _writingDone;
        private ManualResetEvent            _sessionReset;
        private bool                        _noMoreInputForThisSegment;
        private object                      _outputLock = new object();
        private bool                        _isClosed;
        private bool                        _isDisposed;
        private bool                        _firstWriteDone;
        private int                         _pc;
        private int                         _Crc32;
        private Int64                       _totalBytesProcessed;
        private Ionic.Zlib.CompressionLevel _compressLevel;
        private volatile Exception          _pendingException;
        private object                      _eLock = new Object();  // protects _pendingException

        // This bitfield is used only when Trace is defined.
        //private TraceBits _DesiredTrace = TraceBits.Write | TraceBits.WriteBegin |
        //TraceBits.WriteDone | TraceBits.Lifecycle | TraceBits.Fill | TraceBits.Flush |
        //TraceBits.Session;

        //private TraceBits _DesiredTrace = TraceBits.WriteBegin | TraceBits.WriteDone | TraceBits.Synch | TraceBits.Lifecycle  | TraceBits.Session ;

        private TraceBits _DesiredTrace = TraceBits.WriterThread | TraceBits.Synch | TraceBits.Lifecycle  | TraceBits.Session ;

        /// <summary>
        /// Create a ParallelDeflateOutputStream.
        /// </summary>
        /// <remarks>
        ///
        /// <para>
        ///   This stream compresses data written into it via the DEFLATE
        ///   algorithm (see RFC 1951), and writes out the compressed byte stream.
        /// </para>
        ///
        /// <para>
        ///   The instance will use the default compression level, the default
        ///   buffer sizes and the default number of threads and buffers per
        ///   thread.
        /// </para>
        ///
        /// <para>
        ///   This class is similar to <see cref="Ionic.Zlib.DeflateStream"/>,
        ///   except that this implementation uses an approach that employs
        ///   multiple worker threads to perform the DEFLATE.  On a multi-cpu or
        ///   multi-core computer, the performance of this class can be
        ///   significantly higher than the single-threaded DeflateStream,
        ///   particularly for larger streams.  How large?  Anything over 10mb is
        ///   a good candidate for parallel compression.
        /// </para>
        ///
        /// </remarks>
        ///
        /// <example>
        ///
        /// This example shows how to use a ParallelDeflateOutputStream to compress
        /// data.  It reads a file, compresses it, and writes the compressed data to
        /// a second, output file.
        ///
        /// <code>
        /// byte[] buffer = new byte[WORKING_BUFFER_SIZE];
        /// int n= -1;
        /// String outputFile = fileToCompress + ".compressed";
        /// using (System.IO.Stream input = System.IO.File.OpenRead(fileToCompress))
        /// {
        ///     using (var raw = System.IO.File.Create(outputFile))
        ///     {
        ///         using (Stream compressor = new ParallelDeflateOutputStream(raw))
        ///         {
        ///             while ((n= input.Read(buffer, 0, buffer.Length)) != 0)
        ///             {
        ///                 compressor.Write(buffer, 0, n);
        ///             }
        ///         }
        ///     }
        /// }
        /// </code>
        /// <code lang="VB">
        /// Dim buffer As Byte() = New Byte(4096) {}
        /// Dim n As Integer = -1
        /// Dim outputFile As String = (fileToCompress &amp; ".compressed")
        /// Using input As Stream = File.OpenRead(fileToCompress)
        ///     Using raw As FileStream = File.Create(outputFile)
        ///         Using compressor As Stream = New ParallelDeflateOutputStream(raw)
        ///             Do While (n &lt;&gt; 0)
        ///                 If (n &gt; 0) Then
        ///                     compressor.Write(buffer, 0, n)
        ///                 End If
        ///                 n = input.Read(buffer, 0, buffer.Length)
        ///             Loop
        ///         End Using
        ///     End Using
        /// End Using
        /// </code>
        /// </example>
        /// <param name="stream">The stream to which compressed data will be written.</param>
        public ParallelDeflateOutputStream(System.IO.Stream stream)
            : this(stream, CompressionLevel.Default, CompressionStrategy.Default, false)
        {
        }

        /// <summary>
        ///   Create a ParallelDeflateOutputStream using the specified CompressionLevel.
        /// </summary>
        /// <remarks>
        ///   See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
        ///   constructor for example code.
        /// </remarks>
        /// <param name="stream">The stream to which compressed data will be written.</param>
        /// <param name="level">A tuning knob to trade speed for effectiveness.</param>
        public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level)
            : this(stream, level, CompressionStrategy.Default, false)
        {
        }

        /// <summary>
        /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open
        /// when the ParallelDeflateOutputStream is closed.
        /// </summary>
        /// <remarks>
        ///   See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
        ///   constructor for example code.
        /// </remarks>
        /// <param name="stream">The stream to which compressed data will be written.</param>
        /// <param name="leaveOpen">
        ///    true if the application would like the stream to remain open after inflation/deflation.
        /// </param>
        public ParallelDeflateOutputStream(System.IO.Stream stream, bool leaveOpen)
            : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen)
        {
        }

        /// <summary>
        /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open
        /// when the ParallelDeflateOutputStream is closed.
        /// </summary>
        /// <remarks>
        ///   See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
        ///   constructor for example code.
        /// </remarks>
        /// <param name="stream">The stream to which compressed data will be written.</param>
        /// <param name="level">A tuning knob to trade speed for effectiveness.</param>
        /// <param name="leaveOpen">
        ///    true if the application would like the stream to remain open after inflation/deflation.
        /// </param>
        public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level, bool leaveOpen)
            : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen)
        {
        }

        /// <summary>
        /// Create a ParallelDeflateOutputStream using the specified
        /// CompressionLevel and CompressionStrategy, and specifying whether to
        /// leave the captive stream open when the ParallelDeflateOutputStream is
        /// closed.
        /// </summary>
        /// <remarks>
        ///   See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
        ///   constructor for example code.
        /// </remarks>
        /// <param name="stream">The stream to which compressed data will be written.</param>
        /// <param name="level">A tuning knob to trade speed for effectiveness.</param>
        /// <param name="strategy">
        ///   By tweaking this parameter, you may be able to optimize the compression for
        ///   data with particular characteristics.
        /// </param>
        /// <param name="leaveOpen">
        ///    true if the application would like the stream to remain open after inflation/deflation.
        /// </param>
        public ParallelDeflateOutputStream(System.IO.Stream stream,
                                           CompressionLevel level,
                                           CompressionStrategy strategy,
                                           bool leaveOpen)
        {
            TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "-------------------------------------------------------");
            TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "Create {0:X8}", this.GetHashCode());
            _compressLevel= level;
            _leaveOpen = leaveOpen;
            Strategy = strategy;

            BuffersPerCore = 4; // default

            _writingDone = new ManualResetEvent(false);
            _sessionReset = new ManualResetEvent(false);

            _outStream = stream;
        }


        /// <summary>
        ///   The ZLIB strategy to be used during compression.
        /// </summary>
        ///
        public CompressionStrategy Strategy
        {
            get;
            private set;
        }

        /// <summary>
        /// The number of buffers per CPU or CPU core.
        /// </summary>
        ///
        /// <remarks>
        /// <para>
        ///   This property sets the number of memory buffers to create, for every
        ///   CPU or CPU core in the machine.  The divide-and-conquer approach
        ///   taken by this class assumes a single thread from the application
        ///   will call Write().  There will be multiple background threads that
        ///   then compress (DEFLATE) the data written into the stream, and also a
        ///   single output thread, also operating in the background, aggregating
        ///   those results and finally emitting the output.
        /// </para>
        ///
        /// <para>
        ///   The default value is 4.  Different values may deliver better or
        ///   worse results, depending on the dynamic performance characteristics
        ///   of your storage and compute resources.
        /// </para>
        ///
        /// <para>
        ///   The total amount of storage space allocated for buffering will be
        ///   (n*M*S*2), where n is the number of CPUs, M is the multiple (this
        ///   property), S is the size of each buffer (<see cref="BufferSize"/>),
        ///   and there are 2 buffers used by the compressor, one for input and
        ///   one for output. For example, if your machine has 4 cores, and you
        ///   set BuffersPerCore to 3, and you retain the default buffer size of
        ///   128k, then the ParallelDeflateOutputStream will use 3mb of buffer
        ///   memory in total.
        /// </para>
        ///
        /// <para>
        ///   The application can set this value at any time, but it is effective
        ///   only before the first call to Write(), which is when the buffers are
        ///   allocated.
        /// </para>
        /// </remarks>
        public int BuffersPerCore
        {
            get; set;
        }

        /// <summary>
        ///   The size of the buffers used by the compressor threads.
        /// </summary>
        /// <remarks>
        ///
        /// <para>
        ///   The default buffer size is 128k. The application can set this value at any
        ///   time, but it is effective only before the first Write().
        /// </para>
        ///
        /// <para>
        ///   Larger buffer sizes implies larger memory consumption but allows
        ///   more efficient compression. Using smaller buffer sizes consumes less
        ///   memory but result in less effective compression.  For example, using
        ///   the default buffer size of 128k, the compression delivered is within
        ///   1% of the compression delivered by the single-threaded <see
        ///   cref="Ionic.Zlib.DeflateStream"/>.  On the other hand, using a
        ///   BufferSize of 8k can result in a compressed data stream that is 5%
        ///   larger than that delivered by the single-threaded
        ///   <c>DeflateStream</c>.  Excessively small buffer sizes can also cause
        ///   the speed of the ParallelDeflateOutputStream to drop, because of
        ///   larger thread scheduling overhead dealing with many many small
        ///   buffers.
        /// </para>
        ///
        /// <para>
        ///   The total amount of storage space allocated for buffering will be
        ///   (n*M*S*2), where n is the number of CPUs, M is the multiple (<see
        ///   cref="BuffersPerCore"/>), S is the size of each buffer (this
        ///   property), and there are 2 buffers used by the compressor, one for
        ///   input and one for output. For example, if your machine has a total
        ///   of 4 cores, and if you set <see cref="BuffersPerCore"/> to 3, and
        ///   you keep the default buffer size of 128k, then the
        ///   <c>ParallelDeflateOutputStream</c> will use 3mb of buffer memory in
        ///   total.
        /// </para>
        ///
        /// </remarks>
        public int BufferSize
        {
            get { return _bufferSize;}
            set
            {
                if (value < 1024)
                    throw new ArgumentException();
                _bufferSize = value;
            }
        }

        /// <summary>
        /// The CRC32 for the data that was written out, prior to compression.
        /// </summary>
        /// <remarks>
        /// This value is meaningful only after a call to Close().
        /// </remarks>
        public int Crc32 { get { return _Crc32; } }


        /// <summary>
        /// The total number of uncompressed bytes processed by the ParallelDeflateOutputStream.
        /// </summary>
        /// <remarks>
        /// This value is meaningful only after a call to Close().
        /// </remarks>
        public Int64 BytesProcessed { get { return _totalBytesProcessed; } }


        private void _InitializePoolOfWorkItems()
        {
            _pool = new System.Collections.Generic.List<WorkItem>();
            for(int i=0; i < BuffersPerCore * Environment.ProcessorCount; i++)
                _pool.Add(new WorkItem(_bufferSize, _compressLevel, Strategy));
            _pc = _pool.Count;

            for(int i=0; i < _pc; i++)
                _pool[i].index= i;

            // set the pointers
            _nextToFill= _nextToWrite= 0;
        }


        private void _KickoffWriter()
        {
            if (!ThreadPool.QueueUserWorkItem(new WaitCallback(this._PerpetualWriterMethod)))
                throw new Exception("Cannot enqueue writer thread.");
        }


        /// <summary>
        ///   Write data to the stream.
        /// </summary>
        ///
        /// <remarks>
        ///
        /// <para>
        ///   To use the ParallelDeflateOutputStream to compress data, create a
        ///   ParallelDeflateOutputStream with CompressionMode.Compress, passing a
        ///   writable output stream.  Then call Write() on that
        ///   ParallelDeflateOutputStream, providing uncompressed data as input.  The
        ///   data sent to the output stream will be the compressed form of the data
        ///   written.
        /// </para>
        ///
        /// <para>
        ///   To decompress data, use the <see cref="Ionic.Zlib.DeflateStream"/> class.
        /// </para>
        ///
        /// </remarks>
        /// <param name="buffer">The buffer holding data to write to the stream.</param>
        /// <param name="offset">the offset within that data array to find the first byte to write.</param>
        /// <param name="count">the number of bytes to write.</param>
        public override void Write(byte[] buffer, int offset, int count)
        {
            // Fill a work buffer; when full, flip state to 'Filled'

            if (_isClosed)
                throw new NotSupportedException();

            // dispense any exceptions that occurred on the BG threads
            if (_pendingException != null)
                throw _pendingException;

            if (count == 0) return;

            if (!_firstWriteDone)
            {
                // Want to do this on first Write, first session, and not in the
                // constructor.  We want to allow the BufferSize and BuffersPerCore to
                // change after construction, but before first Write.
                _InitializePoolOfWorkItems();

                // Only do this once (ever), the first time Write() is called:
                _KickoffWriter();

                // Release the writer thread.
                TraceOutput(TraceBits.Synch, "Synch    _sessionReset.Set()          Write (first)");
                _sessionReset.Set();

                _firstWriteDone = true;
            }


            do
            {
                int ix = _nextToFill % _pc;
                WorkItem workitem = _pool[ix];
                lock(workitem)
                {
                    TraceOutput(TraceBits.Fill,
                                   "Fill     lock     wi({0}) stat({1}) iba({2}) nf({3})",
                                   workitem.index,
                                   workitem.status,
                                   workitem.inputBytesAvailable,
                                   _nextToFill
                                   );

                    // If the status is what we want, then use the workitem.
                    if (workitem.status == (int)WorkItem.Status.None ||
                        workitem.status == (int)WorkItem.Status.Done ||
                        workitem.status == (int)WorkItem.Status.Filling)
                    {
                        workitem.status = (int)WorkItem.Status.Filling;
                        int limit = ((workitem.buffer.Length - workitem.inputBytesAvailable) > count)
                            ? count
                            : (workitem.buffer.Length - workitem.inputBytesAvailable);

                        // copy from the provided buffer to our workitem, starting at
                        // the tail end of whatever data we might have in there currently.
                        Array.Copy(buffer, offset, workitem.buffer, workitem.inputBytesAvailable, limit);

                        count -= limit;
                        offset += limit;
                        workitem.inputBytesAvailable += limit;
                        if (workitem.inputBytesAvailable==workitem.buffer.Length)
                        {
                            workitem.status = (int)WorkItem.Status.Filled;
                            // No need for interlocked.increment: the Write() method
                            // is documented as not multi-thread safe, so we can assume Write()
                            // calls come in from only one thread.
                            _nextToFill++;

                            TraceOutput(TraceBits.Fill,
                                           "Fill     QUWI     wi({0}) stat({1}) iba({2}) nf({3})",
                                           workitem.index,
                                           workitem.status,
                                           workitem.inputBytesAvailable,
                                           _nextToFill
                                           );

                            if (!ThreadPool.QueueUserWorkItem( _DeflateOne, workitem ))
                                throw new Exception("Cannot enqueue workitem");
                        }

                    }
                    else
                    {
                        int wcycles= 0;

                        while (workitem.status != (int)WorkItem.Status.None &&
                               workitem.status != (int)WorkItem.Status.Done &&
                               workitem.status != (int)WorkItem.Status.Filling)
                        {
                            TraceOutput(TraceBits.Fill,
                                           "Fill     waiting  wi({0}) stat({1}) nf({2})",
                                           workitem.index,
                                           workitem.status,
                                           _nextToFill);

                            wcycles++;

                            Monitor.Pulse(workitem);
                            Monitor.Wait(workitem);

                            if (workitem.status == (int)WorkItem.Status.None ||
                                workitem.status == (int)WorkItem.Status.Done ||
                                workitem.status == (int)WorkItem.Status.Filling)
                                TraceOutput(TraceBits.Fill,
                                               "Fill     A-OK     wi({0}) stat({1}) iba({2}) cyc({3})",
                                               workitem.index,
                                               workitem.status,
                                               workitem.inputBytesAvailable,
                                               wcycles);
                        }
                    }
                }
            }
            while (count > 0);  // until no more to write

            return;
        }



        /// <summary>
        /// Flush the stream.
        /// </summary>
        public override void Flush()
        {
            _Flush(false);
        }


        private void _Flush(bool lastInput)
        {
            if (_isClosed)
                throw new NotSupportedException();

            // pass any partial buffer out to the compressor workers:
            WorkItem workitem = _pool[_nextToFill % _pc];
            lock(workitem)
            {
                if ( workitem.status == (int)WorkItem.Status.Filling)
                {
                    workitem.status = (int)WorkItem.Status.Filled;
                    _nextToFill++;

                    // When flush is called from Close(), we set _noMore.
                    // can't do it before updating nextToFill, though.
                    if (lastInput)
                        _noMoreInputForThisSegment= true;

                    TraceOutput(TraceBits.Flush,
                                   "Flush    filled   wi({0})  iba({1}) nf({2}) nomore({3})",
                                   workitem.index, workitem.inputBytesAvailable, _nextToFill, _noMoreInputForThisSegment);

                    if (!ThreadPool.QueueUserWorkItem( _DeflateOne, workitem ))
                        throw new Exception("Cannot enqueue workitem");

                    //Monitor.Pulse(workitem);
                }
                else
                {
                    // When flush is called from Close(), we set _noMore.
                    // Gotta do this whether or not there is another packet to send along.
                    if (lastInput)
                        _noMoreInputForThisSegment= true;

                    TraceOutput(TraceBits.Flush,
                                   "Flush    noaction wi({0}) stat({1}) nf({2})  nomore({3})",
                                   workitem.index, workitem.status, _nextToFill, _noMoreInputForThisSegment);
                }
            }
        }


        /// <summary>
        /// Close the stream.
        /// </summary>
        /// <remarks>
        /// You must call Close on the stream to guarantee that all of the data written in has
        /// been compressed, and the compressed data has been written out.
        /// </remarks>
        public override void Close()
        {
            TraceOutput(TraceBits.Session, "Close {0:X8}", this.GetHashCode());

            if (_isClosed) return;

            _Flush(true);

            //System.Diagnostics.StackTrace st = new System.Diagnostics.StackTrace(1);
            //System.Console.WriteLine(st.ToString());

            // need to get Writer off the workitem, in case he's waiting forever
            WorkItem workitem = _pool[_nextToFill % _pc];
            lock(workitem)
            {
                Monitor.PulseAll(workitem);
            }

            // wait for the writer to complete his work
            TraceOutput(TraceBits.Synch, "Synch    _writingDone.WaitOne(begin)  Close");
            _writingDone.WaitOne();
            TraceOutput(TraceBits.Synch, "Synch    _writingDone.WaitOne(done)   Close");

            TraceOutput(TraceBits.Session, "-------------------------------------------------------");
            if (!_leaveOpen)
                _outStream.Close();

            _isClosed= true;
        }



//        /// <summary>The destructor</summary>
//         ~ParallelDeflateOutputStream()
//         {
//             TraceOutput(TraceBits.Lifecycle, "Destructor  {0:X8}", this.GetHashCode());
//             // call Dispose with false.  Since we're in the
//             // destructor call, the managed resources will be
//             // disposed of anyways.
//             Dispose(false);
//         }



        // workitem 10030 - implement a new Dispose method

        /// <summary>Dispose the object</summary>
        /// <remarks>
        ///   <para>
        ///     Because ParallelDeflateOutputStream is IDisposable, the
        ///     application must call this method when finished using the instance.
        ///   </para>
        ///   <para>
        ///     This method is generally called implicitly upon exit from
        ///     a <c>using</c> scope in C# (<c>Using</c> in VB).
        ///   </para>
        /// </remarks>
        new public void  Dispose()
        {
            TraceOutput(TraceBits.Lifecycle, "Dispose  {0:X8}", this.GetHashCode());
            _isDisposed= true;
            _pool = null;
            TraceOutput(TraceBits.Synch, "Synch    _sessionReset.Set()  Dispose");
            _sessionReset.Set();  // tell writer to die
            Dispose(true);
        }



        /// <summary>The Dispose method</summary>
        protected override void Dispose(bool disposeManagedResources)
        {
            if (disposeManagedResources)
            {
                // dispose managed resources
                _writingDone.Close();
                _sessionReset.Close();
            }
        }


        /// <summary>
        ///   Resets the stream for use with another stream.
        /// </summary>
        /// <remarks>
        ///   Because the ParallelDeflateOutputStream is expensive to create, it
        ///   has been designed so that it can be recycled and re-used.  You have
        ///   to call Close() on the stream first, then you can call Reset() on
        ///   it, to use it again on another stream.
        /// </remarks>
        ///
        /// <example>
        /// <code>
        /// ParallelDeflateOutputStream deflater = null;
        /// foreach (var inputFile in listOfFiles)
        /// {
        ///     string outputFile = inputFile + ".compressed";
        ///     using (System.IO.Stream input = System.IO.File.OpenRead(inputFile))
        ///     {
        ///         using (var outStream = System.IO.File.Create(outputFile))
        ///         {
        ///             if (deflater == null)
        ///                 deflater = new ParallelDeflateOutputStream(outStream,
        ///                                                            CompressionLevel.Best,
        ///                                                            CompressionStrategy.Default,
        ///                                                            true);
        ///             deflater.Reset(outStream);
        ///
        ///             while ((n= input.Read(buffer, 0, buffer.Length)) != 0)
        ///             {
        ///                 deflater.Write(buffer, 0, n);
        ///             }
        ///         }
        ///     }
        /// }
        /// </code>
        /// </example>
        public void Reset(Stream stream)
        {
            TraceOutput(TraceBits.Session, "-------------------------------------------------------");
            TraceOutput(TraceBits.Session, "Reset {0:X8} firstDone({1})", this.GetHashCode(), _firstWriteDone);

            if (!_firstWriteDone) return;

            if (_noMoreInputForThisSegment)
            {
                // wait til done writing:
                TraceOutput(TraceBits.Synch, "Synch    _writingDone.WaitOne(begin)  Reset");
                _writingDone.WaitOne();
                TraceOutput(TraceBits.Synch, "Synch    _writingDone.WaitOne(done)   Reset");

                // reset all status
                foreach (var workitem in _pool)
                    workitem.status = (int) WorkItem.Status.None;

                _noMoreInputForThisSegment= false;
                _nextToFill= _nextToWrite= 0;
                _totalBytesProcessed = 0L;
                _Crc32= 0;
                _isClosed= false;

                TraceOutput(TraceBits.Synch, "Synch    _writingDone.Reset()         Reset");
                _writingDone.Reset();
            }
            else
            {
                TraceOutput(TraceBits.Synch, "Synch                           Reset noMore=false");
            }

            _outStream = stream;

            // release the writer thread for the next "session"
            TraceOutput(TraceBits.Synch, "Synch    _sessionReset.Set()          Reset");
            _sessionReset.Set();
        }



        private void _PerpetualWriterMethod(object state)
        {
            TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod START");

            try
            {
                do
                {
                    // wait for the next session
                    TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch    _sessionReset.WaitOne(begin) PWM");
                    _sessionReset.WaitOne();
                    TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch    _sessionReset.WaitOne(done)  PWM");

                    if (_isDisposed) break;

                    TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch    _sessionReset.Reset()        PWM");
                    _sessionReset.Reset();

                    // repeatedly write buffers as they become ready
                    WorkItem workitem = null;
                    Ionic.Zlib.CRC32 c= new Ionic.Zlib.CRC32();
                    do
                    {
                        workitem = _pool[_nextToWrite % _pc];
                        lock(workitem)
                        {
                            if (_noMoreInputForThisSegment)
                                TraceOutput(TraceBits.Write,
                                               "Write    drain    wi({0}) stat({1}) canuse({2})  cba({3})",
                                               workitem.index,
                                               workitem.status,
                                               (workitem.status == (int)WorkItem.Status.Compressed),
                                               workitem.compressedBytesAvailable);

                            do
                            {
                                if (workitem.status == (int)WorkItem.Status.Compressed)
                                {
                                    TraceOutput(TraceBits.WriteBegin,
                                                   "Write    begin    wi({0}) stat({1})              cba({2})",
                                                   workitem.index,
                                                   workitem.status,
                                                   workitem.compressedBytesAvailable);

                                    workitem.status = (int)WorkItem.Status.Writing;
                                    _outStream.Write(workitem.compressed, 0, workitem.compressedBytesAvailable);
                                    c.Combine(workitem.crc, workitem.inputBytesAvailable);
                                    _totalBytesProcessed += workitem.inputBytesAvailable;
                                    _nextToWrite++;
                                    workitem.inputBytesAvailable= 0;
                                    workitem.status = (int)WorkItem.Status.Done;

                                    TraceOutput(TraceBits.WriteDone,
                                                   "Write    done     wi({0}) stat({1})              cba({2})",
                                                   workitem.index,
                                                   workitem.status,
                                                   workitem.compressedBytesAvailable);


                                    Monitor.Pulse(workitem);
                                    break;
                                }
                                else
                                {
                                    int wcycles = 0;
                                    // I've locked a workitem I cannot use.
                                    // Therefore, wake someone else up, and then release the lock.
                                    while (workitem.status != (int)WorkItem.Status.Compressed)
                                    {
                                        TraceOutput(TraceBits.WriteWait,
                                                       "Write    waiting  wi({0}) stat({1}) nw({2}) nf({3}) nomore({4})",
                                                       workitem.index,
                                                       workitem.status,
                                                       _nextToWrite, _nextToFill,
                                                       _noMoreInputForThisSegment );

                                        if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
                                            break;

                                        wcycles++;

                                        // wake up someone else
                                        Monitor.Pulse(workitem);
                                        // release and wait
                                        Monitor.Wait(workitem);

                                        if (workitem.status == (int)WorkItem.Status.Compressed)
                                            TraceOutput(TraceBits.WriteWait,
                                                           "Write    A-OK     wi({0}) stat({1}) iba({2}) cba({3}) cyc({4})",
                                                           workitem.index,
                                                           workitem.status,
                                                           workitem.inputBytesAvailable,
                                                           workitem.compressedBytesAvailable,
                                                           wcycles);
                                    }

                                    if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
                                        break;

                                }
                            }
                            while (true);
                        }

                        if (_noMoreInputForThisSegment)
                            TraceOutput(TraceBits.Write,
                                           "Write    nomore  nw({0}) nf({1}) break({2})",
                                           _nextToWrite, _nextToFill, (_nextToWrite == _nextToFill));

                        if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
                            break;

                    } while (true);


                    // Finish:
                    // After writing a series of buffers, closing each one with
                    // Flush.Sync, we now write the final one as Flush.Finish, and
                    // then stop.
                    byte[] buffer = new byte[128];
                    ZlibCodec compressor = new ZlibCodec();
                    int rc = compressor.InitializeDeflate(_compressLevel, false);
                    compressor.InputBuffer = null;
                    compressor.NextIn = 0;
                    compressor.AvailableBytesIn = 0;
                    compressor.OutputBuffer = buffer;
                    compressor.NextOut = 0;
                    compressor.AvailableBytesOut = buffer.Length;
                    rc = compressor.Deflate(FlushType.Finish);

                    if (rc != ZlibConstants.Z_STREAM_END && rc != ZlibConstants.Z_OK)
                        throw new Exception("deflating: " + compressor.Message);

                    if (buffer.Length - compressor.AvailableBytesOut > 0)
                    {
                        TraceOutput(TraceBits.WriteBegin,
                                       "Write    begin    flush bytes({0})",
                                       buffer.Length - compressor.AvailableBytesOut);

                        _outStream.Write(buffer, 0, buffer.Length - compressor.AvailableBytesOut);

                        TraceOutput(TraceBits.WriteBegin,
                                       "Write    done     flush");
                    }

                    compressor.EndDeflate();

                    _Crc32 = c.Crc32Result;

                    // signal that writing is complete:
                    TraceOutput(TraceBits.Synch, "Synch    _writingDone.Set()           PWM");
                    _writingDone.Set();
                }
                while (true);
            }
            catch (System.Exception exc1)
            {
                lock(_eLock)
                {
                    // expose the exception to the main thread
                    if (_pendingException!=null)
                        _pendingException = exc1;
                }
            }

            TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod FINIS");
        }




        private void _DeflateOne(Object wi)
        {
            WorkItem workitem = (WorkItem) wi;
            try
            {
                // compress one buffer
                int myItem = workitem.index;

                lock(workitem)
                {
                    if (workitem.status != (int)WorkItem.Status.Filled)
                        throw new InvalidOperationException();

                    Ionic.Zlib.CRC32 crc = new CRC32();

                    // use the workitem:
                    // calc CRC on the buffer
                    crc.SlurpBlock(workitem.buffer, 0, workitem.inputBytesAvailable);

                    // deflate it
                    DeflateOneSegment(workitem);

                    // update status
                    workitem.status = (int)WorkItem.Status.Compressed;
                    workitem.crc = crc.Crc32Result;

                    TraceOutput(TraceBits.Compress,
                                   "Compress          wi({0}) stat({1}) len({2})",
                                   workitem.index,
                                   workitem.status,
                                   workitem.compressedBytesAvailable
                                   );

                    // release the item
                    Monitor.Pulse(workitem);
                }
            }
            catch (System.Exception exc1)
            {
                lock(_eLock)
                {
                    // expose the exception to the main thread
                    if (_pendingException!=null)
                        _pendingException = exc1;
                }
            }
        }




        private bool DeflateOneSegment(WorkItem workitem)
        {
            ZlibCodec compressor = workitem.compressor;
            int rc= 0;
            compressor.ResetDeflate();
            compressor.NextIn = 0;

            compressor.AvailableBytesIn = workitem.inputBytesAvailable;

            // step 1: deflate the buffer
            compressor.NextOut = 0;
            compressor.AvailableBytesOut =  workitem.compressed.Length;
            do
            {
                compressor.Deflate(FlushType.None);
            }
            while (compressor.AvailableBytesIn > 0 || compressor.AvailableBytesOut == 0);

            // step 2: flush (sync)
            rc = compressor.Deflate(FlushType.Sync);

            workitem.compressedBytesAvailable= (int) compressor.TotalBytesOut;
            return true;
        }


        [System.Diagnostics.ConditionalAttribute("Trace")]
        private void TraceOutput(TraceBits bits, string format, params object[] varParams)
        {
            if ((bits & _DesiredTrace) != 0)
            {
                lock(_outputLock)
                {
                    int tid = Thread.CurrentThread.GetHashCode();
                    Console.ForegroundColor = (ConsoleColor) (tid % 8 + 8);
                    Console.Write("{0:000} PDOS ", tid);
                    Console.WriteLine(format, varParams);
                    Console.ResetColor();
                }
            }
        }


        // used only when Trace is defined
        [Flags]
        enum TraceBits
        {
            None         = 0,
            Write        = 1,    // write out
            WriteBegin   = 2,    // begin to write out
            WriteDone    = 4,    // done writing out
            WriteWait    = 8,    // write thread waiting for buffer
            Flush        = 16,
            Compress     = 32,   // async compress
            Fill         = 64,   // filling buffers, when caller invokes Write()
            Lifecycle    = 128,  // constructor/disposer
            Session      = 256,  // Close/Reset
            Synch        = 512,  // thread synchronization
            WriterThread = 1024, // writer thread
        }



        /// <summary>
        /// Indicates whether the stream supports Seek operations.
        /// </summary>
        /// <remarks>
        /// Always returns false.
        /// </remarks>
        public override bool CanSeek
        {
            get { return false; }
        }


        /// <summary>
        /// Indicates whether the stream supports Read operations.
        /// </summary>
        /// <remarks>
        /// Always returns false.
        /// </remarks>
        public override bool CanRead
        {
            get {return false;}
        }

        /// <summary>
        /// Indicates whether the stream supports Write operations.
        /// </summary>
        /// <remarks>
        /// Returns true if the provided stream is writable.
        /// </remarks>
        public override bool CanWrite
        {
            get { return _outStream.CanWrite; }
        }

        /// <summary>
        /// Reading this property always throws a NotImplementedException.
        /// </summary>
        public override long Length
        {
            get { throw new NotImplementedException(); }
        }

        /// <summary>
        /// Reading or Writing this property always throws a NotImplementedException.
        /// </summary>
        public override long Position
        {
            get { throw new NotImplementedException(); }
            set { throw new NotImplementedException(); }
        }

        /// <summary>
        /// This method always throws a NotImplementedException.
        /// </summary>
        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// This method always throws a NotImplementedException.
        /// </summary>
        public override long Seek(long offset, System.IO.SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// This method always throws a NotImplementedException.
        /// </summary>
        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

    }

}


www.java2v.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.