/** * @param commandId * @param clusterMessage * @since 1.0 */ private void doGroup(short commandId, MyClusterMessage clusterMessage) { logger.debug("MyClusterMessageListener#doSwitch"); IMHeader header = clusterMessage.getHeader(); try { MessageLite body = clusterMessage.getMessage(); switch (commandId) { case GroupCmdID.CID_GROUP_CHANGE_MEMBER_NOTIFY_VALUE:// todebug groupChangeMemberNotify(header, body); break; default: logger.warn("Unsupport command id {}", commandId); break; } } catch (IOException e) { logger.error("decode failed.", e); } }
/** * Attempts to extract a protocol buffer from the specified extra. * @throws MalformedDataException if the intent is null, the extra is missing or not a byte * array, or the protocol buffer could not be parsed. */ @NonNull public static <T extends MessageLite> T extract( @NonNull String extraName, @NonNull Parser<T> protoParser, @NonNull String failureDescription, @Nullable Intent intent) throws MalformedDataException { if (intent == null) { throw new MalformedDataException(failureDescription); } byte[] protoBytes = intent.getByteArrayExtra(extraName); if (protoBytes == null) { throw new MalformedDataException(failureDescription); } try { return protoParser.parseFrom(protoBytes); } catch (IOException ex) { throw new MalformedDataException(failureDescription, ex); } }
@Override public void pushShield(IMHeader header, MessageLite body, ChannelHandlerContext ctx) { IMPushShieldReq pushShieldReq = (IMPushShieldReq) body; long userId = super.getUserId(ctx); IMHeader resHeader = null; IMPushShieldRsp pushShieldRsp = null; try { BaseModel<Integer> pushShieldRes = loginService.pushShield(userId, pushShieldReq.getShieldStatus()); pushShieldRsp = IMPushShieldRsp.newBuilder() .setUserId(userId) .setResultCode(pushShieldRes.getCode()) .build(); resHeader = header.clone(); resHeader.setCommandId((short)LoginCmdID.CID_LOGIN_RES_PUSH_SHIELD_VALUE); ctx.writeAndFlush(new IMProtoMessage<>(resHeader, pushShieldRsp)); } catch(Exception e){ logger.error("服务器端异常", e); ctx.writeAndFlush(new IMProtoMessage<>(resHeader, pushShieldRsp)); } }
public static MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { switch (rpcType) { case RpcType.ACK_VALUE: return Ack.getDefaultInstance(); case RpcType.HANDSHAKE_VALUE: return BitControlHandshake.getDefaultInstance(); case RpcType.RESP_FRAGMENT_HANDLE_VALUE: return FragmentHandle.getDefaultInstance(); case RpcType.RESP_FRAGMENT_STATUS_VALUE: return FragmentStatus.getDefaultInstance(); case RpcType.RESP_BIT_STATUS_VALUE: return BitStatus.getDefaultInstance(); case RpcType.RESP_QUERY_STATUS_VALUE: return QueryProfile.getDefaultInstance(); default: throw new UnsupportedOperationException(); } }
@Override protected ServerHandshakeHandler<BitClientHandshake> getHandshakeHandler(final BitServerConnection connection) { return new ServerHandshakeHandler<BitClientHandshake>(RpcType.HANDSHAKE, BitClientHandshake.PARSER) { @Override public MessageLite getHandshakeResponse(BitClientHandshake inbound) throws Exception { // logger.debug("Handling handshake from other bit. {}", inbound); if (inbound.getRpcVersion() != DataRpcConfig.RPC_VERSION) { throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), DataRpcConfig.RPC_VERSION)); } if (inbound.getChannel() != RpcChannel.BIT_DATA) { throw new RpcException(String.format("Invalid NodeMode. Expected BIT_DATA but received %s.", inbound.getChannel())); } return BitServerHandshake.newBuilder().setRpcVersion(DataRpcConfig.RPC_VERSION).build(); } }; }
/** * 发送当前踢人消息 handleKickUser * * @param MessageLite * @param ChannelHandlerContext * @since 1.0 李春生 */ private void handleKickUser(MessageLite body) { // 转换body中的数据,判断是否是真正的kickUser消息,如果是,则进行下面的操作,不是抛出异常 IMServerKickUser kickUser = (IMServerKickUser) body; long userId = kickUser.getUserId(); int clientType = kickUser.getClientType().getNumber(); int reason = kickUser.getReason(); logger.debug("HandleKickUser, userId={}, clientType={}, reason={}", userId, clientType, reason); ClientUser clientUser = ClientUserManager.getUserById(userId); if (clientUser != null) { // 踢掉用户,根据ClientType进行判断 clientUser.kickSameClientType(clientType, reason, null); } }
@Override protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { switch (rpcType) { case RpcType.ACK_VALUE: return Ack.getDefaultInstance(); case RpcType.HANDSHAKE_VALUE: return BitToUserHandshake.getDefaultInstance(); case RpcType.QUERY_HANDLE_VALUE: return QueryId.getDefaultInstance(); case RpcType.QUERY_RESULT_VALUE: return QueryResult.getDefaultInstance(); case RpcType.QUERY_DATA_VALUE: return QueryData.getDefaultInstance(); } throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType)); }
/** * Reads a list of protos, using the provided parser, from the provided input stream. * @throws IOException if the proto list could not be parsed. */ public static <T extends MessageLite> List<T> readMessageList( InputStream stream, Parser<T> parser) throws IOException { DataInputStream dis = new DataInputStream(stream); int messageCount = dis.readInt(); ArrayList<T> messages = new ArrayList<>(messageCount); for (int i = 0; i < messageCount; i++) { messages.add(parser.parseDelimitedFrom(stream)); } return messages; }
/** * Convert a protobuf to message object according to the serviceId and commandId. * * @param serviceId the service id * @param commandId the command id * @param bytes the protobuf to be parsed * @return the parsed message object * @throws IOException * @since 1.0 */ public static MessageLite getMessage(final int serviceId, final int commandId, final byte[] bytes) throws IOException { Map<Integer, ProtobufParseMap.Parsing> parserMap = parseServiceMap.get(serviceId); if (parserMap == null) { throw new IOException("UnKnown Protocol service: " + serviceId); } ProtobufParseMap.Parsing parser = parserMap.get(commandId); if (parser == null) { throw new IOException( "UnKnown Protocol commandId: service=" + serviceId + ",command=" + commandId); } MessageLite msg = parser.process(bytes); return msg; }
@Override protected void encode(ChannelHandlerContext context, OutboundRpcMessage message, List<Object> out) throws Exception { if (message.mode != RpcMode.RESPONSE_FAILURE) { out.add(message); return; } final MessageLite pBody = message.pBody; if (!(pBody instanceof DremioPBError)) { out.add(message); return; } DremioPBError error = (DremioPBError) pBody; DremioPBError newError = ErrorCompatibility.convertIfNecessary(error); out.add(new OutboundRpcMessage(message.mode, message.rpcType, message.coordinationId, newError, message.dBodies)); }
/** * 处理File消息 * @param ctx 信道 * @param commandId 命令 * @param header 消息头 * @param body 消息体 * @since 1.0 */ public void doFile(ChannelHandlerContext ctx, short commandId, IMHeader header, MessageLite body) { // 判断是否登录 if (!hasLogin(ctx)) { return ; } switch (commandId) { case FileCmdID.CID_FILE_REQUEST_VALUE: imFileHandle.fileReq(header, body, ctx); break; case FileCmdID.CID_FILE_HAS_OFFLINE_REQ_VALUE: imFileHandle.hasOfflineReq(header, body, ctx); break; case FileCmdID.CID_FILE_ADD_OFFLINE_REQ_VALUE: imFileHandle.addOfflineReq(header, body, ctx); break; case FileCmdID.CID_FILE_DEL_OFFLINE_REQ_VALUE: imFileHandle.delOfflineReq(header, body, ctx); break; default: logger.warn("Unsupport command id {}", commandId); break; } }
/** * @param commandId * @param clusterMessage * @since 1.0 */ private void doSwitch(short commandId, MyClusterMessage clusterMessage) { logger.debug("MyClusterMessageListener#doSwitch"); IMHeader header = clusterMessage.getHeader(); try { MessageLite body = clusterMessage.getMessage(); switch (commandId) { case SwitchServiceCmdID.CID_SWITCH_P2P_CMD_VALUE:// todebug switchP2p(header, body); default: logger.warn("Unsupport command id {}", commandId); break; } } catch (IOException e) { logger.error("decode failed.", e); } }
@Override protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { switch (rpcType) { case RpcType.ACK_VALUE: return Ack.getDefaultInstance(); case RpcType.HANDSHAKE_VALUE: return BitToUserHandshake.getDefaultInstance(); case RpcType.QUERY_HANDLE_VALUE: return QueryId.getDefaultInstance(); case RpcType.QUERY_RESULT_VALUE: return QueryResult.getDefaultInstance(); case RpcType.QUERY_DATA_VALUE: return QueryData.getDefaultInstance(); case RpcType.QUERY_PLAN_FRAGMENTS_VALUE: return QueryPlanFragments.getDefaultInstance(); case RpcType.CATALOGS_VALUE: return GetCatalogsResp.getDefaultInstance(); case RpcType.SCHEMAS_VALUE: return GetSchemasResp.getDefaultInstance(); case RpcType.TABLES_VALUE: return GetTablesResp.getDefaultInstance(); case RpcType.COLUMNS_VALUE: return GetColumnsResp.getDefaultInstance(); case RpcType.PREPARED_STATEMENT_VALUE: return CreatePreparedStatementResp.getDefaultInstance(); case RpcType.SERVER_META_VALUE: return GetServerMetaResp.getDefaultInstance(); } throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType)); }
/** Attempts to read a buffered message from the underlying connection. This is more efficient * than attempting to actually read from the underlying connection for each message, when ends * up making a final "empty" read from the non-blocking connection, rather than simply * consuming all buffered data. * * TODO: It would be ideal if there was a way to consume data as we go, instead of buffering * it all then consuming it. However, this only matters for streams of medium-sized messages * with a huge backlog, which should be rare? The C++ implementation has a similar issue. * * @param builder message builder to be parsed * @return true if a message was read, false if there is not enough buffered data to read a * message. */ public boolean readBufferedMessage(MessageLite.Builder builder) { try { if (nextMessageLength == -1) { if (connection.available() < 4) { return false; } input.setLimit(4); nextMessageLength = codedInput.readRawLittleEndian32(); } assert nextMessageLength >= 0; if (connection.available() < nextMessageLength) { assert 0 <= connection.available() && connection.available() < nextMessageLength; return false; } // Parse the response for the next RPC // TODO: Add .available() to CodedInputStream to avoid many copies to internal buffer? // or make CodedInputStream wrap a non-blocking interface like C++? input.setLimit(nextMessageLength); builder.mergeFrom(codedInput); assert codedInput.isAtEnd(); codedInput.resetSizeCounter(); nextMessageLength = -1; return true; } catch (IOException e) { throw new RuntimeException(e); } }
/** * 生成对应的消息体 * * @param uid * @param msg * @return * @throws Exception */ public static LibraMessage createLibraMessage(long uid, MessageLite msg) throws Exception { Integer moduleId = handlerMgr.searchModuleIdByClass(msg.getClass().getName()); if (moduleId == null) { LibraLog.error("protocolId is null,msg =:" + JsonUtil.ObjectToJsonString(msg)); throw new Exception("LibraEncoder.encodeHeader >>> protocolId is null"); } LibraHead head = LibraHead.createHead(uid, moduleId); LibraMessage message = new LibraMessage(); message.setHead(head); message.setBody(msg); return message; }
@SuppressWarnings("unchecked") public <REQUEST extends MessageLite, RESPONSE extends MessageLite> SendEndpointCreator<REQUEST, RESPONSE> register( int id, ReceiveHandler<REQUEST, RESPONSE> handler) { Preconditions.checkArgument(id > -1 && id < 2048, "A request id must be between 0 and 2047."); Preconditions.checkNotNull(handler); Preconditions.checkArgument(!handlers.containsKey(id), "Only a single handler can be registered per id. You tried to register a handler for id %d twice.", id); handlers.put(id, (ReceiveHandler<MessageLite, MessageLite>) handler); return new EndpointCreator<REQUEST, RESPONSE>(proxyFactory, new PseudoEnum(id), (Class<RESPONSE>) handler.getDefaultResponse().getClass(), timeoutMillis); }
@Override protected ServerHandshakeHandler<BitControlHandshake> getHandshakeHandler(final ControlConnection connection) { return new ServerHandshakeHandler<BitControlHandshake>(RpcType.HANDSHAKE, BitControlHandshake.PARSER) { @Override public MessageLite getHandshakeResponse(BitControlHandshake inbound) throws Exception { // logger.debug("Handling handshake from other bit. {}", inbound); if (inbound.getRpcVersion() != ControlRpcConfig.RPC_VERSION) { throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), ControlRpcConfig.RPC_VERSION)); } if (!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getControlPort() < 1) { throw new RpcException(String.format("RPC didn't provide valid counter endpoint information. Received %s.", inbound.getEndpoint())); } connection.setEndpoint(inbound.getEndpoint()); // add the ControlConnectionManager manager = connectionRegistry.getConnectionManager(inbound.getEndpoint()); // update the close handler. proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection, proxyCloseHandler.getHandler())); // add to the connection manager. manager.addExternalConnection(connection); return BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).build(); } }; }
@Override public MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { switch(rpcType){ case 1: case 2: return NodeEndpoint.getDefaultInstance(); default: throw new UnsupportedOperationException(); } }
OutboundRpcMessage(RpcMode mode, int rpcTypeNumber, int coordinationId, MessageLite pBody, ByteBuf... dBodies) { super(mode, rpcTypeNumber, coordinationId); this.pBody = pBody; // Netty doesn't traditionally release the reference on an unreadable buffer. However, we need to so that if we send a empty or unwritable buffer, we still release. otherwise we get weird memory leaks when sending empty vectors. List<ByteBuf> bufs = Lists.newArrayList(); for (ByteBuf d : dBodies) { if (d.readableBytes() == 0) { d.release(); } else { bufs.add(d); } } this.dBodies = bufs.toArray(new ByteBuf[bufs.size()]); }
public static MessageLite getResponseDefaultInstanceClient(int rpcType) throws RpcException { switch (rpcType) { case RpcType.ACK_VALUE: return Ack.getDefaultInstance(); case RpcType.HANDSHAKE_VALUE: return BitServerHandshake.getDefaultInstance(); default: throw new UnsupportedOperationException(); } }
public static MessageLite getResponseDefaultInstanceServer(int rpcType) throws RpcException { switch (rpcType) { case RpcType.ACK_VALUE: return Ack.getDefaultInstance(); case RpcType.HANDSHAKE_VALUE: return BitClientHandshake.getDefaultInstance(); case RpcType.REQ_RECORD_BATCH_VALUE: return FragmentRecordBatch.getDefaultInstance(); default: throw new UnsupportedOperationException(); } }
@Override protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { // a user server only expects acknowledgments on messages it creates. switch (rpcType) { case RpcType.ACK_VALUE: return Ack.getDefaultInstance(); default: throw new UnsupportedOperationException(); } }
public <R extends MessageLite, C extends RpcCommand<R, CONNECTION_TYPE>> void runCommand(C cmd) { // logger.info(String.format("Running command %s sending to host %s:%d", cmd, host, port)); if (closed.get()) { cmd.connectionFailed(FailureType.CONNECTION, new IOException("Connection has been closed")); } CONNECTION_TYPE connection = connectionHolder.get(); if (connection != null) { if (connection.isActive()) { cmd.connectionAvailable(connection); // logger.info("Connection available and active, command run inline."); return; } else { // remove the old connection. (don't worry if we fail since someone else should have done it. connectionHolder.compareAndSet(connection, null); } } /** * We've arrived here without a connection, let's make sure only one of us makes a connection. (fyi, another * endpoint could create a reverse connection **/ synchronized (this) { connection = connectionHolder.get(); if (connection != null) { cmd.connectionAvailable(connection); } else { logger.info("[{}]: No connection active, opening new connection to {}:{}.", name, host, port); BasicClient<?, CONNECTION_TYPE, OUTBOUND_HANDSHAKE, ?> client = getNewClient(); ConnectionListeningFuture<R, C> future = new ConnectionListeningFuture<R, C>(cmd); client.connectAsClient(future, handshake, host, port); // logger.info("Connection available and active, command now being run inline."); future.waitAndRun(); // logger.info("Connection available. Command now run."); } return; } }
/** * Dispatches a query for the specified data type, carrying the specified protocol buffer * message (if required). The response to this query will be provided to the specified callback. * A {@link #DEFAULT_TIMEOUT_MS default timeout} will be used. */ public void queryFor( @NonNull String dataType, @Nullable MessageLite queryMessage, @NonNull QueryCallback callback) { queryFor(dataType, queryMessage, DEFAULT_TIMEOUT_MS, callback); }
/** * Dispatches a query for the specified data type, carrying the specified protocol buffer * message (if required). The response to this query will be provided to the specified callback. */ public void queryFor( @NonNull String dataType, @Nullable MessageLite queryMessage, long timeoutInMs, @NonNull QueryCallback callback) { queryFor(dataType, queryMessage != null ? queryMessage.toByteArray() : null, timeoutInMs, callback); }
/** * Creates a {@link ByteString} by serializing the list of protos. Use * {@link #readMessageList(ByteString, Parser)} to deserialize. */ public static <T extends MessageLite> ByteString writeMessageList(List<T> protos) { Output output = ByteString.newOutput(); try { writeMessageListTo(output, protos); } catch (IOException ex) { throw new IllegalStateException("Unable to write protobufs to memory"); } return output.toByteString(); }
/** * Writes the provided list of protos to the provided output stream. * @throws IOException if the protos cannot be written to the provided output stream. */ public static <T extends MessageLite> void writeMessageListTo( OutputStream stream, List<T> protos) throws IOException { DataOutputStream dos = new DataOutputStream(stream); dos.writeInt(protos.size()); for (MessageLite proto : protos) { proto.writeDelimitedTo(stream); } }
@Override public void switchP2p(IMHeader header, MessageLite body, ChannelHandlerContext ctx) { // IMSwitchService.IMP2PCmdMsg p2pCmdMsg = (IMSwitchService.IMP2PCmdMsg) body; // long toId = p2pCmdMsg.getToUserId(); // FIXME 需要确认一下逻辑 // ClientConnection clientConn = ClientConnectionMap.getClientByUserId(String.valueOf(toId)); // if (clientConn != null) { // clientConn.getCtx().writeAndFlush(new IMProtoMessage<IMSwitchService.IMP2PCmdMsg>(header.clone(), p2pCmdMsg)); // } IMSwitchService.IMP2PCmdMsg p2pCmdMsg = (IMSwitchService.IMP2PCmdMsg)body; // 设置用户的ID long userId = super.getUserId(ctx); p2pCmdMsg = p2pCmdMsg.toBuilder().setFromUserId(userId).build(); IMProtoMessage<MessageLite> swithP2pMsg = new IMProtoMessage<MessageLite>(header, body); long toId = p2pCmdMsg.getToUserId(); long fromId = p2pCmdMsg.getFromUserId(); ClientUser toClientUser = ClientUserManager.getUserById(toId); ClientUser fromClientUser = ClientUserManager.getUserById(fromId); //处理是否正确需要确认? if (toClientUser != null ){ toClientUser.broadcast(swithP2pMsg, ctx); } if (fromClientUser != null) { fromClientUser.broadcast(swithP2pMsg, null); } ClientUserManager.broadCast(swithP2pMsg, SysConstant.CLIENT_TYPE_FLAG_BOTH); // 通过路由进行转发 // routerHandler.send(header, body); messageServerCluster.send(header, body); }
@Override protected void encode(final ChannelHandlerContext ctx, final IMProtoMessage<MessageLite> protoMessage, final ByteBuf out) throws Exception { try { logger.debug("Protobuf encode started."); // [HEADER] data IMHeader header = protoMessage.getHeader(); byte[] bytes = protoMessage.getBody().toByteArray(); int length = bytes.length; // Set the length of bytebuf header.setLength(SysConstant.PROTOCOL_HEADER_LENGTH + length); byte[] allbytes = header.encode().array(); allbytes = Arrays.copyOf(allbytes, SysConstant.PROTOCOL_HEADER_LENGTH + length); for (int i = 0; i < length; i++) { allbytes[i + 16] = bytes[i]; } out.writeBytes(allbytes); logger.debug("Sent protobuf: commandId={}", header.getCommandId()); } catch (Exception e) { logger.error("编码异常", e); } finally { logger.debug("Protobuf encode finished."); } }
public void broadcast(IMProtoMessage<MessageLite> message, ChannelHandlerContext fromCtx) { for (ChannelHandlerContext conn: connMap.values()) { if (conn != fromCtx) { logger.debug("发送消息> {}", conn.channel().remoteAddress()); conn.writeAndFlush(message); // conn > AddToSendList } } }
public void broadcastWithOutMobile(IMProtoMessage<MessageLite> message, ChannelHandlerContext fromCtx) { for (ChannelHandlerContext conn: connMap.values()) { if (conn != fromCtx && CommonUtils.isPc(conn.attr(CLIENT_TYPE).get())) { logger.debug("发送消息> {}", conn.channel().remoteAddress()); conn.writeAndFlush(message); } } }
public void broadcastToMobile(IMProtoMessage<MessageLite> message, ChannelHandlerContext fromCtx) { for (ChannelHandlerContext conn: connMap.values()) { if (conn != fromCtx && CommonUtils.isMobile(conn.attr(CLIENT_TYPE).get())) { logger.debug("发送消息> {}", conn.channel().remoteAddress()); conn.writeAndFlush(message); } } }
public void broadcaseMessage(IMProtoMessage<MessageLite> message, long messageId, ChannelHandlerContext fromCtx, long fromId) { for (ChannelHandlerContext conn: connMap.values()) { if (conn != fromCtx) { logger.debug("发送消息> {}", conn.channel().remoteAddress()); conn.writeAndFlush(message); // conn AddToSendList } } }
/** * 处理通讯录相关消息类型 * * @param ctx 信道 * @param commandId 命令 * @param header 消息头 * @param body 消息体 * @since 1.0 */ public void doBuddyList(ChannelHandlerContext ctx, short commandId, IMHeader header, MessageLite body) { // 判断是否登录 if (!hasLogin(ctx)) { return ; } logger.info("doBuddyList"); switch (commandId) { case BuddyListCmdID.CID_BUDDY_LIST_RECENT_CONTACT_SESSION_REQUEST_VALUE: imBuddyListHandler.recentContactReq(header, body, ctx); break; case BuddyListCmdID.CID_BUDDY_LIST_USER_INFO_REQUEST_VALUE: imBuddyListHandler.userInfoReq(header, body, ctx); break; case BuddyListCmdID.CID_BUDDY_LIST_REMOVE_SESSION_REQ_VALUE: imBuddyListHandler.removeSessionReq(header, body, ctx); //todebug break; case BuddyListCmdID.CID_BUDDY_LIST_ALL_USER_REQUEST_VALUE: imBuddyListHandler.allUserReq(header, body, ctx); break; case BuddyListCmdID.CID_BUDDY_LIST_USERS_STATUS_REQUEST_VALUE: imBuddyListHandler.userStatusReq(header, body, ctx); break; case BuddyListCmdID.CID_BUDDY_LIST_CHANGE_AVATAR_REQUEST_VALUE: imBuddyListHandler.changeAvaterReq(header, body, ctx); //todebug break; case BuddyListCmdID.CID_BUDDY_LIST_DEPARTMENT_REQUEST_VALUE: imBuddyListHandler.departmentReq(header, body, ctx); break; case BuddyListCmdID.CID_BUDDY_LIST_CHANGE_SIGN_INFO_REQUEST_VALUE: imBuddyListHandler.changeSignInfoReq(header, body, ctx); break; default: logger.warn("Unsupport command id {}", commandId); break; } }
public <SEND extends MessageLite, RECEIVE extends MessageLite> void send( RpcOutcomeListener<RECEIVE> outcomeListener, EnumLite rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) { assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz); connection.send(new ProxyListener<RECEIVE>(outcomeListener), RpcType.MESSAGE, msg(rpcType, protobufBody), FabricMessage.class, dataBodies); }
/** * 处理群相关消息类型 * @param ctx 信道 * @param commandId 命令 * @param header 消息头 * @param body 消息体 * @since 1.0 */ public void doGroup(ChannelHandlerContext ctx, short commandId, IMHeader header, MessageLite body) { // 判断是否登录 if (!hasLogin(ctx)) { return ; } logger.info("doGroup"); switch (commandId) { case GroupCmdID.CID_GROUP_NORMAL_LIST_REQUEST_VALUE: imGroupHandler.normalListReq(header, body, ctx); break; case GroupCmdID.CID_GROUP_INFO_REQUEST_VALUE: //todebug imGroupHandler.groupInfoReq(header, body, ctx); break; case GroupCmdID.CID_GROUP_CREATE_REQUEST_VALUE: //todebug // imGroupHandler.groupCreateReq(header, body, ctx); imGroupHandler.createGroupReq(header, body, ctx); break; case GroupCmdID.CID_GROUP_CHANGE_MEMBER_REQUEST_VALUE: //todebug // imGroupHandler.groupChangeMemberReq(header, body, ctx); imGroupHandler.changeMemberReq(header, body, ctx); break; case GroupCmdID.CID_GROUP_SHIELD_GROUP_REQUEST_VALUE://todebug imGroupHandler.groupShieldReq(header, body, ctx); break; default: logger.warn("Unsupport command id {}", commandId); break; } }
/** * 处理其他消息类型 * * @param ctx 信道 * @param commandId 命令 * @param header 消息头 * @param body 消息体 * @since 1.0 */ public void doOther(ChannelHandlerContext ctx, short commandId, IMHeader header, MessageLite body) { logger.info("doOther"); switch (commandId) { case OtherCmdID.CID_OTHER_HEARTBEAT_VALUE: imOtherHandler.hearBeat(header, body, ctx); break; case OtherCmdID.CID_OTHER_STOP_RECV_PACKET_VALUE: //不需要实现? imOtherHandler.StopReceivePacket(header, body, ctx); break; default: logger.warn("Unsupport command id {}", commandId); break; } }
/** * 处理P2P消息 * @param ctx 信道 * @param commandId 命令 * @param header 消息头 * @param body 消息体 * @since 1.0 */ public void doSwitch(ChannelHandlerContext ctx, short commandId, IMHeader header, MessageLite body) { logger.info("doSwitch"); switch (commandId) { case SwitchServiceCmdID.CID_SWITCH_P2P_CMD_VALUE: imSwitchHandler.switchP2p(header, body, ctx); //todebug break; default: logger.warn("Unsupport command id {}", commandId); break; } }
/** * @param commandId * @param clusterMessage * @since 1.0 */ private void doOther(short commandId, MyClusterMessage clusterMessage, Member member) { logger.debug("MyClusterMessageListener#doOther"); IMHeader header = clusterMessage.getHeader(); try { MessageLite body = clusterMessage.getMessage(); switch (commandId) { case OtherCmdID.CID_OTHER_SERVER_KICK_USER_VALUE: if (!member.localMember()) { handleKickUser(body); } break; case OtherCmdID.CID_OTHER_LOGIN_STATUS_NOTIFY_VALUE: if (!member.localMember()) { handlePCLoginStatusNotify(header, body); } break; case OtherCmdID.CID_OTHER_HEARTBEAT_VALUE:// 无需实现 break; case OtherCmdID.CID_OTHER_ROLE_SET_VALUE:// 目前不需要实现 break; default: logger.warn("Unsupport command id {}", commandId); break; } } catch (IOException e) { logger.error("decode failed.", e); } }