本例子为简单的消息发送与接收,主机地址与端口均采用默认值。
如Send1 , 声明Queue后消息被注入通道,(实际msg仍然先注入了默认的Exchange)
消息发送端:
package sunf.rabbitMQTest;import java.io.IOException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;/** * MQ消息发送者测试1 * * 对应 :SomeConsumerTest * * 本例中信息被直接从channel流向queue。 * 信息会被所有消费该队列信息的客户---均分。 * * @author SUNF * */public class Send1 { public static void main(String[] args) throws IOException, Exception { ConnectionFactory cf = new ConnectionFactory(); cf.setHost(RabbitMQConstants.VHOST_IP); Connection conn = cf.newConnection(); //新连接 Channel channel = conn.createChannel(); //声明此队列并且持久化 channel.queueDeclare(RabbitMQConstants.QUEUE_NAME, true/*持久化*/, false, false, null); for(int i=0;i<10;i++){ String message = "能量+" + i; //持久化消息 channel.basicPublish("", RabbitMQConstants.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("[消息已发送--------------->>" + message); } channel.close(); conn.close(); }}
运行结果:
[消息已发送--------------->>能量+0[消息已发送--------------->>能量+1[消息已发送--------------->>能量+2[消息已发送--------------->>能量+3[消息已发送--------------->>能量+4[消息已发送--------------->>能量+5[消息已发送--------------->>能量+6[消息已发送--------------->>能量+7[消息已发送--------------->>能量+8[消息已发送--------------->>能量+9
接收端:
接收端模拟四个接收者同时接受一个Queue的信息,结果信息被均分到4个客户。
package sunf.rabbitMQTest;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;/** * MQ单通道,多用户模拟 * * @author SUNF * */public class ConsumerTest1 { private static ConnectionFactory factory; private static Connection connection; public static void main(String[] args) { //消费者1 ConsumerTest1.createConsumer("超人"); ConsumerTest1.createConsumer("葫芦娃"); ConsumerTest1.createConsumer("闪电侠"); ConsumerTest1.createConsumer("蜘蛛侠"); } public static void createConsumer(final String consumerName){ new Thread(){ public void run(){ try { ConsumerTest1.receiveMessage(consumerName); } catch (Exception e) { e.printStackTrace(); } } }.start(); } /** * @param consumerName 消费者名称 * @throws IOException * @throws Exception */ public static void receiveMessage(String consumerName) throws IOException, Exception{ if(factory == null){ synchronized(ConsumerTest1.class){ if(factory == null){ factory = new ConnectionFactory(); factory.setHost(RabbitMQConstants.VHOST_IP); connection = factory.newConnection(); } } } Channel channel = connection.createChannel(); channel.queueDeclare(RabbitMQConstants.QUEUE_NAME, true, false, false, null); System.out.println("["+consumerName+"] 准备完毕,可以接收消息------"); channel.basicQos(1);//告诉RabbitMQ同一时间给一个消息给消费者 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RabbitMQConstants.QUEUE_NAME, false, consumer); while(true){ QueueingConsumer.Delivery dy = consumer.nextDelivery(); String msg = new String(dy.getBody()); System.out.println("("+ consumerName +") 收到的消息------->> " + msg); channel.basicAck(dy.getEnvelope().getDeliveryTag(), false);//下一个消息 } }}
结果显示,队列中的10条信息被均分到不同的接收者上。
注意:由于线程开启速度、顺序和电脑环境原因,显示结果可能并不一定与实际相符,测试时尽可能先开启接收端,再启动发送端发布消息。
[超人] 准备完毕,可以接收消息------[葫芦娃] 准备完毕,可以接收消息------[蜘蛛侠] 准备完毕,可以接收消息------[闪电侠] 准备完毕,可以接收消息------(葫芦娃) 收到的消息------->> 能量+0(闪电侠) 收到的消息------->> 能量+1(蜘蛛侠) 收到的消息------->> 能量+2(超人) 收到的消息------->> 能量+3(葫芦娃) 收到的消息------->> 能量+4(闪电侠) 收到的消息------->> 能量+5(蜘蛛侠) 收到的消息------->> 能量+6(超人) 收到的消息------->> 能量+7(葫芦娃) 收到的消息------->> 能量+8(闪电侠) 收到的消息------->> 能量+9
常量类
package sunf.rabbitMQTest;//public class RabbitMQConstants { public static final String VHOST_IP = "127.0.0.1"; public static final String QUEUE_NAME = "hello2"; public static final String EXC_NAME = "java";}