CoreSeek Python数据源的基类

上次提到过Coreseek的安装一文,我个人建议Coreseek最好采用Python作为数据源,相对灵活性很大。这次我就分享一下我写的一个CoreSeek的Python数据源基类。

这个基类的优势在于特别是对于“分库分表”的MySQL来说,支持直接多进程并发读库,性能超强。而且对于Python2.6以下不具有多进程特性的用户来说,这个基类支持通过线程来模拟进程,完全透明!

该库已经在生产环境中使用。

需要MySQLdb类包

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# abs class for CoreSeek indexer data source
#
#   By Litrin J. 2011-07
#

from DBConfig import DBConfig
import MySQLdb
import datetime, time, types
try:
    from multiprocessing import Process, Queue
except:
    from threading import Thread as Process
    from Queue import Queue

class CoreSeek(object):
    '''
    Abs class for CoreSeek data source.
    '''

    DBName = ""
    #field list, you can use "as" method, just like 'id as uid' to rename the `id` field to uid
    Field = []
    WhereCause = "TRUE"
    #the uniq id in the row for 1 table
    UniqId = None
    Scheme = []
    FieldOrder = []

    SQLGroupBy = ""
    #if the table to long, use it!
    SQLLimit = 0
    #Debug switch
    Debug = False

    MaxProcess = 2

    #private var
    __data = []
    __curItem = []
    __uniqNumber = 0
    __tableList = []
    
    __processPool = []   

    def __init__(self, conf):
        if self.__class__ == CoreSeek:
            raise NotImplementedError, "Cannot create object of class CoreSeek!"

        self.conf = conf
    
    def __del__(self):
        pass

    def __getattr__(self, key):
        if self.UniqId is None and 'id' == key:
            return self.__uniqNumber
        else:
            return self.__curItem[key]
    
    def __iter__(self):
        while self.NextDocument() :
            yield self.__curItem   
 
    def __str__(self):
        return str(self.__curItem)

    def GetScheme(self):  
        return self.Scheme

    def GetFieldOrder(self):
        return self.FieldOrder

    def Connected(self):
        tableList = DBConfig().getDBTableConfig(self.DBName)
        self.__tableList = Queue(len(tableList))
        for dConfig in tableList:
            self.__tableList.put(dConfig)

        self.__data = Queue()
        if (len(tableList) < self.MaxProcess):
            self.MaxProcess = len(tableList)

        for i in range (0, self.MaxProcess):
            process = Process(target=self.getTableData).run()
            self.__processPool.append(process)

    def NextDocument(self, err=None):
        if (  self.__data.empty() == False ):
            self.__curItem = self.__data.get()
            self.__uniqNumber += 1

            if (self.Debug):
                print self
            return True

        iProcessRuning = 0
        for process in self.__processPool:
            if process is not None and process.is_alive():
                iProcessRuning += 1

        if (iProcessRuning > 0):
            return self.NextDocument()

        else:
            del self.__tableList
            del self.__data
            time.sleep(0.01)
            return False
    
    def getTableData(self):
        if (self.__tableList.empty() ):
            return False

        dConfig = self.__tableList.get()
        
        sSQL = self.getSQL(dConfig["tableName"])
        iCountPreLoad = self.SQLLimit
        iRecordLoaded = 0

        if (iCountPreLoad == 0):
            iRecordLoaded = self.doLoadData(dConfig, sSQL)

        else:
            iStep         = 0
            iRecordCount  = iCountPreLoad
               
            while (iCountPreLoad == iRecordCount):
                sLimit = " LIMIT " + str(iStep * iCountPreLoad ) + ", " + str(iCountPreLoad)
                sLimitSQL = sSQL + sLimit
                
                iRecordCount = self.doLoadData(dConfig, sLimitSQL)

                if (iRecordCount < iCountPreLoad):
                    break
                iRecordLoaded += iRecordCount
                
                iStep += 1

        self.getTableData()

        return True
   
    def doLoadData(self, dConfig, sSQL):

        tableNumber = int(dConfig["tableName"][-2:], 16)

        mysqlHandle = MySQLdb.connect(host=dConfig["host"], user=dConfig["user"],passwd=dConfig["passwd"], db=dConfig["dbName"], charset="UTF8")
        mysqlCursor = mysqlHandle.cursor()

        if (self.Debug):
            print "SQL: " + sSQL

        try:
            mysqlCursor.execute(sSQL)

            lResultList = mysqlCursor.fetchall()
            iRecordCount = len(lResultList)
        
        except:
            return 0
    
        for lRow in lResultList:
            dRow = self.getNamedDict(lRow)
            if self.UniqId is not None:
                dRow["tablerecord"] = [tableNumber, dRow[self.UniqId.lower()]]
                dRow["id"] = int (dRow[self.UniqId.lower()]) * 1000 + tableNumber

            dRow = self.buildIndex(dRow)

            self.__data.put(dRow)                   

        mysqlHandle.close()
        
        return iRecordCount
    
    def getSQL(self, sTableName):
        SQL = "SELECT %s FROM %s WHERE %s " % (", ".join(self.Field), sTableName, self.WhereCause)

        if(self.SQLGroupBy != ""):
            SQL += "GROUP BY %s " % self.SQLGroupBy
        return SQL

    def getNamedDict(self, lRow):
        i = 0
        result = {}
        for sFieldName in self.Field:
            sDictKey = sFieldName.lower()
            if (sDictKey.find(" as ") > 0):
                sDictKey = sDictKey[sDictKey.find(' as ')+4: ]

            if (isinstance(lRow[i], types.StringTypes)):
                result[sDictKey] = lRow[i].encode("utf-8")
            elif (type(lRow[i]) == type(datetime.datetime.today())):
                result[sDictKey] = int(time.mktime(lRow[i].timetuple()))
            else:
                result[sDictKey] = lRow[i]
            i+=1
        
        return result

    def buildIndex(self, *dRow):
        if(self.Debug):
            print dRow

        return dRow

class CoreSeekUtility:

    @staticmethod
    def literallyCut(string, sChar=" ", CharSet="utf-8"):

        uString = string.decode(CharSet)
        iLength = len(uString)
        if (iLength <= 1):
            return string

        lString = [uString[i:i+1] for i in range(0, iLength)]
        sCut = sChar.join(lString)

        return sCut.encode(CharSet)

    @staticmethod
    def dictIndex(dRecord):
        result = ""
        for key in dRecord:
            key = key.upper()
            value = dRecord[key]

            result += key + "=" + str(value) + " "

        return result

数据源示例:

from CoreSeek import CoreSeek, CoreSeekUtility
import time

class Topic(CoreSeek):
    Scheme = [
              ('id' , {'docid' : True ,} ),
              ('index', { 'type' : 'text',} ),
              ('index_uid', { 'type' : 'text',} ),

              ('topicid', { 'type' : 'integer'} ),
              ('type', { 'type' : 'integer'} ),
              ('privacy', { 'type' : 'integer'} ),
              ('body', { 'type' : 'string'} ),
              ('title', { 'type' : 'string'} ),
              ('uid', { 'type' : 'string'} ),
              ('description', { 'type' : 'string'} )
          ]

    FieldOrder = [('index', 'index_uid')]
    Field = ["topicId", "uid", "title", 'body', 'privacy', 'description', 'type']

    DBName = "Topic"

    def buildIndex(self, dRow):
        dRow['index_uid'] = dRow['uid']
        dRow['index'] = "%s %s" % (dRow['title'], dRow['description'])

        return dRow

class TopicDelta(Topic):
    WhereCause = "createTime > %s " % (time.time() - 3600)

if __name__ == "__main__":
    conf = {}
    source = Topic(conf)
    source.Connected()

    while source.NextDocument():
        print source
推荐阅读:
深入读了读python的官方文
正值毕业季,这些天一直忙于面试
尽管现在有了wheel这类更为

“CoreSeek Python数据源的基类”的2个回复

    1. 
      #!/usr/bin/env python
      
      import os, MySQLdb
      
      try:
          import sqlite3
      except:
          from pysqlite2 import dbapi2 as sqlite3
      
      class DBConfig:
          DBUser="root"
          DBPasswd="1234"
          DBList=["172.18.194.98"]
          Charset="UTF8"
      
          ConfigFile= "/tableList.sqlite"
      
          def getDBUser(self):
              return self.DBUser
      
          def getPasswd(self):
              return self.DBPasswd
      
          def getDBList(self):
              return self.DBList
      
          def getConfigFile(self):
              return self.ConfigFile
      
          def getTableConfig(self, TableName):
              sSQL="SELECT * FROM TableList WHERE tableName ="" + TableName +"""
              sqliteCursor=self.getsqliteCursor()
              lResult=sqliteCursor.execute(sSQL)
      
              lReturn=[]
              for lTableInfo in lResult:
                  lReturn.append({
                          "host" : lTableInfo[0],
                          "user" : lTableInfo[1],
                          "passwd" : lTableInfo[2],
                          "dbName" : lTableInfo[3],
                          "tableName" : lTableInfo[4],
      		            "charset" : self.Charset,
                      })
                  sqliteCursor.close()
                  return lReturn
      
      
          def getDBTableConfig(self, DBName):
              sSQL="SELECT * FROM TableList WHERE dbName ="" + DBName +"""
              sqliteCursor=self.getsqliteCursor()
              lResult=sqliteCursor.execute(sSQL)
      
              lReturn=[]
              for lTableInfo in lResult:
                  lReturn.append({
                          "host" : lTableInfo[0],
                          "user" : lTableInfo[1],
                          "passwd" : lTableInfo[2],
                          "dbName" : lTableInfo[3],
                          "tableName" : lTableInfo[4],
      					"charset" : self.Charset,
                      })
              sqliteCursor.close()
              return lReturn
      
          def getsqliteCursor(self):
              if (os.path.exists(self.ConfigFile) == False):
                  CreateConfig().doGetConfig()
      
              sqliteHandle=sqlite3.connect(self.ConfigFile)
              return sqliteHandle.cursor()
      
      class CreateConfig:
      
          def __init__(self):
              self.ConfigFile=DBConfig().getConfigFile()
              self.DBUser=DBConfig().getDBUser()
              self.DBPasswd=DBConfig().getPasswd()
              self.DBHostList=DBConfig().getDBList()
              self.Charset=DBConfig().Charset
      
              if (os.path.exists(self.ConfigFile) == True):
                  os.remove(self.ConfigFile)
      
              self.sqliteHandle=sqlite3.connect(self.ConfigFile)
              self.createTable()
      
          def __del__(self):
              self.sqliteHandle.commit()
              self.sqliteHandle.close()
      
          def doGetConfig(self):
              for sDBHost in self.DBHostList:
                  self.getMysqlData(sDBHost)
      
          def getMysqlData(self, sHost):
              mysqlHandle=MySQLdb.connect(host=sHost,user=self.DBUser,
                                        passwd=self.DBPasswd,db="mysql", charset=self.Charset)
      
              sSQL='''SELECT `TABLE_SCHEMA`, `TABLE_NAME`
        FROM `information_schema`.`TABLES`
       WHERE `TABLE_SCHEMA` NOT IN ("mysql", "information_schema")'''
      
              mysqlCursor=mysqlHandle.cursor()
              mysqlCursor.execute(sSQL)
      
              lResultList=mysqlCursor.fetchall()
              mysqlHandle.close()
      
              for tRecord in lResultList:
                  self.setConfig(sHost, self.DBUser, self.DBPasswd, tRecord[0], tRecord[1])
      
          def createTable(self):
              sSQL='''CREATE TABLE TableList (
                      host text,
                      user text,
                      password text,
                      dbName text,
                      tableName text
                      )'''
              self.sqliteHandle.execute(sSQL)
              self.sqliteHandle.commit()
      
          def setConfig(self, sHost, sUser, sPasswd, sDBName, sTableName):
              tRecode=(sHost, sUser, sPasswd, sDBName, sTableName)
              sqliteCursor=self.sqliteHandle.cursor()
              sqliteCursor.execute("INSERT INTO TableList VALUES (?,?,?,?,?)", tRecode)
      
      
      if __name__ == '__main__':
          pass
      
      

发表评论

电子邮件地址不会被公开。 必填项已用*标注

请补全下列算式: *

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据