如何解决使用Java读取Spark Redis保存的数据
我使用spark-redis将数据集保存到Redis。 然后,我使用Spring data redis读取了这些数据:
我保存到redis的对象
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@RedisHash("collaborative_filtering")
public class RatingResult implements Serializable {
private static final long serialVersionUID = 8755574422193819444L;
@Id
private String id;
@Indexed
private int user;
@Indexed
private String product;
private double productN;
private double rating;
private float prediction;
public static RatingResult convert(Row row) {
int user = row.getAs("user");
String product = row.getAs("product");
double productN = row.getAs("productN");
double rating = row.getAs("rating");
float prediction = row.getAs("prediction");
String id = user + product;
return RatingResult.builder().id(id).user(user).product(product).productN(productN).rating(rating)
.prediction(prediction).build();
}
}
使用spark-redis保存对象
JavaRDD<RatingResult> result = ...
...
sparkSession.createDataFrame(result,RatingResult.class).write().format("org.apache.spark.sql.redis")
.option("table","collaborative_filtering").mode(SaveMode.Overwrite).save();
存储库:
@Repository
public interface RatingResultRepository extends JpaRepository<RatingResult,String> {
}
我无法读取使用Spring数据redis保存在Redis中的数据,因为spark-redis和spring数据redis保存的结构数据不相同(我检查了spark-redis和spring数据redis创建的键的值使用命令不同:redis-cli -p 6379 keys \*
和redis-cli hgetall $key
)
那么如何使用Java或Java中的任何库保存了如何读取此数据?
解决方法
以下对我有用。
从spark-redis写入数据。
我在这里使用Scala,但是本质上与Java中相同。我唯一更改的是添加了.option("key.column","id")
以指定哈希ID。
val ratingResult = new RatingResult("1",1,"product1",2.0,3.0,4)
val result: JavaRDD[RatingResult] = spark.sparkContext.parallelize(Seq(ratingResult)).toJavaRDD()
spark
.createDataFrame(result,classOf[RatingResult])
.write
.format("org.apache.spark.sql.redis")
.option("key.column","id")
.option("table","collaborative_filtering")
.mode(SaveMode.Overwrite)
.save()
在spring-data-redis中,我有以下内容:
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@RedisHash("collaborative_filtering")
public class RatingResult implements Serializable {
private static final long serialVersionUID = 8755574422193819444L;
@Id
private String id;
@Indexed
private int user;
@Indexed
private String product;
private double productN;
private double rating;
private float prediction;
@Override
public String toString() {
return "RatingResult{" +
"id='" + id + '\'' +
",user=" + user +
",product='" + product + '\'' +
",productN=" + productN +
",rating=" + rating +
",prediction=" + prediction +
'}';
}
}
我使用CrudRepository代替JPA:
@Repository
public interface RatingResultRepository extends CrudRepository<RatingResult,String> {
}
查询:
RatingResult found = ratingResultRepository.findById("1").get();
System.out.println("found = " + found);
输出:
found = RatingResult{id='null',user=1,product='product1',productN=2.0,rating=3.0,prediction=4.0}
您可能会注意到没有填充id
字段,因为存储的spark-redis具有哈希ID,而不是哈希属性。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。