2023年12月18日发(作者:)

public: void dr_cb(RdKafka::Message &message) { std::cout << "Message delivery for (" << () << " bytes): " << () << std::endl; if (()) std::cout << "Key: " << *(()) << ";" << std::endl; }};class KafkaProducerEventCallBack : public RdKafka::EventCb {public: void event_cb(RdKafka::Event &event) { switch (()) { case RdKafka::Event::EVENT_ERROR: std::cerr << "ERROR (" << RdKafka::err2str(()) << "): " << () << std::endl; if (() == RdKafka::ERR__ALL_BROKERS_DOWN) break;

case RdKafka::Event::EVENT_STATS: std::cerr << ""STATS": " << () << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %sn", ty(), ().c_str(), ().c_str()); break; default: std::cerr << "EVENT " << () << " (" << RdKafka::err2str(()) << "): " << () << std::endl; break; } }};class KafkaProducerClient{public:

KafkaProducerClient(const string &brokers, const string &topics, int nPpartition = 0); virtual ~KafkaProducerClient(); bool Init(); void Send(const string &msg); void Stop();private: RdKafka::Producer *m_pProducer = NULL; RdKafka::Topic *m_pTopic = NULL; KafkaProducerCallBack m_producerCallBack; KafkaProducerEventCallBack m_producerEventCallBack; std::string m_strTopics; std::string m_strBroker; bool m_bRun = false; int m_nPpartition = 0;};

#include "KafkaProducerClient.h"

KafkaProducerClient::KafkaProducerClient(const string &brokers, const string &topics, int nPpartition /*= 1*/)

: m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition){

}

KafkaProducerClient::~KafkaProducerClient(){ Stop();}

bool KafkaProducerClient::Init(){ string errstr = ""; RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); /*设置broker list*/ if (conf->set("", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){ std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl; }

conf->set("dr_cb", &m_producerCallBack, errstr); conf->set("event_cb", &m_producerEventCallBack, errstr); m_pProducer = RdKafka::Producer::create(conf, errstr); if (!m_pProducer) { std::cerr << "Failed to create producer: " << errstr << std::endl; return false; }

m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics, tconf, errstr); if (!m_pTopic) { std::cerr << "Failed to create topic: " << errstr << std::endl; return false; } return true;}

void KafkaProducerClient::Send(const string &msg){

if (!m_bRun) return; RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast(msg.c_str()), (), NULL, NULL); if (resp != RdKafka::ERR_NO_ERROR) std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl; else std::cerr << "Produced message (" << () << " bytes)" << std::endl;

m_pProducer->poll(0);}

void KafkaProducerClient::Stop(){

delete m_pTopic; delete m_pProducer;

private: void Msg(RdKafka::Message *msg, void *opt);private: std::string m_strBrokers; std::string m_strTopics; std::string m_strGroupid; int64_t m_nLastOffset = 0; RdKafka::Consumer *m_pKafkaConsumer = nullptr; RdKafka::Topic *m_pTopic = nullptr; int64_t m_nCurrentOffset = RdKafka::Topic::OFFSET_BEGINNING; int32_t m_nPartition = 0; bool m_bRun = false;};#endif // KAFKACONSUMMER_H

#include "KafkaConsumerClient.h"KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offse:m_strBrokers(brokers),

m_strTopics(topics),m_strGroupid(groupid),m_nPartition(nPartition),m_nCurrentOffset(offset){}KafkaConsumerClient::~KafkaConsumerClient(){ Stop();}bool KafkaConsumerClient::Init(){ RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); if(!conf){ std::cerr << "RdKafka create global conf failed" <

RdKafka::Conf *tconf = nullptr; /*创建kafka topic的配置*/ tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if(!tconf){ std::cerr << "RdKafka create topic conf failed"<set("", "smallest", errstr) != RdKafka::Conf::CONF_OK){; std::cerr << "RdKafka conf set failed:" << errstr.c_str() << endl; } m_pTopic = RdKafka::Topic::create(m_pKafkaConsumer, m_strTopics, tconf, errstr); if(!m_pTopic){ std::cerr << "RdKafka create topic failed :" << errstr.c_str() << endl; } delete tconf; RdKafka::ErrorCode resp = m_pKafkaConsumer->start(m_pTopic, m_nPartition, m_nLastOffset); if (resp != RdKafka::ERR_NO_ERROR){ std::cerr << "failed to start consumer : " << errstr.c_str() << endl;

} return true;}void KafkaConsumerClient::Msg(RdKafka::Message *message, void *opt){ switch(message->err()){ case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: cout<<("*sn", static_cast(message->len()), static_cast(message->payload())) << endl; m_nLastOffset = message->offset(); break; case RdKafka::ERR__PARTITION_EOF: cout << "Reached the end of the queue, offset: " << m_nLastOffset << endl; break; case RdKafka::ERR__UNKNOWN_TOPIC: case RdKafka::ERR__UNKNOWN_PARTITION:

cout << "Consume failed: " << message->errstr()<consume(m_pTopic, m_nPartition, timeout_ms); Msg(msg, nullptr); m_pKafkaConsumer->poll(0); delete msg; } m_pKafkaConsumer->stop(m_pTopic, m_nPartition);