PubSubClient Library
Es geht um die Library die auf GitHub unter folgendem Link zu finden ist.
https://github.com/knolleary/pubsubclient
Als Orientierungshilfe ist sie ein gelungener Einstieg, wenn auch die Einschränkungen etwas weh tun.
It can only publish QoS 0 messages. It can subscribe at QoS 0 or QoS 1.
Unten auf der Seite gibt es einen Hinweis, dass die Library nicht mit dem ENC28J60 funktioniert, aber nicht warum!
Also dem Link folgen und die empfohlene Library gleich mit untersuchen:-(
https://github.com/njh/NanodeMQTT
Empfohlen wird die Library für das Nanode-Board. Auf der Seite sind aber schon viele Links tot, so dass ich davon ausgehe, dass das Board sein Lebensende überschritten hat. Die Kurzanalyse des Boards und die entsprechenden Libs packe ich in eine neue Seite.
1 /*
2
3 PubSubClient.cpp - A simple client for MQTT.
4 Nick O'Leary
5 http://knolleary.net
6 */
7
8 #include "PubSubClient.h"
9 #include "Arduino.h"
10
11 PubSubClient::PubSubClient() {
12 this->_state = MQTT_DISCONNECTED;
13 this->_client = NULL;
14 this->stream = NULL;
15 setCallback(NULL);
16 this->bufferSize = 0;
17 setBufferSize(MQTT_MAX_PACKET_SIZE);
18 setKeepAlive(MQTT_KEEPALIVE);
19 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
20 }
21
22 PubSubClient::PubSubClient(Client& client) {
23 this->_state = MQTT_DISCONNECTED;
24 setClient(client);
25 this->stream = NULL;
26 this->bufferSize = 0;
27 setBufferSize(MQTT_MAX_PACKET_SIZE);
28 setKeepAlive(MQTT_KEEPALIVE);
29 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
30 }
31
32 PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
33 this->_state = MQTT_DISCONNECTED;
34 setServer(addr, port);
35 setClient(client);
36 this->stream = NULL;
37 this->bufferSize = 0;
38 setBufferSize(MQTT_MAX_PACKET_SIZE);
39 setKeepAlive(MQTT_KEEPALIVE);
40 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
41 }
42 PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
43 this->_state = MQTT_DISCONNECTED;
44 setServer(addr,port);
45 setClient(client);
46 setStream(stream);
47 this->bufferSize = 0;
48 setBufferSize(MQTT_MAX_PACKET_SIZE);
49 setKeepAlive(MQTT_KEEPALIVE);
50 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
51 }
52 PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
53 this->_state = MQTT_DISCONNECTED;
54 setServer(addr, port);
55 setCallback(callback);
56 setClient(client);
57 this->stream = NULL;
58 this->bufferSize = 0;
59 setBufferSize(MQTT_MAX_PACKET_SIZE);
60 setKeepAlive(MQTT_KEEPALIVE);
61 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
62 }
63 PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
64 this->_state = MQTT_DISCONNECTED;
65 setServer(addr,port);
66 setCallback(callback);
67 setClient(client);
68 setStream(stream);
69 this->bufferSize = 0;
70 setBufferSize(MQTT_MAX_PACKET_SIZE);
71 setKeepAlive(MQTT_KEEPALIVE);
72 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
73 }
74
75 PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
76 this->_state = MQTT_DISCONNECTED;
77 setServer(ip, port);
78 setClient(client);
79 this->stream = NULL;
80 this->bufferSize = 0;
81 setBufferSize(MQTT_MAX_PACKET_SIZE);
82 setKeepAlive(MQTT_KEEPALIVE);
83 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
84 }
85 PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
86 this->_state = MQTT_DISCONNECTED;
87 setServer(ip,port);
88 setClient(client);
89 setStream(stream);
90 this->bufferSize = 0;
91 setBufferSize(MQTT_MAX_PACKET_SIZE);
92 setKeepAlive(MQTT_KEEPALIVE);
93 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
94 }
95 PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
96 this->_state = MQTT_DISCONNECTED;
97 setServer(ip, port);
98 setCallback(callback);
99 setClient(client);
100 this->stream = NULL;
101 this->bufferSize = 0;
102 setBufferSize(MQTT_MAX_PACKET_SIZE);
103 setKeepAlive(MQTT_KEEPALIVE);
104 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
105 }
106 PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
107 this->_state = MQTT_DISCONNECTED;
108 setServer(ip,port);
109 setCallback(callback);
110 setClient(client);
111 setStream(stream);
112 this->bufferSize = 0;
113 setBufferSize(MQTT_MAX_PACKET_SIZE);
114 setKeepAlive(MQTT_KEEPALIVE);
115 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
116 }
117
118 PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
119 this->_state = MQTT_DISCONNECTED;
120 setServer(domain,port);
121 setClient(client);
122 this->stream = NULL;
123 this->bufferSize = 0;
124 setBufferSize(MQTT_MAX_PACKET_SIZE);
125 setKeepAlive(MQTT_KEEPALIVE);
126 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
127 }
128 PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
129 this->_state = MQTT_DISCONNECTED;
130 setServer(domain,port);
131 setClient(client);
132 setStream(stream);
133 this->bufferSize = 0;
134 setBufferSize(MQTT_MAX_PACKET_SIZE);
135 setKeepAlive(MQTT_KEEPALIVE);
136 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
137 }
138 PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
139 this->_state = MQTT_DISCONNECTED;
140 setServer(domain,port);
141 setCallback(callback);
142 setClient(client);
143 this->stream = NULL;
144 this->bufferSize = 0;
145 setBufferSize(MQTT_MAX_PACKET_SIZE);
146 setKeepAlive(MQTT_KEEPALIVE);
147 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
148 }
149 PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
150 this->_state = MQTT_DISCONNECTED;
151 setServer(domain,port);
152 setCallback(callback);
153 setClient(client);
154 setStream(stream);
155 this->bufferSize = 0;
156 setBufferSize(MQTT_MAX_PACKET_SIZE);
157 setKeepAlive(MQTT_KEEPALIVE);
158 setSocketTimeout(MQTT_SOCKET_TIMEOUT);
159 }
160
161 PubSubClient::~PubSubClient() {
162 free(this->buffer);
163 }
164
165 boolean PubSubClient::connect(const char *id) {
166 return connect(id,NULL,NULL,0,0,0,0,1);
167 }
168
169 boolean PubSubClient::connect(const char *id, const char *user, const char *pass) {
170 return connect(id,user,pass,0,0,0,0,1);
171 }
172
173 boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
174 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1);
175 }
176
177 boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
178 return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1);
179 }
180
181 boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) {
182 if (!connected()) {
183 int result = 0;
184
185
186 if(_client->connected()) {
187 result = 1;
188 } else {
189 if (domain != NULL) {
190 result = _client->connect(this->domain, this->port);
191 } else {
192 result = _client->connect(this->ip, this->port);
193 }
194 }
195
196 if (result == 1) {
197 nextMsgId = 1;
198 // Leave room in the buffer for header and variable length field
199 uint16_t length = MQTT_MAX_HEADER_SIZE;
200 unsigned int j;
201
202 #if MQTT_VERSION == MQTT_VERSION_3_1
203 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
204 #define MQTT_HEADER_VERSION_LENGTH 9
205 #elif MQTT_VERSION == MQTT_VERSION_3_1_1
206 uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION};
207 #define MQTT_HEADER_VERSION_LENGTH 7
208 #endif
209 for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) {
210 this->buffer[length++] = d[j];
211 }
212
213 uint8_t v;
214 if (willTopic) {
215 v = 0x04|(willQos<<3)|(willRetain<<5);
216 } else {
217 v = 0x00;
218 }
219 if (cleanSession) {
220 v = v|0x02;
221 }
222
223 if(user != NULL) {
224 v = v|0x80;
225
226 if(pass != NULL) {
227 v = v|(0x80>>1);
228 }
229 }
230 this->buffer[length++] = v;
231
232 this->buffer[length++] = ((this->keepAlive) >> 8);
233 this->buffer[length++] = ((this->keepAlive) & 0xFF);
234
235 CHECK_STRING_LENGTH(length,id)
236 length = writeString(id,this->buffer,length);
237 if (willTopic) {
238 CHECK_STRING_LENGTH(length,willTopic)
239 length = writeString(willTopic,this->buffer,length);
240 CHECK_STRING_LENGTH(length,willMessage)
241 length = writeString(willMessage,this->buffer,length);
242 }
243
244 if(user != NULL) {
245 CHECK_STRING_LENGTH(length,user)
246 length = writeString(user,this->buffer,length);
247 if(pass != NULL) {
248 CHECK_STRING_LENGTH(length,pass)
249 length = writeString(pass,this->buffer,length);
250 }
251 }
252
253 write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE);
254
255 lastInActivity = lastOutActivity = millis();
256
257 while (!_client->available()) {
258 unsigned long t = millis();
259 if (t-lastInActivity >= ((int32_t) this->socketTimeout*1000UL)) {
260 _state = MQTT_CONNECTION_TIMEOUT;
261 _client->stop();
262 return false;
263 }
264 }
265 uint8_t llen;
266 uint32_t len = readPacket(&llen);
267
268 if (len == 4) {
269 if (buffer[3] == 0) {
270 lastInActivity = millis();
271 pingOutstanding = false;
272 _state = MQTT_CONNECTED;
273 return true;
274 } else {
275 _state = buffer[3];
276 }
277 }
278 _client->stop();
279 } else {
280 _state = MQTT_CONNECT_FAILED;
281 }
282 return false;
283 }
284 return true;
285 }
286
287 // reads a byte into result
288 boolean PubSubClient::readByte(uint8_t * result) {
289 uint32_t previousMillis = millis();
290 while(!_client->available()) {
291 yield();
292 uint32_t currentMillis = millis();
293 if(currentMillis - previousMillis >= ((int32_t) this->socketTimeout * 1000)){
294 return false;
295 }
296 }
297 *result = _client->read();
298 return true;
299 }
300
301 // reads a byte into result[*index] and increments index
302 boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
303 uint16_t current_index = *index;
304 uint8_t * write_address = &(result[current_index]);
305 if(readByte(write_address)){
306 *index = current_index + 1;
307 return true;
308 }
309 return false;
310 }
311
312 uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
313 uint16_t len = 0;
314 if(!readByte(this->buffer, &len)) return 0;
315 bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH;
316 uint32_t multiplier = 1;
317 uint32_t length = 0;
318 uint8_t digit = 0;
319 uint16_t skip = 0;
320 uint32_t start = 0;
321
322 do {
323 if (len == 5) {
324 // Invalid remaining length encoding - kill the connection
325 _state = MQTT_DISCONNECTED;
326 _client->stop();
327 return 0;
328 }
329 if(!readByte(&digit)) return 0;
330 this->buffer[len++] = digit;
331 length += (digit & 127) * multiplier;
332 multiplier <<=7; //multiplier *= 128
333 } while ((digit & 128) != 0);
334 *lengthLength = len-1;
335
336 if (isPublish) {
337 // Read in topic length to calculate bytes to skip over for Stream writing
338 if(!readByte(this->buffer, &len)) return 0;
339 if(!readByte(this->buffer, &len)) return 0;
340 skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2];
341 start = 2;
342 if (this->buffer[0]&MQTTQOS1) {
343 // skip message id
344 skip += 2;
345 }
346 }
347 uint32_t idx = len;
348
349 for (uint32_t i = start;i<length;i++) {
350 if(!readByte(&digit)) return 0;
351 if (this->stream) {
352 if (isPublish && idx-*lengthLength-2>skip) {
353 this->stream->write(digit);
354 }
355 }
356
357 if (len < this->bufferSize) {
358 this->buffer[len] = digit;
359 len++;
360 }
361 idx++;
362 }
363
364 if (!this->stream && idx > this->bufferSize) {
365 len = 0; // This will cause the packet to be ignored.
366 }
367 return len;
368 }
369
370 boolean PubSubClient::loop() {
371 if (connected()) {
372 unsigned long t = millis();
373 if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) {
374 if (pingOutstanding) {
375 this->_state = MQTT_CONNECTION_TIMEOUT;
376 _client->stop();
377 return false;
378 } else {
379 this->buffer[0] = MQTTPINGREQ;
380 this->buffer[1] = 0;
381 _client->write(this->buffer,2);
382 lastOutActivity = t;
383 lastInActivity = t;
384 pingOutstanding = true;
385 }
386 }
387 if (_client->available()) {
388 uint8_t llen;
389 uint16_t len = readPacket(&llen);
390 uint16_t msgId = 0;
391 uint8_t *payload;
392 if (len > 0) {
393 lastInActivity = t;
394 uint8_t type = this->buffer[0]&0xF0;
395 if (type == MQTTPUBLISH) {
396 if (callback) {
397 uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */
398 memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
399 this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
400 char *topic = (char*) this->buffer+llen+2;
401 // msgId only present for QOS>0
402 if ((this->buffer[0]&0x06) == MQTTQOS1) {
403 msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1];
404 payload = this->buffer+llen+3+tl+2;
405 callback(topic,payload,len-llen-3-tl-2);
406
407 this->buffer[0] = MQTTPUBACK;
408 this->buffer[1] = 2;
409 this->buffer[2] = (msgId >> 8);
410 this->buffer[3] = (msgId & 0xFF);
411 _client->write(this->buffer,4);
412 lastOutActivity = t;
413
414 } else {
415 payload = this->buffer+llen+3+tl;
416 callback(topic,payload,len-llen-3-tl);
417 }
418 }
419 } else if (type == MQTTPINGREQ) {
420 this->buffer[0] = MQTTPINGRESP;
421 this->buffer[1] = 0;
422 _client->write(this->buffer,2);
423 } else if (type == MQTTPINGRESP) {
424 pingOutstanding = false;
425 }
426 } else if (!connected()) {
427 // readPacket has closed the connection
428 return false;
429 }
430 }
431 return true;
432 }
433 return false;
434 }
435
436 boolean PubSubClient::publish(const char* topic, const char* payload) {
437 return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,false);
438 }
439
440 boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) {
441 return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained);
442 }
443
444 boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) {
445 return publish(topic, payload, plength, false);
446 }
447
448 boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
449 if (connected()) {
450 if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2+strnlen(topic, this->bufferSize) + plength) {
451 // Too long
452 return false;
453 }
454 // Leave room in the buffer for header and variable length field
455 uint16_t length = MQTT_MAX_HEADER_SIZE;
456 length = writeString(topic,this->buffer,length);
457
458 // Add payload
459 uint16_t i;
460 for (i=0;i<plength;i++) {
461 this->buffer[length++] = payload[i];
462 }
463
464 // Write the header
465 uint8_t header = MQTTPUBLISH;
466 if (retained) {
467 header |= 1;
468 }
469 return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE);
470 }
471 return false;
472 }
473
474 boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) {
475 return publish_P(topic, (const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0, retained);
476 }
477
478 boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
479 uint8_t llen = 0;
480 uint8_t digit;
481 unsigned int rc = 0;
482 uint16_t tlen;
483 unsigned int pos = 0;
484 unsigned int i;
485 uint8_t header;
486 unsigned int len;
487 int expectedLength;
488
489 if (!connected()) {
490 return false;
491 }
492
493 tlen = strnlen(topic, this->bufferSize);
494
495 header = MQTTPUBLISH;
496 if (retained) {
497 header |= 1;
498 }
499 this->buffer[pos++] = header;
500 len = plength + 2 + tlen;
501 do {
502 digit = len & 127; //digit = len %128
503 len >>= 7; //len = len / 128
504 if (len > 0) {
505 digit |= 0x80;
506 }
507 this->buffer[pos++] = digit;
508 llen++;
509 } while(len>0);
510
511 pos = writeString(topic,this->buffer,pos);
512
513 rc += _client->write(this->buffer,pos);
514
515 for (i=0;i<plength;i++) {
516 rc += _client->write((char)pgm_read_byte_near(payload + i));
517 }
518
519 lastOutActivity = millis();
520
521 expectedLength = 1 + llen + 2 + tlen + plength;
522
523 return (rc == expectedLength);
524 }
525
526 boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
527 if (connected()) {
528 // Send the header and variable length field
529 uint16_t length = MQTT_MAX_HEADER_SIZE;
530 length = writeString(topic,this->buffer,length);
531 uint8_t header = MQTTPUBLISH;
532 if (retained) {
533 header |= 1;
534 }
535 size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE);
536 uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
537 lastOutActivity = millis();
538 return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
539 }
540 return false;
541 }
542
543 int PubSubClient::endPublish() {
544 return 1;
545 }
546
547 size_t PubSubClient::write(uint8_t data) {
548 lastOutActivity = millis();
549 return _client->write(data);
550 }
551
552 size_t PubSubClient::write(const uint8_t *buffer, size_t size) {
553 lastOutActivity = millis();
554 return _client->write(buffer,size);
555 }
556
557 size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) {
558 uint8_t lenBuf[4];
559 uint8_t llen = 0;
560 uint8_t digit;
561 uint8_t pos = 0;
562 uint16_t len = length;
563 do {
564
565 digit = len & 127; //digit = len %128
566 len >>= 7; //len = len / 128
567 if (len > 0) {
568 digit |= 0x80;
569 }
570 lenBuf[pos++] = digit;
571 llen++;
572 } while(len>0);
573
574 buf[4-llen] = header;
575 for (int i=0;i<llen;i++) {
576 buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
577 }
578 return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
579 }
580
581 boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
582 uint16_t rc;
583 uint8_t hlen = buildHeader(header, buf, length);
584
585 #ifdef MQTT_MAX_TRANSFER_SIZE
586 uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
587 uint16_t bytesRemaining = length+hlen; //Match the length type
588 uint8_t bytesToWrite;
589 boolean result = true;
590 while((bytesRemaining > 0) && result) {
591 bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining;
592 rc = _client->write(writeBuf,bytesToWrite);
593 result = (rc == bytesToWrite);
594 bytesRemaining -= rc;
595 writeBuf += rc;
596 }
597 return result;
598 #else
599 rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen);
600 lastOutActivity = millis();
601 return (rc == hlen+length);
602 #endif
603 }
604
605 boolean PubSubClient::subscribe(const char* topic) {
606 return subscribe(topic, 0);
607 }
608
609 boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
610 size_t topicLength = strnlen(topic, this->bufferSize);
611 if (topic == 0) {
612 return false;
613 }
614 if (qos > 1) {
615 return false;
616 }
617 if (this->bufferSize < 9 + topicLength) {
618 // Too long
619 return false;
620 }
621 if (connected()) {
622 // Leave room in the buffer for header and variable length field
623 uint16_t length = MQTT_MAX_HEADER_SIZE;
624 nextMsgId++;
625 if (nextMsgId == 0) {
626 nextMsgId = 1;
627 }
628 this->buffer[length++] = (nextMsgId >> 8);
629 this->buffer[length++] = (nextMsgId & 0xFF);
630 length = writeString((char*)topic, this->buffer,length);
631 this->buffer[length++] = qos;
632 return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
633 }
634 return false;
635 }
636
637 boolean PubSubClient::unsubscribe(const char* topic) {
638 size_t topicLength = strnlen(topic, this->bufferSize);
639 if (topic == 0) {
640 return false;
641 }
642 if (this->bufferSize < 9 + topicLength) {
643 // Too long
644 return false;
645 }
646 if (connected()) {
647 uint16_t length = MQTT_MAX_HEADER_SIZE;
648 nextMsgId++;
649 if (nextMsgId == 0) {
650 nextMsgId = 1;
651 }
652 this->buffer[length++] = (nextMsgId >> 8);
653 this->buffer[length++] = (nextMsgId & 0xFF);
654 length = writeString(topic, this->buffer,length);
655 return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
656 }
657 return false;
658 }
659
660 void PubSubClient::disconnect() {
661 this->buffer[0] = MQTTDISCONNECT;
662 this->buffer[1] = 0;
663 _client->write(this->buffer,2);
664 _state = MQTT_DISCONNECTED;
665 _client->flush();
666 _client->stop();
667 lastInActivity = lastOutActivity = millis();
668 }
669
670 uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) {
671 const char* idp = string;
672 uint16_t i = 0;
673 pos += 2;
674 while (*idp) {
675 buf[pos++] = *idp++;
676 i++;
677 }
678 buf[pos-i-2] = (i >> 8);
679 buf[pos-i-1] = (i & 0xFF);
680 return pos;
681 }
682
683
684 boolean PubSubClient::connected() {
685 boolean rc;
686 if (_client == NULL ) {
687 rc = false;
688 } else {
689 rc = (int)_client->connected();
690 if (!rc) {
691 if (this->_state == MQTT_CONNECTED) {
692 this->_state = MQTT_CONNECTION_LOST;
693 _client->flush();
694 _client->stop();
695 }
696 } else {
697 return this->_state == MQTT_CONNECTED;
698 }
699 }
700 return rc;
701 }
702
703 PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) {
704 IPAddress addr(ip[0],ip[1],ip[2],ip[3]);
705 return setServer(addr,port);
706 }
707
708 PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
709 this->ip = ip;
710 this->port = port;
711 this->domain = NULL;
712 return *this;
713 }
714
715 PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) {
716 this->domain = domain;
717 this->port = port;
718 return *this;
719 }
720
721 PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
722 this->callback = callback;
723 return *this;
724 }
725
726 PubSubClient& PubSubClient::setClient(Client& client){
727 this->_client = &client;
728 return *this;
729 }
730
731 PubSubClient& PubSubClient::setStream(Stream& stream){
732 this->stream = &stream;
733 return *this;
734 }
735
736 int PubSubClient::state() {
737 return this->_state;
738 }
739
740 boolean PubSubClient::setBufferSize(uint16_t size) {
741 if (size == 0) {
742 // Cannot set it back to 0
743 return false;
744 }
745 if (this->bufferSize == 0) {
746 this->buffer = (uint8_t*)malloc(size);
747 } else {
748 uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size);
749 if (newBuffer != NULL) {
750 this->buffer = newBuffer;
751 } else {
752 return false;
753 }
754 }
755 this->bufferSize = size;
756 return (this->buffer != NULL);
757 }
758
759 uint16_t PubSubClient::getBufferSize() {
760 return this->bufferSize;
761 }
762 PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) {
763 this->keepAlive = keepAlive;
764 return *this;
765 }
766 PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) {
767 this->socketTimeout = timeout;
768 return *this;
769 }
Die Library besteht nur aus einem Programm (cpp+h,769 lines (689 sloc) 23.4 KB). Das Programm hat alleine 14(!) Konstruktoren. Irre. Die fliegen bei mir alle bis auf einen raus! Weiterhin 5 Überladungen für das connect und 6 für das publish, da kann man locker schon mal 200 Zeilen Code einsparen.
MQTT-Control
- connect 181
- publish 6-fach 436 public
- beginPublish 526
- endPublish 543
- subscribe 605
- unsubscribe 637
- disconnect 660
- readByte 282 2-fach benutzt von: readPacket
- readPacket 312
- write 552
- loop 370 public
- buildHeader 557
- write 581
- writeString 670 benutzt: connect, publish, publish_P, subscribe, unsubscribe, disconnect
- connected 684 benutzt von: subscribe, unsubscribe, connect, loop, publish, publish_P, beginPublish
Konstruktor:
- setServer 703 3-fach
- setCallback 721
- setClient 726
- setStream 731
- setBufferSize 740
- setKeepAlive 762
- setSocketTimeout 766
- state 736
- getBufferSize 759
Quellen
https://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/index.html
Die automatische Aktualisierung der Kommentare aktivieren.