项目需求
最近在做一个和支付相关的项目,由于上游通道对每个商户交易额度有上限风控。因此我们需要实现一个商户轮询的机制,通过使用多个商户号,来提高交易上限,满足交易需求。
需求分析
通过需求分析,我们知道商户的交易额度是共享资源,因此涉及到了共享资源同步的问题,需要我们控制商户的交易额度,及时切换上送的交易商户号,保证在多线程的情况下运行。
功能实现
不说那么多了,直接贴代码。
根据业务需求,设计接口:
[java] view plain copy
- public interface IncOrDecInterface {
- /**
- * 恢复额度
- * 当交易失败时,恢复商户可使用的交易额度。
- * @param amount
- */
- boolean inc(Long amount);
- /**
- * 减少额度
- * 交易时,根据交易金额减少商户可使用额度。
- * @param amount
- */
- boolean dec(Long amount);
- /**
- * 更新商户可使用额度
- * 由于商户额度每天更新一次,因此设计此接口,用于重置或更新商户可使用额度。
- * @param amount
- */
- boolean update(Long amount);
- /**
- * 判断对象是否有锁
- */
- boolean isLook();
- }
商户对象
[java] view plain copy
- public class Merchant implements IncOrDecInterface{
- private static final Logger log = LoggerFactory.getLogger(UnsPayMerchant.class);
- private String merchantNo;
- private String signKey;
- private volatile long maxTransactionAmount;
- private Lock lock = new ReentrantLock();
- private boolean isLook = false;
- public String getMerchantNo() {
- return merchantNo;
- }
- public void setMerchantNo(String merchantNo) {
- this.merchantNo = merchantNo;
- }
- public String getSignKey() {
- return signKey;
- }
- public void setSignKey(String signKey) {
- this.signKey = signKey;
- }
- public long getMaxTransactionAmount() {
- try {
- lock.lock();
- return maxTransactionAmount;
- }finally {
- lock.unlock();
- }
- }
- public void setMaxTransactionAmount(long maxTransactionAmount) {
- this.maxTransactionAmount = maxTransactionAmount;
- }
- @Override
- public boolean inc(Long amount) {
- try{
- lock.lock();
- isLook = true;
- log.info("商户号[{}] 当前额度[{}] 充值金额[{}]",this.merchantNo,this.maxTransactionAmount,amount);
- this.maxTransactionAmount =this.maxTransactionAmount + amount;
- Thread.sleep(1000);
- return true;
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- isLook = false;
- }
- return false;
- }
- @Override
- public boolean dec(Long amount) {
- try{
- lock.lock();
- isLook = true;
- log.info("当前线程[{}] 商户号[{}] 当前额度[{}] 消费金额[{}]",Thread.currentThread().getName(),this.merchantNo ,this.maxTransactionAmount, amount);
- if(this.maxTransactionAmount >= amount){
- this.maxTransactionAmount = this.maxTransactionAmount - amount;
- return true;
- }
- } finally {
- lock.unlock();
- isLook = false;
- }
- return false;
- }
- @Override
- public boolean update(Long amount) {
- try{
- lock.lock();
- isLook = true;
- log.info("商户号[{}] 当前额度[{}] 恢复额度[{}]", this.merchantNo, this.maxTransactionAmount, amount);
- this.maxTransactionAmount = amount;
- return true;
- }finally {
- lock.unlock();
- isLook = false;
- }
- }
- @Override
- public boolean isLook() {
- return isLook;
- }
- }
商户容器
[java] view plain copy
- public class MerchantContainer {
- private static final Logger log = LoggerFactory.getLogger(MerchantContainer.class);
- private static ConcurrentLinkedQueue<UnsPayMerchant> unsPayMerchants = new ConcurrentLinkedQueue<>();
- /**
- * 获取商户信息
- * @param amount
- * @return
- */
- public static UnsPayMerchant getMerchant(Long amount) {
- //判断是否有可用额度的商户
- while(unsPayMerchants.size() > 0) {
- for (UnsPayMerchant unsPayMerchant : unsPayMerchants) {
- if(unsPayMerchant.isLook()){
- continue;
- }else{
- if(unsPayMerchant.getMaxTransactionAmount() >= amount){
- unsPayMerchant.dec(amount);
- }else{
- unsPayMerchants.remove(unsPayMerchant);
- continue;
- }
- }
- return unsPayMerchant;
- }
- }
- return null;
- }
- /**
- * 交易失败恢复商户可用额度
- *
- * @param unsPayMerchantFailure
- * @param amount
- * @return
- */
- public static boolean payFailure(UnsPayMerchant unsPayMerchantFailure, Long amount) {
- for (UnsPayMerchant unsPayMerchant : unsPayMerchants) {
- if (unsPayMerchantFailure.getMerchantNo().equals(unsPayMerchant.getMerchantNo())) {
- if(unsPayMerchant.isLook()){
- continue;
- }else {
- unsPayMerchant.inc(amount);
- }
- return true;
- } else {
- unsPayMerchants.add(unsPayMerchantFailure);
- }
- }
- return false;
- }
- /**
- * 更新商户额度
- *
- * @param merchant
- */
- public static void updateMaxAmount(UnsPayMerchant merchant) {
- for (UnsPayMerchant unsPayMerchant : unsPayMerchants) {
- if (unsPayMerchant.getMerchantNo().equals(merchant.getMerchantNo())) {
- unsPayMerchant.update(merchant.getMaxTransactionAmount());
- return;
- }
- }
- log.info("向商户池添加商户[{}] 额度为[{}] 当前商户池有[{}]个商户。", merchant.getMerchantNo(), merchant.getMaxTransactionAmount(), unsPayMerchants.size());
- unsPayMerchants.add(merchant);
- }
测试类
[java] view plain copy
- public class UnspayMerchantLoopTest {
- private static Thread[] threads = new Thread[8];
- /**
- * 前置执行任务,准备可以使用的商户数据。
- */
- @Before
- public void init(){
- final UnsPayMerchant unsPayMerchant = new UnsPayMerchant();
- unsPayMerchant.setMerchantNo("101");
- unsPayMerchant.setMaxTransactionAmount(10000);
- UnsPayMerchant unsPayMerchant1 = new UnsPayMerchant();
- unsPayMerchant1.setMerchantNo("102");
- unsPayMerchant1.setMaxTransactionAmount(10000);
- UnsPayMerchant unsPayMerchant2 = new UnsPayMerchant();
- unsPayMerchant2.setMerchantNo("103");
- unsPayMerchant2.setMaxTransactionAmount(10000);
- MerchantContainer.updateMaxAmount(unsPayMerchant);
- MerchantContainer.updateMaxAmount(unsPayMerchant1);
- MerchantContainer.updateMaxAmount(unsPayMerchant2);
- }
- @Test
- public void merLoop() {
- doMerchantPay();
- doPayFailuer();
- doReloadMerchant();
- for (Thread thread : threads) {
- try {
- thread.join();//等待线程执行结束
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 模拟商户交易
- * 启动5个线程,每执行一次休眠6秒。
- */
- private void doMerchantPay(){
- for (int i = 0; i < 5; i++) {
- Thread upLoadThread = new Thread(new Runnable() {
- @Override
- public void run() {
- Random random = new Random();
- int maxAmount = 1000;
- int minAmount = 0;
- while (true) {
- UnsPayMerchant merchant = MerchantContainer.getMerchant((long) random.nextInt(maxAmount) % (maxAmount - minAmount + 1) + minAmount);
- if (merchant == null) {
- System.out.println(Thread.currentThread().getName() + "额度用完了!");
- break;
- }
- try {
- Thread.sleep(6000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
- upLoadThread.start();
- threads[i] = upLoadThread;
- }
- }
- /**
- * 模拟交易失败时,返回商户额度。
- * 启动2个线程,每执行一次休眠5秒。
- */
- private void doPayFailuer(){
- for (int i = 5; i < 7; i++) {
- Thread upLoadThread = new Thread(new Runnable() {
- @Override
- public void run() {
- String[] strings = new String[]{"101", "102", "103"};
- Random random = new Random();
- int maxAmount = 2000;
- int minAmount = 0;
- while (true) {
- for (int i = 0; i < 3; i++) {
- UnsPayMerchant unsPayMerchant3 = new UnsPayMerchant();
- unsPayMerchant3.setMerchantNo(strings[i]);
- MerchantContainer.payFailure(unsPayMerchant3, (long) random.nextInt(maxAmount) % (maxAmount - minAmount + 1) + minAmount);
- }
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
- upLoadThread.start();
- threads[i] = upLoadThread;
- }
- }
- /**
- * 恢复商户可用额度
- * 可用于商户每天恢复额度功能。
- * 单线程,每20秒执行一次。
- */
- private void doReloadMerchant(){
- Thread upLoadThread = new Thread(new Runnable() {
- @Override
- public void run() {
- String[] strings = new String[]{"101", "102", "103"};
- Random random = new Random();
- int maxAmount = 2000;
- int minAmount = 0;
- while (true) {
- for (int i = 0; i < 3; i++) {
- UnsPayMerchant unsPayMerchant3 = new UnsPayMerchant();
- unsPayMerchant3.setMerchantNo(strings[i]);
- unsPayMerchant3.setMaxTransactionAmount(1000);
- MerchantContainer.updateMaxAmount(unsPayMerchant3);
- }
- try {
- Thread.sleep(20000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
- upLoadThread.start();
- threads[7] = upLoadThread;
- }
- }
登录 | 立即注册