rss· 投稿· 设为首页· 加入收藏· 繁體版
当前位置: 火魔网 » 程序开发 » C#

C#实现的一个先进先出的环形内存流 (1)

//from http://www.cnblogs.com/jasonsun/archive/2008/06/19/1226167.html

创建一个流然后一个线程往流中写入数据,而另一个线程从流中读取数据。

    派生于 System.IO.Stream     环形缓冲区     自动扩展容量

/*
* Author: Jason Sun (相交的平行线) * Email: sjshjz@hotmail.com * Copyright (C) Jason Sun. All Rights Reserved.
*/

using System;
using System.IO;

namespace Common     /**//// <summary>     /// </summary>
    public sealed class RingMemoryStream : Stream         private const int INITCAPACITY = 4 * 1024; // 默认初始容量
        private const int INCREMENTSIZE = 4 * 1024;// 自动扩展时一次最少扩展多少字节

        private object m_synObject = new object(); // 同步对象

        private byte[] m_buffer;    // 内部缓冲区
        private int m_capacity;     // 缓冲区的容量
        private int m_length;       // 缓冲区中当前有效数据的长度
        private bool m_expandable; // 是否可自动扩展容量
        private int m_maxCapacity; // 可扩展到的最大容量

        private int m_rPos;         // 读指针的偏移位置(指向流中第一个有效字节)
        private int m_wPos;         // 写指针的偏移位置(指向流中最后一个有效字节之后的那个字节)

        /**//// <summary>
        /// 使用默认的初始容量以及可自动扩展初始化 RingMemoryStream 类的新实例。
        /// </summary>
        public RingMemoryStream()
            : this(INITCAPACITY)         }

        /**//// <summary>
        /// 使用指定的初始容量以及可自动扩展初始化 RingMemoryStream 类的新实例。
        /// </summary>
        public RingMemoryStream(int capacity)
            : this(capacity, true)         }

        /**//// <summary>
        /// 使用指定的容量和是否可自动扩展初始化 RingMemoryStream 类的新实例。
        /// </summary>
        /// <param name="capacity">指定流的初始容量。</param>
        /// <param name="expandable">是否可自动扩展。</param>
        public RingMemoryStream(int capacity, bool expandable)
            : this(capacity,expandable,-1)         }

        /**//// <summary>
        /// 使用指定的容量和是否可扩展以及最大可扩展到的字节数初始化 RingMemoryStream 类的新实例。
        /// </summary>
        /// <param name="capacity">指定流的初始容量。</param>
        /// <param name="expandable">是否可自动扩展。</param>
        /// <param name="maxCapacity">最大可扩展到的字节数。指定 -1 表示不限制最大可扩展到的容量。如果 expandable 为 false,则此参数无意义。</param>
        public RingMemoryStream(int capacity, bool expandable, int maxCapacity)             if (capacity < 0)                 throw new ArgumentOutOfRangeException("capacity");             if (expandable && (maxCapacity != -1 && maxCapacity < capacity))                 throw new ArgumentOutOfRangeException("maxCapacity");
            }

            m_length = 0;
            m_capacity = capacity;
            m_expandable = expandable;
            m_maxCapacity = maxCapacity;

            m_buffer = new byte[m_capacity];
            m_rPos = 0;
            m_wPos = 0;
        }

        /**//// <summary>         /// </summary>
        public override bool CanRead             get { return true; }
        }

        /**//// <summary>         /// </summary>
        public override bool CanSeek             get { return false; }
        }

        /**//// <summary>         /// </summary>
        public override bool CanWrite             get { return true; }
        }

        /**//// <summary>         /// </summary>
        public override long Length             get                 lock (m_synObject)                     return m_length;             }
        }

        /**//// <summary>
        /// 已重写。RingMemoryStream 流不支持 Position 属性,在设置或获取 Position 属性的值时将引发 NotSupportedException 异常。
        /// </summary>
        public override long Position             get                 throw new NotSupportedException();             set                 throw new NotSupportedException();         }

        /**//// <summary>
        /// 已重写。调用此方法时将引发 NotSupportedException 异常。
        /// </summary>
        public override void Flush()             throw new NotSupportedException();
        }

        /**//// <summary>
        /// 已重写。调用此方法时将引发 NotSupportedException 异常。
        /// </summary>
        public override long Seek(long offset, SeekOrigin origin)             throw new NotSupportedException();
        }

        /**//// <summary>
        /// 已重写。调用此方法时将引发 NotSupportedException 异常。
        /// </summary>
        public override void SetLength(long value)             throw new NotSupportedException();
        }

        /**//// <summary>
        /// 已重写。从当前流中读取字节块并将数据写入 buffer 中。已读取的数据将被流抛弃。
        /// </summary>
        /// <param name="buffer">包含所读取到的字节。</param>
        /// <param name="offset">buffer 中的字节偏移量。</param>
        /// <param name="count">最多读取的字节数。</param>
        /// <returns>成功读取到的总字节数。如果没有读取到任何字节则为 0。</returns>
        public override int Read(byte[] buffer, int offset, int count)             if (buffer == null)                 throw new ArgumentNullException("buffer");             if (offset < 0 || count < 0)                 throw new ArgumentOutOfRangeException("offset 或 count 不能为负数。");             if ((buffer.Length - offset) < count)                 throw new ArgumentException("buffer 的长度减去 offset 的结果小于 count。");
            }

            lock (m_synObject)                 // 真正要读取的字节数
                int readLen = Math.Min(m_length, count);
                if (readLen == 0)                     return 0;
                }

                ReadInternal(buffer,offset,readLen);

                return readLen;         }

        /**//// <summary>         /// </summary>
        /// <param name="buffer">要写入当前流的字节块。</param>
        /// <param name="offset">buffer 中的偏移量,从此处开始写入。</param>
        /// <param name="count">最多写入的字节数。</param>
        /// <remarks>         /// 则会引发 NotSupportedException 异常。
        /// </remarks>
        public override void Write(byte[] buffer, int offset, int count)             if (buffer == null)                 throw new ArgumentNullException("buffer");             if (offset < 0 || count < 0)                 throw new ArgumentOutOfRangeException("offset 或 count 不能为负数。");             if ((buffer.Length - offset) < count)                 throw new ArgumentException("buffer 的长度减去 offset 的结果小于 count。");
            }

            lock (m_synObject)                 // 要往流中写入 buffer 中的数据,流的容量至少要是这么多
                int minCapacityNeeded = m_length + count;

                // 如果需要扩展流则扩展流
                ExpandStream(minCapacityNeeded);

                // 如果无法再容纳下指定的字节数
                if (minCapacityNeeded > m_capacity)                     throw new NotSupportedException("无法再往流中写入 " + count + " 个字节。");
                }

                this.WriteInternal(buffer, offset, count);         }

        private void WriteInternal(byte[] buffer, int offset, int count)             if (m_rPos > m_wPos)                 Buffer.BlockCopy(buffer, offset, m_buffer, m_wPos, count);             else                 int afterWritePosLen = m_capacity - m_wPos;

                // 如果 m_rPos 之后的字节数够用
                if (afterWritePosLen >= count)                     Buffer.BlockCopy(buffer, offset, m_buffer, m_wPos, count);                 else                     Buffer.BlockCopy(buffer, offset, m_buffer, m_wPos, afterWritePosLen);
                    int restLen = count - afterWritePosLen;
                    Buffer.BlockCopy(buffer, afterWritePosLen, m_buffer, 0, restLen);             }
            m_wPos += count;
            m_wPos %= m_capacity;
            m_length += count;
        }

        private void ReadInternal(byte[] buffer, int offset, int count)             if (m_rPos < m_wPos)                 Buffer.BlockCopy(m_buffer, m_rPos, buffer, offset, count);             else                 int afterReadPosLen = m_capacity - m_rPos;

                // 如果 m_rPos 之后的字节数够用
                if (afterReadPosLen >= count)                     Buffer.BlockCopy(m_buffer, m_rPos, buffer, offset, count);                 else                     Buffer.BlockCopy(m_buffer, m_rPos, buffer, offset, afterReadPosLen);
                    int restLen = count - afterReadPosLen;
                    Buffer.BlockCopy(m_buffer, 0, buffer, afterReadPosLen, restLen);             }

            m_rPos += count;
            m_rPos %= m_capacity;
            m_length -= count;
        }

        // 扩展流
        private void ExpandStream(int minSize)             // 不支持扩展
            if (!m_expandable)                 return;
            }

            // 不需要扩展
            if (m_capacity >= minSize)                 return;
            }

            // 如果无法再扩展
            if (m_maxCapacity != -1 && (m_maxCapacity - m_capacity) < INCREMENTSIZE)                 return;
            }

            // 计算要扩展几块(INCREMENTSIZE 的倍数)
            int blocksNum = (int)Math.Ceiling((double)(minSize - m_capacity) / INCREMENTSIZE);

            // 创建新的缓冲区,并把旧缓冲区中的数据复制到新缓冲区中
            byte[] buffNew = new byte[m_capacity + blocksNum * INCREMENTSIZE];
            int strLen = m_length;
            ReadInternal(buffNew, 0, m_length);
            m_buffer = buffNew;
            m_rPos = 0;
            m_wPos = strLen;
            m_capacity = buffNew.Length;
            m_length = strLen;     }   
}

顶一下
(0)
踩一下
(0)