如何解决Mongodb:$ unwind和计算$ avg
我有存储IoT数据的文档。
在MongoDB schema design best practices for IoT之后,我来到了具有以下结构的文档:
"_id" : "AQ106_2020-09-12T09","date" : "2020-09-12T09:00:00.000Z","station" : {
"name" : "AQ106","loc" : {
"type" : "Point","coordinates" : [
14.339263,40.814224
]
},"properties" : {
}
},"samples" : [
{
"t" : ISODate("2020-09-12T11:02:00.000+02:00"),"data" : {
"pm1_mg_m3" : 2.7,"pm2_5_mg_m3" : 4.6,"pm10_mg_m3" : 12,"P0" : 152,"P1" : 16,"P2" : 4.7,"P3" : 0.8,"P4" : 0.86,"P5" : 0.6,"P6" : 0.28,"P7" : 0.152,"P8" : 0.094,"P9" : 0.092,"P10" : 0.019,"P11" : 0,"P12" : 0,"P13" : 0.0188,"P14" : 0,"P15" : 0,"P16" : 0,"P17" : 0,"P18" : 0,"P19" : 0,"P20" : 0,"P21" : 0,"P22" : 0,"P23" : 0,"temp_celsius" : 32.59,"humRelPercent" : 34,"press_mBar" : 1010.79,"CO2mA" : 4,"NO2_WE_mV" : 226.419,"NO2_AE_mV" : 229.553,"OX_WE_mV" : 252.287,"OX_AE_mV" : 220.419,"CO_WE_mV" : 509.077,"AE_WE_mV" : 348.51,"batt_V" : 13.5,"source_V" : 17.6
}
},.... additional arrays
}
现在,我想计算每小时或每天的平均值(或其他指标),以仅使用汇总数据填充新集合。 我按小时的方式编写了以下解决方案的代码:
db.collection.aggregate([{$match: {
'station.name':'AQ104'
}},{$unwind: {
path: "$samples"
}},{$group: {
_id: "$date",P0: {
$avg : "$samples.data.P0"
},temp:{
$avg:"$samples.data.temp_celsius"
}
}}])
这可行,但是我需要为原始文档中的samples.data
中的每个属性手动创建一个字段,这很繁琐。
此外,如何同时按日期和station.name分组?
您可以找到一个有效的示例here。
谢谢。
解决方法
让我们从一个简单的问题开始,即如何对多个字段进行分组?通过简单的语法更改:
{
$group: {
_id: {
date: "$date",station: "$station.name"
}
}
现在,对于第二个问题,这将更加乏味。 Mongo不支持通过其键使用自定义逻辑(在这种情况下为$avg
来“合并”对象)。因此,我们将不得不将对象转换为数组。展开它,计算每个字段的平均值,最后分组以恢复所需的结构,如下所示:
db.collection.aggregate([
{
$match: {
"station.name": "AQ106"
}
},{
$unwind: {
path: "$samples"
}
},{
$addFields: {
objArr: {
"$objectToArray": "$samples.data"
}
}
},{
$unwind: "$objArr"
},{
$group: {
_id: {
date: "$date",station: "$station.name",objKey: "$objArr.k"
},value: {
$avg: "$objArr.v"
}
}
},{
$addFields: {
data: {
"$arrayToObject": [
[
{
k: "$_id.objKey",v: "$value"
}
]
]
}
}
},{
$group: {
_id: {
date: "$_id.date",station: "$_id.station"
},data: {
"$mergeObjects": "$data"
}
}
},{
$replaceRoot: {
newRoot: {
"$mergeObjects": [
"$data","$_id"
]
}
}
}
])
-------编辑---------
对于Mongo v4.4 +,您可以使用$accumulator,它允许您在管道中执行自定义javascript代码。从规模性能方面来看,我不确定这与本地的Mongo渠道会有什么冲突。
要注意的一件事是,我在假设不同的$addFields
可能具有不同的密钥的情况下添加了初始samples
阶段。如果不是这种情况,则不需要。
db.collection.aggregate([
{
$addFields: {
sampleKeys: {
$reduce: {
input: {
$map: {
input: "$samples",as: "sample",in: {
$map: {
input: {
"$objectToArray": "$$sample.data"
},as: "sampleArrItem",in: "$$sampleArrItem.k"
}
}
}
},initialValue: [],in: {
"$setUnion": [
"$$this","$$value"
]
}
}
}
}
},{
$addFields: {
samples: {
$accumulator: {
init: function(keys){
return keys.map(k => {return {k: {v: 0,c: 0}}});
},initArgs: ["$sampleKeys"],accumulateArgs: ["$samples"],accumulate: function(state,sample){
Object.keys(state).forEach((key) => {
if (key in sample.data) {
state[key].v += sample.data[key];
state[key].c++;
};
});
return state;
},merge: function(state1,state2){
Object.keys(state1).forEach((key) => {
state1[key].v += state2[key].v;
state1[key].c += state2[key].c;
});
return state1;
},lang: "js"
}
}
}
},{
$replaceRoot: {
newRoot: {
$mergeObject: [
"$samples",{station: "$station.name",date: "$date"},]
}
}
}
])
,
我已按多个字段分组部分解决了我的问题(我认为MongoDB文档在这方面不太清楚)
db.collection.aggregate([
{
$unwind: {
path: "$samples"
}
},station: "$station.name"
},P0: {
$avg: "$samples.data.P0"
},temp: {
$avg: "$samples.data.temp_celsius"
}
}
}
])
Here更新的工作示例。
感谢汤姆·斯拉伯特(Tom Slabbaert),我通过以下查询解决了我的问题:
db.collection.aggregate([
{
$unwind: {
path: "$samples"
}
},station: "$station",{
"$project": {
_id: "$_id.date",station: "$_id.station",data: 1
}
}
])
我想知道是否可以使用新的$function
运算符来简化上述解决方案。
谢谢。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。