如何解决Laravel-通知或调度作业队列工作者laravel会导致带锁定表的双重选择吗?
两个Laravel工作者可以使用同一个事务数据库吗?
我有作业流程A,如果表A中的数据带有标志is_processed
= 0,它将调用/调度作业流程B。
-- first select data with lock
SELECT *
FROM tableA
WHERE is_proccesed = 0
LIMIT 1000
FOR UPDATE OF tableA SKIP LOCKED
-- insert data to tableB
INSERT tableB VALUES SELECT values from tableA
-- update data
UPDATE tableA SET is_proccesed = 1 where id = (from any id i have select)
然后触发作业流程B:
ProcessB::dispatch(from any id i have select as string)->onQueue('queueA');
我有作业流程B,它将由作业流程A或cron触发,每分钟工作一次。
--first select data with lock
SELECT *
FROM tableB
WHERE is_proccesed = 0 AND id in (parameter get from job A if any)
LIMIT 1000
FOR UPDATE OF tableB SKIP LOCKED
-- call API with parameter value is from tableB
-- update data
如果(调用API成功),则:
UPDATE tableB SET is_proccesed = 1 where id = (from any id i have select)
如果(调用API失败),则:
UPDATE tableB SET is_proccesed = 0 where id = (from any id i have select)
- 如果表A中任何is_processed标志为0,我每分钟都会运行cron来调用/调度作业进程A。
- 如果表B中的is_processed标志为0,我每分钟都会运行cron来调用/调度作业进程B。
- 我使用主管实时进行此操作,并对失败3次的作业使用max-retry。
我的问题是:
- 我有作业流程B中的双重流程调用API,
- 我已经滚动浏览了日志,并且SELECT键同时从2个不同的进程中获取2个数据。 (在某些情况下需要处理2000或更多数据),
- 并非总是会处理少量数据。
我的问题是:
- 选择带锁的数据不能用于队列作业吗?
- 创建cron手动通知作业以重新处理不成功的数据是否正确,还是应该仅将失败的作业应用于重新处理的作业?
解决方法
我还没有看到许多正确使用数据库锁的Web语言。如果不看Laravel代码,我会猜测它没有为作业正确使用数据库锁。我知道它不使用锁进行迁移。从2个以上的Web节点运行迁移是不安全的。
如果您将Redis或其他技术用于作业而不是SQL DB,则许多并发问题可能会消失。
管理您自己的全局锁定
您可以管理自己的锁,并在自己的进程之间添加同步。
$results = \DB::select('SELECT GET_LOCK("process-b",120) as obtain_lock');
if (!$results[0]->obtain_lock) { return 0; }
//120 is seconds to wait for lock or fail
//load one record
//call API
//update one record
//free lock
$results = \DB::select('SELECT RELEASE_LOCK("process-b")');
if (!$results[0]->obtain_lock) { return -1; } //couldn't release lock,stop process,free mysql connection
在Postgresql中,它们被称为“咨询锁”,但是您不能使用字符,必须使用数字
$results = \DB::select('SELECT pg_advisory_lock(1337)');
if (!$results) { return 0; } // ???
//load one record
//call API
//update one record
//free lock
$results = \DB::select('SELECT pg_advisory_unlock(1337)');
if (!$results) { return -1; } //??? how to check for success?
使用“选择...进行更新”
我不确定您是否尝试使用FOR UPDATE
锁并且它不起作用,还是您有意跳过了该锁。
您需要关闭自动提交(set autocommit=0
)才能使用FOR UPDATE锁定或启动事务。
\DB::transaction( function () use ($id) {
$results = \DB::table('table_b')->select('SELECT * from table_b where ID=?',$id)->lockForUpdate()->get();
\DB::table('table_b')->update('UPDATE table_b set x=y where ID=?',$id);
});
在ProcA将作业发送到ProcB的地方,您可以为每个要处理的ID = 0做1个ProcB作业-或者-只要找到任何已处理的= 0记录,就可以做1个ProcB作业。
因此,如果ProcB仅能使用1个记录ID,则全局锁定解决方案可能不是很好。
您可以通过放置sleep()
并创建具有相同记录ID的10-20个ProcB作业来检查更新锁是否正常工作。如果您睡眠3秒钟,并且需要30-60秒钟才能完成所有ProcB作业,那么更新锁将正常工作。如果它们都在3秒内完成,则说明它们没有遵守记录锁定。
奖金
将此添加到您的routes/console.php
中以获取并发安全的artisan lockingmigrate
命令
$signature = 'lockingmigrate {--database= : The database connection to use}
{--force : Force the operation to run when in production}
{--path=* : The path(s) to the migrations files to be executed}
{--realpath : Indicate any provided migration file paths are pre-resolved absolute paths}
{--pretend : Dump the SQL queries that would be run}
{--seed : Indicates if the seed task should be re-run}
{--step : Force the migrations to be run so they can be rolled back individually}';
Artisan::command($signature,function ($database=false,$seed=false,$step=false,$pretend=false,$force=false) {
$results = \DB::select('SELECT GET_LOCK("artisan-migrate",120) as migrate');
if (!$results[0]->migrate) { return -1; }
$params = [
'--pretend' => $pretend,'--force' => $force,'--step' => $step,'--seed' => $seed,];
$retval = Artisan::call('migrate',$params);
$outputLines = explode("\n",trim(\Artisan::output()));
dump($outputLines);
\DB::select('SELECT RELEASE_LOCK("artisan-migrate")');
return $retval;
})->describe('Concurrent-safe migrate');
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。