如何解决Clojure交换!原子并行执行
我正在与clojure一起玩一个脚本,以读取文件中的URI序列作为输入,并为它们的状态代码做报告。
我已经使用clojure.core.async/pipeline-async
实现了此操作,以执行对URI的HTTP调用(使用httpkit异步调用)。
我想监视脚本的执行,所以我需要一个状态的原子:
(let [processing (atom [(System/currentTimeMillis) 0])]
和跟踪进度的功能。
(defn track-progress [total progress]
(swap! progress
(fn [[time count]]
(let [incremented-count (inc count)
now (System/currentTimeMillis)]
(if (= 0 (mod incremented-count (max 1 (int (/ total 20)))))
(do
(println (str "Progress " incremented-count "/" total " | " (- now time) "ms"))
[now incremented-count])
[time incremented-count])))))
在HTTP调用后使用它:
(a/pipeline-async
parallelism
output-chan
(fn [[an-id uri] result]
(http/head uri {:throw-exceptions false
:timeout timeout}
(fn [{:keys [status error]}]
(track-progress total processing)
(a/go
(if (nil? error)
(do (a/>! result [an-id (keyword (str status))])
(a/close! result))
(do (a/>! result [an-id :error])
(a/close! result)))))))
input-chan)
使用该processing
部分在let
表达式中创建pipeline-async
原子。
除了该日志,一切似乎都正常。
我发现有时候记录很奇怪,有这样的东西:
Progress 500/10000 | 11519ms
Progress 500/10000 | 11519msProgress 500/10000 | 11519ms
Progress 1000/10000 | 11446ms
Progress 1000/10000 | 11446ms
Progress 1500/10000 | 9503ms
Progress 2000/10000 | 7802ms
Progress 2500/10000 | 12822ms
Progress 2500/10000 | 12822msProgress 2500/10000 | 12822ms
Progress 2500/10000 | 12822ms
Progress 3000/10000 | 10623ms
Progress 3500/10000 | 9018ms
Progress 4000/10000 | 9618ms
Progress 4500/10000 | 13544ms
Progress 5000/10000 | 10541ms
Progress 5500/10000 | 10817ms
Progress 6000/10000 | 8921ms
Progress 6500/10000 | 9078ms
Progress 6500/10000 | 9078ms
Progress 7000/10000 | 9270ms
Progress 7500/10000 | 11826msProgress 7500/10000 | 11826msProgress 7500/10000 | 11826ms
输出的格式与在shell中写入时一样,似乎有时多次执行相同的println
,或者在以下情况下执行传递给fn
函数的swap!
:在原子中并行(无并发)。
(如果println
我删除了str
以创建要打印的字符串,则多次具有相同进度的行将像ProgressProgress 7500/10000 | 11826ms7500/100007500 | 11826msProgress/10000 | 11826ms
一样完全混淆在一起)
我的代码有问题吗?
还是我误解了atom
,因为我认为它不允许并行执行改变其状态的函数?
解决方法
Clojure原子是专门设计的,以便在多线程程序中,可以有多个线程在单个原子上执行swap!
,如果您的程序这样做,则那些给定的更新函数f
swap!
可以同时运行。 swap!
中唯一同步的部分是有效执行的“比较和交换”操作:
- 锁定原子的状态
- 检查其当前值是否为
identical?
开始执行之前所包含的引用的f
,如果是,则将其替换为f
返回的新对象。 - 解锁原子状态”。
函数f
可能需要很长时间才能从当前值中计算出一个新值,但是上面的关键部分只是一个指针比较,如果相等,则是一个指针分配。
这就是为什么swap!
的文档字符串说“请注意f可能被多次调用,因此应该没有副作用。”
您想要的是序列化一组并发执行线程的输出流。您可以使用代理来序列化对一个可变状态的访问,但是在这里,您有一个简陋的案例,没有状态,只有副作用。对于这种情况,the locking
function就是您所需要的。
一个例子:
(ns tst.demo.core
(:use demo.core tupelo.core tupelo.test))
(defn do-println
[& args]
(apply println args))
(def lock-obj (Object.))
(defn do-println-locking
[& args]
(locking lock-obj
(apply println args)))
(def sleep-millis 500)
(defn wait-and-print
[print-fn id]
(Thread/sleep sleep-millis)
(print-fn (format "wait-and-print %s is complete" id)))
(defn start-threads
[print-fn count]
(println "-----------------------------------------------------------------------------")
(let [futures (forv [i (range count)]
(future (wait-and-print print-fn i)))]
(doseq [future futures]
; block until future is complete
(deref future))))
(dotest
(start-threads do-println 10)
(start-threads do-println-locking 10))
典型结果:
--------------------------------------
Clojure 1.10.2-alpha1 Java 15
--------------------------------------
Testing tst.demo.core
-----------------------------------------------------------------------------
wait-and-print 4 is completewait-and-print 3 is completewait-and-print 2 is complete
wait-and-print 8 is completewait-and-print 9 is complete
wait-and-print 6 is completewait-and-print 1 is complete
wait-and-print 7 is complete
wait-and-print 0 is complete
wait-and-print 5 is complete
-----------------------------------------------------------------------------
wait-and-print 5 is complete
wait-and-print 8 is complete
wait-and-print 7 is complete
wait-and-print 9 is complete
wait-and-print 6 is complete
wait-and-print 3 is complete
wait-and-print 0 is complete
wait-and-print 4 is complete
wait-and-print 2 is complete
wait-and-print 1 is complete
因此,您可以看到locking
的未序列化输出变得混乱,而第二种情况下的每个println
一次只能完成一次(即使顺序仍然是随机的) )。
如果println
一次打印一个字符而不是一次打印一个字符串,那么在不同步的情况下的结果将更加混乱。修改输出函数以分别打印每个字符:
(defn do-println
[& args]
(doseq [ch (str/join args)]
(print ch))
(newline))
(def lock-obj (Object.))
(defn do-println-locking
[& args]
(locking lock-obj
(apply do-println args)))
具有典型结果:
--------------------------------------
Clojure 1.10.2-alpha1 Java 15
--------------------------------------
Testing tst.demo.core
-----------------------------------------------------------------------------
wwwwwaaawwiiiattti--taaa--nnaiddnaa--dwpp-irrptaiir-niiantnttn -dw2ta- ani96ipds trn- i-pcndrota-impn nrpd4itl- n eipt5tr s e7i
incisots mc0cpo olmmieppstll ee
etctteo
e-
amnidps-l pectroeai
intt- a1n di-sip rcsio nmctmpo plm3lew etaiei
spt t-lceeatone
d
m-pplreitnet
8 is complete
-----------------------------------------------------------------------------
wait-and-print 3 is complete
wait-and-print 9 is complete
wait-and-print 8 is complete
wait-and-print 4 is complete
wait-and-print 6 is complete
wait-and-print 7 is complete
wait-and-print 0 is complete
wait-and-print 1 is complete
wait-and-print 5 is complete
wait-and-print 2 is complete
但是我们看到locking
序列化了函数调用,以便活动调用必须在下一个调用开始之前完成。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。