学生生活の利便性が低い

不足している情報をクラスメートに絶えず尋ねることを恥ずかしく思う簡単な話で、私たちの生活を少し楽にすることにしました。







画像







私の同僚の多くは、重要な情報が頻繁に点滅する一般的なチャットルームで、VKontakteデータベースにメッセージを常にロードするアクティブな対話者が約30人いる状況に精通していると思います。 このような状況では、誰もがこの重要な情報を見る可能性は低いです。 それは私に起こります。 1年前、この誤解を修正することが決定されました。







ボットに関する次の記事についてinするつもりはない人は、私は猫の下で尋ねます。







私は一流の学生なので、例はこのトピックに関連しています。

そこで、課題があります。校長から生徒への情報の転送を校長と生徒の両方にとって便利にすることです。 Vkontakteの比較的新しい機能(つまり、コミュニティの個人的なメッセージ)のおかげで、すぐに決定に気付きました。 グループに座っているボットは、長老(ストリームに多くのグループがある場合は長老)からメッセージを受信し、関心のある関係者(学生)に送信する必要があります。







タスクが設定されました。続行します。







必要なもの:







  1. vk apiを使用するためのvk_apiライブラリ
  2. データベースを操作するピーウィーオーム
  3. およびpython組み込みモジュール


また、読む前に、「Observer」( HabrWiki )および「Facade」( HabrWiki )のパターンを更新することを提案します







パート1.「はじめまして、同志ボット」







まず、コミュニティとして自分自身を理解するようボットに教える必要があります。 Groupというクラスを作成します。 引数として、セッションオブジェクトとデータベース代表オブジェクト(プロキシ)を受け入れます。







class Group(BaseCommunicateVK): def __init__(self, vksession, storage): super().__init__(vksession) self.storage = storage
      
      





BaseCommunicateVK? 何がありますか?

この機能を別のクラスに配置する決定は、将来、おそらくあなたの誰かがボットに他のVkontakte機能を追加することを決定するという事実によって説明されます。

もちろん、コミュニティの抽象化をアンロードするために。







 class BaseCommunicateVK: longpoll = None def __init__(self, vksession): self.session = vksession self.api = vksession.get_api() if BaseCommunicateVK.longpoll is None: BaseCommunicateVK.longpoll = VkLongPoll(self.session) def get_api(self): return self.api def get_longpoll(self): return self.longpoll def method(self, func, args): return self.api.method(func, args) @staticmethod def create_session(token=None, login=None, password=None, api_v='5.85'): try: if token: session = vk_api.VkApi(token=token, api_version=api_v) elif login and password: session = vk_api.VkApi(login, password, api_version=api_v) else: raise vk_api.AuthError("Define login and password or token.") return session except vk_api.ApiError as error: logging.info(error) def get_last_message(self, user_id): return self.api.messages.getHistory( peer_id=user_id, count=1)["items"][0] @staticmethod def get_attachments(last_message): if not last_message or "attachments" not in last_message: return "" attachments = last_message["attachments"] attach_strings = [] for attach in attachments: attach_type = attach["type"] attach_info = attach[attach_type] attach_id = attach_info["id"] attach_owner_id = attach_info["owner_id"] if "access_key" in attach_info: access_key = attach_info["access_key"] attach_string = "{}{}_{}_{}".format(attach_type, attach_owner_id, attach_id, access_key) else: attach_string = "{}{}_{}".format(attach_type, attach_owner_id, attach_id) attach_strings.append(attach_string) return ",".join(attach_strings) @staticmethod def get_forwards(attachments, last_message): if not attachments or "fwd_count" not in attachments: return "" if len(last_message["fwd_messages"]) == int(attachments["fwd_count"]): return last_message["id"] def send(self, user_id, message, attachments=None, **kwargs): send_to = int(user_id) if "last_message" in kwargs: last_message = kwargs["last_message"] else: last_message = None p_attachments = self.get_attachments(last_message) p_forward = self.get_forwards(attachments, last_message) if message or p_attachments or p_forward: self.api.messages.send( user_id=send_to, message=message, attachment=p_attachments, forward_messages=p_forward) if destroy: accept_msg_id = self.api.messages \ .getHistory(peer_id=user_id, count=1) \ .get('items')[0].get('id') self.delete(accept_msg_id, destroy_type=destroy_type) def delete(self, msg_id, destroy_type=1): self.api.messages.delete(message_id=msg_id, delete_for_all=destroy_type)
      
      





コミュニティメンバーを更新するメソッドを作成します。 それらをすぐに管理者と参加者に分割し、データベースに保存します。









 def update_members(self): fields = 'domain, sex' admins = self.api.groups.getMembers(group_id=self.group_id, fields=fields, filter='managers') self.save_members(self._configure_users(admins)) members = self.api.groups.getMembers(group_id=self.group_id, fields=fields) self.save_members(self._configure_users(members)) return self def save_members(self, members): self.storage.update(members) @staticmethod def _configure_users(items, exclude=None): if exclude is None: exclude = [] users = [] for user in items.get('items'): if user.get('id') not in exclude: member = User() member.configure(**user) users.append(member) return users
      
      





このクラスは受信者にメッセージを送信できるようにする必要があるため、スタジオの次のメソッドです。 パラメーター:メーリングリスト、メッセージテキスト、およびアプリケーション。 ボットが他の参加者からメッセージを受信できるように、この全体が別のスレッドで始まります。

メッセージは同期モードで受信されるため、アクティブなクライアントの数が増えると、応答速度は明らかに低下します。







  def broadcast(self, uids, message, attachments=None, **kwargs): report = BroadcastReport() def send_all(): users_ids = uids if not isinstance(users_ids, list): users_ids = list(users_ids) report.should_be_sent = len(users_ids) for user_id in users_ids: try: self.send(user_id, message, attachments, **kwargs) if message or attachments: report.sent += 1 except vk_api.VkApiError as error: report.errors.append('vk.com/id{}: {}'.format(user_id, error)) except ValueError: continue for uid in self.get_member_ids(admins=True, moders=True): self.send(uid, str(report)) broadcast_thread = Thread(target=send_all) broadcast_thread.start() broadcast_thread.join()
      
      





BroadcastReport-レポートクラス
 class BroadcastReport: def __init__(self): self.should_be_sent = 0 self.sent = 0 self.errors = [] def __str__(self): res = "#  #" res += "\n: {}  ".format(self.should_be_sent) res += "\n: {} ".format(self.sent) if self.errors: res += "\n:" for i in self.errors: res += "\n- {}".format(i) return res
      
      





これで、グループの抽象化は終わったようです。 私たちはコミュニティのすべてのメンバーと会いました。今、私たちはそれらを理解する方法を学ぶ必要があります。







パート2.「Psh ...ようこそ..」







ボットにコミュニティのメンバーからのすべてのメッセージを聞かせてください。

これを行うには、これを行うクラスChatHandlerを作成します

パラメーター内:









 class ChatHandler(Handler): def __init__(self, group_manager, command_observer): super().__init__() self.longpoll = group_manager.get_longpoll() self.group = group_manager self.api = group_manager.get_api() self.command_observer = command_observer
      
      





さらに、実際には、ユーザーからのメッセージを聞き、コマンドを認識します。







 def listen(self): try: for event in self.longpoll.listen(): if event.user_id and event.type == VkEventType.MESSAGE_NEW and event.to_me: self.group.api.messages.markAsRead(peer_id=event.user_id) self.handle(event.user_id, event.text, event.attachments, message_id=event.message_id) except ConnectionError: logging.error("I HAVE BEEN DOWNED AT {}".format(datetime.datetime.today())) self.longpoll.update_longpoll_server() def handle(self, user_id, message, attachments, **kwargs): member = self.group.get_member(user_id) self.group.update_members() self.command_observer.execute(member, message, attachments, self.group, **kwargs) def run(self): self.listen()
      
      





パート3.「私のものについて何を書きましたか..?」







コマンド認識は、Observerパターンを通じて実装された別のサブシステムによって処理されます。

CommandObserverに注意:







 class CommandObserver(AbstractObserver): def execute(self, member, message, attachments, group, **kwargs): for command in self.commands: for trigger in command.triggers: body = command.get_body(trigger, message) if body is not None: group.api.messages.setActivity(user_id=member.id, type="typing") if command.system: kwargs.update({"trigger": trigger, "commands": self.commands}) else: kwargs.update({"trigger": trigger}) return command.proceed(member, body, attachments, group, **kwargs)
      
      





AbstractObserver

繰り返しますが、将来の拡張に備えてレンダリングが行われます。







 class AbstractObserver(metaclass=ABCMeta): def __init__(self): self.commands = [] def add(self, *args): for arg in args: self.commands.append(arg) @abstractmethod def execute(self, *args, **kwargs): pass
      
      





しかし、このオブザーバーは何を認識しますか?

それで、私たちは最も興味深い部分-チームに着きました。

各チームは独立したクラスであり、基本クラスCommandの子孫です。

コマンドに必要なことは、ユーザーのメッセージの先頭にキーワードが見つかった場合、proceed()メソッドを実行することだけです。 コマンドキーワードは、コマンドクラスのトリガー変数で定義されます(行または行のリスト)







 class Command(metaclass=ABCMeta): def __init__(self): self.triggers = [] self.description = "Empty description." self.system = False self.privilege = False self.activate_times = [] self.activate_days = set() self.autostart_func = self.proceed def proceed(self, member, message, attachments, group, **kwargs): raise NotImplementedError() @staticmethod def get_body(kw, message): if not isinstance(kw, list): kw = [kw, ] for i in kw: reg = '^ *(\\{}) *'.format(i) if re.search(reg, message): return re.sub(reg, '', message).strip(' ')
      
      





continue()メソッドのシグネチャからわかるように、各コマンドはグループメンバーのインスタンスへのリンク、そのメッセージ(キーワードなし)、アプリケーション、およびグループインスタンスへのリンクを受け取ります。 つまり、グループメンバーとのやり取りはすべてチームに委ねられます。 これが最も正しい解決策だと思います。このようにして、対話性を高めるためにシェル(シェル)を作成できるからです。

(実際、これを行うには、処理が同期的であるため非同期を追加するか、受信した各メッセージを新しいスレッドで処理する必要がありますが、これはまったく有益ではありません)







コマンド実装の例:







BroadcastCommand
 class BroadcastCommand(Command): def __init__(self): super().__init__() self.triggers = ['.mb'] self.privilege = True self.description = "    ." def proceed(self, member, message, attachments, group, **kwargs): if member.id not in group.get_member_ids(admins=True, editors=True): group.send(member.id, "You cannot do this ^_^") return True last_message = group.get_last_message(member.id) group.broadcast(group.get_member_ids(), message, attachments, last_message=last_message, **kwargs) return True
      
      





HelpCommand
 class HelpCommand(Command): def __init__(self): super().__init__() self.commands = [] self.triggers = ['.h', '.help'] self.system = True self.description = "  ." def proceed(self, member, message, attachments, group, **kwargs): commands = kwargs["commands"] help = "  :\n\n" admins = group.get_member_ids(admins=True, moders=True) i = 0 for command in commands: if command.privilege and member.id not in admins: continue help += "{}) {}\n\n".format(i + 1, command.name()) i += 1 group.send(member.id, help) return True
      
      





パート4.「私たちは1つの大きなチームです。」







ここで、これらすべてのモジュールとハンドラーを組み合わせて構成する必要があります。

別のクラスをお願いします!

ボットをカスタマイズするファサードを作成します。







 class VKManage: def __init__(self, token=None, login=None, password=None): self.session = BaseCommunicateVK.create_session(token, login, password, api_version) self.storage = DBProxy(DatabaseORM) self.group = Group(self.session, self.storage).setup().update_members() self.chat = ChatHandler(self.group, CommandObserver.get_observer()) def start(self): self.chat.run() def get_command(self, command_name): return { " ": BroadcastCommand(), " ": AdminBroadcastCommand(), "": HelpCommand(), " ": SkippedLectionsCommand(), "": TopicTimetableCommand().setup_account(self.bot.api), }.get(command_name) def connect_command(self, command_name): command = self.get_command(str(command_name).lower()) if command: self.chat.command_observer.add(command) return self def connect_commands(self, command_names): for i in command_names.split(','): self.connect_command(i.strip()) return self
      
      





最後の段階は打ち上げです。 常に最も厄介なのは、何らかの驚きが出てくる可能性があるからです。 今回は違います。









それがすべてのようです。







現時点では、このプログラムは少なくとも3つのグループに恩恵をもたらしています。







Herokuに無料でデプロイできますが、それは別の話です。







参照:










All Articles