1.ios 智能家居 mqtt 使用场景
mqtt mqtt
zigbeen -> 网关 -> 服务器 -> app
2. mqtt client 使用流程
1. Podfile文件内容, mqtt client 安装依赖
platform :ios,'9.0'
use_frameworks!
target 'MqttClient' do
inhibit_all_warnings!
pod 'AFNetworking', '~>3.0'
pod 'MQTTClient'
end
pod install
2、 创建连接 、设置 mqttSession 代理
- (IBAction)connectionMqtt:(id)sender {
MQTTCFSocketTransport *transport = (MQTTCFSocketTransport *)self.mqttSession.transport;
/*MQTTCFSocketEncoder *enc = [transport valueForKey:@"encoder"];
BOOL socketClosed = enc.stream.streamStatus<NSStreamStatusOpening || enc.stream.streamStatus>NSStreamStatusAtEnd;*/
BOOL connected = self.mqttSession.status == MQTTSessionStatusConnected;
BOOL connecting = self.mqttSession.status == MQTTSessionStatusConnecting;
if ((![self.mqttSession.userName isEqualToString:self.uid] || ![self.mqttSession.password isEqualToString:self.token]) || !(connecting || connected))
{
connected = NO;
[self internalClose];
if (!self.mqttSession)
{
self.mqttSession = [[MQTTSession alloc] init];
transport = [[MQTTCFSocketTransport alloc] init];
transport.host = kMQTTHost;
transport.port = kMQTTPort;
self.mqttSession.transport = transport;
self.mqttSession.keepAliveInterval = 5;
self.mqttSession.delegate = self;
[MQTTLog setLogLevel:DDLogLevelDebug];
}
self.mqttSession.userName = self.uid;
self.mqttSession.clientId = [NSString stringWithFormat:@"app:%@", self.uid];
self.mqttSession.password = self.token;
[self.mqttSession connect];
}
}
3、 连接状态监听
-(void)connected:(MQTTSession *)session{} 连接成功回调,订阅topic
-(void)connectionRefused:(MQTTSession *)session error:(NSError *)error
连接被拒绝
-(void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error{}
连接状态回调
///关闭MQTT服务器的连接。@note 注意,现时不能统一在这里移除队列任务,否则会造成没有连接前添加的任务全部被移除了。
- (void)internalClose
{
NSLog(@"%@",@"关闭mqtt连接");
[self.mqttSession disconnect];
// self.subscribed = NO;
}
/*连接成功回调*/
-(void)connected:(MQTTSession *)session{
NSLog(@"%@",@"连接成功");
UInt16 pid = 0;
NSString* subTopic=[NSString stringWithFormat:@"/%@/rpc/reply", self.uid];
pid = [self.mqttSession subscribeToTopic:subTopic atLevel:MQTTQosLevelExactlyOnce subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) {
// block(error, gQoss);
NSLog(@"订阅回调: %@--%@",error,gQoss);
}];
}
-(void)connectionClosed:(MQTTSession *)session{
self.subscribed = NO;
}
-(void)connectionRefused:(MQTTSession *)session error:(NSError *)error
{
switch (error.code)
{
case MQTTSessionErrorConnackBadUsernameOrPassword:
case MQTTSessionErrorConnackNotAuthorized:
NSLog(@"%@",@"Mqtt 鉴权错误... 连接被拒绝");
break;
default:
NSLog(@"%@",@"Mqtt 连接被拒绝");
break;
}
}
/*连接状态回调*/
-(void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error{
if (eventCode == MQTTSessionEventConnected) {
NSLog(@"2222222 链接MQTT 成功");
}else if (eventCode == MQTTSessionEventConnectionRefused) {
NSLog(@"MQTT拒绝链接");
}else if (eventCode == MQTTSessionEventConnectionClosed){
NSLog(@"MQTT链接关闭");
}else if (eventCode == MQTTSessionEventConnectionError){
NSLog(@"MQTT 链接错误");
}else if (eventCode == MQTTSessionEventProtocolError){
NSLog(@"MQTT 不可接受的协议");
}else{//MQTTSessionEventConnectionClosedByBroker
NSLog(@"MQTT链接 其他错误");
}
if (error) {
NSLog(@"链接报错 -- %@",error);
}
}
3、 订阅topic以后服务器 推送消息到app回调接口
-(void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid{}
/*连接成功回调*/
-(void)connected:(MQTTSession *)session{
NSLog(@"%@",@"连接成功");
UInt16 pid = 0;
NSString* subTopic=[NSString stringWithFormat:@"/%@/rpc/reply", self.uid];
pid = [self.mqttSession subscribeToTopic:subTopic atLevel:MQTTQosLevelExactlyOnce subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) {
// block(error, gQoss);
NSLog(@"订阅回调: %@--%@",error,gQoss);
}];
}
/*收到消息*/
-(void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid{
NSDictionary *result = [NSJSONSerialization JSONObjectWithData:data options:0 error:nil];
NSLog(@"mqtt result=%@",result);
if (![result isKindOfClass:NSDictionary.class]) return;
NSNumber *msgId = result[@"msgId"]?:result[@"msgid"];
NSString *func = [result[kFunc] isKindOfClass:NSString.class] ? result[kFunc] : @"不是字符串";
NSLog(@"收到mqtt消息: %@---%@",msgId,func);
}
4. app到服务器
主题名: NSString* const MQTTServerTopic = @"/request/app/func";
app 到服务器 透传到网关
主题名: NSString* const MQTTGWTopic = @"/type/gwid/call";
[self.mqttSession publishData:payload onTopic:MQTTServerTopic retain:NO qos:MQTTQosLevelExactlyOnce publishHandler:^(NSError *error) {
NSLog(@"%@",@"发送获取设备列表");
completion(nil,nil,nil);
}];
发送过去以后再 3中 收到消息
- (IBAction)getDevicesList:(id)sender {
[self performServerFunc:@"getAllBindDevice" withParams:@{@"msgtype":@"request"} completion:^(NSError * _Nullable error, BOOL b, id _Nullable rs) {
NSLog(@"%@",@"完成");
}];
}
#pragma mark MQTT服务器相关接口。
- (void )performServerFunc:(NSString *)func withParams:(NSDictionary *)funcParams completion:(void (^)(NSError * _Nullable, BOOL, id _Nullable))completion
{
NSAssert(func.length, @"%s__%d__执行的功能不能为空", __FILE__, __LINE__);
NSMutableDictionary *newParam = [NSMutableDictionary dictionaryWithDictionary:funcParams ?: @{}];
newParam[kFunc] = func;
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wincompatible-pointer-types"
newParam[@"uid"] = self.uid;
newParam[@"msgId"] = @([self nextMsgId]);
NSData *payload=[NSJSONSerialization dataWithJSONObject:newParam options:0 error:nil];
#pragma clang diagnostic pop
UInt16 pid = [self.mqttSession publishData:payload onTopic:MQTTServerTopic retain:NO qos:MQTTQosLevelExactlyOnce publishHandler:^(NSError *error) {
NSLog(@"%@",@"发送获取设备列表");
if(error){
NSLog(@"发送失败 - %@",error);
}else{
NSLog(@"发送成功");
}
completion(nil,nil,nil);
}];
}
5. 取消订阅
[self.mqttSession unsubscribeTopic:subTopic unsubscribeHandler:^(NSError *error) {
NSLog(@"取消订阅: %@",error);
}];
- (IBAction)cancleSubsribe:(id)sender {
NSString* subTopic=[NSString stringWithFormat:@"/%@/rpc/reply", self.uid];
UInt16 pid = [self.mqttSession unsubscribeTopic:subTopic unsubscribeHandler:^(NSError *error) {
if (error) {
NSLog(@"取消订阅失败--%@",error);
}else{
NSLog(@"取消订阅成功");
}
}];
}
完整代码:
//
// ViewController.m
// MqttClient
//
#import "ViewController.h"
#import <MQTTClient/MQTTClient.h>
#import "KDSHttpManager.h"
#import "KDSHttpManager+Login.h"
#import "KDSTool.h"
#define kBaseURL @""
#define kMQTTHost @""
#pragma mark - 主题名
NSString* const MQTTServerTopic = @"/request/app/func";
NSString* const MQTTGWTopic = @"/type/%@/call";
//#define MQTTGWTopic(gwuuid) [NSString stringWithFormat:MQTTGWTopic, gwuuid]///<发布到网关的主题宏。
// %@ 用对应 网关 sn 替代
#define kMQTTPort 1883
static NSString * const kFunc = @"func";
@interface ViewController ()<MQTTSessionDelegate>
@property (nonatomic, strong) MQTTSession *mqttSession;
@property(nonatomic,copy) NSString* uid;
@property(nonatomic,copy) NSString* token;
@property (nonatomic, assign) BOOL subscribed;
@property (nonatomic, assign) NSInteger msgId;
@end
@implementation ViewController
- (void)viewDidLoad {
[super viewDidLoad];
_msgId=1;
}
// 1. 创建连接 、设置 mqttSession 代理
- (IBAction)connectionMqtt:(id)sender {
MQTTCFSocketTransport *transport = (MQTTCFSocketTransport *)self.mqttSession.transport;
/*MQTTCFSocketEncoder *enc = [transport valueForKey:@"encoder"];
BOOL socketClosed = enc.stream.streamStatus<NSStreamStatusOpening || enc.stream.streamStatus>NSStreamStatusAtEnd;*/
BOOL connected = self.mqttSession.status == MQTTSessionStatusConnected;
BOOL connecting = self.mqttSession.status == MQTTSessionStatusConnecting;
if ((![self.mqttSession.userName isEqualToString:self.uid] || ![self.mqttSession.password isEqualToString:self.token]) || !(connecting || connected))
{
connected = NO;
[self internalClose];
if (!self.mqttSession)
{
self.mqttSession = [[MQTTSession alloc] init];
transport = [[MQTTCFSocketTransport alloc] init];
transport.host = kMQTTHost;
transport.port = kMQTTPort;
self.mqttSession.transport = transport;
self.mqttSession.keepAliveInterval = 5;
self.mqttSession.delegate = self;
[MQTTLog setLogLevel:DDLogLevelDebug];
}
self.mqttSession.userName = self.uid;
self.mqttSession.clientId = [NSString stringWithFormat:@"app:%@", self.uid];
self.mqttSession.password = self.token;
[self.mqttSession connect];
}
}
///关闭MQTT服务器的连接。@note 注意,现时不能统一在这里移除队列任务,否则会造成没有连接前添加的任务全部被移除了。
- (void)internalClose
{
NSLog(@"%@",@"关闭mqtt连接");
[self.mqttSession disconnect];
// self.subscribed = NO;
}
/* 2. 连接成功回调*/
-(void)connected:(MQTTSession *)session{
NSLog(@"%@",@"连接成功");
UInt16 pid = 0;
NSString* subTopic=[NSString stringWithFormat:@"/%@/rpc/reply", self.uid];
pid = [self.mqttSession subscribeToTopic:subTopic atLevel:MQTTQosLevelExactlyOnce subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) {
// block(error, gQoss);
NSLog(@"订阅回调: %@--%@",error,gQoss);
}];
}
-(void)connectionClosed:(MQTTSession *)session{
self.subscribed = NO;
}
-(void)connectionRefused:(MQTTSession *)session error:(NSError *)error
{
switch (error.code)
{
case MQTTSessionErrorConnackBadUsernameOrPassword:
case MQTTSessionErrorConnackNotAuthorized:
NSLog(@"%@",@"Mqtt 鉴权错误... 连接被拒绝");
break;
default:
NSLog(@"%@",@"Mqtt 连接被拒绝");
break;
}
}
/*连接状态回调*/
-(void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error{
if (eventCode == MQTTSessionEventConnected) {
NSLog(@"2222222 链接MQTT 成功");
}else if (eventCode == MQTTSessionEventConnectionRefused) {
NSLog(@"MQTT拒绝链接");
}else if (eventCode == MQTTSessionEventConnectionClosed){
NSLog(@"MQTT链接关闭");
}else if (eventCode == MQTTSessionEventConnectionError){
NSLog(@"MQTT 链接错误");
}else if (eventCode == MQTTSessionEventProtocolError){
NSLog(@"MQTT 不可接受的协议");
}else{//MQTTSessionEventConnectionClosedByBroker
NSLog(@"MQTT链接 其他错误");
}
if (error) {
NSLog(@"链接报错 -- %@",error);
}
}
/* 3. 收到消息*/
-(void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid{
NSDictionary *result = [NSJSONSerialization JSONObjectWithData:data options:0 error:nil];
NSLog(@"mqtt result=%@",result);
if (![result isKindOfClass:NSDictionary.class]) return;
NSNumber *msgId = result[@"msgId"]?:result[@"msgid"];
NSString *func = [result[kFunc] isKindOfClass:NSString.class] ? result[kFunc] : @"不是字符串";
NSLog(@"收到mqtt消息: %@---%@",msgId,func);
}
// 用户登录获取 token ,uid
- (IBAction)login:(id)sender {
int source = 1;
NSString *username = @"8615875568850";
NSString *passWord = @"a123456";
[[KDSHttpManager sharedManager] login:source username:username password:passWord success:^(KDSUser * _Nonnull user) {
NSLog(@"userid==%@",user.uid);
NSLog(@"token==%@",user.token);
self.uid= user.uid;
self.token = user.token;
// NSString *account = [KDSTool getDefaultLoginAccount];
[KDSTool setDefaultLoginAccount:username];
[KDSTool setDefaultLoginPassWord:passWord];
// 设置token
[KDSHttpManager sharedManager].token = user.token;
} error:^(NSError * _Nonnull error) {
NSString *msg;
msg = error.localizedDescription;
NSLog(@"error=%@",msg);
} failure:^(NSError * _Nonnull error) {
NSLog(@"%@",@"failure");
}];
}
// 4. 获取消息 app 到服务器
- (IBAction)getDevicesList:(id)sender {
[self performServerFunc:@"getAllBindDevice" withParams:@{@"msgtype":@"request"} completion:^(NSError * _Nullable error, BOOL b, id _Nullable rs) {
NSLog(@"%@",@"完成");
}];
}
#pragma mark MQTT服务器相关接口。
- (void )performServerFunc:(NSString *)func withParams:(NSDictionary *)funcParams completion:(void (^)(NSError * _Nullable, BOOL, id _Nullable))completion
{
NSAssert(func.length, @"%s__%d__执行的功能不能为空", __FILE__, __LINE__);
NSMutableDictionary *newParam = [NSMutableDictionary dictionaryWithDictionary:funcParams ?: @{}];
newParam[kFunc] = func;
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wincompatible-pointer-types"
newParam[@"uid"] = self.uid;
newParam[@"msgId"] = @([self nextMsgId]);
NSData *payload=[NSJSONSerialization dataWithJSONObject:newParam options:0 error:nil];
#pragma clang diagnostic pop
UInt16 pid = [self.mqttSession publishData:payload onTopic:MQTTServerTopic retain:NO qos:MQTTQosLevelExactlyOnce publishHandler:^(NSError *error) {
NSLog(@"%@",@"发送获取设备列表");
if(error){
NSLog(@"发送失败 - %@",error);
}else{
NSLog(@"发送成功");
}
completion(nil,nil,nil);
}];
}
///获取msgId时加锁。
- (NSInteger)nextMsgId
{
@synchronized (self) {
NSInteger mid = self.msgId;
self.msgId ++;
return mid;
}
}
// 5. 取消订阅
- (IBAction)cancleSubsribe:(id)sender {
NSString* subTopic=[NSString stringWithFormat:@"/%@/rpc/reply", self.uid];
UInt16 pid = [self.mqttSession unsubscribeTopic:subTopic unsubscribeHandler:^(NSError *error) {
if (error) {
NSLog(@"取消订阅失败--%@",error);
}else{
NSLog(@"取消订阅成功");
}
}];
}
@end
源码下载地址 :https://download.csdn.net/download/dreams_deng/12572615