Laravel-通知或调度作业队列工作者laravel会导致带锁定表的双重选择吗?

如何解决Laravel-通知或调度作业队列工作者laravel会导致带锁定表的双重选择吗?

两个Laravel工作者可以使用同一个事务数据库吗?

我有作业流程A,如果表A中的数据带有标志is_processed = 0,它将调用/调度作业流程B。

-- first select data with lock
SELECT *
FROM tableA
WHERE is_proccesed = 0
LIMIT 1000
FOR UPDATE OF tableA SKIP LOCKED

-- insert data to tableB 
INSERT tableB VALUES SELECT values from tableA
    
-- update data 
UPDATE tableA SET is_proccesed = 1 where id = (from any id i have select)

然后触发作业流程B:

ProcessB::dispatch(from any id i have select as string)->onQueue('queueA');

我有作业流程B,它将由作业流程A或cron触发,每分钟工作一次。

--first select data with lock
   
SELECT *
FROM tableB
WHERE is_proccesed = 0 AND id in (parameter get from job A if any)
LIMIT 1000
FOR UPDATE OF tableB SKIP LOCKED
    
-- call API with parameter value is from tableB
-- update data

如果(调用API成功),则:

UPDATE tableB SET is_proccesed = 1 where id = (from any id i have select)

如果(调用API失败),则:

UPDATE tableB SET is_proccesed = 0 where id = (from any id i have select)
  • 如果表A中任何is_processed标志为0,我每分钟都会运行cron来调用/调度作业进程A。
  • 如果表B中的is_processed标志为0,我每分钟都会运行cron来调用/调度作业进程B。
  • 我使用主管实时进行此操作,并对失败3次的作业使用max-retry。

我的问题是:

  • 我有作业流程B中的双重流程调用API,
  • 我已经滚动浏览了日志,并且SELECT键同时从2个不同的进程中获取2个数据。 (在某些情况下需要处理2000或更多数据),
  • 并非总是会处理少量数据。

我的问题是:

  • 选择带锁的数据不能用于队列作业吗?
  • 创建cron手动通知作业以重新处理不成功的数据是否正确,还是应该仅将失败的作业应用于重新处理的作业?

解决方法

我还没有看到许多正确使用数据库锁的Web语言。如果不看Laravel代码,我会猜测它没有为作业正确使用数据库锁。我知道它不使用锁进行迁移。从2个以上的Web节点运行迁移是不安全的。

如果您将Redis或其他技术用于作业而不是SQL DB,则许多并发问题可能会消失。

管理您自己的全局锁定

您可以管理自己的锁,并在自己的进程之间添加同步。

    $results = \DB::select('SELECT GET_LOCK("process-b",120) as obtain_lock');
    if (!$results[0]->obtain_lock) { return 0; }
    //120 is seconds to wait for lock or fail

    //load one record

    //call API

    //update one record

    //free lock
    $results = \DB::select('SELECT RELEASE_LOCK("process-b")');
    if (!$results[0]->obtain_lock) { return -1; } //couldn't release lock,stop process,free mysql connection

在Postgresql中,它们被称为“咨询锁”,但是您不能使用字符,必须使用数字

    $results = \DB::select('SELECT pg_advisory_lock(1337)');
    if (!$results) { return 0; }  // ???

    //load one record

    //call API

    //update one record

    //free lock
    $results = \DB::select('SELECT pg_advisory_unlock(1337)');
    if (!$results) { return -1; } //??? how to check for success?

使用“选择...进行更新”

我不确定您是否尝试使用FOR UPDATE锁并且它不起作用,还是您有意跳过了该锁。

您需要关闭自动提交(set autocommit=0)才能使用FOR UPDATE锁定或启动事务。

\DB::transaction( function () use ($id) {
    $results = \DB::table('table_b')->select('SELECT * from table_b where ID=?',$id)->lockForUpdate()->get();
    \DB::table('table_b')->update('UPDATE table_b set x=y where ID=?',$id);
});

在ProcA将作业发送到ProcB的地方,您可以为每个要处理的ID = 0做1个ProcB作业-或者-只要找到任何已处理的= 0记录,就可以做1个ProcB作业。

因此,如果ProcB仅能使用1个记录ID,则全局锁定解决方案可能不是很好。

您可以通过放置sleep()并创建具有相同记录ID的10-20个ProcB作业来检查更新锁是否正常工作。如果您睡眠3秒钟,并且需要30-60秒钟才能完成所有ProcB作业,那么更新锁将正常工作。如果它们都在3秒内完成,则说明它们没有遵守记录锁定。

奖金

将此添加到您的routes/console.php中以获取并发安全的artisan lockingmigrate命令

$signature = 'lockingmigrate {--database= : The database connection to use}
                {--force : Force the operation to run when in production}
                {--path=* : The path(s) to the migrations files to be executed}
                {--realpath : Indicate any provided migration file paths are pre-resolved absolute paths}
                {--pretend : Dump the SQL queries that would be run}
                {--seed : Indicates if the seed task should be re-run}
                {--step : Force the migrations to be run so they can be rolled back individually}';

Artisan::command($signature,function ($database=false,$seed=false,$step=false,$pretend=false,$force=false) {
    $results = \DB::select('SELECT GET_LOCK("artisan-migrate",120) as migrate');
    if (!$results[0]->migrate) { return -1; }
    $params = [
        '--pretend' => $pretend,'--force'   => $force,'--step'    => $step,'--seed'    => $seed,];
    $retval = Artisan::call('migrate',$params);
    $outputLines = explode("\n",trim(\Artisan::output()));
    dump($outputLines);
    \DB::select('SELECT RELEASE_LOCK("artisan-migrate")');
    return $retval;
})->describe('Concurrent-safe migrate');

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-