如何解决MqttConnection:客户端未连接,因此不在Java中发送消息
我尝试使用mqtt paho发布主题或发布消息mqtt.fx,但是它显示消息I / MqttConnection:客户端未连接,所以未发送消息。我的英语不好,但我希望您理解我的问题。我工作了几个小时,但找不到任何解决方案。 这是主要活动:
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
pahoMqttClient = new PahoMqttClient();
textMessage = (EditText) findViewById(R.id.textMessage);
publishMessage = (Button) findViewById(R.id.publishMessage);
subscribe = (Button) findViewById(R.id.subscribe);
unSubscribe = (Button) findViewById(R.id.unSubscribe);
subscribeTopic = (EditText) findViewById(R.id.subscribeTopic);
unSubscribeTopic = (EditText) findViewById(R.id.unSubscribeTopic);
client = pahoMqttClient.getMqttClient(getApplicationContext(),Constants.MQTT_BROKER_URL,Constants.CLIENT_ID);
publishMessage.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
String msg = textMessage.getText().toString().trim();
if (!msg.isEmpty()) {
try {
pahoMqttClient.publishMessage(client,msg,1,Constants.PUBLISH_TOPIC);
} catch (MqttException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
});
subscribe.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
String topic = subscribeTopic.getText().toString().trim();
if (!topic.isEmpty()) {
try {
pahoMqttClient.subscribe(client,topic,1);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
});
unSubscribe.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
String topic = unSubscribeTopic.getText().toString().trim();
if (!topic.isEmpty()) {
try {
pahoMqttClient.unSubscribe(client,topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
});
Intent intent = new Intent(MainActivity.this,MqttMessageService.class);
startService(intent);
}
}
这是我的客户代码:
public class PahoMqttClient {
private static final String TAG = "PahoMqttClient";
private MqttAndroidClient mqttAndroidClient;
public MqttAndroidClient getMqttClient(Context context,String brokerUrl,String clientId) {
mqttAndroidClient = new MqttAndroidClient(context,brokerUrl,clientId);
try {
IMqttToken token = mqttAndroidClient.connect(getMqttConnectionOption());
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
mqttAndroidClient.setBufferOpts(getDisconnectedBufferOptions());
Log.d(TAG,"Success");
}
@Override
public void onFailure(IMqttToken asyncActionToken,Throwable exception) {
Log.d(TAG,"Failure " + exception.toString());
}
});
} catch (MqttException e) {
e.printStackTrace();
}
return mqttAndroidClient;
}
public void disconnect(@NonNull MqttAndroidClient client) throws MqttException {
IMqttToken mqttToken = client.disconnect();
mqttToken.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
Log.d(TAG,"Successfully disconnected");
}
@Override
public void onFailure(IMqttToken iMqttToken,Throwable throwable) {
Log.d(TAG,"Failed to disconnected " + throwable.toString());
}
});
}
@NonNull
private DisconnectedBufferOptions getDisconnectedBufferOptions() {
DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
disconnectedBufferOptions.setBufferEnabled(true);
disconnectedBufferOptions.setBufferSize(100);
disconnectedBufferOptions.setPersistBuffer(false);
disconnectedBufferOptions.setDeleteOldestMessages(false);
return disconnectedBufferOptions;
}
@NonNull
private MqttConnectOptions getMqttConnectionOption() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setAutomaticReconnect(true);
//mqttConnectOptions.setWill(Constants.PUBLISH_TOPIC,"I am going offline".getBytes(),true);
//mqttConnectOptions.setUserName("ngbllzzy");
//mqttConnectOptions.setPassword("WtjhZKl3OPoK".toCharArray());
return mqttConnectOptions;
}
public void publishMessage(@NonNull MqttAndroidClient client,@NonNull String msg,int qos,@NonNull String topic)
throws MqttException,UnsupportedEncodingException {
byte[] encodedPayload = new byte[0];
encodedPayload = msg.getBytes("UTF-8");
MqttMessage message = new MqttMessage(encodedPayload);
message.setId(320);
message.setRetained(true);
message.setQos(qos);
client.publish(topic,message);
}
public void subscribe(@NonNull MqttAndroidClient client,@NonNull final String topic,int qos) throws MqttException {
IMqttToken token = client.subscribe(topic,qos);
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
Log.d(TAG,"Subscribe Successfully " + topic);
}
@Override
public void onFailure(IMqttToken iMqttToken,Throwable throwable) {
Log.e(TAG,"Subscribe Failed " + topic);
}
});
}
public void unSubscribe(@NonNull MqttAndroidClient client,@NonNull final String topic) throws MqttException {
IMqttToken token = client.unsubscribe(topic);
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
Log.d(TAG,"UnSubscribe Successfully " + topic);
}
@Override
public void onFailure(IMqttToken iMqttToken,"UnSubscribe Failed " + topic);
}
});
}
} 在这里,我为经纪人URL,客户ID和发布主题创建了单独的类。
public class Constants {
public static final String MQTT_BROKER_URL = "tcp://127.0.0.1:1883";
public static final String PUBLISH_TOPIC = "test";
public static final String CLIENT_ID = "ef9c9a7c92d04effaead3f22227703ae";
}
And finally here it is message service class:
public class MqttMessageService extends Service {
private static final String TAG = "MqttMessageService";
private PahoMqttClient pahoMqttClient;
private MqttAndroidClient mqttAndroidClient;
public MqttMessageService() {
}
@Override
public void onCreate() {
super.onCreate();
Log.d(TAG,"onCreate");
pahoMqttClient = new PahoMqttClient();
mqttAndroidClient = pahoMqttClient.getMqttClient(getApplicationContext(),Constants.CLIENT_ID);
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean b,String s) {
}
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String s,MqttMessage mqttMessage) throws Exception {
setMessageNotification(s,new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
}
@Override
public int onStartCommand(Intent intent,int flags,int startId) {
Log.d(TAG,"onStartCommand");
return START_STICKY;
}
@Override
public IBinder onBind(Intent intent) {
// TODO: Return the communication channel to the service.
throw new UnsupportedOperationException("Not yet implemented");
}
@Override
public void onDestroy() {
super.onDestroy();
Log.d(TAG,"onDestroy");
}
private void setMessageNotification(@NonNull String topic,@NonNull String msg) {
NotificationCompat.Builder mBuilder =
new NotificationCompat.Builder(this)
.setSmallIcon(R.drawable.ic_message_black_24dp)
.setContentTitle(topic)
.setContentText(msg);
Intent resultIntent = new Intent(this,MainActivity.class);
TaskStackBuilder stackBuilder = TaskStackBuilder.create(this);
stackBuilder.addParentStack(MainActivity.class);
stackBuilder.addNextIntent(resultIntent);
PendingIntent resultPendingIntent =
stackBuilder.getPendingIntent(0,PendingIntent.FLAG_UPDATE_CURRENT);
mBuilder.setContentIntent(resultPendingIntent);
NotificationManager mNotificationManager =
(NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
mNotificationManager.notify(100,mBuilder.build());
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。