Pythonでtwitterクライアント作ってみる - その4

大分空いてしまいましたが前回の続き

今回はpythonでthreadの制御とcursesの制御をどうやって同時に実装するのかが分からなかったので

とりあえずcursesのみで実装してみました


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
/** twitter cuiクライアントっぽいの
*
* @version 0.0.1-alpha
* @auther mochi
*
* Press Key Options
* (l) load
* (u) update
* (i) info
* (v) version
* (q) quit
*
*
* README!!
*
* First. Please input your ID, Name and Password.
* Line 96 - 98
*
*/
"""

import sys, time, os, re
import curses
#import commands
import StringIO
import pycurl
import threading
from xml.parsers import pyexpat
#import sqlite3
import locale

locale.setlocale(locale.LC_ALL, "")

class Product(object): # Interface
def setAttribute(name, value): pass
def getAttribute(name): pass
def execute(): raise NotImplementedError

class Parser: # Interface
def __init__(): pass
def parse(): pass

class XMLParser(Parser, threading.Thread):
p = None
str = ""
stdscr = None

def __init__(self):
threading.Thread.__init__(self)
self.p = pyexpat.ParserCreate()

def createParser(self):
self.p = pyexpat.ParserCreate()

def startElement(obj, name, nodes):
global dispflg
if name == 'text':
dispflg = True
elif name == 'screen_name':
dispflg = True
else:
dispflg = False

def endElement(obj, name):
if name == 'text':
stdscr.addstr(" -> [ ", curses.color_pair(1) )
#stdscr.refresh()
elif name == 'screen_name':
stdscr.addstr(" ]\n", curses.color_pair(1) )
stdscr.refresh()

def charData(text, value):
if dispflg is True:
str = value.encode('UTF-8', 'replace').replace("\n", "")
str = str.lstrip(" ")
stdscr.addstr(str, curses.color_pair(2))
stdscr.refresh()
#
def parse(self, str=""):
if not self.p: self.createParser()
if str is not "": self.str = str

self.p.StartElementHandler = self.startElement
self.p.EndElementHandler = self.endElement
self.p.CharacterDataHandler = self.charData
self.p.Parse(self.str)

del(self.p)
#
def run(self):
#self.p.Parse(self.str)
pass

class ApiContext(object): # Context
userID = *********
userName = "hagaeru3sei"
password = "***********"
responseType = "xml" # [xml, json, atom, rss] which one choise
url = "http://twitter.com"
response = ""
parser = XMLParser()

#
def setResponseType(self, type):
self.responseType = type
#
def getResponseType(self):
return self.type
#
def getUserURL(self):
return self.url + "/users/show."+ self.responseType +"?screen_name=" + self.userName
#
def getTimeLineURL(self):
return self.url + "/statuses/friends_timeline." + self.responseType

#
def getUpdateURL(self):
return self.url + "/statuses/update." + self.responseType

# factory method
def Curl(*args):
return pycurl.Curl()

# set httpResponse
def setResponse(self, res):
self.response = res

# create parser
def parse(self):
self.parser.parse(self.response)

class Twitter(Product, threading.Thread): # Twitter
context = None
type, value = "", ""
prevStr = ""
version = "0.0.1-alpha"
auther = "Nobuaki Mochizuki"
lisence = "copy(c) all rights reserved."

#
def __init__(self):
threading.Thread.__init__(self)
self.context = ApiContext()
# 端末制御初期化
self.stdscr = curses.initscr()
global stdscr
stdscr = self.stdscr
curses.noecho()
curses.cbreak()
curses.curs_set(0)

curses.start_color()
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_CYAN, curses.COLOR_BLACK)

def setOpt(self, type, value):
self.type = type
self.value = value
#
def execute(self, *args, **kwargs):
self.buffer = StringIO.StringIO()
curl = self.context.Curl()
curl.setopt(pycurl.URL, self.getURL())
curl.setopt(pycurl.USERPWD, self.context.userName + ':' + self.context.password)

if self.type == 'update':
curl.setopt(pycurl.POST, 1)
curl.setopt(pycurl.POSTFIELDS, "status=" + self.value)
elif self.type == "follow":
pass
elif self.type == "search":
pass
else:
curl.setopt(pycurl.WRITEFUNCTION, self.buffer.write)
curl.setopt(pycurl.FOLLOWLOCATION, 1)
curl.setopt(pycurl.MAXREDIRS, 5)
curl.setopt(pycurl.CONNECTTIMEOUT, 30)
curl.setopt(pycurl.TIMEOUT, 300)
curl.setopt(pycurl.NOSIGNAL, 1)

curl.perform()

o = self.buffer.getvalue().rstrip()

# 変更が無い場合表示しない
if o == self.prevStr: return
else: self.stdscr.clear()

self.prevStr = o
self.context.setResponse(o)
self.context.parse()

self.buffer.close()

#
def load(self):
"""
threads = []
#sys.stdout.write("Loading...")
#sys.stdout.flush()

self.start()
threads.append(self)
cnt = 0
list = ['|', '/', '-', '\\']
while self.isAlive() is True:
#sys.stdout.write(list[cnt])
#sys.stdout.flush()
#sys.stdout.write("\b")
#sys.stdout.flush()

self.stdscr.erase()
self.stdscr.addstr("Loading ...." + list[cnt] + "\n", curses.color_pair(1) )
self.stdscr.refresh()

time.sleep(0.1)
cnt += 1
if cnt > 3:
cnt = 0

for thread in threads:
thread.join()
"""

self.run()

#
def run(self):
cmd = ""
while True:
if not cmd: self.reload()
cmd = self.stdscr.getch()
if cmd == ord('q'): # quit
break
elif cmd == ord('u'): # update
self.update()
elif cmd == ord("l"): # load
self.reload()
elif cmd == ord('s'): # search
self.search(1)
elif cmd == ord('f'): # follow
self.follow(1)
elif cmd == ord('v'): # version
self.version_info()
elif cmd == ord('i'): # info
self.info()
else: # else
self.reload()

# Update
def update(self):
self.stdscr.erase()
self.type = "update"
self.stdscr.addstr("update: ", curses.color_pair(1))
self.stdscr.refresh()
buf = ""
while True:
cmd = self.stdscr.getch()
if cmd == ord("\b"):
if buf: buf = buf[:-1]
elif cmd == ord("\n"):
self.setOpt("update", buf)
if self.value != "":
self.execute()
self.setOpt("", "")
buf = ""
#self.stdscr.erase()
self.stdscr.addstr("\nupdate message!!\nPress (l) load\t(q) quit\t(u) update", curses.color_pair(1))
self.stdscr.refresh()
time.sleep(1)
break
elif cmd == ord(" "):
break
elif cmd == ord("q"):
break
else:
buf += chr(cmd)
self.stdscr.addstr(chr(cmd), curses.color_pair(1))
self.stdscr.refresh()

#
def reload(self):
self.setOpt("", "")
while True:
self.stdscr.erase()
self.execute()
#time.sleep(60)
break

#
def follow(userId):
pass

#
def sendDirectMessage(userId):
pass

#
def search(keywords):
pass

#
def setMessage(message):
self.message = message
return 0

#
def getURL(self):
if self.type == "update":
return self.context.getUpdateURL()
elif self.type == "follow":
return ""
elif self.type == "search":
return ""
else:
return self.context.getTimeLineURL()

# version info
def version_info(self):
self.stdscr.erase()
self.stdscr.addstr("mochitter version : " + self.version + "\n", curses.color_pair(1))
self.stdscr.refresh()

# info
def info(self):
self.stdscr.erase()
self.stdscr.addstr("auther : " + self.auther + "\n", curses.color_pair(1))
self.stdscr.addstr("lisence : " + self.lisence + "\n", curses.color_pair(1))
self.stdscr.refresh()

# destruct
def __del__(self):
curses.nocbreak()
stdscr.keypad(0)
curses.echo()
curses.endwin()

# main
def main(*args, **kwargs):
type, value = "", ""
if not args[0].count(1): args[0].append(None)
if not args[0].count(2): args[0].append(None)
if args[0][1]:
type = args[0][1]
if args[0][2]:
value = args[0][2]
t = Twitter()
t.setOpt(type, value)
t.load()
return 0

if __name__ == "__main__":
try:
main(sys.argv);
except Exception,e:
sys.stdout.write(str(e))

exit(0)

topコマンドみたいにするのはスレッドを実装してやらないと実現できないんじゃないかなと

多分だけどイメージ的には

A. 60秒に一回最新のデータを取得&画面表示

B. キーボードからの入力を受けて押下されたキーに対応した手続きを呼び出し→入力を受付&入力チェック→入力に応じた処理→結果を画面に出力

のA.とB.を同時に走らせないと実現できないと思う

※A.の60秒に一回ってのはこれ以上短いとtwitterが応答してくれなくなるので

難しいなぁ・・・

また時間あるときに続きやります