JobPlus知识库 IT 工业智能4.0 文章
多线程共享资源操作

项目需求

      最近在做一个和支付相关的项目,由于上游通道对每个商户交易额度有上限风控。因此我们需要实现一个商户轮询的机制,通过使用多个商户号,来提高交易上限,满足交易需求。

需求分析

      通过需求分析,我们知道商户的交易额度是共享资源,因此涉及到了共享资源同步的问题,需要我们控制商户的交易额度,及时切换上送的交易商户号,保证在多线程的情况下运行。

功能实现

不说那么多了,直接贴代码。

根据业务需求,设计接口:

[java] view plain copy

  1. public interface IncOrDecInterface {  
  2.     /** 
  3.      * 恢复额度 
  4.      * 当交易失败时,恢复商户可使用的交易额度。 
  5.      * @param amount 
  6.      */  
  7.     boolean inc(Long amount);  
  8.   
  9.     /** 
  10.      * 减少额度 
  11.      * 交易时,根据交易金额减少商户可使用额度。 
  12.      * @param amount 
  13.      */  
  14.     boolean dec(Long amount);  
  15.   
  16.     /** 
  17.      * 更新商户可使用额度 
  18.      * 由于商户额度每天更新一次,因此设计此接口,用于重置或更新商户可使用额度。 
  19.      * @param amount 
  20.      */  
  21.     boolean update(Long amount);  
  22.   
  23.     /** 
  24.      * 判断对象是否有锁 
  25.      */  
  26.     boolean isLook();  
  27. }  

商户对象

[java] view plain copy

  1. public class Merchant implements IncOrDecInterface{  
  2.     private static final Logger log = LoggerFactory.getLogger(UnsPayMerchant.class);  
  3.       
  4.     private String merchantNo;  
  5.     private String signKey;  
  6.     private volatile long maxTransactionAmount;  
  7.   
  8.     private Lock lock = new ReentrantLock();  
  9.     private boolean isLook = false;  
  10.   
  11.     public String getMerchantNo() {  
  12.         return merchantNo;  
  13.     }  
  14.   
  15.     public void setMerchantNo(String merchantNo) {  
  16.         this.merchantNo = merchantNo;  
  17.     }  
  18.   
  19.     public String getSignKey() {  
  20.         return signKey;  
  21.     }  
  22.   
  23.     public void setSignKey(String signKey) {  
  24.         this.signKey = signKey;  
  25.     }  
  26.   
  27.     public  long getMaxTransactionAmount() {  
  28.         try {  
  29.             lock.lock();  
  30.             return maxTransactionAmount;  
  31.         }finally {  
  32.             lock.unlock();  
  33.         }  
  34.     }  
  35.   
  36.     public void setMaxTransactionAmount(long maxTransactionAmount) {  
  37.         this.maxTransactionAmount = maxTransactionAmount;  
  38.     }  
  39.   
  40.     @Override  
  41.     public boolean inc(Long amount) {  
  42.         try{  
  43.             lock.lock();  
  44.             isLook = true;  
  45.             log.info("商户号[{}] 当前额度[{}] 充值金额[{}]",this.merchantNo,this.maxTransactionAmount,amount);  
  46.             this.maxTransactionAmount =this.maxTransactionAmount + amount;  
  47.             Thread.sleep(1000);  
  48.             return true;  
  49.         } catch (InterruptedException e) {  
  50.             e.printStackTrace();  
  51.         } finally {  
  52.             lock.unlock();  
  53.             isLook = false;  
  54.         }  
  55.         return false;  
  56.     }  
  57.   
  58.     @Override  
  59.     public boolean dec(Long amount) {  
  60.         try{  
  61.             lock.lock();  
  62.             isLook = true;  
  63.             log.info("当前线程[{}] 商户号[{}] 当前额度[{}] 消费金额[{}]",Thread.currentThread().getName(),this.merchantNo ,this.maxTransactionAmount, amount);  
  64.             if(this.maxTransactionAmount >= amount){  
  65.                 this.maxTransactionAmount = this.maxTransactionAmount - amount;  
  66.                 return true;  
  67.             }  
  68.         }  finally {  
  69.             lock.unlock();  
  70.             isLook = false;  
  71.         }  
  72.         return false;  
  73.     }  
  74.   
  75.     @Override  
  76.     public boolean update(Long amount) {  
  77.         try{  
  78.             lock.lock();  
  79.             isLook = true;  
  80.             log.info("商户号[{}] 当前额度[{}] 恢复额度[{}]", this.merchantNo, this.maxTransactionAmount, amount);  
  81.             this.maxTransactionAmount = amount;  
  82.             return true;  
  83.         }finally {  
  84.             lock.unlock();  
  85.             isLook = false;  
  86.         }  
  87.     }  
  88.   
  89.     @Override  
  90.     public boolean isLook() {  
  91.         return isLook;  
  92.     }  
  93. }  

商户容器

[java] view plain copy

  1. public class MerchantContainer {  
  2.     private static final Logger log = LoggerFactory.getLogger(MerchantContainer.class);  
  3.     private static ConcurrentLinkedQueue<UnsPayMerchant> unsPayMerchants = new ConcurrentLinkedQueue<>();  
  4.     /** 
  5.      * 获取商户信息 
  6.      * @param amount 
  7.      * @return 
  8.      */  
  9.     public static UnsPayMerchant getMerchant(Long amount) {  
  10.         //判断是否有可用额度的商户  
  11.         while(unsPayMerchants.size() > 0) {  
  12.             for (UnsPayMerchant unsPayMerchant : unsPayMerchants) {  
  13.                 if(unsPayMerchant.isLook()){  
  14.                     continue;  
  15.                 }else{  
  16.                     if(unsPayMerchant.getMaxTransactionAmount() >= amount){  
  17.                         unsPayMerchant.dec(amount);  
  18.                     }else{  
  19.                         unsPayMerchants.remove(unsPayMerchant);  
  20.                         continue;  
  21.                     }  
  22.                 }  
  23.                 return unsPayMerchant;  
  24.             }  
  25.         }  
  26.         return null;  
  27.     }  
  28.   
  29.     /** 
  30.      * 交易失败恢复商户可用额度 
  31.      * 
  32.      * @param unsPayMerchantFailure 
  33.      * @param amount 
  34.      * @return 
  35.      */  
  36.     public static boolean payFailure(UnsPayMerchant unsPayMerchantFailure, Long amount) {  
  37.         for (UnsPayMerchant unsPayMerchant : unsPayMerchants) {  
  38.             if (unsPayMerchantFailure.getMerchantNo().equals(unsPayMerchant.getMerchantNo())) {  
  39.                 if(unsPayMerchant.isLook()){  
  40.                     continue;  
  41.                 }else {  
  42.                     unsPayMerchant.inc(amount);  
  43.                 }  
  44.                 return true;  
  45.             } else {  
  46.                 unsPayMerchants.add(unsPayMerchantFailure);  
  47.             }  
  48.         }  
  49.         return false;  
  50.     }  
  51.   
  52.     /** 
  53.      * 更新商户额度 
  54.      * 
  55.      * @param merchant 
  56.      */  
  57.     public static void updateMaxAmount(UnsPayMerchant merchant) {  
  58.         for (UnsPayMerchant unsPayMerchant : unsPayMerchants) {  
  59.             if (unsPayMerchant.getMerchantNo().equals(merchant.getMerchantNo())) {  
  60.                 unsPayMerchant.update(merchant.getMaxTransactionAmount());  
  61.                 return;  
  62.             }  
  63.         }  
  64.         log.info("向商户池添加商户[{}] 额度为[{}] 当前商户池有[{}]个商户。", merchant.getMerchantNo(), merchant.getMaxTransactionAmount(), unsPayMerchants.size());  
  65.         unsPayMerchants.add(merchant);  
  66.     }  

测试类


[java] view plain copy

  1. public class UnspayMerchantLoopTest {  
  2.   
  3.     private static Thread[] threads = new Thread[8];  
  4.   
  5.     /** 
  6.      * 前置执行任务,准备可以使用的商户数据。 
  7.      */  
  8.     @Before  
  9.     public void init(){  
  10.         final UnsPayMerchant unsPayMerchant = new UnsPayMerchant();  
  11.         unsPayMerchant.setMerchantNo("101");  
  12.         unsPayMerchant.setMaxTransactionAmount(10000);  
  13.         UnsPayMerchant unsPayMerchant1 = new UnsPayMerchant();  
  14.         unsPayMerchant1.setMerchantNo("102");  
  15.         unsPayMerchant1.setMaxTransactionAmount(10000);  
  16.         UnsPayMerchant unsPayMerchant2 = new UnsPayMerchant();  
  17.         unsPayMerchant2.setMerchantNo("103");  
  18.         unsPayMerchant2.setMaxTransactionAmount(10000);  
  19.   
  20.         MerchantContainer.updateMaxAmount(unsPayMerchant);  
  21.         MerchantContainer.updateMaxAmount(unsPayMerchant1);  
  22.         MerchantContainer.updateMaxAmount(unsPayMerchant2);  
  23.     }  
  24.   
  25.     @Test  
  26.     public void merLoop() {  
  27.         doMerchantPay();  
  28.         doPayFailuer();  
  29.         doReloadMerchant();  
  30.         for (Thread thread : threads) {  
  31.             try {  
  32.                 thread.join();//等待线程执行结束  
  33.             } catch (InterruptedException e) {  
  34.                 e.printStackTrace();  
  35.             }  
  36.         }  
  37.     }  
  38.   
  39.     /** 
  40.      * 模拟商户交易 
  41.      * 启动5个线程,每执行一次休眠6秒。 
  42.      */  
  43.     private void doMerchantPay(){  
  44.         for (int i = 0; i < 5; i++) {  
  45.             Thread upLoadThread = new Thread(new Runnable() {  
  46.                 @Override  
  47.                 public void run() {  
  48.                     Random random = new Random();  
  49.                     int maxAmount = 1000;  
  50.                     int minAmount = 0;  
  51.                     while (true) {  
  52.                         UnsPayMerchant merchant = MerchantContainer.getMerchant((long) random.nextInt(maxAmount) % (maxAmount - minAmount + 1) + minAmount);  
  53.                         if (merchant == null) {  
  54.                             System.out.println(Thread.currentThread().getName() + "额度用完了!");  
  55.                             break;  
  56.                         }  
  57.                         try {  
  58.                             Thread.sleep(6000);  
  59.                         } catch (InterruptedException e) {  
  60.                             e.printStackTrace();  
  61.                         }  
  62.                     }  
  63.   
  64.                 }  
  65.             });  
  66.             upLoadThread.start();  
  67.             threads[i] = upLoadThread;  
  68.         }  
  69.     }  
  70.   
  71.     /** 
  72.      * 模拟交易失败时,返回商户额度。 
  73.      * 启动2个线程,每执行一次休眠5秒。 
  74.      */  
  75.     private void doPayFailuer(){  
  76.         for (int i = 5; i < 7; i++) {  
  77.             Thread upLoadThread = new Thread(new Runnable() {  
  78.                 @Override  
  79.                 public void run() {  
  80.                     String[] strings = new String[]{"101", "102", "103"};  
  81.                     Random random = new Random();  
  82.                     int maxAmount = 2000;  
  83.                     int minAmount = 0;  
  84.                     while (true) {  
  85.                         for (int i = 0; i < 3; i++) {  
  86.                             UnsPayMerchant unsPayMerchant3 = new UnsPayMerchant();  
  87.                             unsPayMerchant3.setMerchantNo(strings[i]);  
  88.                             MerchantContainer.payFailure(unsPayMerchant3, (long) random.nextInt(maxAmount) % (maxAmount - minAmount + 1) + minAmount);  
  89.                         }  
  90.                         try {  
  91.                             Thread.sleep(5000);  
  92.                         } catch (InterruptedException e) {  
  93.                             e.printStackTrace();  
  94.                         }  
  95.                     }  
  96.   
  97.                 }  
  98.             });  
  99.             upLoadThread.start();  
  100.             threads[i] = upLoadThread;  
  101.         }  
  102.     }  
  103.   
  104.     /** 
  105.      * 恢复商户可用额度 
  106.      * 可用于商户每天恢复额度功能。 
  107.      * 单线程,每20秒执行一次。 
  108.      */  
  109.     private void doReloadMerchant(){  
  110.         Thread upLoadThread = new Thread(new Runnable() {  
  111.             @Override  
  112.             public void run() {  
  113.                 String[] strings = new String[]{"101", "102", "103"};  
  114.                 Random random = new Random();  
  115.                 int maxAmount = 2000;  
  116.                 int minAmount = 0;  
  117.                 while (true) {  
  118.                     for (int i = 0; i < 3; i++) {  
  119.                         UnsPayMerchant unsPayMerchant3 = new UnsPayMerchant();  
  120.                         unsPayMerchant3.setMerchantNo(strings[i]);  
  121.                         unsPayMerchant3.setMaxTransactionAmount(1000);  
  122.                         MerchantContainer.updateMaxAmount(unsPayMerchant3);  
  123.                     }  
  124.                     try {  
  125.                         Thread.sleep(20000);  
  126.                     } catch (InterruptedException e) {  
  127.                         e.printStackTrace();  
  128.                     }  
  129.                 }  
  130.   
  131.             }  
  132.         });  
  133.         upLoadThread.start();  
  134.         threads[7] = upLoadThread;  
  135.     }  
  136. }  


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
371人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序