unqlite源码分析之unqlite_kv_store

现在只记录了使用文件系统的解析,后续添加使用mem的解析

int unqlite_kv_store(unqlite *pDb,const void *pKey,int nKeyLen,const void *pData,unqlite_int64 nDataLen)
{
...
         if( nKeyLen < 0 ){
             /* Assume a null terminated string and compute it's length */
             nKeyLen = SyStrlen((const char *)pKey);
         }
         if( !nKeyLen ){
             unqliteGenError(pDb,"Empty key");
             rc = UNQLITE_EMPTY;
         }else{
             /* Perform the requested operation */
             rc = pEngine->pIo->pMethods->xReplace(pEngine,pKey,nKeyLen,pData,nDataLen);
         }
     }
...
}

上述代码重点是pEngine->pIo->pMethods->xReplace(pEngine,pKey,nKeyLen,pData,nDataLen);这行代码会根据是否是mem数据库选择MemHashReplace或者lhash_kv_replace。这里重点记录一下hash中的方法,lhash_kv_replace只是简单的重定向到了lh_record_insert。

/*
 * Insert a record (Either overwrite or append operation) in our database.
 */
static int lh_record_insert(
      unqlite_kv_engine *pKv,         /* KV store */
      const void *pKey,sxu32 nKeyLen, /* Payload: Key */
      const void *pData,unqlite_int64 nDataLen, /* Payload: data */
      int is_append /* True for an append operation */
      )
{
...
    rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,1,0);
    
...
    nHash = pEngine->xHash(pKey,(sxu32)nKeyLen);
retry:
    /* Extract the logical bucket number */
    iBucket = nHash & (pEngine->nmax_split_nucket - 1);
    if( iBucket >= pEngine->split_bucket + pEngine->max_split_bucket ){
        /* Low mask */
        iBucket = nHash & (pEngine->max_split_bucket - 1);
    }
    /* Map the logical bucket number to real page number */
    pRec = lhMapFindBucket(pEngine,iBucket);
    if( pRec == 0 ){
        /* Request a new page */
        rc = lhAcquirePage(pEngine,&pRaw);
        if( rc != UNQLITE_OK ){
            return rc;
        }
        /* Initialize the page */
        pPage = lhNewPage(pEngine,pRaw,0);
        if( pPage == 0 ){
            return UNQLITE_NOMEM;
        }
        /* Mark as an empty page */
        rc = lhSetEmptyPage(pPage);
        if( rc != UNQLITE_OK ){
            pEngine->pIo->xPageUnref(pRaw); /* pPage will be released during this call */
            return rc;
        }
        /* Store the cell */
        rc = lhStoreCell(pPage,pKey,nKeyLen,pData,nDataLen,nHash,1);
        if( rc == UNQLITE_OK ){
            /* Install and write the logical map record */
            rc = lhMapWriteRecord(pEngine,iBucket,pRaw->pgno);
        }
        pEngine->pIo->xPageUnref(pRaw);
        return rc;
    }else{
        /* Load the page */
        rc = lhLoadPage(pEngine,pRec->iReal,0,&pPage,0);
        if( rc != UNQLITE_OK ){
            /* IO error, unlikely scenario */
            return rc;
        }
        /* Do not add this page to the hot dirty list */
        pEngine->pIo->xDontMkHot(pPage->pRaw);
        /* Lookup for the cell */
        pCell = lhFindCell(pPage,pKey,(sxu32)nKeyLen,nHash);
        if( pCell == 0 ){
            /* Create the record */
            rc = lhRecordInstall(pPage,nHash,pKey,nKeyLen,pData,nDataLen);
            if( rc == SXERR_RETRY && iCnt++ < 2 ){
                rc = UNQLITE_OK;
                goto retry;
            }
        }else{
            if( is_append ){
                /* Append operation */
                rc = lhRecordAppend(pCell,pData,nDataLen);
            }else{
                /* Overwrite old value */
                rc = lhRecordOverwrite(pCell,pData,nDataLen);
            }
        }
        pEngine->pIo->xPageUnref(pPage->pRaw);
    }
    return rc;
}

 打开数据这一操作中将会调用pager_shared_lock这个接口,pager_shared_lock这个接口将会打开数据库文件,并设置db的hearder信息,然后读取数据库中包含的bucket信息。

       然后更具key算出hashkey,通过hashkey来查找是否有相应的bucket。如果没有的话将会创建一个新的page并向里面添加一个新的cell,最后将新的page添加到map中;如果有的话就会从disk中夹在page到内存,然后查找相应的cell,如果没有cell的话存储一个新的cell,如果有的话更具参数调用append或者overwrite。

      通过分析这段代码我们知道了,unqlite在打开数据库的时候并没有将所有已有的数据都读到内存中,而是在需要的时候在读取(lhLoadPage)。

猜你喜欢

转载自blog.csdn.net/scdnshijiemengxiang/article/details/83808444