最近使用Mina開發(fā)一個(gè)Java的NIO服務(wù)端程序,因此也特意學(xué)習(xí)了Apache的這個(gè)Mina框架。 首先,Mina是個(gè)什么東西?看下官方網(wǎng)站(http://mina./)對(duì)它的解釋: Apache的Mina(Multipurpose Infrastructure Networked Applications)是一個(gè)網(wǎng)絡(luò)應(yīng)用框架,可以幫助用戶開發(fā)高性能和高擴(kuò)展性的網(wǎng)絡(luò)應(yīng)用程序;它提供了一個(gè)抽象的、事件驅(qū)動(dòng)的異步API,使Java NIO在各種傳輸協(xié)議(如TCP/IP,UDP/IP協(xié)議等)下快速高效開發(fā)。 Apache Mina也稱為: - NIO框架
- 客戶端/服務(wù)端框架(典型的C/S架構(gòu))
- 網(wǎng)絡(luò)套接字(networking socket)類庫
- 事件驅(qū)動(dòng)的異步API(注意:在JDK7中也新增了異步API)
總之:我們簡(jiǎn)單理解它是一個(gè)封裝底層IO操作,提供高級(jí)操作API的通訊框架! 在Mina的官網(wǎng)、以及網(wǎng)上都有比較豐富的文檔了,這里我就稍微簡(jiǎn)單說一下Mina的結(jié)構(gòu)和示例代碼。 因?yàn)镸ina2.X改進(jìn)了Mina的代碼結(jié)構(gòu)和包結(jié)構(gòu),降低了使用的復(fù)雜性和增強(qiáng)了健壯性,所以使得API發(fā)生了比較大的改變,有許多地方已經(jīng)和Mina1.x不兼容了。 這里使用的是Mina2.0.4 1.Mina的結(jié)構(gòu) Mina的通信流程大致如上圖所示,各個(gè)組件功能有: (1.) IoService:這個(gè)接口在一個(gè)線程上負(fù)責(zé)套接字的建立,擁有自己的Selector,監(jiān) 聽是否有連接被建立。 (Mina底層使用JAVA NIO, 因此它是典型的使用Reactor模式架構(gòu)的,采用事件驅(qū)動(dòng)編程 , Mina運(yùn)行用戶自定義線程模型,可以是單線程、多線程、線程池等 , 跟JAVA Socket不一樣, Mina是非阻塞的Socket,它內(nèi)部已經(jīng)保證了對(duì)各個(gè)連接(session)的業(yè)務(wù)和數(shù)據(jù)的隔離,采用輪詢機(jī)制為各個(gè)session分配CPU資源, 所以,你就不需要再去考慮不同Socket連接需要用不同的線程去操縱的問題了。) (2.) IoProcessor:這個(gè)接口在另一個(gè)線程上負(fù)責(zé)檢查是否有數(shù)據(jù)在通道上讀寫,也就是 說它也擁有自己的Selector,這是與我們使用JAVA NIO 編碼時(shí)的一個(gè)不同之處, 通常在JAVA NIO 編碼中,我們都是使用一個(gè)Selector,也就是不區(qū)分IoService 與 IoProcessor 兩個(gè)功能接口。另外,IoProcessor 負(fù)責(zé)調(diào)用注冊(cè)在IoService 上 的過濾器,并在過濾器鏈之后調(diào)用IoHandler。 (3.) IoFilter:這個(gè)接口定義一組攔截器,這些攔截器可以包括日志輸出、黑名單過濾、 數(shù)據(jù)的編碼(write 方向)與解碼(read 方向)等功能,其中數(shù)據(jù)的encode 與 decode 是最為重要的、也是你在使用Mina 時(shí)最主要關(guān)注的地方。
(4.) IoHandler:這個(gè)接口負(fù)責(zé)編寫業(yè)務(wù)邏輯,也就是接收、發(fā)送數(shù)據(jù)的地方。
2. Mina編程的大致過程. 2.1 總體流程 建立服務(wù)器端的資源: 包括 Acceptor的建立,之后為Acceptor配置相應(yīng)的Filter(可以是Mina自帶的Filter或者自定義的Filter), 之后再配置相應(yīng)基于事件驅(qū)動(dòng)的處理業(yè)務(wù)邏輯的IoHandler. 建立客戶端的資源: Mina采用了統(tǒng)一的編程模型,所以建立客戶端的過程和建立服務(wù)器端的過程大致上是相似的,不過這里建立的是Connector. 2.2 示例程序。(使用jar包為 mina-core-2.0.4.jar) 下面通過一個(gè)簡(jiǎn)單的示例程序來進(jìn)一步理解Mina的運(yùn)行機(jī)制。 該程序?qū)崿F(xiàn)簡(jiǎn)單的即時(shí)通訊功能。 即,多個(gè)客戶端可以同時(shí)臉上服務(wù)器,并進(jìn)行類似于聊天室一樣的通信。 2.2.1 建立自定義的TextLineCodecFacotry 為了了解Mina的代碼功能以及運(yùn)行機(jī)制,我們模擬實(shí)現(xiàn)了類似Mina自帶TextLineCodecFactory。 該CodecFactory功能是: 配合ProtocolCodecFilter 進(jìn)行對(duì)底層數(shù)據(jù)(binary二進(jìn)制數(shù)據(jù)流)和高層數(shù)據(jù)(特定類型的數(shù)據(jù)對(duì)象信息,例如String)之間的轉(zhuǎn)換。 這里實(shí)現(xiàn)了一個(gè)斷行讀取功能,即遇到'\n'的時(shí)候,就認(rèn)為是一個(gè)String Line , 將這段數(shù)據(jù)流封裝成String,之后再交給下一個(gè)Filter或者Handler處理。 CodecFactory是一個(gè)工廠方法,底層通過一個(gè)Decoder和Encoder來提供對(duì)數(shù)據(jù)進(jìn)行解、編碼的操作,載體是IoBuffer。 (解碼:將二進(jìn)制數(shù)據(jù)轉(zhuǎn)換成高層數(shù)據(jù)(對(duì)象) 編碼:將高層數(shù)據(jù)(對(duì)象)轉(zhuǎn)換成二進(jìn)制數(shù)據(jù)流) ) 1)建立Decoder (MyTextLineDecoder) 實(shí)現(xiàn)了ProtocolDecoder接口 View Codepackage com.mai.mina.diyCodecFilter;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput;
public class MyTextLineDecoder implements ProtocolDecoder{
Charset charset = Charset.forName("UTF-8"); IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true); @Override public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput output) throws Exception { // TODO Auto-generated method stub while(in.hasRemaining()){ byte b = in.get(); if(b == '\n'){ buf.flip(); byte[] bytes = new byte[buf.limit()]; buf.get(bytes); String message = new String(bytes,charset); buf = IoBuffer.allocate(100).setAutoExpand(true); output.write(message); }else{ buf.put(b); } } }
@Override public void dispose(IoSession arg0) throws Exception { // TODO Auto-generated method stub }
@Override public void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1) throws Exception { // TODO Auto-generated method stub }
} 2)建立Encoder (MyTextLineEncoder) 實(shí)現(xiàn)了ProtocolEncoder接口 View Codepackage com.mai.mina.diyCodecFilter;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.textline.LineDelimiter;
public class MyTextLineEncoder implements ProtocolEncoder{
Charset charset = Charset.forName("UTF-8"); @Override public void dispose(IoSession session) throws Exception { // TODO Auto-generated method stub }
@Override public void encode(IoSession session, Object message, ProtocolEncoderOutput output) throws Exception { // TODO Auto-generated method stub IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true); buf.putString(message.toString(), charset.newEncoder()); buf.putString(LineDelimiter.DEFAULT.getValue(), charset.newEncoder()); buf.flip(); output.write(buf); }
} 3)建立MyTextLineCodecFactory 實(shí)現(xiàn)了ProtocolCodecFactory接口 package com.mai.mina.diyCodecFilter;
import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder;
public class MyTextLineCodecFactory implements ProtocolCodecFactory{
@Override public ProtocolDecoder getDecoder(IoSession arg0) throws Exception { // TODO Auto-generated method stub return new MyTextLineDecoder(); }
@Override public ProtocolEncoder getEncoder(IoSession arg0) throws Exception { // TODO Auto-generated method stub return new MyTextLineEncoder(); }
} 2.2.2 建立服務(wù)器端資源(包括Acceptor的配置、Handler建立)
1). 建立自定義IoHandler(MyServerHandleDemo1) 實(shí)現(xiàn)了IoHandler接口。 View Codepackage com.mai.mina.diyChat;
import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; import java.util.logging.Logger;
import org.apache.mina.core.future.CloseFuture; import org.apache.mina.core.future.IoFuture; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession;
public class MyServerHandleDemo1 implements IoHandler{
private Logger logger = Logger.getLogger(this.getClass().getName()); @Override public void exceptionCaught(IoSession session, Throwable arg1) throws Exception { // TODO Auto-generated method stub logger.warning("服務(wù)器啟動(dòng)發(fā)生異常,have a exception : " + arg1.getMessage()); }
@Override public void messageReceived(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub String messageStr = message.toString(); DateFormat format = new SimpleDateFormat("yyyy-MM-dd H:m:s"); String dateStr = format.format(new Date()); logger.info(messageStr + "\t" + dateStr); Collection<IoSession> sessions = session.getService().getManagedSessions().values(); for(IoSession tempSession : sessions){ tempSession.write(messageStr + "\t" + dateStr); } }
@Override public void messageSent(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub logger.info("服務(wù)器成功發(fā)送信息: " + message.toString()); }
@Override public void sessionClosed(IoSession session) throws Exception { // TODO Auto-generated method stub logger.info("there is a session closed"); CloseFuture future = session.close(true); future.addListener(new IoFutureListener(){ public void operationComplete(IoFuture future){ if(future instanceof CloseFuture){ ((CloseFuture)future).setClosed(); logger.info("have do the future set to closed"); } } }); }
@Override public void sessionCreated(IoSession session) throws Exception { // TODO Auto-generated method stub logger.info("there is a session created"); session.write("welcome to the chat room"); }
@Override public void sessionIdle(IoSession session, IdleStatus arg1) throws Exception { // TODO Auto-generated method stub logger.info(session.getId() + "(SesssionID) is idle in the satate-->" + arg1); }
@Override public void sessionOpened(IoSession arg0) throws Exception { // TODO Auto-generated method stub }
} 2).建立Acceptor ,同時(shí)也充當(dāng)Server端的啟動(dòng)類 (SimpleMinaServer) package com.mai.mina.diyChat;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset;
import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LogLevel; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.SocketAcceptor; import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class SimpleMinaServer { SocketAcceptor acceptor = null; SimpleMinaServer(){ acceptor = new NioSocketAcceptor(); } public boolean bind(){ acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter( new MyTextLineCodecFactory()); //配置CodecFactory LoggingFilter log = new LoggingFilter(); log.setMessageReceivedLogLevel(LogLevel.INFO); acceptor.getFilterChain().addLast("logger", log); acceptor.setHandler(new MyServerHandleDemo1()); //配置handler acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30); try { acceptor.bind(new InetSocketAddress(8888)); return true; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); return false; } } public static void main(String args[]){ SimpleMinaServer server = new SimpleMinaServer(); if(!server.bind()){ System.out.println("服務(wù)器啟動(dòng)失敗"); }else{ System.out.println("服務(wù)器啟動(dòng)成功"); } }
} 2.2.3 建立Client端資源:
1)自定義IoHandler(MyClientHandleDemo1) 實(shí)現(xiàn)IoHandler接口 View Codepackage com.mai.mina.diyChat;
import java.util.logging.Logger;
import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession;
public class MyClientHandleDemo1 extends IoHandlerAdapter{ private ChatPanel messagePanel = null; private Logger logger = Logger.getLogger(this.getClass().getName());
MyClientHandleDemo1(){ } MyClientHandleDemo1(ChatPanel messagePanel){ this.messagePanel = messagePanel; } public void messageReceived(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub String messageStr = message.toString(); logger.info("receive a message is : " + messageStr); if(messagePanel != null) messagePanel.showMsg(messageStr); } public void messageSent(IoSession session , Object message) throws Exception{ logger.info("客戶端發(fā)了一個(gè)信息:" + message.toString()); } } 2) 建立Connector (SimpleMinaClient) View Codepackage com.mai.mina.diyChat;
import java.net.InetSocketAddress; import java.nio.charset.Charset;
import org.apache.mina.core.future.CloseFuture; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LogLevel; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.SocketConnector; import org.apache.mina.transport.socket.nio.NioSocketConnector;
public class SimpleMinaClient {
public SocketConnector connector = null; public ConnectFuture future; public IoSession session = null; private ChatPanel messagePanel = null; SimpleMinaClient(){ } SimpleMinaClient(ChatPanel messagePanel){ this.messagePanel = messagePanel; } boolean connect(){ try{ connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(3000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter( new MyTextLineCodecFactory()); LoggingFilter log = new LoggingFilter(); log.setMessageReceivedLogLevel(LogLevel.INFO); connector.getFilterChain().addLast("logger", log); connector.setHandler(new MyClientHandleDemo1(messagePanel)); future = connector.connect(new InetSocketAddress("127.0.0.1" , 8888)); future.awaitUninterruptibly(); session = future.getSession(); return true; }catch(Exception e){ e.printStackTrace(); return false; } } public void setAttribute(Object key , Object value){ session.setAttribute(key, value); } void sentMsg(String message){ session.write(message); } boolean close(){ CloseFuture future = session.getCloseFuture(); future.awaitUninterruptibly(1000); connector.dispose(); return true; } public SocketConnector getConnector() { return connector; }
public IoSession getSession() { return session; }
/** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub SimpleMinaClient client = new SimpleMinaClient(); if(client.connect()){ client.sentMsg("hello , sever !"); client.close(); } }
} 到這里,基本的Mina通信基礎(chǔ)就建立好了。 接下來實(shí)現(xiàn)一個(gè)客戶端的GUI界面,方便實(shí)際功能的建立和信息交互的演示。 2.2.4 Client Gui界面的建立。(ChatPanel -通過使用SimpleMinaClient來提供實(shí)際通信功能) View Codepackage com.mai.mina.diyChat; import java.awt.BorderLayout; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent;
import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JLabel; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.swing.JTextArea; import javax.swing.JTextField;
import org.apache.commons.lang.math.RandomUtils;
public class ChatPanel extends javax.swing.JPanel { private JPanel northPanel; private JLabel headLabel; private JScrollPane jScrollPane1; private JScrollPane jScrollPane2; private JButton exitB; private JButton clearMsgB; private JButton sentB; private JButton connectB; private JTextArea messageText; private JTextField nameText; private JLabel nameLabel; private JTextArea messageArea; private JPanel southPanel; private SimpleMinaClient client = null; private boolean connected = false; private String username = null;
{ //Set Look & Feel try { javax.swing.UIManager.setLookAndFeel("com.sun.java.swing.plaf.windows.WindowsLookAndFeel"); } catch(Exception e) { e.printStackTrace(); } }
public void connect(){ if(client.connect()){ username = nameText.getText().trim(); if(username == null || "".equals(username)){ username = "游客" + RandomUtils.nextInt(1000); nameText.setText(username); } connected = true; dealUIWithFlag(); }else{ connected = false; dealUIWithFlag(); showMsg("連接服務(wù)器失敗。。。"); } } public void showMsg(String msg){ messageArea.append(msg); messageArea.append("\n"); messageArea.selectAll(); messageArea.lostFocus(null, this); } public void sentMsg(){ String message = username + ":" + messageText.getText(); client.sentMsg(message); messageText.setText(""); messageText.requestFocus(); } public void dealUIWithFlag(){ if(connected){ nameText.setEnabled(false); connectB.setEnabled(false); sentB.setEnabled(true); clearMsgB.setEnabled(true); exitB.setEnabled(true); }else{ nameText.setEnabled(true); connectB.setEnabled(true); sentB.setEnabled(false); clearMsgB.setEnabled(false); exitB.setEnabled(false); } } public void closeTheClient(){ if(client.close()){ showMsg("連接已斷開..."); connected = false; dealUIWithFlag(); }else{ showMsg("無法斷開連接..."); } }
public static void main(String[] args) { JFrame frame = new JFrame(); frame.getContentPane().add(new ChatPanel()); frame.addWindowListener(new WindowAdapter(){ public void windowClosing(WindowEvent e){ System.exit(0); } }); frame.pack(); frame.setLocationByPlatform(true); frame.setVisible(true); } public ChatPanel() { super(); client = new SimpleMinaClient(this); initGUI(); dealUIWithFlag(); } private void initGUI() { try { this.setPreferredSize(new java.awt.Dimension(400, 339)); this.setLayout(null); { northPanel = new JPanel(); BorderLayout northPanelLayout = new BorderLayout(); northPanel.setLayout(northPanelLayout); this.add(northPanel); northPanel.setBounds(0, 0, 400, 188); { headLabel = new JLabel(); northPanel.add(headLabel, BorderLayout.NORTH); headLabel.setText("\u6b22\u8fce\u4f7f\u7528 (\u6d4b\u8bd5Ip:port --> 127.0.0.1:8888)"); headLabel.setPreferredSize(new java.awt.Dimension(397, 19)); } { jScrollPane1 = new JScrollPane(); northPanel.add(jScrollPane1, BorderLayout.CENTER); jScrollPane1.setPreferredSize(new java.awt.Dimension(400, 169)); { messageArea = new JTextArea(); jScrollPane1.setViewportView(messageArea); messageArea.setPreferredSize(new java.awt.Dimension(398, 145)); messageArea.setEditable(false); messageArea.setLineWrap(true); messageArea.setWrapStyleWord(true); } } } { southPanel = new JPanel(); this.add(southPanel); southPanel.setBounds(0, 194, 400, 145); southPanel.setLayout(null); { nameLabel = new JLabel(); southPanel.add(nameLabel); nameLabel.setText("\u6635\u79f0:"); nameLabel.setBounds(10, 12, 35, 15); } { nameText = new JTextField(); southPanel.add(nameText); nameText.setText("\u6e38\u5ba2"); nameText.setBounds(45, 9, 96, 21); } { jScrollPane2 = new JScrollPane(); southPanel.add(jScrollPane2); jScrollPane2.setBounds(15, 37, 364, 69); { messageText = new JTextArea(); jScrollPane2.setViewportView(messageText); messageText.setBounds(101, 72, 362, 75); messageText.setPreferredSize(new java.awt.Dimension(362, 54)); messageText.setLineWrap(true); messageText.setWrapStyleWord(true); } } { connectB = new JButton(); southPanel.add(connectB); connectB.setText("\u8fde\u63a5\u670d\u52a1\u5668"); connectB.setBounds(179, 8, 93, 23); connectB.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent evt) { System.out.println("connectB.actionPerformed, event="+evt); //TODO add your code for connectB.actionPerformed connect(); } }); } { sentB = new JButton(); southPanel.add(sentB); sentB.setText("\u53d1\u9001"); sentB.setBounds(261, 116, 57, 23); sentB.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent evt) { System.out.println("sentB.actionPerformed, event="+evt); //TODO add your code for sentB.actionPerformed sentMsg(); } }); } { clearMsgB = new JButton(); southPanel.add(clearMsgB); clearMsgB.setText("\u6e05\u7a7a"); clearMsgB.setBounds(324, 116, 57, 23); clearMsgB.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent evt) { System.out.println("clearMsgB.actionPerformed, event="+evt); //TODO add your code for clearMsgB.actionPerformed messageText.setText(""); } }); } { exitB = new JButton(); southPanel.add(exitB); exitB.setText("\u6ce8\u9500"); exitB.setBounds(282, 8, 57, 23); exitB.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent evt) { System.out.println("exitB.actionPerformed, event="+evt); //TODO add your code for exitB.actionPerformed closeTheClient(); } }); } } } catch (Exception e) { e.printStackTrace(); } }
} 3. 運(yùn)行結(jié)果
首先啟動(dòng)服務(wù)器端,即運(yùn)行SimpleMinaServer類 , 啟動(dòng)成功時(shí)會(huì)在控制臺(tái)中打印出“服務(wù)器啟動(dòng)成功" 接下來運(yùn)行客戶端ChatPanel。
note: 上面只是一個(gè)簡(jiǎn)單的信息交互,其實(shí)使用Mina比較常用的還是在自定義協(xié)議處理這塊。
所以,比較應(yīng)該注重學(xué)習(xí)的是Filter這塊。有時(shí)間大家可以去看看源碼。
|