博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
即时通信系统Openfire分析之四:消息路由
阅读量:4347 次
发布时间:2019-06-07

本文共 17582 字,大约阅读时间需要 58 分钟。

  两个人的孤独

  两个人的孤独,大抵是,你每发出去一句话,都要经由无数网络、由几百个计算机处理后,出在他的面前,而他就在你不远处。

  连接建立之后

  Openfire使用MINA网络框架,并设置ConnectionHandler为MINA的处理器,连接的启停、消息的收发,都在这个类中做中转。这是我们中分析的内容。

  那么,当客户端与服务器的建立起连接以后,信息交换中,消息到了ConnectionHandler之后,是如何实现路由的,本文来一探究竟。

  ConnectionHandler类,MINA的处理器

  ConnectionHandler是个抽象类,openfire中的有四种handle,分别为:ServerConnectionHandler、ClientConnectionHandler、ComponentConnectionHandler、MultiplexerConnectionHandler,代表了S2S、C2S等四种不同的消息类型,这四个handle都继承自ConnectionHandler。

  而ConnectionHandler继承org.apache.mina.core.service.IoHandlerAdapter,IoHandlerAdapter实现了IoHandler接口。

  类关系如下:

|-- IoHandler(接口)    |-- IoHandlerAdapter(默认的空实现,实际的handler继承它就行)        |-- ConnectionHandler            |-- ServerConnectionHandler            |-- ClientConnectionHandler            |-- ComponentConnectionHandler            |-- MultiplexerConnectionHandler

  IoHandler中定义了消息响应中所需要的一系列方法:

public interface IoHandler  {      //创建session    public void sessionCreated(IoSession session) throws Exception    //开启session      public void sessionOpened(IoSession iosession)  throws Exception;      //关闭session      public void sessionClosed(IoSession iosession)  throws Exception;      //session空闲      public void sessionIdle(IoSession iosession, IdleStatus idlestatus)  throws Exception;      //异常处理      public void exceptionCaught(IoSession iosession, Throwable throwable)  throws Exception;       //接收消息      public void messageReceived(IoSession iosession, Object obj)  throws Exception;      //发送消息      public void messageSent(IoSession iosession, Object obj)  throws Exception;  }

  ConnectionHandler中覆写这些方法,并注入到MINA的适配器NioSocketAcceptor中,当接收到连接与进行交互时,将相应调用ConnectionHandler中覆写的方法。

  消息路由

  下面分析ConnectionHandler的消息响应机制,以C2S的message消息为例。

  ConnectionHandler除了实现IoHandler内定义的方法外,还定义了如下三个抽象方法: 

// 创建NIOConnectionabstract NIOConnection createNIOConnection(IoSession session);// 创建StanzaHandlerabstract StanzaHandler createStanzaHandler(NIOConnection connection);// 从数据库中获取闲置timeoutabstract int getMaxIdleTime();

  这三个方法,在具体的Handler子类里面实现,在sessionOpened()中调用,根据连接类型创建不同的NIOConnection、StanzaHandler的对象。

   ConnectionHandler.sessionOpened()

@Overridepublic void sessionOpened(IoSession session) throws Exception {    final XMLLightweightParser parser = new XMLLightweightParser(StandardCharsets.UTF_8);    session.setAttribute(XML_PARSER, parser);    final NIOConnection connection = createNIOConnection(session);    session.setAttribute(CONNECTION, connection);    session.setAttribute(HANDLER, createStanzaHandler(connection));    final int idleTime = getMaxIdleTime() / 2;    if (idleTime > 0) {        session.getConfig().setIdleTime(IdleStatus.READER_IDLE, idleTime);    }}

  其中,NIOConnection是对IoSession的一次包装,将MINA框架的IoSession转化为Openfire的Connection。StanzaHandler负责数据节的处理。 

  当服务器接收到客户端发送的消息时,MINA框架调用IoHandler.messageReceived将消息传递到指定的处理器ConnectionHandler中的messageReceived()方法。

    ConnectionHandler.messageReceived()

@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {    StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);    final XMPPPacketReader parser = PARSER_CACHE.get();    updateReadBytesCounter(session);    try {        handler.process((String) message, parser);    } catch (Exception e) {        final Connection connection = (Connection) session.getAttribute(CONNECTION);        if (connection != null) {            connection.close();        }    }}

  消息由StanzaHandler处理,C2S消息具体的实现类是ClientStanzaHandler。

  StanzaHandler.process()方法如下:

public void process(String stanza, XMPPPacketReader reader) throws Exception {    boolean initialStream = stanza.startsWith("

  上面省略掉部分代码,可以看到执行了如下操作:

  (1)若Session未创建,则创建之

  (2)调用本类的process(Element doc)

  Session的创建中,涉及到Session的管理,路由表的构建等重点内容,在下一章再专门做讲解。这里先提两点:1、此处的Session,只是预创建,还未能用于通信;2、在与客户端完成资源绑定的时候,该Session才真正可用。

  而process(Element doc)如下,只保留了和message相关的代码: 

private void process(Element doc) throws UnauthorizedException {    if (doc == null) {        return;    }    // Ensure that connection was secured if TLS was required    if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&            !connection.isSecure()) {        closeNeverSecuredConnection();        return;    }        String tag = doc.getName();    if ("message".equals(tag)) {        Message packet;        try {            packet = new Message(doc, !validateJIDs());        }        catch (IllegalArgumentException e) {            Log.debug("Rejecting packet. JID malformed", e);            // The original packet contains a malformed JID so answer with an error.            Message reply = new Message();            reply.setID(doc.attributeValue("id"));            reply.setTo(session.getAddress());            reply.getElement().addAttribute("from", doc.attributeValue("to"));            reply.setError(PacketError.Condition.jid_malformed);            session.process(reply);            return;        }        processMessage(packet);    }    ......}

  将Element转化为Message对象,然后在StanzaHandler.processMessage()中,调用包路由PacketRouterImpl模块发送消息。

protected void processMessage(Message packet) throws UnauthorizedException {    router.route(packet);    session.incrementClientPacketCount();}

  Openfire有三种数据包:IQ、Message、Presence,对应的路由器也有三种:IQRouter、MessageRouter、PresenceRouter。

  PacketRouterImpl是对这三种路由器统一做包装,对于message消息,调用的是MessageRouter中的route()方法。

  PacketRouterImpl.route()如下:

@Overridepublic void route(Message packet) {    messageRouter.route(packet);}

  MessageRouter.route()中消息的发送,分如下两步:

  (1)调用路由表,将消息发给Message中指定的接收者ToJID。

  (2)通过session,将消息原路返回给发送方(当发送方收到推送回来的消息,表示消息已发送成功)

  MessageRouter.route()代码如下:

public void route(Message packet) {    if (packet == null) {        throw new NullPointerException();    }    ClientSession session = sessionManager.getSession(packet.getFrom());    try {        // Invoke the interceptors before we process the read packet        InterceptorManager.getInstance().invokeInterceptors(packet, session, true, false);                if (session == null || session.getStatus() == Session.STATUS_AUTHENTICATED) {            JID recipientJID = packet.getTo();            ......            boolean isAcceptable = true;            if (session instanceof LocalClientSession) {                .....            }            if (isAcceptable) {                boolean isPrivate = packet.getElement().element(QName.get("private", "urn:xmpp:carbons:2")) != null;                try {                    // Deliver stanza to requested route                    routingTable.routePacket(recipientJID, packet, false);                } catch (Exception e) {                    log.error("Failed to route packet: " + packet.toXML(), e);                routingFailed(recipientJID, packet);                }                // Sent carbon copies to other resources of the sender:                // When a client sends a 
of type "chat" if (packet.getType() == Message.Type.chat && !isPrivate && session != null) { List
routes = routingTable.getRoutes(packet.getFrom().asBareJID(), null); for (JID route : routes) { if (!route.equals(session.getAddress())) { ClientSession clientSession = sessionManager.getSession(route); if (clientSession != null && clientSession.isMessageCarbonsEnabled()) { Message message = new Message(); message.setType(packet.getType()); message.setFrom(packet.getFrom().asBareJID()); message.setTo(route); message.addExtension(new Sent(new Forwarded(packet))); clientSession.process(message); } } } } } } ......}

  其中,routingTable.routePacket(recipientJID, packet, false)是发送消息的关键代码。

  路由模块中,对消息的发送做了封装,在任何需要发送消息的地方,例如自定义插件中,只需要调用下面这个方法,就能完成消息的发送:

XMPPServer.getInstance().getRoutingTable().routePacket(to, message, true);

  路由表中保存了该连接的Session对象,Session中携带有连接创建时生成的Connection对象,而从上一章我们知道,Connection是对MINA的Iosession的封装。

  换言之,其实路由表的消息发送功能,就是通过Connection调用MINA底层来实现的。答案是否是如此?下面来看看。

  路由表中的消息发送

  路由表中的其他细节,我们暂时不关注过多,目前主要看它的消息发送流程:

  消息发送的方法RoutingTableImpl.routePacket():

@Overridepublic void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException {    boolean routed = false;    try {        if (serverName.equals(jid.getDomain())) {            // Packet sent to our domain.            routed = routeToLocalDomain(jid, packet, fromServer);        }        else if (jid.getDomain().endsWith(serverName) && hasComponentRoute(jid)) {            // Packet sent to component hosted in this server            routed = routeToComponent(jid, packet, routed);        }        else {            // Packet sent to remote server            routed = routeToRemoteDomain(jid, packet, routed);        }    } catch (Exception ex) {        Log.error("Primary packet routing failed", ex);     }    if (!routed) {        if (Log.isDebugEnabled()) {            Log.debug("Failed to route packet to JID: {} packet: {}", jid, packet.toXML());        }        if (packet instanceof IQ) {            iqRouter.routingFailed(jid, packet);        }        else if (packet instanceof Message) {            messageRouter.routingFailed(jid, packet);        }        else if (packet instanceof Presence) {            presenceRouter.routingFailed(jid, packet);        }    }}

  这里有几个分支:

|-- routeToLocalDomain      路由到本地|-- routeToComponent        路由到组件|-- routeToRemoteDomain     路由到远程

 

  对于单机情况的消息,调用的是routeToLocalDomain()。

  RoutingTableImpl.routeToLocalDomain()

private boolean routeToLocalDomain(JID jid, Packet packet, boolean fromServer) {    boolean routed = false;    Element privateElement = packet.getElement().element(QName.get("private", "urn:xmpp:carbons:2"));    boolean isPrivate = privateElement != null;    // The receiving server and SHOULD remove the 
element before delivering to the recipient. packet.getElement().remove(privateElement); if (jid.getResource() == null) { // Packet sent to a bare JID of a user if (packet instanceof Message) { // Find best route of local user routed = routeToBareJID(jid, (Message) packet, isPrivate); } else { throw new PacketException("Cannot route packet of type IQ or Presence to bare JID: " + packet.toXML()); } } else { // Packet sent to local user (full JID) ClientRoute clientRoute = usersCache.get(jid.toString()); if (clientRoute == null) { clientRoute = anonymousUsersCache.get(jid.toString()); } if (clientRoute != null) { if (!clientRoute.isAvailable() && routeOnlyAvailable(packet, fromServer) && !presenceUpdateHandler.hasDirectPresence(packet.getTo(), packet.getFrom())) { Log.debug("Unable to route packet. Packet should only be sent to available sessions and the route is not available. {} ", packet.toXML()); routed = false; } else { if (localRoutingTable.isLocalRoute(jid)) { if (packet instanceof Message) { Message message = (Message) packet; if (message.getType() == Message.Type.chat && !isPrivate) { List
routes = getRoutes(jid.asBareJID(), null); for (JID route : routes) { if (!route.equals(jid)) { ClientSession clientSession = getClientRoute(route); if (clientSession.isMessageCarbonsEnabled()) { Message carbon = new Message(); // The wrapping message SHOULD maintain the same 'type' attribute value; carbon.setType(message.getType()); // the 'from' attribute MUST be the Carbons-enabled user's bare JID carbon.setFrom(route.asBareJID()); // and the 'to' attribute MUST be the full JID of the resource receiving the copy carbon.setTo(route); carbon.addExtension(new Received(new Forwarded(message))); try { localRoutingTable.getRoute(route.toString()).process(carbon); } catch (UnauthorizedException e) { Log.error("Unable to route packet " + packet.toXML(), e); } } } } } } // This is a route to a local user hosted in this node try { localRoutingTable.getRoute(jid.toString()).process(packet); routed = true; } catch (UnauthorizedException e) { Log.error("Unable to route packet " + packet.toXML(), e); } } else { // This is a route to a local user hosted in other node if (remotePacketRouter != null) { routed = remotePacketRouter .routePacket(clientRoute.getNodeID().toByteArray(), jid, packet); if (!routed) { removeClientRoute(jid); // drop invalid client route } } } } } } return routed;}

  上面的关键代码中是这一段:

try {    localRoutingTable.getRoute(route.toString()).process(carbon);} catch (UnauthorizedException e) {    Log.error("Unable to route packet " + packet.toXML(), e);}

  可以看出,RoutingTable的路由功能,是通过localRoutingTable实现的。

  LocalRoutingTable中用一个容器保存了所有的路由:

Map
routes = new ConcurrentHashMap<>();

  RoutingTableImpl中通过调用LocalRoutingTable的add、get、remove等方法,实现对路由的管理。

  localRoutingTable.getRoute()方法实现从路由表中获取RoutableChannelHandler对象,那么具体消息是如何通过路由发出去的?

  要解释这个问题,先来看一下与RoutableChannelHandler相关的继承和派生关系,如下:

|-- ChannelHandler    |-- RoutableChannelHandler        |-- Session            |-- LocalSession                |-- LocalClientSession

  也就是说,其实localRoutingTable.getRoute(route.toString()).process(carbon)最终调用的是LacalSession.process()。

  LacalSession.process()代码如下:

@Overridepublic void process(Packet packet) {    // Check that the requested packet can be processed    if (canProcess(packet)) {        // Perform the actual processing of the packet. This usually implies sending        // the packet to the entity        try {            InterceptorManager.getInstance().invokeInterceptors(packet, this, false, false);                        deliver(packet);            InterceptorManager.getInstance().invokeInterceptors(packet, this, false, true);        }        catch (PacketRejectedException e) {            // An interceptor rejected the packet so do nothing        }        catch (Exception e) {            Log.error(LocaleUtils.getLocalizedString("admin.error"), e);        }    }    ......}

  其中的deliver()是LacalSession定义的一个插象方法,由其子类来实现。

  有一点值得提一下,在deliver()前后,都做了拦截,方便在发送的前后做一些额外的处理。

  继续讲回deliver(),对于C2S连接类型来说,它是在LocalClientSession类中实现。

  LocalClientSession.deliver()代码如下:

@Overridepublic void deliver(Packet packet) throws UnauthorizedException {    conn.deliver(packet);    streamManager.sentStanza(packet);}

  此时的发送方法conn.deliver()中的conn,就是来自最初在sessionOpened()被调用时创建的NIOConnection对象。

  NIOConnection.deliver():

@Overridepublic void deliver(Packet packet) throws UnauthorizedException {    if (isClosed()) {        backupDeliverer.deliver(packet);    }    else {        boolean errorDelivering = false;        IoBuffer buffer = IoBuffer.allocate(4096);        buffer.setAutoExpand(true);        try {            buffer.putString(packet.getElement().asXML(), encoder.get());            if (flashClient) {                buffer.put((byte) '\0');            }            buffer.flip();                        ioSessionLock.lock();            try {                ioSession.write(buffer);            } finally {                ioSessionLock.unlock();            }        }        catch (Exception e) {            Log.debug("Error delivering packet:\n" + packet, e);            errorDelivering = true;        }        if (errorDelivering) {            close();            // Retry sending the packet again. Most probably if the packet is a            // Message it will be stored offline            backupDeliverer.deliver(packet);        }        else {            session.incrementServerPacketCount();        }    }}

  NIOConnection.deliver()中,通过其内部包装的IoSession对象,调用write()方法将数据流写入网卡中,完成消息的发送。

  ConnectionHandler.messageSent()

  消息发送完成,MINA回调:

@Overridepublic void messageSent(IoSession session, Object message) throws Exception {    super.messageSent(session, message);    // Update counter of written btyes    updateWrittenBytesCounter(session);        System.out.println("Fordestiny-SEND: "+ioBufferToString(message));}

 

  至此,系统完成了一条消息的接收、转发。


 

  其实消息的路由中,除了消息的整个流通路径之外,怎么保证消息能够准确的发送到对应的客户端是至关重要的。这方面Openfire是如何处理的,在下个章节做解析,即Openfire的会话管理和路由表。Over!

 

转载于:https://www.cnblogs.com/Fordestiny/p/7486889.html

你可能感兴趣的文章
(简单)华为Nova青春 WAS-AL00的USB调试模式在哪里开启的流程
查看>>
图论知识,博客
查看>>
[原创]一篇无关技术的小日记(仅作暂存)
查看>>
20145303刘俊谦 Exp7 网络欺诈技术防范
查看>>
原生和jQuery的ajax用法
查看>>
iOS开发播放文本
查看>>
20145202马超《java》实验5
查看>>
JQuery 事件
查看>>
main(argc,argv[])
查看>>
第四阶段 15_Linux tomcat安装与配置
查看>>
NAS 创建大文件
查看>>
学习笔记-模块之xml文件处理
查看>>
接口测试用例
查看>>
Sybase IQ导出文件的几种方式
查看>>
MySQL存储过程定时任务
查看>>
Python中and(逻辑与)计算法则
查看>>
POJ 3267 The Cow Lexicon(动态规划)
查看>>
设计原理+设计模式
查看>>
tomcat 7服务器跨域问题解决
查看>>
前台实现ajax 需注意的地方
查看>>