在上一篇中介绍了连接Zookeeper集群的方法,这一篇将围绕一个有趣的话题---来展开,这就是Replication(索引复制),关于Solr Replication的详细介绍,可以参考http://wiki.apache.org/solr/SolrReplication。
在开始这个话题之前,先从我最近在应用中引入solr的master/slave架构时,遇到的一个让我困扰的实际问题。
应用场景简单描述如下:
1)首先master节点下载索引分片,然后创建配置文件,加入master节点的replication配置片段,再对索引分片进行合并(关于mergeIndex,可以参考http://wiki.apache.org/solr/MergingSolrIndexes),然后利用上述配置文件和索引数据去创建一个solr核。
2)slave节点创建配置文件,加入slave节点的replication配置片段,创建一个空的solr核,等待从master节点进行索引数据同步出现的问题:slave节点没有从master节点同步到数据。
问题分析:
1)首先检查master节点,获取最新的可复制索引的版本号,
http://master_host:port/solr/replication?command=indexversion
发现返回的索引版本号是0,这说明mater节点根本没有触发replication动作,
2)为了确认上述判断,在slave节点上进一步查看replication的详细信息
http://slave_host:port/solr/replication?command=details
发现确实如此,尽管master节点的索引版本号和slave节点的索引版本号不一致,但索引却没有同步过来,再分别查看master节点和slave节点的日志,发现索引复制动作确实没有开始。
综上所述,确实是master节点没有触发索引复制动作,那究竟是为何呢?先将原因摆出来,后面会通过源码的分析来加以说明。
原因:solr合并索引时,不管你是通过mergeindexes的http命令,还是调用底层lucene的IndexWriter,记得最后一定要提交一个commit,否则,不仅索引不仅不会对查询可见,更是对于master/slave架构的solr集群来说,master节点的replication动作不会触发,因为indexversion没有感知到变化。
好了,下面开始对Solr的Replication的分析。
Solr容器在加载solr核的时候,会对已经注册的各个实现SolrCoreAware接口的Handler进行回调,调用其inform方法。
对于ReplicationHandler来说,就是在这里对自己是属于master节点还是slave节点进行判断,若是slave节点,则创建一个SnapPuller对象,定时负责从master节点主动拉索引数据下来;若是master节点,则只设置相应的参数。
public void inform(SolrCore core) { this.core = core; registerFileStreamResponseWriter(); registerCloseHook(); NamedList slave = (NamedList) initArgs.get("slave"); boolean enableSlave = isEnabled( slave ); if (enableSlave) { tempSnapPuller = snapPuller = new SnapPuller(slave, this, core); isSlave = true; } NamedList master = (NamedList) initArgs.get("master"); boolean enableMaster = isEnabled( master ); if (!enableSlave && !enableMaster) { enableMaster = true; master = new NamedList<Object>(); } if (enableMaster) { includeConfFiles = (String) master.get(CONF_FILES); if (includeConfFiles != null && includeConfFiles.trim().length() > 0) { List<String> files = Arrays.asList(includeConfFiles.split(",")); for (String file : files) { if (file.trim().length() == 0) continue; String[] strs = file.split(":"); // if there is an alias add it or it is null confFileNameAlias.add(strs[0], strs.length > 1 ? strs[1] : null); } LOG.info("Replication enabled for following config files: " + includeConfFiles); } List backup = master.getAll("backupAfter"); boolean backupOnCommit = backup.contains("commit"); boolean backupOnOptimize = !backupOnCommit && backup.contains("optimize"); List replicateAfter = master.getAll(REPLICATE_AFTER); replicateOnCommit = replicateAfter.contains("commit"); replicateOnOptimize = !replicateOnCommit && replicateAfter.contains("optimize"); if (!replicateOnCommit && ! replicateOnOptimize) { replicateOnCommit = true; } // if we only want to replicate on optimize, we need the deletion policy to // save the last optimized commit point. if (replicateOnOptimize) { IndexDeletionPolicyWrapper wrapper = core.getDeletionPolicy(); IndexDeletionPolicy policy = wrapper == null ? null : wrapper.getWrappedDeletionPolicy(); if (policy instanceof SolrDeletionPolicy) { SolrDeletionPolicy solrPolicy = (SolrDeletionPolicy)policy; if (solrPolicy.getMaxOptimizedCommitsToKeep() < 1) { solrPolicy.setMaxOptimizedCommitsToKeep(1); } } else { LOG.warn("Replication can't call setMaxOptimizedCommitsToKeep on " + policy); } } if (replicateOnOptimize || backupOnOptimize) { core.getUpdateHandler().registerOptimizeCallback(getEventListener(backupOnOptimize, replicateOnOptimize)); } if (replicateOnCommit || backupOnCommit) { replicateOnCommit = true; core.getUpdateHandler().registerCommitCallback(getEventListener(backupOnCommit, replicateOnCommit)); } if (replicateAfter.contains("startup")) { replicateOnStart = true; RefCounted<SolrIndexSearcher> s = core.getNewestSearcher(false); try { DirectoryReader reader = s==null ? null : s.get().getIndexReader(); if (reader!=null && reader.getIndexCommit() != null && reader.getIndexCommit().getGeneration() != 1L) { try { if(replicateOnOptimize){ Collection<IndexCommit> commits = DirectoryReader.listCommits(reader.directory()); for (IndexCommit ic : commits) { if(ic.getSegmentCount() == 1){ if(indexCommitPoint == null || indexCommitPoint.getGeneration() < ic.getGeneration()) indexCommitPoint = ic; } } } else{ indexCommitPoint = reader.getIndexCommit(); } } finally { // We don't need to save commit points for replication, the SolrDeletionPolicy // always saves the last commit point (and the last optimized commit point, if needed) /*** if(indexCommitPoint != null){ core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getGeneration()); } ***/ } } // reboot the writer on the new index core.getUpdateHandler().newIndexWriter(); } catch (IOException e) { LOG.warn("Unable to get IndexCommit on startup", e); } finally { if (s!=null) s.decref(); } } String reserve = (String) master.get(RESERVE); if (reserve != null && !reserve.trim().equals("")) { reserveCommitDuration = SnapPuller.readInterval(reserve); } LOG.info("Commits will be reserved for " + reserveCommitDuration); isMaster = true; }
ReplicationHandler可以响应多种命令:
1) indexversion。
这里需要了解的第一个概念是索引提交点(IndexCommit),这是底层lucene的东西,可以自行查阅资料。首先获取最新的索引提交点,然后从其中获取索引版本号和索引所属代。
IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't change if (commitPoint != null && replicationEnabled.get()) { core.getDeletionPolicy().setReserveDuration(commitPoint.getVersion(), reserveCommitDuration); rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());
2)backup。这个命令用来对索引做快照。首先获取最新的索引提交点,然后创建做一个SnapShooter,具体的快照动作由这个对象完成,
private void doSnapShoot(SolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) { try { int numberToKeep = params.getInt(NUMBER_BACKUPS_TO_KEEP, Integer.MAX_VALUE); IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy(); IndexCommit indexCommit = delPolicy.getLatestCommit(); if(indexCommit == null) { indexCommit = req.getSearcher().getReader().getIndexCommit(); } // small race here before the commit point is saved new SnapShooter(core, params.get("location")).createSnapAsync(indexCommit, numberToKeep, this); } catch (Exception e) { LOG.warn("Exception during creating a snapshot", e); rsp.add("exception", e); } }
快照对象会启动一个线程去异步地做一个索引备份。
void createSnapAsync(final IndexCommit indexCommit, final int numberToKeep, final ReplicationHandler replicationHandler) { replicationHandler.core.getDeletionPolicy().saveCommitPoint(indexCommit.getVersion()); new Thread() { @Override public void run() { createSnapshot(indexCommit, numberToKeep, replicationHandler); } }.start(); } void createSnapshot(final IndexCommit indexCommit, int numberToKeep, ReplicationHandler replicationHandler) { NamedList details = new NamedList(); details.add("startTime", new Date().toString()); File snapShotDir = null; String directoryName = null; Lock lock = null; try { if(numberToKeep<Integer.MAX_VALUE) { deleteOldBackups(numberToKeep); } SimpleDateFormat fmt = new SimpleDateFormat(DATE_FMT, Locale.US); directoryName = "snapshot." + fmt.format(new Date()); lock = lockFactory.makeLock(directoryName + ".lock"); if (lock.isLocked()) return; snapShotDir = new File(snapDir, directoryName); if (!snapShotDir.mkdir()) { LOG.warn("Unable to create snapshot directory: " + snapShotDir.getAbsolutePath()); return; } Collection<String> files = indexCommit.getFileNames(); FileCopier fileCopier = new FileCopier(solrCore.getDeletionPolicy(), indexCommit); fileCopier.copyFiles(files, snapShotDir); details.add("fileCount", files.size()); details.add("status", "success"); details.add("snapshotCompletedAt", new Date().toString()); } catch (Exception e) { SnapPuller.delTree(snapShotDir); LOG.error("Exception while creating snapshot", e); details.add("snapShootException", e.getMessage()); } finally { replicationHandler.core.getDeletionPolicy().releaseCommitPoint(indexCommit.getVersion()); replicationHandler.snapShootDetails = details; if (lock != null) { try { lock.release(); } catch (IOException e) { LOG.error("Unable to release snapshoot lock: " + directoryName + ".lock"); } } } }
3)fetchindex。响应来自slave节点的取索引文件的请求,会启动一个线程来实现索引文件的获取。
String masterUrl = solrParams.get(MASTER_URL); if (!isSlave && masterUrl == null) { rsp.add(STATUS,ERR_STATUS); rsp.add("message","No slave configured or no 'masterUrl' Specified"); return; } final SolrParams paramsCopy = new ModifiableSolrParams(solrParams); new Thread() { @Override public void run() { doFetch(paramsCopy); } }.start(); rsp.add(STATUS, OK_STATUS);
具体的获取动作是通过SnapPuller对象来实现的,首先尝试获取pull对象锁,如果请求锁失败,则说明还有取索引数据动作未结束,如果请求锁成功,就调用SnapPuller对象的fetchLatestIndex方法来取最新的索引数据。
void doFetch(SolrParams solrParams) { String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL); if (!snapPullLock.tryLock()) return; try { tempSnapPuller = snapPuller; if (masterUrl != null) { NamedList<Object> nl = solrParams.toNamedList(); nl.remove(SnapPuller.POLL_INTERVAL); tempSnapPuller = new SnapPuller(nl, this, core); } tempSnapPuller.fetchLatestIndex(core); } catch (Exception e) { LOG.error("SnapPull failed ", e); } finally { tempSnapPuller = snapPuller; snapPullLock.unlock(); } }
最后真正的取索引数据过程,首先,若mastet节点的indexversion为0,则说明master节点根本没有提供可供复制的索引数据,若master节点和slave节点的indexversion相同,则说明slave节点目前与master节点索引数据状态保持一致,无需同步。若两者的indexversion不同,则开始索引复制过程,首先从master节点上下载指定索引版本号的索引文件列表,然后创建一个索引文件同步服务线程来完成同并工作。
这里需要区分的是,如果master节点的年代比slave节点要老,那就说明两者已经不相容,此时slave节点需要新建一个索引目录,再从master节点做一次全量索引复制。还需要注意的一点是,索引同步也是可以同步配置文件的,若配置文件发生变化,则需要对solr核进行一次reload操作。最对了,还有,和文章开头一样, slave节点同步完数据后,别忘了做一次commit操作,以便刷新自己的索引提交点到最新的状态。最后,关闭并等待同步服务线程结束。此外,具体的取索引文件是通过FileFetcher对象来完成。
boolean fetchLatestIndex(SolrCore core) throws IOException { replicationStartTime = System.currentTimeMillis(); try { //get the current 'replicateable' index version in the master NamedList response = null; try { response = getLatestVersion(); } catch (Exception e) { LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage()); return false; } long latestVersion = (Long) response.get(CMD_INDEX_VERSION); long latestGeneration = (Long) response.get(GENERATION); if (latestVersion == 0L) { //there is nothing to be replicated return false; } IndexCommit commit; RefCounted<SolrIndexSearcher> searcherRefCounted = null; try { searcherRefCounted = core.getNewestSearcher(false); commit = searcherRefCounted.get().getReader().getIndexCommit(); } finally { if (searcherRefCounted != null) searcherRefCounted.decref(); } if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) { //master and slave are alsready in sync just return LOG.info("Slave in sync with master."); return false; } LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration); LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration()); LOG.info("Starting replication process"); // get the list of files first fetchFileList(latestVersion); // this can happen if the commit point is deleted before we fetch the file list. if(filesToDownload.isEmpty()) return false; LOG.info("Number of files in latest index in master: " + filesToDownload.size()); // Create the sync service fsyncService = Executors.newSingleThreadExecutor(); // use a synchronized list because the list is read by other threads (to show details) filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>()); // if the generateion of master is older than that of the slave , it means they are not compatible to be copied // then a new index direcory to be created and all the files need to be copied boolean isFullCopyNeeded = commit.getGeneration() >= latestGeneration; File tmpIndexDir = createTempindexDir(core); if (isIndexStale()) isFullCopyNeeded = true; successfulInstall = false; boolean deleteTmpIdxDir = true; File indexDir = null ; try { indexDir = new File(core.getIndexDir()); downloadIndexFiles(isFullCopyNeeded, tmpIndexDir, latestVersion); LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs"); Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload); if (!modifiedConfFiles.isEmpty()) { downloadConfFiles(confFilesToDownload, latestVersion); if (isFullCopyNeeded) { successfulInstall = modifyIndexProps(tmpIndexDir.getName()); deleteTmpIdxDir = false; } else { successfulInstall = copyIndexFiles(tmpIndexDir, indexDir); } if (successfulInstall) { LOG.info("Configuration files are modified, core will be reloaded"); logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);//write to a file time of replication and conf files. reloadCore(); } } else { terminateAndWaitFsyncService(); if (isFullCopyNeeded) { successfulInstall = modifyIndexProps(tmpIndexDir.getName()); deleteTmpIdxDir = false; } else { successfulInstall = copyIndexFiles(tmpIndexDir, indexDir); } if (successfulInstall) { logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall); doCommit(); } } replicationStartTime = 0; return successfulInstall; } catch (ReplicationHandlerException e) { LOG.error("User aborted Replication"); } catch (SolrException e) { throw e; } catch (Exception e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e); } finally { if (deleteTmpIdxDir) delTree(tmpIndexDir); else delTree(indexDir); } return successfulInstall; } finally { if (!successfulInstall) { logReplicationTimeAndConfFiles(null, successfulInstall); } filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null; replicationStartTime = 0; fileFetcher = null; if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdownNow(); fsyncService = null; stop = false; fsyncException = null; } }
作者:洞庭散人
出处:http://phinecos.cnblogs.com/