PubSubClient Library

Aus TippvomTibb
Version vom 7. Februar 2021, 08:46 Uhr von Chris T. Ludwig (Diskussion | Beiträge)
(Unterschied) ← Nächstältere Version | Aktuelle Version (Unterschied) | Nächstjüngere Version → (Unterschied)
Zur Navigation springen Zur Suche springen

Es geht um die Library die auf GitHub unter folgendem Link zu finden ist.

https://github.com/knolleary/pubsubclient

Als Orientierungshilfe ist sie ein gelungener Einstieg, wenn auch die Einschränkungen etwas weh tun.

It can only publish QoS 0 messages. It can subscribe at QoS 0 or QoS 1.

Unten auf der Seite gibt es einen Hinweis, dass die Library nicht mit dem ENC28J60 funktioniert, aber nicht warum!

Also dem Link folgen und die empfohlene Library gleich mit untersuchen:-(

https://github.com/njh/NanodeMQTT

Empfohlen wird die Library für das Nanode-Board. Auf der Seite sind aber schon viele Links tot, so dass ich davon ausgehe, dass das Board sein Lebensende überschritten hat. Die Kurzanalyse des Boards und die entsprechenden Libs packe ich in eine neue Seite.


Receiver Sketch
  1 /*
  2 
  3   PubSubClient.cpp - A simple client for MQTT.
  4   Nick O'Leary
  5   http://knolleary.net
  6 */
  7 
  8 #include "PubSubClient.h"
  9 #include "Arduino.h"
 10 
 11 PubSubClient::PubSubClient() {
 12     this->_state = MQTT_DISCONNECTED;
 13     this->_client = NULL;
 14     this->stream = NULL;
 15     setCallback(NULL);
 16     this->bufferSize = 0;
 17     setBufferSize(MQTT_MAX_PACKET_SIZE);
 18     setKeepAlive(MQTT_KEEPALIVE);
 19     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
 20 }
 21 
 22 PubSubClient::PubSubClient(Client& client) {
 23     this->_state = MQTT_DISCONNECTED;
 24     setClient(client);
 25     this->stream = NULL;
 26     this->bufferSize = 0;
 27     setBufferSize(MQTT_MAX_PACKET_SIZE);
 28     setKeepAlive(MQTT_KEEPALIVE);
 29     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
 30 }
 31 
 32 PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
 33     this->_state = MQTT_DISCONNECTED;
 34     setServer(addr, port);
 35     setClient(client);
 36     this->stream = NULL;
 37     this->bufferSize = 0;
 38     setBufferSize(MQTT_MAX_PACKET_SIZE);
 39     setKeepAlive(MQTT_KEEPALIVE);
 40     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
 41 }
 42 PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
 43     this->_state = MQTT_DISCONNECTED;
 44     setServer(addr,port);
 45     setClient(client);
 46     setStream(stream);
 47     this->bufferSize = 0;
 48     setBufferSize(MQTT_MAX_PACKET_SIZE);
 49     setKeepAlive(MQTT_KEEPALIVE);
 50     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
 51 }
 52 PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
 53     this->_state = MQTT_DISCONNECTED;
 54     setServer(addr, port);
 55     setCallback(callback);
 56     setClient(client);
 57     this->stream = NULL;
 58     this->bufferSize = 0;
 59     setBufferSize(MQTT_MAX_PACKET_SIZE);
 60     setKeepAlive(MQTT_KEEPALIVE);
 61     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
 62 }
 63 PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
 64     this->_state = MQTT_DISCONNECTED;
 65     setServer(addr,port);
 66     setCallback(callback);
 67     setClient(client);
 68     setStream(stream);
 69     this->bufferSize = 0;
 70     setBufferSize(MQTT_MAX_PACKET_SIZE);
 71     setKeepAlive(MQTT_KEEPALIVE);
 72     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
 73 }
 74 
 75 PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
 76     this->_state = MQTT_DISCONNECTED;
 77     setServer(ip, port);
 78     setClient(client);
 79     this->stream = NULL;
 80     this->bufferSize = 0;
 81     setBufferSize(MQTT_MAX_PACKET_SIZE);
 82     setKeepAlive(MQTT_KEEPALIVE);
 83     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
 84 }
 85 PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
 86     this->_state = MQTT_DISCONNECTED;
 87     setServer(ip,port);
 88     setClient(client);
 89     setStream(stream);
 90     this->bufferSize = 0;
 91     setBufferSize(MQTT_MAX_PACKET_SIZE);
 92     setKeepAlive(MQTT_KEEPALIVE);
 93     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
 94 }
 95 PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
 96     this->_state = MQTT_DISCONNECTED;
 97     setServer(ip, port);
 98     setCallback(callback);
 99     setClient(client);
100     this->stream = NULL;
101     this->bufferSize = 0;
102     setBufferSize(MQTT_MAX_PACKET_SIZE);
103     setKeepAlive(MQTT_KEEPALIVE);
104     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
105 }
106 PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
107     this->_state = MQTT_DISCONNECTED;
108     setServer(ip,port);
109     setCallback(callback);
110     setClient(client);
111     setStream(stream);
112     this->bufferSize = 0;
113     setBufferSize(MQTT_MAX_PACKET_SIZE);
114     setKeepAlive(MQTT_KEEPALIVE);
115     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
116 }
117 
118 PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
119     this->_state = MQTT_DISCONNECTED;
120     setServer(domain,port);
121     setClient(client);
122     this->stream = NULL;
123     this->bufferSize = 0;
124     setBufferSize(MQTT_MAX_PACKET_SIZE);
125     setKeepAlive(MQTT_KEEPALIVE);
126     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
127 }
128 PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
129     this->_state = MQTT_DISCONNECTED;
130     setServer(domain,port);
131     setClient(client);
132     setStream(stream);
133     this->bufferSize = 0;
134     setBufferSize(MQTT_MAX_PACKET_SIZE);
135     setKeepAlive(MQTT_KEEPALIVE);
136     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
137 }
138 PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
139     this->_state = MQTT_DISCONNECTED;
140     setServer(domain,port);
141     setCallback(callback);
142     setClient(client);
143     this->stream = NULL;
144     this->bufferSize = 0;
145     setBufferSize(MQTT_MAX_PACKET_SIZE);
146     setKeepAlive(MQTT_KEEPALIVE);
147     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
148 }
149 PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
150     this->_state = MQTT_DISCONNECTED;
151     setServer(domain,port);
152     setCallback(callback);
153     setClient(client);
154     setStream(stream);
155     this->bufferSize = 0;
156     setBufferSize(MQTT_MAX_PACKET_SIZE);
157     setKeepAlive(MQTT_KEEPALIVE);
158     setSocketTimeout(MQTT_SOCKET_TIMEOUT);
159 }
160 
161 PubSubClient::~PubSubClient() {
162   free(this->buffer);
163 }
164 
165 boolean PubSubClient::connect(const char *id) {
166     return connect(id,NULL,NULL,0,0,0,0,1);
167 }
168 
169 boolean PubSubClient::connect(const char *id, const char *user, const char *pass) {
170     return connect(id,user,pass,0,0,0,0,1);
171 }
172 
173 boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
174     return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1);
175 }
176 
177 boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
178     return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1);
179 }
180 
181 boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) {
182     if (!connected()) {
183         int result = 0;
184 
185 
186         if(_client->connected()) {
187             result = 1;
188         } else {
189             if (domain != NULL) {
190                 result = _client->connect(this->domain, this->port);
191             } else {
192                 result = _client->connect(this->ip, this->port);
193             }
194         }
195 
196         if (result == 1) {
197             nextMsgId = 1;
198             // Leave room in the buffer for header and variable length field
199             uint16_t length = MQTT_MAX_HEADER_SIZE;
200             unsigned int j;
201 
202 #if MQTT_VERSION == MQTT_VERSION_3_1
203             uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
204 #define MQTT_HEADER_VERSION_LENGTH 9
205 #elif MQTT_VERSION == MQTT_VERSION_3_1_1
206             uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION};
207 #define MQTT_HEADER_VERSION_LENGTH 7
208 #endif
209             for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) {
210                 this->buffer[length++] = d[j];
211             }
212 
213             uint8_t v;
214             if (willTopic) {
215                 v = 0x04|(willQos<<3)|(willRetain<<5);
216             } else {
217                 v = 0x00;
218             }
219             if (cleanSession) {
220                 v = v|0x02;
221             }
222 
223             if(user != NULL) {
224                 v = v|0x80;
225 
226                 if(pass != NULL) {
227                     v = v|(0x80>>1);
228                 }
229             }
230             this->buffer[length++] = v;
231 
232             this->buffer[length++] = ((this->keepAlive) >> 8);
233             this->buffer[length++] = ((this->keepAlive) & 0xFF);
234 
235             CHECK_STRING_LENGTH(length,id)
236             length = writeString(id,this->buffer,length);
237             if (willTopic) {
238                 CHECK_STRING_LENGTH(length,willTopic)
239                 length = writeString(willTopic,this->buffer,length);
240                 CHECK_STRING_LENGTH(length,willMessage)
241                 length = writeString(willMessage,this->buffer,length);
242             }
243 
244             if(user != NULL) {
245                 CHECK_STRING_LENGTH(length,user)
246                 length = writeString(user,this->buffer,length);
247                 if(pass != NULL) {
248                     CHECK_STRING_LENGTH(length,pass)
249                     length = writeString(pass,this->buffer,length);
250                 }
251             }
252 
253             write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE);
254 
255             lastInActivity = lastOutActivity = millis();
256 
257             while (!_client->available()) {
258                 unsigned long t = millis();
259                 if (t-lastInActivity >= ((int32_t) this->socketTimeout*1000UL)) {
260                     _state = MQTT_CONNECTION_TIMEOUT;
261                     _client->stop();
262                     return false;
263                 }
264             }
265             uint8_t llen;
266             uint32_t len = readPacket(&llen);
267 
268             if (len == 4) {
269                 if (buffer[3] == 0) {
270                     lastInActivity = millis();
271                     pingOutstanding = false;
272                     _state = MQTT_CONNECTED;
273                     return true;
274                 } else {
275                     _state = buffer[3];
276                 }
277             }
278             _client->stop();
279         } else {
280             _state = MQTT_CONNECT_FAILED;
281         }
282         return false;
283     }
284     return true;
285 }
286 
287 // reads a byte into result
288 boolean PubSubClient::readByte(uint8_t * result) {
289    uint32_t previousMillis = millis();
290    while(!_client->available()) {
291      yield();
292      uint32_t currentMillis = millis();
293      if(currentMillis - previousMillis >= ((int32_t) this->socketTimeout * 1000)){
294        return false;
295      }
296    }
297    *result = _client->read();
298    return true;
299 }
300 
301 // reads a byte into result[*index] and increments index
302 boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
303   uint16_t current_index = *index;
304   uint8_t * write_address = &(result[current_index]);
305   if(readByte(write_address)){
306     *index = current_index + 1;
307     return true;
308   }
309   return false;
310 }
311 
312 uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
313     uint16_t len = 0;
314     if(!readByte(this->buffer, &len)) return 0;
315     bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH;
316     uint32_t multiplier = 1;
317     uint32_t length = 0;
318     uint8_t digit = 0;
319     uint16_t skip = 0;
320     uint32_t start = 0;
321 
322     do {
323         if (len == 5) {
324             // Invalid remaining length encoding - kill the connection
325             _state = MQTT_DISCONNECTED;
326             _client->stop();
327             return 0;
328         }
329         if(!readByte(&digit)) return 0;
330         this->buffer[len++] = digit;
331         length += (digit & 127) * multiplier;
332         multiplier <<=7; //multiplier *= 128
333     } while ((digit & 128) != 0);
334     *lengthLength = len-1;
335 
336     if (isPublish) {
337         // Read in topic length to calculate bytes to skip over for Stream writing
338         if(!readByte(this->buffer, &len)) return 0;
339         if(!readByte(this->buffer, &len)) return 0;
340         skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2];
341         start = 2;
342         if (this->buffer[0]&MQTTQOS1) {
343             // skip message id
344             skip += 2;
345         }
346     }
347     uint32_t idx = len;
348 
349     for (uint32_t i = start;i<length;i++) {
350         if(!readByte(&digit)) return 0;
351         if (this->stream) {
352             if (isPublish && idx-*lengthLength-2>skip) {
353                 this->stream->write(digit);
354             }
355         }
356 
357         if (len < this->bufferSize) {
358             this->buffer[len] = digit;
359             len++;
360         }
361         idx++;
362     }
363 
364     if (!this->stream && idx > this->bufferSize) {
365         len = 0; // This will cause the packet to be ignored.
366     }
367     return len;
368 }
369 
370 boolean PubSubClient::loop() {
371     if (connected()) {
372         unsigned long t = millis();
373         if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) {
374             if (pingOutstanding) {
375                 this->_state = MQTT_CONNECTION_TIMEOUT;
376                 _client->stop();
377                 return false;
378             } else {
379                 this->buffer[0] = MQTTPINGREQ;
380                 this->buffer[1] = 0;
381                 _client->write(this->buffer,2);
382                 lastOutActivity = t;
383                 lastInActivity = t;
384                 pingOutstanding = true;
385             }
386         }
387         if (_client->available()) {
388             uint8_t llen;
389             uint16_t len = readPacket(&llen);
390             uint16_t msgId = 0;
391             uint8_t *payload;
392             if (len > 0) {
393                 lastInActivity = t;
394                 uint8_t type = this->buffer[0]&0xF0;
395                 if (type == MQTTPUBLISH) {
396                     if (callback) {
397                         uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */
398                         memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
399                         this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
400                         char *topic = (char*) this->buffer+llen+2;
401                         // msgId only present for QOS>0
402                         if ((this->buffer[0]&0x06) == MQTTQOS1) {
403                             msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1];
404                             payload = this->buffer+llen+3+tl+2;
405                             callback(topic,payload,len-llen-3-tl-2);
406 
407                             this->buffer[0] = MQTTPUBACK;
408                             this->buffer[1] = 2;
409                             this->buffer[2] = (msgId >> 8);
410                             this->buffer[3] = (msgId & 0xFF);
411                             _client->write(this->buffer,4);
412                             lastOutActivity = t;
413 
414                         } else {
415                             payload = this->buffer+llen+3+tl;
416                             callback(topic,payload,len-llen-3-tl);
417                         }
418                     }
419                 } else if (type == MQTTPINGREQ) {
420                     this->buffer[0] = MQTTPINGRESP;
421                     this->buffer[1] = 0;
422                     _client->write(this->buffer,2);
423                 } else if (type == MQTTPINGRESP) {
424                     pingOutstanding = false;
425                 }
426             } else if (!connected()) {
427                 // readPacket has closed the connection
428                 return false;
429             }
430         }
431         return true;
432     }
433     return false;
434 }
435 
436 boolean PubSubClient::publish(const char* topic, const char* payload) {
437     return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,false);
438 }
439 
440 boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) {
441     return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained);
442 }
443 
444 boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) {
445     return publish(topic, payload, plength, false);
446 }
447 
448 boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
449     if (connected()) {
450         if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2+strnlen(topic, this->bufferSize) + plength) {
451             // Too long
452             return false;
453         }
454         // Leave room in the buffer for header and variable length field
455         uint16_t length = MQTT_MAX_HEADER_SIZE;
456         length = writeString(topic,this->buffer,length);
457 
458         // Add payload
459         uint16_t i;
460         for (i=0;i<plength;i++) {
461             this->buffer[length++] = payload[i];
462         }
463 
464         // Write the header
465         uint8_t header = MQTTPUBLISH;
466         if (retained) {
467             header |= 1;
468         }
469         return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE);
470     }
471     return false;
472 }
473 
474 boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) {
475     return publish_P(topic, (const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0, retained);
476 }
477 
478 boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
479     uint8_t llen = 0;
480     uint8_t digit;
481     unsigned int rc = 0;
482     uint16_t tlen;
483     unsigned int pos = 0;
484     unsigned int i;
485     uint8_t header;
486     unsigned int len;
487     int expectedLength;
488 
489     if (!connected()) {
490         return false;
491     }
492 
493     tlen = strnlen(topic, this->bufferSize);
494 
495     header = MQTTPUBLISH;
496     if (retained) {
497         header |= 1;
498     }
499     this->buffer[pos++] = header;
500     len = plength + 2 + tlen;
501     do {
502         digit = len  & 127; //digit = len %128
503         len >>= 7; //len = len / 128
504         if (len > 0) {
505             digit |= 0x80;
506         }
507         this->buffer[pos++] = digit;
508         llen++;
509     } while(len>0);
510 
511     pos = writeString(topic,this->buffer,pos);
512 
513     rc += _client->write(this->buffer,pos);
514 
515     for (i=0;i<plength;i++) {
516         rc += _client->write((char)pgm_read_byte_near(payload + i));
517     }
518 
519     lastOutActivity = millis();
520 
521     expectedLength = 1 + llen + 2 + tlen + plength;
522 
523     return (rc == expectedLength);
524 }
525 
526 boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
527     if (connected()) {
528         // Send the header and variable length field
529         uint16_t length = MQTT_MAX_HEADER_SIZE;
530         length = writeString(topic,this->buffer,length);
531         uint8_t header = MQTTPUBLISH;
532         if (retained) {
533             header |= 1;
534         }
535         size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE);
536         uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
537         lastOutActivity = millis();
538         return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
539     }
540     return false;
541 }
542 
543 int PubSubClient::endPublish() {
544  return 1;
545 }
546 
547 size_t PubSubClient::write(uint8_t data) {
548     lastOutActivity = millis();
549     return _client->write(data);
550 }
551 
552 size_t PubSubClient::write(const uint8_t *buffer, size_t size) {
553     lastOutActivity = millis();
554     return _client->write(buffer,size);
555 }
556 
557 size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) {
558     uint8_t lenBuf[4];
559     uint8_t llen = 0;
560     uint8_t digit;
561     uint8_t pos = 0;
562     uint16_t len = length;
563     do {
564 
565         digit = len  & 127; //digit = len %128
566         len >>= 7; //len = len / 128
567         if (len > 0) {
568             digit |= 0x80;
569         }
570         lenBuf[pos++] = digit;
571         llen++;
572     } while(len>0);
573 
574     buf[4-llen] = header;
575     for (int i=0;i<llen;i++) {
576         buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
577     }
578     return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
579 }
580 
581 boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
582     uint16_t rc;
583     uint8_t hlen = buildHeader(header, buf, length);
584 
585 #ifdef MQTT_MAX_TRANSFER_SIZE
586     uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
587     uint16_t bytesRemaining = length+hlen;  //Match the length type
588     uint8_t bytesToWrite;
589     boolean result = true;
590     while((bytesRemaining > 0) && result) {
591         bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining;
592         rc = _client->write(writeBuf,bytesToWrite);
593         result = (rc == bytesToWrite);
594         bytesRemaining -= rc;
595         writeBuf += rc;
596     }
597     return result;
598 #else
599     rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen);
600     lastOutActivity = millis();
601     return (rc == hlen+length);
602 #endif
603 }
604 
605 boolean PubSubClient::subscribe(const char* topic) {
606     return subscribe(topic, 0);
607 }
608 
609 boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
610     size_t topicLength = strnlen(topic, this->bufferSize);
611     if (topic == 0) {
612         return false;
613     }
614     if (qos > 1) {
615         return false;
616     }
617     if (this->bufferSize < 9 + topicLength) {
618         // Too long
619         return false;
620     }
621     if (connected()) {
622         // Leave room in the buffer for header and variable length field
623         uint16_t length = MQTT_MAX_HEADER_SIZE;
624         nextMsgId++;
625         if (nextMsgId == 0) {
626             nextMsgId = 1;
627         }
628         this->buffer[length++] = (nextMsgId >> 8);
629         this->buffer[length++] = (nextMsgId & 0xFF);
630         length = writeString((char*)topic, this->buffer,length);
631         this->buffer[length++] = qos;
632         return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
633     }
634     return false;
635 }
636 
637 boolean PubSubClient::unsubscribe(const char* topic) {
638 	size_t topicLength = strnlen(topic, this->bufferSize);
639     if (topic == 0) {
640         return false;
641     }
642     if (this->bufferSize < 9 + topicLength) {
643         // Too long
644         return false;
645     }
646     if (connected()) {
647         uint16_t length = MQTT_MAX_HEADER_SIZE;
648         nextMsgId++;
649         if (nextMsgId == 0) {
650             nextMsgId = 1;
651         }
652         this->buffer[length++] = (nextMsgId >> 8);
653         this->buffer[length++] = (nextMsgId & 0xFF);
654         length = writeString(topic, this->buffer,length);
655         return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
656     }
657     return false;
658 }
659 
660 void PubSubClient::disconnect() {
661     this->buffer[0] = MQTTDISCONNECT;
662     this->buffer[1] = 0;
663     _client->write(this->buffer,2);
664     _state = MQTT_DISCONNECTED;
665     _client->flush();
666     _client->stop();
667     lastInActivity = lastOutActivity = millis();
668 }
669 
670 uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) {
671     const char* idp = string;
672     uint16_t i = 0;
673     pos += 2;
674     while (*idp) {
675         buf[pos++] = *idp++;
676         i++;
677     }
678     buf[pos-i-2] = (i >> 8);
679     buf[pos-i-1] = (i & 0xFF);
680     return pos;
681 }
682 
683 
684 boolean PubSubClient::connected() {
685     boolean rc;
686     if (_client == NULL ) {
687         rc = false;
688     } else {
689         rc = (int)_client->connected();
690         if (!rc) {
691             if (this->_state == MQTT_CONNECTED) {
692                 this->_state = MQTT_CONNECTION_LOST;
693                 _client->flush();
694                 _client->stop();
695             }
696         } else {
697             return this->_state == MQTT_CONNECTED;
698         }
699     }
700     return rc;
701 }
702 
703 PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) {
704     IPAddress addr(ip[0],ip[1],ip[2],ip[3]);
705     return setServer(addr,port);
706 }
707 
708 PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
709     this->ip = ip;
710     this->port = port;
711     this->domain = NULL;
712     return *this;
713 }
714 
715 PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) {
716     this->domain = domain;
717     this->port = port;
718     return *this;
719 }
720 
721 PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
722     this->callback = callback;
723     return *this;
724 }
725 
726 PubSubClient& PubSubClient::setClient(Client& client){
727     this->_client = &client;
728     return *this;
729 }
730 
731 PubSubClient& PubSubClient::setStream(Stream& stream){
732     this->stream = &stream;
733     return *this;
734 }
735 
736 int PubSubClient::state() {
737     return this->_state;
738 }
739 
740 boolean PubSubClient::setBufferSize(uint16_t size) {
741     if (size == 0) {
742         // Cannot set it back to 0
743         return false;
744     }
745     if (this->bufferSize == 0) {
746         this->buffer = (uint8_t*)malloc(size);
747     } else {
748         uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size);
749         if (newBuffer != NULL) {
750             this->buffer = newBuffer;
751         } else {
752             return false;
753         }
754     }
755     this->bufferSize = size;
756     return (this->buffer != NULL);
757 }
758 
759 uint16_t PubSubClient::getBufferSize() {
760     return this->bufferSize;
761 }
762 PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) {
763     this->keepAlive = keepAlive;
764     return *this;
765 }
766 PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) {
767     this->socketTimeout = timeout;
768     return *this;
769 }

Die Library besteht nur aus einem Programm (cpp+h,769 lines (689 sloc) 23.4 KB). Das Programm hat alleine 14(!) Konstruktoren. Irre. Die fliegen bei mir alle bis auf einen raus! Weiterhin 5 Überladungen für das connect und 6 für das publish, da kann man locker schon mal 200 Zeilen Code einsparen.


MQTT-Control

  • connect 181
  • publish 6-fach 436 public
  • beginPublish 526
  • endPublish 543
  • subscribe 605
  • unsubscribe 637
  • disconnect 660
  • readByte 282 2-fach benutzt von: readPacket
  • readPacket 312
  • write 552
  • loop 370 public
  • buildHeader 557
  • write 581
  • writeString 670 benutzt: connect, publish, publish_P, subscribe, unsubscribe, disconnect
  • connected 684 benutzt von: subscribe, unsubscribe, connect, loop, publish, publish_P, beginPublish


Konstruktor:

  • setServer 703 3-fach
  • setCallback 721
  • setClient 726
  • setStream 731
  • setBufferSize 740
  • setKeepAlive 762
  • setSocketTimeout 766
  • state 736
  • getBufferSize 759


Quellen

https://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/index.html

Request for Comments


Kommentar hinzufügen
TippvomTibb freut sich über alle Kommentare. Sofern du nicht anonym bleiben möchtest, registriere dich bitte oder melde dich an.