Ultimate Spider【终极蜘蛛王】,一款用于网络数据采集的工具。本文对Ultimate Spider的整体架构和一些技术亮点作说明。

Ultimate Spider主要页面有:1、用来展示概览信息的仪表盘页;2、爬虫配置页。

仪表盘页

爬虫配置页

整体架构如下图所示:

整体架构

# 爬虫配置

在WebMagic爬虫框架的基础上对初始Url和爬取内容进行可视化配置,爬取规则的制定更加灵活多变,适应范围更广。

  • SpiderProcessor.java 根据爬取规则进行爬取
@Slf4j
@EnableConfigurationProperties({SpiderProperties.class})
public class SpiderProcessor implements PageProcessor {

    private SpiderProperties spiderProperties;

    private String spiderName;

    private Crawler crawler;

    public SpiderProcessor(SpiderProperties spiderProperties, String spiderName, Crawler crawler) {
        this.spiderProperties = spiderProperties;
        this.spiderName = spiderName;
        this.crawler = crawler;
    }

    @Override
    public void process(Page page) {
        if (CollectionUtils.isNotEmpty(crawler.getCrawlConfigs())) {
            page.putField(Constants.SPIDER_NAME, spiderName);
            JSONObject content = new JSONObject();
            for (Crawler.CrawlConfig crawlConfig : crawler.getCrawlConfigs()) {
                if (CollectionUtils.isNotEmpty(crawlConfig.getCrawlRules())) {
                    Selectable selector = page.getHtml();
                    for (Crawler.CrawlRule crawlRule : crawlConfig.getCrawlRules()) {
                        switch (crawlRule.getCrawlType()) {
                            case XPATH:
                                selector = selector.xpath(crawlRule.getRule());
                                break;
                            case CSS:
                                if (StringUtils.isBlank(crawlRule.getAttr())) {
                                    selector = selector.css(crawlRule.getRule());
                                } else {
                                    selector = selector.css(crawlRule.getRule(), crawlRule.getAttr());
                                }
                                break;
                            case LINKS:
                                selector = selector.links();
                                break;
                            case REGEX:
                                selector = selector.regex(crawlRule.getRule());
                                break;
                            case REGEX_WITH_GROUP:
                                selector = selector.regex(crawlRule.getRule(), crawlRule.getGroup());
                                break;
                            case REPLACE:
                                selector = selector.replace(crawlRule.getRule(), crawlRule.getReplacement());
                                break;
                            default:
                                log.warn("not support crawl rule type: {}", crawlRule.getCrawlType());
                        }
                    }
                    if (crawlConfig.isMultiResult()) {
                        List<String> value = selector.all();
                        if (crawlConfig.isNullSkip() && CollectionUtils.isEmpty(value)) {
                            page.setSkip(true);
                            break;
                        }
                        if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TARGET_URL) {
                            page.addTargetRequests(value);
                        } else if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TEXT) {
                            content.put(crawlConfig.getCrawlKey(), value);
                        } else {
                            log.warn("not support crawl result type: {}", crawlConfig.getCrawlResultType());
                        }
                    } else {
                        String value = selector.get();
                        if (crawlConfig.isNullSkip() && StringUtils.isBlank(value)) {
                            page.setSkip(true);
                            break;
                        }
                        if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TARGET_URL) {
                            page.addTargetRequest(value);
                        } else if (crawlConfig.getCrawlResultType() == Crawler.CrawlResultType.TEXT) {
                            content.put(crawlConfig.getCrawlKey(), value);
                        } else {
                            log.warn("not support crawl result type: {}", crawlConfig.getCrawlResultType());
                        }
                    }
                }
            }
            content.put(Constants.CRAWL_AT, new Date());
            page.putField(Constants.SPIDER_CONTENT, content);
        }
    }

    @Override
    public Site getSite() {
        return Site.me()
                .setRetryTimes(spiderProperties.getRetryTimes())
                .setRetrySleepTime(spiderProperties.getRetrySleepTime())
                .setSleepTime(spiderProperties.getSleepTime())
                .setTimeOut(spiderProperties.getTimeout());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

# 爬虫执行

使用Quartz定时任务框架完成爬虫任务的调度,可以随时暂停和恢复爬虫任务。

  • TaskServiceImpl.java 爬虫任务服务实现类
@Slf4j
@Service
public class TaskServiceImpl implements TaskService {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private TaskRepository taskRepository;

    @Autowired
    private UltimateSpiderRepository ultimateSpiderRepository;

    @Override
    public Result getSpiderTask(Integer spiderId) {
        if (!ValidateUtils.validId(spiderId)) {
            return Result.fail(ResultCode.PARAMS_ERROR);
        }
        Task crawlTask = taskRepository.findOneBySpiderIdAndTaskType(spiderId, Task.TaskType.CRAWL);
        Task cleanTask = taskRepository.findOneBySpiderIdAndTaskType(spiderId, Task.TaskType.CLEAN);
        JSONObject spiderTask = new JSONObject();
        spiderTask.fluentPut(Constants.CRAWL_TASK, crawlTask).fluentPut(Constants.CLEAN_TASK, cleanTask);
        return Result.ok(spiderTask);
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Result saveTask(Task task) throws SchedulerException {
        if (!task.isValid(false)) {
            return Result.fail(ResultCode.PARAMS_ERROR);
        }
        if (task.getJobStatus() == null) {
            task.setJobStatus(Task.JobStatus.RUNNING);
        }
        Task flushedTask = taskRepository.saveAndFlush(task);
        UltimateSpider spider = ultimateSpiderRepository.findOne(task.getSpiderId());
        JobKey jobKey = JobUtils.generateJobKey(spider, task.getTaskType());
        if (task.getJobStatus() == Task.JobStatus.RUNNING) {
            TriggerKey triggerKey = JobUtils.generateTriggerKey(spider, task.getTaskType());
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withSchedule(CronScheduleBuilder.cronSchedule(task.getCronExpression()))
                    .withIdentity(triggerKey)
                    .build();
            if (scheduler.checkExists(jobKey)) {
                scheduler.rescheduleJob(triggerKey, trigger);
            } else {
                //noinspection unchecked
                JobDetail jobDetail = JobBuilder.newJob(task.getTaskType().getJobClass())
                        .withIdentity(jobKey)
                        .usingJobData(Constants.JOB_TASK_ID, flushedTask.getId())
                        .storeDurably()
                        .build();
                scheduler.scheduleJob(jobDetail, trigger);
            }
        } else if (task.getJobStatus() == Task.JobStatus.PAUSE) {
            if (scheduler.checkExists(jobKey)) {
                scheduler.pauseJob(jobKey);
            }
        }
        return Result.ok(flushedTask);
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Result pauseTask(Integer id) throws SchedulerException {
        if (!ValidateUtils.validId(id)) {
            return Result.fail(ResultCode.PARAMS_ERROR);
        }
        Task task = taskRepository.findOne(id);
        if (task == null || !task.isValid(false)) {
            return Result.fail(ResultCode.DATA_VALID_ERROR);
        }
        UltimateSpider spider = ultimateSpiderRepository.findOne(task.getSpiderId());
        JobKey jobKey = JobUtils.generateJobKey(spider, task.getTaskType());
        if (scheduler.checkExists(jobKey)) {
            scheduler.pauseJob(jobKey);
        }
        task.setJobStatus(Task.JobStatus.PAUSE);
        taskRepository.save(task);
        return Result.ok();
    }

    @Override
    public Result resumeTask(Integer id) throws SchedulerException {
        if (!ValidateUtils.validId(id)) {
            return Result.fail(ResultCode.PARAMS_ERROR);
        }
        Task task = taskRepository.findOne(id);
        if (task == null || !task.isValid(false)) {
            return Result.fail(ResultCode.DATA_VALID_ERROR);
        }
        resumeTask(task);
        task.setJobStatus(Task.JobStatus.RUNNING);
        taskRepository.save(task);
        return Result.ok();
    }

    @Override
    public void resumeTask(Task task) throws SchedulerException {
        UltimateSpider spider = ultimateSpiderRepository.findOne(task.getSpiderId());
        JobKey jobKey = JobUtils.generateJobKey(spider, task.getTaskType());
        if (scheduler.checkExists(jobKey)) {
            scheduler.resumeJob(jobKey);
        } else {
            //noinspection unchecked
            JobDetail jobDetail = JobBuilder.newJob(task.getTaskType().getJobClass())
                    .withIdentity(jobKey)
                    .usingJobData(Constants.JOB_TASK_ID, task.getId())
                    .storeDurably()
                    .build();
            TriggerKey triggerKey = JobUtils.generateTriggerKey(spider, task.getTaskType());
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withSchedule(CronScheduleBuilder.cronSchedule(task.getCronExpression()))
                    .withIdentity(triggerKey)
                    .build();
            scheduler.scheduleJob(jobDetail, trigger);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

# 数据清洗

数据清洗任务同样使用Quartz进行调度,确保爬取数据的质量。清洗规则同样可在页面上进行配置。

  • CleanerJob.java 清洗任务
@Slf4j
public class CleanerJob implements Job {

    @Autowired
    private UltimateSpiderRepository ultimateSpiderRepository;

    @Autowired
    private TaskRepository taskRepository;

    @Autowired
    private CleanerRepository cleanerRepository;

    @Autowired
    private MongoTemplate mongoTemplate;

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        Integer taskId = (Integer) jobExecutionContext.getJobDetail().getJobDataMap().get(Constants.JOB_TASK_ID);
        Task task = taskRepository.findOne(taskId);
        UltimateSpider ultimateSpider = ultimateSpiderRepository.findOne(task.getSpiderId());
        Cleaner cleaner = cleanerRepository.findOneBySpiderId(task.getSpiderId());
        if (cleaner == null || !cleaner.isValid(true)) {
            log.warn("[{}] cleaner is invalid", ultimateSpider.getSpiderName());
            return;
        }
        log.info("[{}] cleaner start", ultimateSpider.getSpiderName());
        List<JSONObject> purgeList = Lists.newArrayList();
        List<JSONObject> mergeList = Lists.newArrayList();
        for (Cleaner.CleanerRule cleanerRule : cleaner.getCleanerRules()) {
            if (cleanerRule.getCleanType() == Cleaner.CleanType.MERGE) {
                List<JSONObject> queryResult = mongoTemplate.find(new BasicQuery(cleanerRule.getQueryRule()),
                        JSONObject.class, ultimateSpider.getSpiderName());
                Map<String, List<JSONObject>> mergeMap = Maps.newHashMap();
                queryResult.forEach(result -> {
                    List<String> keyItems = Lists.newLinkedList();
                    for (String distinctKey : cleanerRule.getDistinctKeys()) {
                        StringBuilder keyItemBuilder = new StringBuilder();
                        if (!result.containsKey(distinctKey)) {
                            break;
                        }
                        keyItemBuilder.append(distinctKey).append(Constants.MERGE_KEY_DELIMITER).append(result.get(distinctKey));
                        keyItems.add(keyItemBuilder.toString());
                    }
                    if (keyItems.size() == cleanerRule.getDistinctKeys().size()) {
                        String mergeKey = StringUtils.join(Constants.MERGE_VALUE_DELIMITER, keyItems);
                        List<JSONObject> resultMergeList = mergeMap.getOrDefault(mergeKey, Lists.newArrayList());
                        resultMergeList.add(result);
                        mergeMap.putIfAbsent(mergeKey, Lists.newArrayList());
                    }
                });
                mergeMap.forEach((mergeKey, resultMergeList) -> {
                    if (resultMergeList.size() > 1) {
                        purgeList.addAll(resultMergeList);
                        JSONObject mergeResult = resultMergeList.get(0);
                        resultMergeList.remove(0);
                        resultMergeList.forEach(result -> {
                            for (String resultMergeKey : cleanerRule.getMergeKeys()) {
                                if (mergeResult.get(resultMergeKey) instanceof JSONArray) {
                                    JSONArray jsonArray = (JSONArray) mergeResult.getOrDefault(resultMergeKey, new JSONArray());
                                    if (!jsonArray.contains(result.get(resultMergeKey))) {
                                        jsonArray.add(result.get(resultMergeKey));
                                        mergeResult.put(resultMergeKey, jsonArray);
                                    }
                                } else {
                                    JSONArray jsonArray = new JSONArray();
                                    jsonArray.add(mergeResult.get(resultMergeKey));
                                    jsonArray.add(result.get(resultMergeKey));
                                    mergeResult.put(resultMergeKey, jsonArray);
                                }
                            }
                        });
                        mergeList.add(mergeResult);
                    }
                });
            } else if (cleanerRule.getCleanType() == Cleaner.CleanType.PURGE) {
                purgeList.addAll(mongoTemplate.find(new BasicQuery(cleanerRule.getQueryRule()),
                        JSONObject.class, ultimateSpider.getSpiderName()));
            }
        }
        purgeList.forEach(purgeObject -> mongoTemplate.remove(purgeObject, ultimateSpider.getSpiderName()));
        mergeList.forEach(mergeObject -> mongoTemplate.save(mergeObject, ultimateSpider.getSpiderName()));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

# 数据持久化

使用RabbitMQ消息队列异步保存爬取结果至MongoDB文档型数据库,性能更好,更适合格式多变的爬取结果的存储。

  • 消息消费者存储爬取结果至MongoDB
@Slf4j
@Component
@RabbitListener(queues = Constants.QUEUE_NAME)
public class Receiver {

    @Autowired
    private MongoTemplate mongoTemplate;

    @RabbitHandler
    public void receive(String spiderResultJson) {
        SpiderResult spiderResult = JSON.parseObject(spiderResultJson, SpiderResult.class);
        log.info("queue: {}, spider_result: {}", Constants.QUEUE_NAME, spiderResult);
        mongoTemplate.save(spiderResult.getContent(), spiderResult.getSpiderName());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 数据可视化

使用较为成熟的Metabase进行采集数据的可视化。

Metabase采集数据可视化