博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitmq java queue
阅读量:5314 次
发布时间:2019-06-14

本文共 3891 字,大约阅读时间需要 12 分钟。

1.连接抽象封装

package com.pccw.rabbitmq;import com.rabbitmq.client.Channel;import java.io.IOException;import com.rabbitmq.client.*;public abstract class EndPoint {    protected Channel channel;    protected Connection connection;    protected String endPointName;    public EndPoint(String endpointName) throws IOException {        this.endPointName = endpointName;        // Create a connection factory        ConnectionFactory factory = new ConnectionFactory();        // hostname of your rabbitmq server        factory.setHost("192.168.220.132");        factory.setPort(5672);        factory.setUsername("guest");        factory.setPassword("guest");                connection = factory.newConnection();        // creating a channel        channel = connection.createChannel();        // declaring a queue for this channel. If queue does not exist,        // it will be created on the server.        channel.queueDeclare(endpointName, false, false, false, null);    }    public void close() throws IOException {        this.channel.close();        this.connection.close();    }}
View Code

2.生产者

package com.pccw.rabbitmq;import java.io.IOException;import java.io.Serializable;import org.apache.commons.lang.SerializationUtils;public class Producer extends EndPoint{        public Producer(String endPointName) throws IOException{        super(endPointName);    }    public void sendMessage(Serializable object) throws IOException {        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));    }   }
View Code

3.消费者

package com.pccw.rabbitmq;import java.io.IOException;import java.util.*;import org.apache.commons.lang.SerializationUtils;import com.rabbitmq.client.*;public class QueueConsumer extends EndPoint implements Runnable, Consumer {    public QueueConsumer(String endPointName) throws IOException {        super(endPointName);    }    public void run() {        try {            // start consuming messages. Auto acknowledge messages.            channel.basicConsume(endPointName, true, this);        } catch (IOException e) {            e.printStackTrace();        }    }    /**     * Called when consumer is registered.     */    public void handleConsumeOk(String consumerTag) {        System.out.println("Consumer " + consumerTag + " registered");    }    /**     * Called when new message is available.     */    public void handleDelivery(String arg0, Envelope arg1, com.rabbitmq.client.AMQP.BasicProperties arg2, byte[] body)            throws IOException {        Map map = (HashMap) SerializationUtils.deserialize(body);        System.out.println(map.get("message number"));    }    public void handleCancel(String consumerTag) {    }    public void handleCancelOk(String consumerTag) {    }    public void handleRecoverOk(String consumerTag) {    }    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {    }}
View Code

4.生产者测试类

package com.pccw.rabbitmq;import java.util.HashMap;public class MainProducer {    public MainProducer() throws Exception{              Producer producer = new Producer("queue");                for (int i = 0; i < 1; i++) {            HashMap message = new HashMap();            message.put("message number", "中国"+i);            producer.sendMessage(message);            System.out.println("Message Number "+ i +" sent.");        }        producer.close();    }        public static void main(String[] args) throws Exception{      new MainProducer();    }}
View Code

5.消费者测试类

package com.pccw.rabbitmq;public class MainConsumer {    public MainConsumer() throws Exception{                QueueConsumer consumer = new QueueConsumer("queue");        Thread consumerThread = new Thread(consumer);        consumerThread.start();    }      public static void main(String[] args) throws Exception{      new MainConsumer();    }}
View Code

 

转载于:https://www.cnblogs.com/rigid/p/7380693.html

你可能感兴趣的文章
C++ 删除字符串的两种实现方式
查看>>
ORA-01502: 索引'P_ABCD.PK_WEB_BASE'或这类索引的分区处于不可用状态
查看>>
Java抽象类和接口的比较
查看>>
开发进度一
查看>>
MyBaits学习
查看>>
管道,数据共享,进程池
查看>>
CSS
查看>>
[LeetCode] 55. Jump Game_ Medium tag: Dynamic Programming
查看>>
[Cypress] Stub a Post Request for Successful Form Submission with Cypress
查看>>
程序集的混淆及签名
查看>>
判断9X9数组是否是数独的java代码
查看>>
00-自测1. 打印沙漏
查看>>
UNITY在VS中调试
查看>>
SDUTOJ3754_黑白棋(纯模拟)
查看>>
Scala入门(1)Linux下Scala(2.12.1)安装
查看>>
如何改善下面的代码 领导说了很耗资源
查看>>
Quartus II 中常见Warning 原因及解决方法
查看>>
php中的isset和empty的用法区别
查看>>
Android ViewPager 动画效果
查看>>
pip和easy_install使用方式
查看>>