eis/src/zmqp/ProducerMQ.cpp

72 lines
2.3 KiB
C++

#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/util/Config.h>
#include <cms/BytesMessage.h>
#include <cms/Connection.h>
#include <cms/ExceptionListener.h>
#include <cms/MapMessage.h>
#include <cms/MessageListener.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/lang/Runnable.h>
#include <decaf/lang/System.h>
#include <decaf/lang/Thread.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <iostream>
#include <log4cplus/LOG.h>
#include <memory>
#include <stdio.h>
#include <stdlib.h>
#include <zmqp/ProducerMQ.h>
using namespace log4cplus;
using namespace activemq::core;
using namespace decaf::util::concurrent;
using namespace decaf::util;
using namespace decaf::lang;
using namespace cms;
using namespace std;
int ProducerMQ::Publish(const string &brokerurl, const string &desurl,
const string &sendmsg, bool istopic) {
LOG d("roducerMQ::Publish");
// d.Debug()<<"brokerurl:"<<brokerurl<<",desurl"<<desurl<<",sendmsg"<<sendmsg<<",istopic:"<<istopic<<endl;
try {
ActiveMQConnectionFactory factory;
factory.setBrokerURI(brokerurl);
std::shared_ptr<TextMessage> message;
// std::shared_ptr<Connection>
// connection(factory.createConnection("admin", "admin"));
std::shared_ptr<Connection> connection(factory.createConnection());
connection->start();
std::shared_ptr<Session> session(connection->createSession());
std::shared_ptr<Destination> dest;
if (!istopic) {
dest.reset(session->createQueue(desurl));
} else {
dest.reset(session->createTopic(desurl));
}
std::shared_ptr<MessageProducer> producer(
session->createProducer(dest.get()));
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
message.reset(session->createTextMessage(sendmsg));
producer->send(message.get());
// producer->send(message.get(),DeliveryMode::NON_PERSISTENT,4,10000);
connection->close();
// d.Debug() << "session " << session.use_count() << std::endl;
// d.Debug() << "message " << message.use_count() << std::endl;
// activemq::library::ActiveMQCPP::shutdownLibrary();
} catch (CMSException &e) {
e.printStackTrace();
// activemq::library::ActiveMQCPP::shutdownLibrary();
}
return 0;
}