Clojure交换!原子并行执行

如何解决Clojure交换!原子并行执行

我正在与clojure一起玩一个脚本,以读取文件中的URI序列作为输入,并为它们的状态代码做报告。

我已经使用clojure.core.async/pipeline-async实现了此操作,以执行对URI的HTTP调用(使用httpkit异步调用)。

我想监视脚本的执行,所以我需要一个状态的原子:

(let [processing (atom [(System/currentTimeMillis) 0])]

和跟踪进度的功能。

(defn track-progress [total progress]
  (swap! progress 
         (fn [[time count]]
            (let [incremented-count (inc count)
                  now (System/currentTimeMillis)]
              (if (= 0 (mod incremented-count (max 1 (int (/ total 20)))))
                (do
                  (println (str "Progress " incremented-count "/" total " | " (- now time) "ms"))
                  [now incremented-count])
                [time incremented-count])))))

在HTTP调用后使用它:

(a/pipeline-async
      parallelism
      output-chan
      (fn [[an-id uri] result]
        (http/head uri {:throw-exceptions false
                        :timeout timeout}
                   (fn [{:keys [status error]}]
                     (track-progress total processing)
                     (a/go 
                        (if (nil? error)
                            (do (a/>! result [an-id (keyword (str status))])
                                (a/close! result))
                            (do (a/>! result [an-id :error])
                                (a/close! result)))))))
      input-chan)

使用该processing部分在let表达式中创建pipeline-async原子。
除了该日志,一切似乎都正常。 我发现有时候记录很奇怪,有这样的东西:


Progress 500/10000 | 11519ms
Progress 500/10000 | 11519msProgress 500/10000 | 11519ms

Progress 1000/10000 | 11446ms
Progress 1000/10000 | 11446ms
Progress 1500/10000 | 9503ms
Progress 2000/10000 | 7802ms
Progress 2500/10000 | 12822ms
Progress 2500/10000 | 12822msProgress 2500/10000 | 12822ms
Progress 2500/10000 | 12822ms

Progress 3000/10000 | 10623ms
Progress 3500/10000 | 9018ms
Progress 4000/10000 | 9618ms
Progress 4500/10000 | 13544ms
Progress 5000/10000 | 10541ms
Progress 5500/10000 | 10817ms
Progress 6000/10000 | 8921ms
Progress 6500/10000 | 9078ms
Progress 6500/10000 | 9078ms
Progress 7000/10000 | 9270ms
Progress 7500/10000 | 11826msProgress 7500/10000 | 11826msProgress 7500/10000 | 11826ms

输出的格式与在shell中写入时一样,似乎有时多次执行相同的println,或者在以下情况下执行传递给fn函数的swap!:在原子中并行(无并发)。 (如果println我删除了str以创建要打印的字符串,则多次具有相同进度的行将像ProgressProgress 7500/10000 | 11826ms7500/100007500 | 11826msProgress/10000 | 11826ms一样完全混淆在一起)

我的代码有问题吗?
还是我误解了atom,因为我认为它不允许并行执行改变其状态的函数?

解决方法

Clojure原子是专门设计的,以便在多线程程序中,可以有多个线程在单个原子上执行swap!,如果您的程序这样做,则那些给定的更新函数f swap! 可以同时运行。 swap!中唯一同步的部分是有效执行的“比较和交换”操作:

  • 锁定原子的状态
  • 检查其当前值是否为identical?开始执行之前所包含的引用的f,如果是,则将其替换为f返回的新对象。
  • 解锁原子状态”。

函数f可能需要很长时间才能从当前值中计算出一个新值,但是上面的关键部分只是一个指针比较,如果相等,则是一个指针分配。

这就是为什么swap!的文档字符串说“请注意f可能被多次调用,因此应该没有副作用。”

,

您想要的是序列化一组并发执行线程的输出流。您可以使用代理来序列化对一个可变状态的访问,但是在这里,您有一个简陋的案例,没有状态,只有副作用。对于这种情况,the locking function就是您所需要的。

一个例子:

(ns tst.demo.core
  (:use demo.core tupelo.core tupelo.test))

(defn do-println
  [& args]
  (apply println args))

(def lock-obj (Object.))
(defn do-println-locking
  [& args]
  (locking lock-obj
    (apply println args)))

(def sleep-millis 500)
(defn wait-and-print
  [print-fn id]
  (Thread/sleep sleep-millis)
  (print-fn (format "wait-and-print %s is complete" id)))

(defn start-threads
  [print-fn count]
  (println "-----------------------------------------------------------------------------")
  (let [futures (forv [i (range count)]
                  (future (wait-and-print print-fn i)))]
    (doseq [future futures]
      ; block until future is complete
      (deref future))))

(dotest
  (start-threads do-println 10)
  (start-threads do-println-locking 10))

典型结果:

--------------------------------------
   Clojure 1.10.2-alpha1    Java 15
--------------------------------------

Testing tst.demo.core
-----------------------------------------------------------------------------
wait-and-print 4 is completewait-and-print 3 is completewait-and-print 2 is complete
wait-and-print 8 is completewait-and-print 9 is complete
wait-and-print 6 is completewait-and-print 1 is complete

wait-and-print 7 is complete
wait-and-print 0 is complete

wait-and-print 5 is complete


-----------------------------------------------------------------------------
wait-and-print 5 is complete
wait-and-print 8 is complete
wait-and-print 7 is complete
wait-and-print 9 is complete
wait-and-print 6 is complete
wait-and-print 3 is complete
wait-and-print 0 is complete
wait-and-print 4 is complete
wait-and-print 2 is complete
wait-and-print 1 is complete

因此,您可以看到locking的未序列化输出变得混乱,而第二种情况下的每个println一次只能完成一次(即使顺序仍然是随机的) )。

如果println一次打印一个字符而不是一次打印一个字符串,那么在不同步的情况下的结果将更加混乱。修改输出函数以分别打印每个字符:

(defn do-println
  [& args]
  (doseq [ch (str/join args)]
    (print ch))
  (newline))

(def lock-obj (Object.))
(defn do-println-locking
  [& args]
  (locking lock-obj
    (apply do-println args)))

具有典型结果:

--------------------------------------
   Clojure 1.10.2-alpha1    Java 15
--------------------------------------

Testing tst.demo.core
-----------------------------------------------------------------------------
wwwwwaaawwiiiattti--taaa--nnaiddnaa--dwpp-irrptaiir-niiantnttn  -dw2ta-  ani96ipds trn- i-pcndrota-impn nrpd4itl- n eipt5tr s e7i 
 incisots   mc0cpo olmmieppstll ee
etctteo
e-
 amnidps-l pectroeai
intt- a1n di-sip rcsio nmctmpo plm3lew etaiei
spt t-lceeatone
d
m-pplreitnet
 8 is complete
-----------------------------------------------------------------------------
wait-and-print 3 is complete
wait-and-print 9 is complete
wait-and-print 8 is complete
wait-and-print 4 is complete
wait-and-print 6 is complete
wait-and-print 7 is complete
wait-and-print 0 is complete
wait-and-print 1 is complete
wait-and-print 5 is complete
wait-and-print 2 is complete

但是我们看到locking序列化了函数调用,以便活动调用必须在下一个调用开始之前完成。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-