2023年12月18日发(作者:)
public: void dr_cb(RdKafka::Message &message) { std::cout << "Message delivery for (" << () << " bytes): " << () << std::endl; if (()) std::cout << "Key: " << *(()) << ";" << std::endl; }};class KafkaProducerEventCallBack : public RdKafka::EventCb {public: void event_cb(RdKafka::Event &event) { switch (()) { case RdKafka::Event::EVENT_ERROR: std::cerr << "ERROR (" << RdKafka::err2str(()) << "): " << () << std::endl; if (() == RdKafka::ERR__ALL_BROKERS_DOWN) break;
case RdKafka::Event::EVENT_STATS: std::cerr << ""STATS": " << () << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %sn", ty(), ().c_str(), ().c_str()); break; default: std::cerr << "EVENT " << () << " (" << RdKafka::err2str(()) << "): " << () << std::endl; break; } }};class KafkaProducerClient{public:
KafkaProducerClient(const string &brokers, const string &topics, int nPpartition = 0); virtual ~KafkaProducerClient(); bool Init(); void Send(const string &msg); void Stop();private: RdKafka::Producer *m_pProducer = NULL; RdKafka::Topic *m_pTopic = NULL; KafkaProducerCallBack m_producerCallBack; KafkaProducerEventCallBack m_producerEventCallBack; std::string m_strTopics; std::string m_strBroker; bool m_bRun = false; int m_nPpartition = 0;};
#include "KafkaProducerClient.h"
KafkaProducerClient::KafkaProducerClient(const string &brokers, const string &topics, int nPpartition /*= 1*/)
: m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition){
}
KafkaProducerClient::~KafkaProducerClient(){ Stop();}
bool KafkaProducerClient::Init(){ string errstr = ""; RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); /*设置broker list*/ if (conf->set("", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){ std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl; }
conf->set("dr_cb", &m_producerCallBack, errstr); conf->set("event_cb", &m_producerEventCallBack, errstr); m_pProducer = RdKafka::Producer::create(conf, errstr); if (!m_pProducer) { std::cerr << "Failed to create producer: " << errstr << std::endl; return false; }
m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics, tconf, errstr); if (!m_pTopic) { std::cerr << "Failed to create topic: " << errstr << std::endl; return false; } return true;}
void KafkaProducerClient::Send(const string &msg){
if (!m_bRun) return; RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast
m_pProducer->poll(0);}
void KafkaProducerClient::Stop(){
delete m_pTopic; delete m_pProducer;
private: void Msg(RdKafka::Message *msg, void *opt);private: std::string m_strBrokers; std::string m_strTopics; std::string m_strGroupid; int64_t m_nLastOffset = 0; RdKafka::Consumer *m_pKafkaConsumer = nullptr; RdKafka::Topic *m_pTopic = nullptr; int64_t m_nCurrentOffset = RdKafka::Topic::OFFSET_BEGINNING; int32_t m_nPartition = 0; bool m_bRun = false;};#endif // KAFKACONSUMMER_H
#include "KafkaConsumerClient.h"KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offse:m_strBrokers(brokers),
m_strTopics(topics),m_strGroupid(groupid),m_nPartition(nPartition),m_nCurrentOffset(offset){}KafkaConsumerClient::~KafkaConsumerClient(){ Stop();}bool KafkaConsumerClient::Init(){ RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); if(!conf){ std::cerr << "RdKafka create global conf failed" < RdKafka::Conf *tconf = nullptr; /*创建kafka topic的配置*/ tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if(!tconf){ std::cerr << "RdKafka create topic conf failed"< } return true;}void KafkaConsumerClient::Msg(RdKafka::Message *message, void *opt){ switch(message->err()){ case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: cout<<("*sn", static_cast cout << "Consume failed: " << message->errstr()<


发布评论