activemq端口怎么调(activemq详解)
-
-
类目:知识大全
-
联系人:
-
微信号:
-
Q Q 号:
-
手机号:
-
浏览量:
327
【商户信息】
【货源详情】
一.下载和安装
直接去官网(
http://activemq.apache.org/)下载最新版本就可以了。 这是免安装的,所以解压缩就可以了。 安装完成后进入bin目录,然后双击activemq.bat文件。 (在linux上,在bin目录下执行activemq start。 )
二.访问控制台
在浏览器中输入http://ip:8161/admin/
三.修改端口号
61616是对外服务端口号
8161是控制器的端口号
端口号冲突时,可以更改两个端口号。 更改cd conf、activemq.xml以更改中的61616端口。 修改jetty.xml,然后修改中的8161端口。
queue队列模式:
与rabbitmq简单队列模式一样,如果多个消费者消耗同一队列中的消息,则缺省情况下也会消耗轮询机制
示例代码:
公共类产品制造商{
publicstaticfinalstringborker _ URL=' TCP ://127.0.0.1336061616 ';
publicstaticfinalstringqueue _ name=' queue1';
publicstaticvoidmain (string [ ] args ) throws JMSException {
//办工厂
activemqconnectionfactoryfactory=newactivemqconnectionfactory (borker _ URL;
//创建TCP连接
connection connection=factory.create connection (;
//建立连接
connection.start (;
session session=connection.create session (false,Session.AUTO_ACKNOWLEDGE );
//创建队列()消息目标) )。
queue queue=session.create queue (queue _ name );
//做生产者
messageproducerproducer=session.create producer (queue;
//消息非持久化
producer.setdeliverymode (delivery mode.non _ persistent;
//消息持久性缺省情况下是持久性的
//producer.setdeliverymode (delivery mode.persistent );
//创建消息
textmessagemessage=session.create text message;
//发送信息
producer.send (消息;
producer.close (;
session.close (;
connection.close (;
System.out.println ('发送成功!' );
}
}
公共类消费者{
publicstaticfinalstringborker _ URL=' TCP ://127.0.0.1336061616 ';
publicstaticfinalstringqueue _ name=' queue1';
publicstaticvoidmain (string [ ] args ) throws JMSException {
//办工厂
activemqconnectionfactoryfactory=newactivemqconnectionfactory (borker _ URL;
//创建TCP连接
connection connection=factory.create connection (;
//建立连接
connection.start (;
session session=connection.create session (false,Session.AUTO_ACKNOWLEDGE );
//创建/声明队列()将消息发送到) )
queue queue=session.create queue (queue _ name );
//做消费者
messageconsumerconsumer=session.create consumer (queue;
//拦截消耗
consumer.setmessagelistener (消息- {
textmessagetextmessage=(文本消息)消息;
try {
system.out.println(1日收到消息。 (textMessage.getText ) );
}catch(JMSexceptione ) {
e .打印任务跟踪(;
}
);
}
}
topic队列模式:
被称为发布订阅模型,是生产者向某个topic主题的消费者发送信息的模型。 默认情况下,这种模式需要先启动消费者。 否则,即使生产者发布了某个topic主题的消息,消费者也无法消费。 只要消费者没有提前订阅并进行消息持久化处理,消费者就可以消费预先推送的消息。
代码:
公共类产品制造商{
publicstaticfinalstringborker _ URL=' TCP ://127.0.0.1336061616 ';
publicstaticfinalstringtopic _ name=' topic1';
publicstaticvoidmain (string [ ] args ) throws JMSException {
//办工厂
activemqconnectionfactoryfactory=newactivemqconnectionfactory (borker _ URL;
//异步送达
factory.setuseasyncsend(true;
//创建TCP连接
connection connection=factory.create connection (;
session session=connection.create session (false,Session.AUTO_ACKNOWLEDGE );
创建/声明topic (消息目标)
topic topic=session.create topic (topic _ name;
//做生产者
activemqmessageproducerproducer=(activemqmessageproducer ) session.createproducer ) topic;
//持续化
producer.setdeliverymode (delivery mode.persistent;
//建立连接
connection.start (;
//创建消息
textmessagemessage=session.create text message;
//发送消息,异步发送回调函数
producer.send(message,new AsyncCallback ) )。
@Override
公共语音上的success
system.out.println(success );
}
@Override
publicvoidonexception (jmsexceptione ) {
system.out.println('fail );
}
);
producer.close (;
session.close (;
connection.close (;
System.out.println ('发送成功!' );
}
}
公共类消费者1 {
publicstaticfinalstringborker _ URL=' TCP ://127.0.0.1336061616 ';
publicstaticfinalstringtopic _ name=' topic1';
publicstaticvoidmain (string [ ] args ) throws JMSException {
//办工厂
activemqconnectionfactoryfactory=newactivemqconnectionfactory (borker _ URL;
//创建TCP连接
connection connection=factory.create connection (;
制定clientId
connection.setclientid('my );
session session=connection.create session (false,Session.AUTO_ACKNOWLEDGE );
创建/声明topic (消息目标)
topic topic=session.create topic (topic _ name;
//订阅主题
topicsubscribersubscriber=session.createdurablesubscriber (topic,' remark ' );
//建立连接
connection.start (;
while (真)。
//receive阻止线程
//接收订阅的新闻
textmessagemessage=(text message ) subscriber.receive (;
System.out.println ('已接收到消息。' message.getText ) );
}
}
}
如何保证消息的可靠性
回答这个问题主要从持久化、办公、受领几个方面入手
消息持久化的核心代码:
//queue模式的消息持久化缺省情况下被持久化
producer.setdeliverymode (delivery mode.persistent;
topic topic=session.create topic (topic _ name;
activemqmessageproducerproducer=(activemqmessageproducer ) session.createproducer ) topic;
producer.setdeliverymode (delivery mode.persistent;
connection.start (;
事务的核心代码(偏生产者):
//将参数设置为true
connection.createsession(false,Session.AUTO_ACKNOWLEDGE );
//提交事务
session.commit (;
签收的核心代码(偏消费者):
//将参数设置为手动提交
connection.createsession(false,Session.CLIENT_ACKNOWLEDGE;
//消息签名
message.acknowledge (;
注意:若是既开启事务,又开启手动签收,以事务为准,只要事务被提交了也默认消息被签收了
性能提升:
1.利用nio的协议比tcp的性能高,
配置方式:在conf目录下activemq.xml配置如下
浏览器
.
传输连接器
transportconnectorname=' nio ' uri=' nio ://0.0.0. 0336061616 ' /
/transportConnectors
.
/broker
第二步是代码访问方式由tcp改为nio
//办工厂
activemqconnectionfactoryfactory=newactivemqconnectionfactory (' nio ://127.0.0.1:61616 ' );
2.jdbc+Journaling提高只有jdbc持久化的性能,它在做持久化入数据库之前,会先将数据保存到Journaling文件中,之后才慢慢同步到数据库中,等于中间加了一层缓冲层。
把数据库mysql的驱动包放到lib目录下
配置方式:在conf目录下activemq.xml照着下面配置,其中有个createTablesOnStartup属性,默认值是true,表示每次启动后去数据库自动建表
永久适配器
kahadbdirectory=' $ { ActiveMQ.data }/kah ADB ' /
/持续适配器
//上面是默认配置,已更改为以下配置
永久适配器
journalpersistenceadapterfactoryjournallogfiles='5' data directory=' $ { basedir }/ActiveMQ-data ' data source=' # mmmata
/持续适配器
//以下代码写在beans节点中
bean id=' MySQL-ds ' class=' org.Apache.com mons.dbcp.basic data source ' destroy-method=' close '
property name=' driver class name ' value=' com.MySQL.JDBC.driver ' /
property name=' URL ' value=' JDBC : MySQL 3360//localhost/ActiveMQ? relaxAutoCommit=true'/
property name=' username ' value=' ActiveMQ ' /
property name=' password ' value=' ActiveMQ ' /
property name=' poolpreparedstatements ' value=' true ' /
/bean