a亚洲精品_精品国产91乱码一区二区三区_亚洲精品在线免费观看视频_欧美日韩亚洲国产综合_久久久久久久久久久成人_在线区

首頁 > 數據庫 > MongoDB > 正文

基于Morphia實現MongoDB按小時、按天聚合操作方法

2020-10-29 18:41:30
字體:
來源:轉載
供稿:網友

MongoDB按照天數或小時聚合

需求

最近接到需求,需要對用戶賬戶下的設備狀態,分別按照天以及小時進行聚合,以此為基礎繪制設備狀態趨勢圖.
實現思路是啟動定時任務,對各用戶的設備狀態數據分別按照小時以及天進行聚合,并存儲進數據庫中供用戶后續查詢.
涉及到的技術棧分別為:Spring Boot,MongoDB,Morphia.

數據模型

@Data@Builder@Entity(value = "rawDevStatus", noClassnameStored = true)// 設備狀態索引@Indexes({    // 設置數據超時時間(TTL,MongoDB根據TTL在后臺進行數據刪除操作)    @Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),    @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})})public class RawDevStatus {  @Id  @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)  private ObjectId objectId;  private String userId;  private Instant time;  @Embedded("points")  List<Point> protocolPoints;  @Data  @AllArgsConstructor  public static class Point {    /**     * 協議類型     */    private Protocol protocol;    /**     * 設備總數     */    private Integer total;    /**     * 設備在線數目     */    private Integer onlineNum;    /**     * 處于啟用狀態設備數目     */    private Integer enableNum;  }}

上述代碼是設備狀態實體類,其中設備狀態數據是按照設備所屬協議進行區分的.

@Data@Builder@Entity(value = "aggregationDevStatus", noClassnameStored = true)@Indexes({    @Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),    @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})})public class AggregationDevStatus {  @Id  @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)  private ObjectId objectId;  /**   * 用戶ID   */  private String userId;  /**   * 設備總數   */  private Double total;  /**   * 設備在線數目   */  private Double onlineNum;  /**   * 處于啟用狀態設備數目   */  private Double enableNum;  /**   * 聚合類型(按照小時還是按照天聚合)   */  @Property("aggDuration")  private AggregationDuration aggregationDuration;  private Instant time;  /**   * 動態設置文檔過期時間   */  private Instant expireAt;}

上述代碼是期待的聚合結果,其中構建兩個索引:(1)超時索引;(2)復合索引,程序會根據用戶名以及時間查詢設備狀態聚合結果.

聚合操作符介紹

聚合操作類似于管道,管道中的每一步操作產生的中間結果作為下一步的輸入源,最終輸出聚合結果.

此次聚合主要涉及以下操作:

•$project:指定輸出文檔中的字段.
•$unwind:拆分數據中的數組;
•match:選擇要處理的文檔數據;
•group:根據key分組聚合結果.

原始聚合語句

db.getCollection('raw_dev_status').aggregate([  {$match:    {      time:{$gte: ISODate("2019-06-27T00:00:00Z")},    }  },  {$unwind: "$points"},  {$project:    {      userId:1,points:1,      tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }    }  },  {$project:    {      userId:1,points:1,      groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }    }  },  {$group:    {      _id:{user_id:'$userId', cal_time:'$groupTime'},      devTotal:{'$avg':'$points.total'},      onlineTotal:{'$avg':'$points.onlineNum'},      enableTotal:{'$avg':'$points.enableNum'}    }  },])

上述代碼是按小時聚合數據,以下來逐步介紹處理思路:

(1) $match

根據小時聚合數據,因為只需要獲取近24小時的聚合結果,所以對數據進行初步篩選.

(2) $unwind

raw_dev_status中的設備狀態是按照協議區分的數組,因此需要對其進行展開,以便下一步進行篩選;

(3) $project

  {$project:    {      userId:1,points:1,      tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }    }  }

選擇需要輸出的數據,分別為:userId,points以及tmp.

需要注意,為了按照時間聚合,對$time屬性進行操作,提取%Y:%m:%dT%H時信息至$tmp作為下一步的聚合依據.

如果需要按天聚合,則format數據可修改為:%Y:%m:%dT00:00:00Z即可滿足要求.

(4) $project

  {$project:    {      userId:1,points:1,      groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }    }  }

因為上一步project操作中,tmp為字符串數據,最終的聚合結果需要時間戳(主要懶,不想在程序中進行轉換操作).
因此,此處對$tmp進行操作,轉換為時間類型數據,即groupTime.

(5) $group

對聚合結果進行分類操作,并生成最終輸出結果.

 {$group:    {      # 根據_id進行分組操作,依據是`user_id`以及`$groupTime`      _id:{user_id:'$userId', cal_time:'$groupTime'},      # 求設備總數平均值      devTotal:{'$avg':'$points.total'},      # 求設備在線數平均值      onlineTotal:{'$avg':'$points.onlineNum'},      # ...      enableTotal:{'$avg':'$points.enableNum'}    }  }

代碼編寫

此處ODM選擇Morphia,亦可以使用MongoTemplate,原理類似.

 /**   * 創建聚合條件   *   * @param pastTime   過去時間段   * @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)   * @return 聚合條件   */  private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {    Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class);    return datastore.createAggregation(RawDevStatus.class)        .match(query.field("time").greaterThanOrEq(pastTime))        .unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))        .match(query.field("points.protocol").equal("ALL"))        .project(Projection.projection("userId"),            Projection.projection("points"),            Projection.projection("convertTime",                Projection.expression("$dateToString",                    new BasicDBObject("format", dateToString)                        .append("date", "$time"))            )        )        .project(Projection.projection("userId"),            Projection.projection("points"),            Projection.projection("convertTime",                Projection.expression("$dateFromString",                    new BasicDBObject("format", stringToDate)                        .append("dateString", "$convertTime"))            )        )        .group(            Group.id(Group.grouping("userId"), Group.grouping("convertTime")),            Group.grouping("total", Group.average("points.total")),            Group.grouping("onlineNum", Group.average("points.onlineNum")),            Group.grouping("enableNum", Group.average("points.enableNum"))        );  }  /**   * 獲取聚合結果   *   * @param pipeline 聚合條件   * @return 聚合結果   */  private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {    List<AggregationMidDevStatus> statuses = new ArrayList<>();    Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate(        AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());    while (resultIterator.hasNext()) {      statuses.add(resultIterator.next());    }    return statuses;  }  //......................................................................................  // 獲取聚合結果(省略若干代碼)  AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);  List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);  if (CollectionUtils.isEmpty(midStatuses)) {    log.warn("Can not get dev status aggregation result.");    return;  }

總結

以上所述是小編給大家介紹的基于Morphia實現MongoDB按小時、按天聚合操作方法,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對武林網網站的支持!
如果你覺得本文對你有幫助,歡迎轉載,煩請注明出處,謝謝!

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 午夜婷婷色 | 国产三级精品三级 | 国产久| 国产成人久久777777 | 国产精品99 | 国产1区在线观看 | 伊人激情影院 | 夜夜操av | 可以看黄的视频 | 久久成人精品 | 狠狠操精品视频 | 国产小视频在线播放 | 亚洲免费观看 | 在线国产一区二区 | 国产成人61精品免费看片 | 超碰av人人| 日韩精品一二区 | 精品国产影院 | 国产成人一区二区三区影院在线 | 日韩高清中文字幕 | 欧美色综合一区二区三区 | 亚洲精品国产综合区久久久久久久 | 久久久久久久国产精品 | 9久久精品| 免费一级淫片aaa片毛片a级 | 国产一级淫片a级aaa | 欧美一区二区三区四区五区 | 久久在线视频 | 神马久久精品 | 91在线免费观看 | 国产精品久久久久久久久免费高清 | 欧美日韩久久久 | 欧美激情欧美激情在线五月 | jjzz18国产 | 超碰一区| 天天操狠狠操 | 亚洲免费三级 | 国产精品对白一区二区三区 | 涩涩视频在线 | 国产成人在线视频 | 成人三级视频 |