`
cloudeagle_bupt
  • 浏览: 529534 次
文章分类
社区版块
存档分类
最新评论

环形缓冲区读取磁盘

 
阅读更多
package ringBuffer.reader;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.LinkedHashMap;

public class MsgReader {

	public static void main(String[] args) {
		long start = System.currentTimeMillis();
		MsgReader smh = new MsgReader();
		Reader rd = smh.new Reader();

		try {
			new Thread(rd).start();
			for (Integer j = 0; j < 5; j++) {
				LinkedHashMap<Integer, String> msgs = rd.getMsg(j);
				System.out.println(" msg : " + msgs.get(0));
			}
			rd.interrupt();
			rd.join(); // 这里主线程会否比子线程先跑完? 加上join
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(" last : " + (System.currentTimeMillis() - start));
		System.exit(0);
	}

	class Reader extends Thread {
		int bufferSize = 2 * 1024 * 1024;
		private byte[] buffer = new byte[bufferSize];
		private Integer filePos = 0; // 文件指针
		private Integer bufPos = 0; // 缓冲区指针
		String outputFile = "F:\\test\\ringBuf.txt";
		private int fileLength = 0;
		boolean isRead = true;
		long stepNum = 0; 
		RandomAccessFile raf;
		LinkedHashMap<Long, LinkedHashMap<Integer, String>> lastJobMsgs = new LinkedHashMap<Long, LinkedHashMap<Integer, String>>();

		public Reader() {
			try {
				raf = new RandomAccessFile(outputFile, "rw");
				fileLength = (int) raf.length(); // 中间文件一般不会超过int大小?
			} catch (IOException e) {
				e.printStackTrace();
			}
		}

		@Override
		public void run() {
			try {
				while (!Thread.interrupted()) {
					synchronized (this) {
						while (isRead) {
							raf.seek(filePos);
							int readLength= 0 ;
							if ((fileLength - filePos + bufPos) > bufferSize) {
								raf.read(buffer, bufPos, bufferSize - bufPos);
								filePos = filePos + bufferSize - bufPos;
								readLength = bufferSize ; //满读
							} else {
								readLength = fileLength - filePos;
								raf.read(buffer, bufPos, readLength);//部分读
								readLength = bufPos + readLength ;
								filePos = (int) raf.length();
							}
							bufPos= 0;  //每次新读取处理都从buf的0处开始
							fromBufToMsg(buffer, bufPos, readLength);
							isRead = false;
							this.notifyAll();
						}
					}
				}
				raf.close();
			} catch (IOException e) {
				e.printStackTrace();
			}  
		}

		public void fromBufToMsg(byte[] buffer, int pos, int len) throws IOException {
			String str = new String(buffer,pos, len);
			String[] map = str.split("R");
			if (map.length > 1) { // 包含结果

			} else {
				String[] superSteps = str.split("S");
				for (int i = 0; i < superSteps.length; i++) {
					LinkedHashMap<Integer, String> keyValues = new LinkedHashMap<Integer, String>();
					if (i == superSteps.length - 1) { // 最后一组可能取不全,要做处理
						if (str.lastIndexOf("S") != (str.length() - 1)) {
							System.arraycopy(buffer, bufPos, buffer, 0,
									(bufferSize - bufPos)); // 把最后Step的信息放到开头
							bufPos = bufferSize - bufPos;
							break;
						}
						bufPos = 0; // 恰好取整
					}
					String[] vertexMsgs = superSteps[i].split("V");
					for (String vertexMsg : vertexMsgs) {
						String[] keyMsgs = vertexMsg.split("-");
						Integer key = new Integer(keyMsgs[0]);
						if (keyMsgs.length < 2)
							continue;
						keyValues.put(key, keyMsgs[1]);
					}
					bufPos = bufPos + superSteps[i].length() + 1;
					lastJobMsgs.put(stepNum++, keyValues);
				}
			}
			return;
		}

		public LinkedHashMap<Integer, String> getMsg(long i) {
			synchronized (this) {
				try {
					while (lastJobMsgs.get(i) == null) {
						isRead = true;
						this.wait();
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				return lastJobMsgs.get(i);
			}
		}
	}
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics