//批量插入数据 String sql = "INSERT INTO `r` (`data`,`name`) VALUES (?,?)"; jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement preparedStatement, int i) throws SQLException { preparedStatement.setString(1, PAYLOAD); preparedStatement.setString(2, "item" + i); }
@Override public int getBatchSize() { return ROWS; } }); log.info("init mysql finished with count {}", jdbcTemplate.queryForObject("SELECT COUNT(*) FROM `r`", Long.class)); }
@GetMapping("mysql") public void mysql() { //根据随机name来查data,name字段有索引,结果应该等于PAYLOAD Assert.assertTrue(jdbcTemplate.queryForObject("SELECT data FROM `r` WHERE name=?", new Object[]{("item" + (ThreadLocalRandom.current().nextInt(CommonMistakesApplication.ROWS) + 1))}, String.class) .equals(CommonMistakesApplication.PAYLOAD)); }
@GetMapping("redis2") public void redis2() { Assert.assertTrue(stringRedisTemplate.keys("item71*").size() == 1111); } @GetMapping("mysql2") public void mysql2() { Assert.assertTrue(jdbcTemplate.queryForList("SELECT name FROM `r` WHERE name LIKE 'item71%'", String.class).size() == 1111); }
@PostConstruct public void init() { //使用-Dspring.profiles.active=init启动程序进行初始化 if (Arrays.stream(standardEnvironment.getActiveProfiles()).anyMatch(s -> s.equalsIgnoreCase("init"))) { initInfluxDB(); initMySQL(); } }
//初始化MySQL private void initMySQL() { long begin = System.currentTimeMillis(); jdbcTemplate.execute("DROP TABLE IF EXISTS `m`;"); //只有ID、值和时间戳三列 jdbcTemplate.execute("CREATE TABLE `m` (\n" + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + " `value` bigint NOT NULL,\n" + " `time` timestamp NOT NULL,\n" + " PRIMARY KEY (`id`),\n" + " KEY `time` (`time`) USING BTREE\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;");
String sql = "INSERT INTO `m` (`value`,`time`) VALUES (?,?)"; //批量插入数据 jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement preparedStatement, int i) throws SQLException { preparedStatement.setLong(1, ThreadLocalRandom.current().nextInt(10000)); preparedStatement.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now().minusSeconds(5 * i))); }
@Override public int getBatchSize() { return ROWS; } }); log.info("init mysql finished with count {} took {}ms", jdbcTemplate.queryForObject("SELECT COUNT(*) FROM `m`", Long.class), System.currentTimeMillis()-begin); }
//初始化InfluxDB private void initInfluxDB() { long begin = System.currentTimeMillis(); OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder() .connectTimeout(1, TimeUnit.SECONDS) .readTimeout(10, TimeUnit.SECONDS) .writeTimeout(10, TimeUnit.SECONDS); try (InfluxDB influxDB = InfluxDBFactory.connect("http://127.0.0.1:8086", "root", "root", okHttpClientBuilder)) { String db = "performance"; influxDB.query(new Query("DROP DATABASE " + db)); influxDB.query(new Query("CREATE DATABASE " + db)); //设置数据库 influxDB.setDatabase(db); //批量插入,10000条数据刷一次,或1秒刷一次 influxDB.enableBatch(BatchOptions.DEFAULTS.actions(10000).flushDuration(1000)); IntStream.rangeClosed(1, ROWS).mapToObj(i -> Point .measurement("m") .addField("value", ThreadLocalRandom.current().nextInt(10000)) .time(LocalDateTime.now().minusSeconds(5 * i).toInstant(ZoneOffset.UTC).toEpochMilli(), TimeUnit.MILLISECONDS).build()) .forEach(influxDB::write); influxDB.flush(); log.info("init influxdb finished with count {} took {}ms", influxDB.query(new Query("SELECT COUNT(*) FROM m")).getResults().get(0).getSeries().get(0).getValues().get(0).get(1), System.currentTimeMillis()-begin); } } }
启动后,程序输出了如下日志:
1 2
[16:08:25.062] [main] [INFO ] [o.g.t.c.n.i.CommonMistakesApplication:104 ] - init influxdb finished with count 1.0E7 took 54280ms [16:11:50.462] [main] [INFO ] [o.g.t.c.n.i.CommonMistakesApplication:80 ] - init mysql finished with count 10000000 took 205394ms
@Autowired private JdbcTemplate jdbcTemplate; @GetMapping("mysql") public void mysql() { long begin = System.currentTimeMillis(); //使用SQL从MySQL查询,按照小时分组 Object result = jdbcTemplate.queryForList("SELECT date_format(time,'%Y%m%d%H'),max(value),min(value),avg(value) FROM m WHERE time>now()- INTERVAL 60 DAY GROUP BY date_format(time,'%Y%m%d%H')"); log.info("took {} ms result {}", System.currentTimeMillis() - begin, result); }
@GetMapping("influxdb") public void influxdb() { long begin = System.currentTimeMillis(); try (InfluxDB influxDB = InfluxDBFactory.connect("http://127.0.0.1:8086", "root", "root")) { //切换数据库 influxDB.setDatabase("performance"); //InfluxDB的查询语法InfluxQL类似SQL Object result = influxDB.query(new Query("SELECT MEAN(value),MIN(value),MAX(value) FROM m WHERE time > now() - 60d GROUP BY TIME(1h)")); log.info("took {} ms result {}", System.currentTimeMillis() - begin, result); } }
InfluxDB提供的tag功能,可以为每一个指标设置多个标签,并且tag有索引,可以对tag进行条件搜索或分组。但是,tag只能保存有限的、可枚举的标签,不能保存URL等信息,否则可能会出现high series cardinality问题,导致占用大量内存,甚至是OOM。你可以点击这里,查看series和内存占用的关系。对于InfluxDB,我们无法把URL这种原始数据保存到数据库中,只能把数据进行归类,形成有限的tag进行保存。
[22:04:00.951] [http-nio-45678-exec-6] [INFO ] [o.g.t.c.n.esvsmyql.PerformanceController:48 ] - took 48 ms result 2100 Hibernate: select count(news0_.id) as col_0_0_ from news news0_ where news0_.cateid=? and (news0_.content like ? escape ?) and (news0_.content like ? escape ?) [22:04:11.946] [http-nio-45678-exec-7] [INFO ] [o.g.t.c.n.esvsmyql.PerformanceController:39 ] - took 6637 ms result 2100
@GetMapping("mysql2") public void mysql2(@RequestParam(value = "id", defaultValue = "400000") long id) { long begin = System.currentTimeMillis(); //对于MySQL,使用JdbcTemplate+SQL语句,实现直接更新某个category字段,更新1000次 IntStream.rangeClosed(1, 1000).forEach(i -> { jdbcTemplate.update("UPDATE `news` SET category=? WHERE id=?", new Object[]{"test" + i, id}); }); log.info("mysql took {} ms result {}", System.currentTimeMillis() - begin, newsMySQLRepository.findById(id)); }
@GetMapping("es2") public void es(@RequestParam(value = "id", defaultValue = "400000") long id) { long begin = System.currentTimeMillis(); IntStream.rangeClosed(1, 1000).forEach(i -> { //对于ES,通过ElasticsearchTemplate+自定义UpdateQuery,实现文档的部分更新 UpdateQuery updateQuery = null; try { updateQuery = new UpdateQueryBuilder() .withIndexName("news") .withId(String.valueOf(id)) .withType("_doc") .withUpdateRequest(new UpdateRequest().doc( jsonBuilder() .startObject() .field("category", "test" + i) .endObject())) .build(); } catch (IOException e) { e.printStackTrace(); } elasticsearchTemplate.update(updateQuery); }); log.info("es took {} ms result {}", System.currentTimeMillis() - begin, newsESRepository.findById(id).get()); }