环信MQTT地址:https://www.easemob.com/product/mqtt

什么是MQTT?

MQTT 本身是一种物联网通讯协议。由IBM在1999年设计,经过多年发展,最终成为了当下物联网的标准协议。
MQTT协议的优点(相较于HTTP)低开销、低带宽、可以隐藏客户端IP、端口(所有的消息都有MQTT服务器代理转发)、允许客户端自由创建topic等。

环信的Github项目可谓是一言难尽,没有任何相关SDK详细的说明!(还需要自行官网找,github项目区看不到!信息没有联通!)我们还是按照我的教程将sdk导入Maven吧!

进入正文:开发环境配置

1、自己编译SDK

Java项目SDK下载地址:https://github.com/easemob/easemob-mqtt-server-sdk/

将下载的代码打成Jar包(最终的目的是导入Maven仓库)

进到 src同级路径 输入

mvn package 

# package install二选一 如果不想手动将jar包打入maven仓库 可以直接install
mvn install

2、将jar导入Maven仓库

找到刚才编译的target目录下的jar包,然后cmd

mvn install:install-file -Dfile=huanxin-mqtt-sdk-1.0.0.RELEASE.jar -DgroupId=org.example -DartifactId=huanxin-mqtt-sdk -Dversion=1.0.0.RELEASE -Dpackaging=jar

以后我们的Maven就可以引用了

        <dependency>
            <groupId>org.example</groupId>
            <artifactId>huanxin-mqtt-sdk</artifactId>
            <version>1.0.0.RELEASE</version>
        </dependency>

SDK 引入成功,接下来我就就可以使用SDK了

SDK 代码编写

服务端SDK文档:https://docs-im.easemob.com/mqtt/serversdkdownload#%E6%9C%8D%E5%8A%A1%E5%99%A8sdk

import com.easemob.sdk.Client;
import com.easemob.sdk.Config;
import com.easemob.sdk.api.*;
import com.easemob.sdk.domain.*;
import com.xunliao.zhenliaowms.pojo.ResultPage;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author : zanglikun
 * @date : 2021/12/11 14:33
 * @Version: 1.0
 * @Desc : 环信MQTT接入服务Demo
 */
@Api(tags = "环信MQTT服务")
@RestController
@RequestMapping("/easemobMQTT")
@Component
@Slf4j
public class MQTT {
    //SDK初始化  https://docs-im.easemob.com/mqtt/serversdkdownload#sdk%E5%88%9D%E5%A7%8B%E5%8C%96
    /*
     cilentId、cilentSercret 均在环信应用概况-应用详情-开发者ID查看
     RestApi 在环信MQTT服务概况-服务配置-REST API地址
    */
    @Bean
    public static Client createClient() {
        Config config = new Config(); // 实例化应用配置类
        config.setAppClientId("你的AppClientId")
                .setAppClientSecret("你的AppClientSecret")
                .setRestApi("https://api.cn1.mqtt.chat/app/vms2j0");
        Client client = new Client(config);  // 实例化应用客户端
        return client;
    }

    @Autowired
    private Client client;

    /**
     * 获取临时访问Token https://docs-im.easemob.com/mqtt/serversdkdownload#%E8%8E%B7%E5%8F%96%E4%B8%B4%E6%97%B6%E8%AE%BF%E9%97%AEtoken
     * 需要用一个环信用户的信息,去随便找一个账号密码即可,未来使用此Token来发送消息
     */
    @ApiOperation("获取临时访问Token")
    @PostMapping("/getToken")
    public ResultPage GetToken(String username, String password) {
        try {
            ApplyTokenRequest applyTokenRequest = new ApplyTokenRequest();  // 创建请求参数类
            applyTokenRequest.setUsername(username);  // 设置请求参数
            applyTokenRequest.setPassword(password);  // 设置请求参数
            ResponseOk<TokenInfo> responseOk = client.applyToken(applyTokenRequest);  // 调用应用客户端发起请求
            if (responseOk.getCode() == 200) {  // 如果返回code为200,请求正常返回
                String access_toke = responseOk.getBody().getAccess_token(); // 获取返回对象字段值
                Long expires_time = responseOk.getBody().getExpires_in();
                //System.out.println("获取的临时Token是:" + access_toke);
                //System.out.println("临时Token失效时间至:" + expires_time);
                return new ResultPage(true, "获取Token成功!有效期:" + expires_time, access_toke);
            }
        } catch (Exception e) {
            log.error("获取账号密码:{},{}:Token出现异常", username, password);
        }
        return new ResultPage(false, "获取Token失败!", null);
    }

    /**
     * 服务端应用发消息  https://docs-im.easemob.com/mqtt/serversdkdownload#%E5%8F%91%E9%80%81%E6%B6%88%E6%81%AF
     * ClientID 由2部分组成,以@连接。我这里使用的是账号@MQTT里面的AppID AppID唯一,但是账号一定不能用相同的!
     * Payload 支持json、xml、raw格式
     */
    @ApiOperation("服务端应用发消息")
    @PostMapping("/serverSendMsg")
    public ResultPage ServerSendMsg(String topic, String account, String message) {
        try {
            if (StringUtils.isBlank(message) || message.length() > 50) {
                return new ResultPage(false, "抱歉,您输入的内容为空或者长度大于50!", null);
            }
            if (StringUtils.isBlank(topic) || StringUtils.isBlank(account)) {
                return new ResultPage(false, "抱歉,您输入的账号ID或者主题为空!", null);
            }
            
            String[] topics = new String[]{"all-server"};
            SendMessage sendMessage = new SendMessage();
            sendMessage.setClientid(account + "@vms2j0");
            sendMessage.setPayload(message);
            sendMessage.setTopics(topics);
            ResponseOk<ChatInfo> responseOk = client.sendMessage(sendMessage);
            if (responseOk.getCode() == 200) {
                ChatInfo chatInfo = responseOk.getBody();
                //System.out.println(responseOk.getMsg());
                //System.out.println(chatInfo.toString());
                //System.out.println("发送消息的结果是" + chatInfo);
                return new ResultPage(true, "服务端发送消息成功!", chatInfo);
            } else {
                return new ResultPage(false, "服务端发送消息成功,但请求状态不是200", responseOk.getBody(), responseOk.getCode());
            }
        } catch (Exception e) {
            log.error("服务端发送消息失败,数据依次是:", topic, account, message);
        }
        return new ResultPage(false, "服务端发送消息失败!", null);
    }

    /**
     * 查询MQTT客户端连接记录  https://docs-im.easemob.com/mqtt/serversdkdownload#%E6%9F%A5%E8%AF%A2%E5%AE%A2%E6%88%B7%E7%AB%AF%E8%BF%9E%E6%8E%A5%E8%AE%B0%E5%BD%95
     * 传入时间 并指定CilentID 由 账号@MQTT的AppID组成
     * 用后台发消息,不会由连接记录
     */
    @ApiOperation("查询MQTT客户端连接记录")
    @PostMapping("/findMQTTClientLinkRecord")
    public ResultPage mainnnnn(String accout, String beginTime, String endTime, Integer pageNum, Integer pageSize) throws Exception {
        try {
            QueryMqttRecordDevice queryMqttRecordDevice = new QueryMqttRecordDevice();
            queryMqttRecordDevice.setBeginTime("2021-12-11 13:00:00");
            queryMqttRecordDevice.setEndTime("2021-12-25 17:00:00");
            queryMqttRecordDevice.setCurrentPage(1);
            queryMqttRecordDevice.setPageSize(10);
            queryMqttRecordDevice.setClientid(accout + "@vms2j0");
            ResponseOk<DeviceRecordInfo> responseOk = client.queryMqttRecordDevice(queryMqttRecordDevice);
            if (responseOk.getCode() == 200) {
                DeviceRecordInfo deviceRecordInfo = responseOk.getBody();
                //System.out.println(deviceRecordInfo);
                return new ResultPage(true, "查询MQTT客户端链接成功!", deviceRecordInfo);
            } else {
                return new ResultPage(true, "查询MQTT客户端链接成功 但状态不是200", responseOk.getBody(), responseOk.getCode());
            }
        } catch (Exception e) {
            log.error("查询MQTT客户端链接失败!");
        }
        return new ResultPage(false, "查询MQTT客户端链接失败!", null);
    }

    /**
     * 根据Client ID查询指定客户端当前session连接信息  https://docs-im.easemob.com/mqtt/serversdkdownload#%E6%9F%A5%E8%AF%A2%E5%AE%A2%E6%88%B7%E7%AB%AFsession%E4%BF%A1%E6%81%AF
     * 依旧是传入指定CilentID 由 账号@MQTT的AppID
     * 目前暂时没发现有啥用
     */
    @ApiOperation("查询某个账号的Session信息")
    @PostMapping("/findSessionInfo")
    public ResultPage FindSessionInfo(String account, Integer pageNum, Integer pageSize) throws Exception {
        try {
            QuerySessionByClientId querySessionByClientId = new QuerySessionByClientId();
            querySessionByClientId.setClientid(account + "@vms2j0");
            querySessionByClientId.setCurrentPage(1);
            querySessionByClientId.setPageSize(10);
            ResponseOk<ClientInfo> responseOk = client.querySessionByClientId(querySessionByClientId);
            if (responseOk.getCode() == 200) {
                ClientInfo clientInfo = responseOk.getBody();
                //System.out.println(clientInfo);
                return new ResultPage(true, "查询:" + account + " 的Session成功", clientInfo);
            } else {
                return new ResultPage(true, "查询:" + account + " 的Session 状态不是200", responseOk.getBody(), responseOk.getCode());
            }
        } catch (Exception e) {
            log.error("查询:{},Session失败", account);
        }
        return new ResultPage(false, "查询:" + account + " 的Session失败", null);
    }

    /**
     * 查询客户端消息发送&投递记录  https://docs-im.easemob.com/mqtt/serversdkdownload#%E6%9F%A5%E8%AF%A2%E5%AE%A2%E6%88%B7%E7%AB%AF%E6%B6%88%E6%81%AF%E5%8F%91%E9%80%81_%E6%8A%95%E9%80%92%E8%AE%B0%E5%BD%95
     * 依旧是传入指定ClientID 由 账号@MQTT的AppID
     */
    @ApiOperation("查询客户端消息发送&投递记录")
    @PostMapping("/findClientMsg")
    public ResultPage mains(String account, String beginTime, String endTime, Integer pageNum, Integer pageSize) throws Exception {
        try {
            QueryMqttRecordMessageOfClient queryMqttRecordMessageOfClient = new QueryMqttRecordMessageOfClient();
            queryMqttRecordMessageOfClient.setClientid(account + "@vms2j0");
            queryMqttRecordMessageOfClient.setBeginTime("2021-12-11 15:00:00");
            queryMqttRecordMessageOfClient.setEndTime("2021-12-25 18:49:00");
            queryMqttRecordMessageOfClient.setCurrentPage(pageNum);
            queryMqttRecordMessageOfClient.setPageSize(pageSize);
            queryMqttRecordMessageOfClient.setOrder(OrderParams.desc);
            ResponseOk<MqttMessageInfo> responseOk = client.queryMqttRecordMessageOfClient(queryMqttRecordMessageOfClient);
            if (responseOk.getCode() == 200) {
                MqttMessageInfo mqttMessageInfo = responseOk.getBody();
                //System.out.println(mqttMessageInfo);
                return new ResultPage(true, "查询客户端消息发送&投递记录成功!", mqttMessageInfo);
            } else {
                return new ResultPage(false, "查询客户端消息发送 状态结果不是200!", responseOk.getBody(), responseOk.getCode());
            }
        } catch (Exception e) {
            log.error("查询客户端消息发送&投递记录出现异常!");
        }
        return new ResultPage(false, "查询客户端消息发送&投递记录", null);
    }

}

拿到这个Demo代码,你就可以使用了!自己整改一下到业务就完事了!

特殊说明:
上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取最新全部资料 ❤