InstagramおよびGAE分析





少し前に、類似のTwitterアカウントの検索に関する記事がHabréで公開されました。 残念ながら、著者はコメントに応答しなかったため、車輪を再発明しなければなりませんでした。 しかし、まったく同じことをしないために、Google App Engineを使用してInstagramで同様のアカウントを探し、誰もがサービスを使用できるようにすることにしました。 そのため、 instalytics.ru *が登場しました。





もちろん、最も難しいことは、すべての人にサービスを実装することです(まあ、Google App Engineの無料の割り当て内にとどまり、 Instagram APIの制限を考慮することです)。



すべては次のように実装されます-

  1. アカウントの分析のユーザーリクエストはInstagramで確認されます。指定されたユーザーが見つかった場合、その分析のリクエストがデータベースに追加されます。 同時に、各リクエストには独自の優先度があります(すべてのリクエストは同じです)。
  2. cronを使用して15分ごとに、キューからデータベースから1つの要求を選択し、新しいタスクを作成するタスクが起動されます-要求からすべてのユーザーサブスクライバーを取得します。 エラーが発生した場合のタスクが再び繰り返されます。

    - name: followers-get rate: 1/m # 1 task per minute bucket_size: 1 max_concurrent_requests: 1 retry_parameters: task_retry_limit: 2 min_backoff_seconds: 30
          
          





    すべてのサブスクライバーが1つの要求で受信されたわけではない場合、各タスクは新しいタスクを作成します。

     if users and users.get('pagination') and users.get('pagination').get('next_cursor'): cursor = users.get('pagination').get('next_cursor') url = '/task/followers-get?user_id='+user_id url += '&cursor=' + cursor taskqueue.add(queue_name='followers-get', url=url, method='GET')
          
          





  3. すべてのサブスクライバーを受信した後、それぞれの分析が開始されます。 このため、膨大な数のタスクが作成され、各サブスクライバーがサブスクライブしているユーザーのリストを受け取ります(上記のサブスクライバーの場合と同様に、各タスクは新しいタスクを作成できます)。 Instagramの1時間あたり5'000リクエストの制限に従うため、タスクキューは次のように構成されています。

     - name: subscriptions-get rate: 5000/h
          
          





    さらに、念のため、各リクエストを完了した後、0.72秒(= 60 * 60/5000)スリープします。

    残念ながら、Google App Engineの無料版では、1日あたりデータベースへの書き込みは50,000回しか実行できません。 なぜなら 各タスクは新しいタスクを作成できます。その後、初期オプション-各タスクの結果をデータベースに書き込む-新しいタスクに置き換える必要がありました-前のタスクの結果は新しいタスクのパラメーターとして渡され、最後のタスクのみが結果をデータベースに書き込みます:

     if users and users.get('pagination') and users.get('pagination').get('next_cursor'): cursor = users.get('pagination').get('next_cursor') params = { 'user_id': user_id, 'f_user_id': f_user_id, 'cursor': cursor } if more_subscriptions: params['subscriptions'] = ','.join(more_subscriptions) taskqueue.add(queue_name='subscriptions-get', url='/task/subscriptions-get', params=params, method='POST')
          
          





    一部のユーザー(たとえば@instagramなど)には数百万人のフォロワーがいます。 すべての加入者を獲得するために貴重なリソースを無駄にしないために、100,000人の加入者を受信した後、タスクは終了します。

  4. データベースへの書き込み操作の数には制限があるため、特定のユーザーのすべてのタスクが完了したかどうかを適切に追跡することはできません。 通常の解決策は、実行中のタスクのIDリストをデータベースに書き込み、各タスクの完了時に(または最後にタスクを完了しようとした場合)、リストからタスクを除外することです。 しかし、膨大な数のタスクにすべてのユーザーを掛け合わせても、これは許可されません。 したがって、タスクのリストはmemcacheに保存されます。

     memcache.set('subscriptions'+str(user_id), ','.join(str(x) for x in followers), 1209600)
          
          





    Memcacheデータはいつでも削除できます。 「ハング」リクエスト(リクエストのすべてのタスクは完了したが、memcacheは削除されたため、それについて知らない)の状況を回避するために、タスクは数時間ごとに起動され、受信サブスクライバーのステータスを受信したリクエストがあるかどうかを確認します2週間前よりも(今のところ、これがすべてのタスクが確実に完了する時間であると考えられています)。 そのような要求が見つかった場合、それらは次の段階に「強制的に」転送されます。

  5. 次の段階では、以前に取得したすべてのデータがデータベースから読み取られます。 結局のところ、非常に多くのデータが存在する可能性があり、それらに十分なGAE RAMが割り当てられていない可能性があります。 データはバッチで読み取られるため、各部分の中間結果が計算され、次の中間結果に追加されます。 このプロセスでは、自動キャッシュを無効にする必要がありました。

     ctx = ndb.get_context() ctx.set_cache_policy(lambda key: False) ctx.set_memcache_policy(lambda key: False)
          
          





    この段階での多数の計算の結果、ユーザーがフォローしている300人の最も人気のあるユーザーが選択されます。

  6. 300人のユーザーごとに、タスク(ユーザーの名前、写真、加入者数など)を取得するタスクが起動されます。 上記のプロセスと同様に、すべてのタスクの完了が期待されるか、しばらくしてから新しいステージが強制的に開始されます。

  7. 最後の段階で、最も類似したユーザーの計算と選択が実行されます(サブスクライバーの数とサブスクライバーの合計を考慮に入れて)。 このようなことが判明し、結果へのリンクが電子メールに送信されます。



上記のアプローチと最適化により、結果の取得にはかなりの時間がかかりますが、GAEによって割り当てられた無料の割り当てのフレームワーク内にとどまることができます。 あなたの助けが必要です-ユーザーをキューに追加し 、分析にかかる時間を確認してください。



将来的には、Instagramの実在の人物/企業の認識をサービスに追加する予定ですが、ここでは機械学習が不可欠であるため、これは別のタスクになります。



*サイトのロシア語はまだ機能しません-django翻訳がGAEで機能しない理由がわかりません。



All Articles