メールピッカー(単純なことを一生懸命行う)

序文として



おそらく、あなたの実践の多くは、多くのメールボックスからメールを収集するタスクに直面しているでしょう。 なぜこれが必要なのでしょうか? これはおそらく、システム間でデータを交換するための普遍的なメカニズムだからです。 SMTP、POP3、IMAP、メッセージスタックを実装するための既製のソリューション(メールボックスと呼んでいたように...)などを実装する言語用のライブラリが多数あります。



多くの統合タスクがメールを介して実装されていることは驚くことではありません。 ここでは、必要なアクションをすばやく取得、分類、および実行する方法を知っている特定のサービスが登場します。



以下のコードで十分な人-彼らはさらに読むことはできません:



foreach (var mailbox in mailboxes) using (var client = new Pop3Client()) { client.Connect(Hostname, Port, false); client.Authenticate(User, Password); var count = client.GetMessageCount(); for (var i = 0; i < count; i++) { Mail = client.GetMessage(i + 1); var cat = SortMail(Mail); DoSomething(Mail, cat); } }
      
      







私たちは何をしますか



すぐにいくつかの仮定を行います。

1)複数のシステムのメールを収集する必要があります。 将来的にはさらにいくつかの可能性があります。 それでも...一般に、解決策は普遍的でなければなりません。

2)たくさんのメールがあるかもしれません-それはパラグラフ1から続きます(そうでなければ私はこの投稿を書きません)。

3)メールは解析する必要があります。

4)すべてのサービスボックス-ユーザーはそこに行きません。



使用するもの



システムは年中無休で動作するはずなので、Windowsサービスの形式で実装します。 これらの目的のために、すぐにTopShelfを使用することをお勧めします



もちろん、すべてを並列化する必要があります。 ここで 、私のお気に入りのTPL DataFlowライブラリが登場します。



POP3経由でメールを受け取ります。 このタスクでのIMAPの「ファッショナブルなもの」はすべて不要です。できるだけ早く手紙のソースを取り上げて、サーバー上で削除する必要があります。 POP3は目にとって十分です。 OpenPop.NETを使用します



すでに述べたように、メールを解析します。 たぶん、正規表現を介して、おそらくカスタムロジックを...あなたは何を知らないのでしょう。 プラグインの助けを借りて柔軟かつ迅速に新しいルールをプッシュする必要があるのはそのためです。 ここでは、 Managed Extensibility Frameworkが役立ちます。



NLogを介してログを書き込みます。



オプションとして、 Zabbixで監視を固定します。 (24時間年中無休で作業し、自慢のスピードを提供します-これに従う必要があります)。



行こう



通常のコンソールアプリケーションを作成します。 NuGetコンソールを開き、必要なすべてのパッケージをインストールします。



 Install-Package Nlog Install-Package OpenPop.NET Install-Package TopShelf Install-Package Microsoft.TPL.DataFlow
      
      





プロジェクトフォルダーに移動し、App.Debug.configとApp.Release.configを作成します。 スタジオからプロジェクトをアンロードし、コードを開きます(以下TopCrawler.csproj)。 構成を含むセクションに追加します。



構成
  <None Include="App.Debug.config"> <DependentUpon>App.config</DependentUpon> </None> <None Include="App.Release.config"> <DependentUpon>App.config</DependentUpon> </None>
      
      







次に、MSBuildのカスタムターゲットを示します。



変換ターゲット
 <UsingTask TaskName="TransformXml" AssemblyFile="$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Web\Microsoft.Web.Publishing.Tasks.dll" /> <Target Name="AfterCompile" Condition="Exists('App.$(Configuration).config')"> <!--Generate transformed app config in the intermediate directory--> <TransformXml Source="App.config" Destination="$(IntermediateOutputPath)$(TargetFileName).config" Transform="App.$(Configuration).config" /> <!--Force build process to use the transformed configuration file from now on.--> <ItemGroup> <AppConfigWithTargetPath Remove="App.config" /> <AppConfigWithTargetPath Include="$(IntermediateOutputPath)$(TargetFileName).config"> <TargetPath>$(TargetFileName).config</TargetPath> </AppConfigWithTargetPath> </ItemGroup> </Target>
      
      







個人的には、この方法で-昔ながらの方法で-構成の変換を別々の環境に追加することに慣れました。

便宜上、私は強くタイプされた構成を提供します。 別のクラスが構成を読み取ります。 (コメントでそのようなソリューションの理論的な側面について話すことができます)。 構成、ログ、監視は、シングルトンパターンを実装する優れた理由です。



プロジェクト内に同じ名前のフォルダーを作成します(順序が必要です)。 内部に3つのクラス-Config、Logger、Zabbixを作成します。 ロガー:



ロガー
 static class Logger { public static NLog.Logger Log { get; private set; } public static NLog.Logger Archive { get; private set; } static Logger() { Log = LogManager.GetLogger("Global"); Archive = LogManager.GetLogger("Archivator"); } }
      
      







Zabbixを使用した監視は別の投稿に値するので、エージェントを実装するクラスをここに残します。



ザビックス
 namespace TopCrawler.Singleton { /// <summary> /// Singleton: zabbix sender class /// </summary> static class Zabbix { public static ZabbixSender Sender { get; private set; } static Zabbix() { Sender = new ZabbixSender(Config.ZabbixServer, Config.ZabbixPort); } } struct ZabbixItem { public string Host; public string Key; public string Value; } class ZabbixSender { internal struct SendItem { // ReSharper disable InconsistentNaming - Zabbix is case sensitive public string host; public string key; public string value; public string clock; // ReSharper restore InconsistentNaming } #pragma warning disable 0649 internal struct ZabbixResponse { public string Response; public string Info; } #pragma warning restore 0649 #region --- Constants --- public const string DefaultHeader = "ZBXD\x01"; public const string SendRequest = "sender data"; public const int DefaultTimeout = 10000; #endregion #region --- Fields --- private readonly DateTime _dtUnixMinTime = DateTime.SpecifyKind(new DateTime(1970, 1, 1), DateTimeKind.Utc); private readonly int _timeout; private readonly string _zabbixserver; private readonly int _zabbixport; #endregion #region --- Constructors --- public ZabbixSender(string zabbixserver, int zabbixport) : this(zabbixserver, zabbixport, DefaultTimeout) { } public ZabbixSender(string zabbixserver, int zabbixport, int timeout) { _zabbixserver = zabbixserver; _zabbixport = zabbixport; _timeout = timeout; } #endregion #region --- Methods --- public string SendData(ZabbixItem itm) { return SendData(new List<ZabbixItem>(1) { itm }); } public string SendData(List<ZabbixItem> lstData) { try { var serializer = new JavaScriptSerializer(); var values = new List<SendItem>(lstData.Count); values.AddRange(lstData.Select(itm => new SendItem { host = itm.Host, key = itm.Key, value = itm.Value, clock = Math.Floor((DateTime.Now.ToUniversalTime() - _dtUnixMinTime).TotalSeconds).ToString(CultureInfo.InvariantCulture) })); var json = serializer.Serialize(new { request = SendRequest, data = values.ToArray() }); var header = Encoding.ASCII.GetBytes(DefaultHeader); var length = BitConverter.GetBytes((long)json.Length); var data = Encoding.ASCII.GetBytes(json); var packet = new byte[header.Length + length.Length + data.Length]; Buffer.BlockCopy(header, 0, packet, 0, header.Length); Buffer.BlockCopy(length, 0, packet, header.Length, length.Length); Buffer.BlockCopy(data, 0, packet, header.Length + length.Length, data.Length); using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) { socket.Connect(_zabbixserver, _zabbixport); socket.Send(packet); //Header var buffer = new byte[5]; ReceivData(socket, buffer, 0, buffer.Length, _timeout); if (DefaultHeader != Encoding.ASCII.GetString(buffer, 0, buffer.Length)) throw new Exception("Invalid header"); //Message length buffer = new byte[8]; ReceivData(socket, buffer, 0, buffer.Length, _timeout); var dataLength = BitConverter.ToInt32(buffer, 0); if (dataLength == 0) throw new Exception("Invalid data length"); //Message buffer = new byte[dataLength]; ReceivData(socket, buffer, 0, buffer.Length, _timeout); var response = serializer.Deserialize<ZabbixResponse>(Encoding.ASCII.GetString(buffer, 0, buffer.Length)); return string.Format("Response: {0}, Info: {1}", response.Response, response.Info); } } catch (Exception e) { return string.Format("Exception: {0}", e); } } private static void ReceivData(Socket pObjSocket, byte[] buffer, int offset, int size, int timeout) { var startTickCount = Environment.TickCount; var received = 0; do { if (Environment.TickCount > startTickCount + timeout) throw new TimeoutException(); try { received += pObjSocket.Receive(buffer, offset + received, size - received, SocketFlags.None); } catch (SocketException ex) { if (ex.SocketErrorCode == SocketError.WouldBlock || ex.SocketErrorCode == SocketError.IOPending || ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable) Thread.Sleep(30); else throw; } } while (received < size); } #endregion } }
      
      







構成...少なくとも何か面白いことをする時が来ました。 最初に、設定に、ポーリングするボックスを保存します。 第二に、DataFlow設定。 これをお勧めします:



構成
  <CredentialsList> <credentials hostname="8.8.8.8" username="popka@example.com" password="123" port="110" type="fbl" /> <credentials hostname="8.8.8.8" username="kesha@example.com" password="123" port="110" type="bounce" /> </CredentialsList> <DataFlowOptionsList> <datablockoptions name="_sortMailDataBlock" maxdop="4" boundedcapacity="4" /> <datablockoptions name="_spamFilterDataBlock" maxdop="4" boundedcapacity="4" /> <datablockoptions name="_checkBounceDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_identifyDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_addToCrmDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_addToFblDataBlock" maxdop="16" boundedcapacity="16" /> <datablockoptions name="_addToBounceDataBlock" maxdop="16" boundedcapacity="16" /> </DataFlowOptionsList>
      
      







したがって、接続先のホストとポート、ユーザーとパスワード-ここではすべてが明確です。 次はボックスの種類です。 サービスがマーケティング部門(および他の部門)によって使用されているとします。 それらには、メールへの自動返信とFBLスパムレポートがダンプされるメールボックスがあります。 箱自体はすでに手紙を分類しているので、そのような状況ではすぐに箱の種類を設定します。 DataFlow設定を使用すると、オブジェクトの作成を開始するとさらに明確になります。 ここでは、設定に独自のセクションがあります。 多くのマニュアルでこれを行う方法がありますので、結果を表示してください:



タイプを定義する
  #region --- Types --- static class MailboxType { public const string Bo = "bo"; public const string Crm = "crm"; public const string Fbl = "fbl"; public const string Bounce = "bounce"; } class MailboxInfo { public string Type { get; set; } public string Hostname { get; set; } public string User { get; set; } public string Password { get; set; } public int Port { get; set; } } class DataBlockOptions { public int Maxdop { get; set; } public int BoundedCapacity { get; set; } public DataBlockOptions() { Maxdop = 1; BoundedCapacity = 1; } } #endregion
      
      







セクションを作成する
  /// <summary> /// Custom config section /// </summary> public class CustomSettingsConfigSection : ConfigurationSection { [ConfigurationProperty("CredentialsList")] public CredentialsCollection CredentialItems { get { return base["CredentialsList"] as CredentialsCollection; } } [ConfigurationProperty("DataFlowOptionsList")] public DataBlockOptionsCollection DataFlowOptionsItems { get { return base["DataFlowOptionsList"] as DataBlockOptionsCollection; } } }
      
      







これらのセクションから値を読むことを学ぶ
  /// <summary> /// Custom collection - credentials list /// </summary> [ConfigurationCollection(typeof(CredentialsElement), AddItemName = "credentials")] public class CredentialsCollection : ConfigurationElementCollection, IEnumerable<CredentialsElement> { protected override ConfigurationElement CreateNewElement() { return new CredentialsElement(); } protected override object GetElementKey(ConfigurationElement element) { return ((CredentialsElement)element).Username; } public CredentialsElement this[int index] { get { return BaseGet(index) as CredentialsElement; } } public new IEnumerator<CredentialsElement> GetEnumerator() { for (var i = 0; i < Count; i++) { yield return BaseGet(i) as CredentialsElement; } } } /// <summary> /// Custom credentials item /// </summary> public class CredentialsElement : ConfigurationElement { [ConfigurationProperty("hostname", DefaultValue = "")] public string Hostname { get { return base["hostname"] as string; } } [ConfigurationProperty("username", DefaultValue = "", IsKey = true)] public string Username { get { return base["username"] as string; } } [ConfigurationProperty("password", DefaultValue = "")] public string Password { get { return base["password"] as string; } } [ConfigurationProperty("type", DefaultValue = "")] public string Type { get { return base["type"] as string; } } [ConfigurationProperty("port", DefaultValue = "")] public string Port { get { return base["port"] as string; } } } /// <summary> /// Custom collection - DataBlock options list /// </summary> [ConfigurationCollection(typeof(DataBlockOptionsElement), AddItemName = "datablockoptions")] public class DataBlockOptionsCollection : ConfigurationElementCollection, IEnumerable<DataBlockOptionsElement> { protected override ConfigurationElement CreateNewElement() { return new DataBlockOptionsElement(); } protected override object GetElementKey(ConfigurationElement element) { return ((DataBlockOptionsElement)element).Name; } public CredentialsElement this[int index] { get { return BaseGet(index) as CredentialsElement; } } public new IEnumerator<DataBlockOptionsElement> GetEnumerator() { for (var i = 0; i < Count; i++) { yield return BaseGet(i) as DataBlockOptionsElement; } } } /// <summary> /// Custom DataBlock options item /// </summary> public class DataBlockOptionsElement : ConfigurationElement { [ConfigurationProperty("name", DefaultValue = "", IsKey = true)] public string Name { get { return base["name"] as string; } } [ConfigurationProperty("maxdop", DefaultValue = "")] public string Maxdop { get { return base["maxdop"] as string; } } [ConfigurationProperty("boundedcapacity", DefaultValue = "")] public string BoundedCapacity { get { return base["boundedcapacity"] as string; } } }
      
      







configの完全な実装は記述しません。開発プロセス中に必要なパラメーターがそこに追加されることが理解されています。



次のようなカスタム設定を読み取ります。



読む
  public List<MailboxInfo> CredentialsList { get; private set; } public Dictionary<string, DataBlockOptions> DataFlowOptionsList { get; private set; } ... static Config() { try { var customConfig = (CustomSettingsConfigSection)ConfigurationManager.GetSection("CustomSettings"); //Get mailboxes foreach (var item in customConfig.CredentialItems) CredentialsList.Add(new MailboxInfo { Hostname = item.Hostname, Port = Convert.ToInt32(item.Port), User = item.Username, Type = item.Type, Password = item.Password }); //Get DataFlow settings foreach (var item in customConfig.DataFlowOptionsItems) DataFlowOptionsList.Add(item.Name, new DataBlockOptions { Maxdop = Convert.ToInt32(item.Maxdop), BoundedCapacity = Convert.ToInt32(item.BoundedCapacity) }); } catch (Exception ex) { Logger.Log.Fatal("Error at reading config: {0}", ex.Message); throw; } }
      
      







どういうわけか非常に長引くことが判明しましたが、私たちは最も興味深い部分にさえ達していません。



TopShelfからのバインディング、パフォーマンスカウンター、データベースとの通信を省略し、ビジネスに取りかかります! Crawlerクラス(コア)を作成します。 まず、メールを読んでください:



  private volatile bool _stopPipeline; ... public void Start() { do { var getMailsTasks = _config.CredentialsList.Select(credentials => Task.Run(() => GetMails(credentials))).ToList(); foreach (var task in getMailsTasks) task.Wait(); Thread.Sleep(2000); } while (!_stopPipeline); //Stop pipeline - wait for completion of all endpoints //   DataFlow  if (_stopPipeline) Logger.Log.Warn("Pipeline has been stopped by user"); }
      
      





ここで怠が犠牲になり、私は気にしないことに決めました-あなたが約20-30のボックスを持っているなら、あなたはそれぞれの下でタスクを実行でき、スレッドの数を心配しません。 (トマトを振りかける許可。)



読み取り自体に進みます。



  private void GetMails(MailboxInfo info) { try { using (var client = new Pop3Client()) {
      
      





ボックスのアクセスタイミングをすぐに計算します-ネットワーク診断とサーバーの負荷に役立ちます。



  //Get Zabbix metrics var stopwatch = new Stopwatch(); stopwatch.Start(); //Get mail count client.Connect(info.Hostname, info.Port, false); client.Authenticate(info.User, info.Password); stopwatch.Stop();
      
      





Zabbixにデータを送信します。 簡単です-ホスト名(Zabbixの場合)、キー(厳密には、Zabbixの場合も同様)、および文字列値を指定します。



  //Send it to Zabbix Zabbix.Sender.SendData(new ZabbixItem { Host = Config.HostKey, Key = info.Type + Config.TimingKey, Value = stopwatch.ElapsedMilliseconds.ToString() }); Logger.Log.Debug("Send [{0}] timing to Zabbix: connected to '{1}' as '{2}', timing {3}ms", info.Type, info.Hostname, info.User, stopwatch.ElapsedMilliseconds); var count = client.GetMessageCount(); if (count == 0) return; Logger.Log.Debug("We've got new {0} messages in '{1}'", count, info.User); //Send messages to sorting block for (var i = 0; i < count; i++) { try { var mailInfo = new MessageInfo { IsSpam = false, Mail = client.GetMessage(i + 1), Type = MessageType.UNKNOWN, Subtype = null, Recipient = null, Mailbox = info }; Logger.Log.Debug("Download message from '{0}'. Size: {1}b", info.User, mailInfo.Mail.RawMessage.Length);
      
      





DataFlowパイプラインは、Crawlerクラスが作成されるときに作成されます。 私たちの最初のステップは手紙を分類することだと信じています。



  while (!_sortMailDataBlock.Post(mailInfo)) Thread.Sleep(500);
      
      





あなたはそれがどれほど簡単かを見る-コンベヤー自体は一つです。 メールを読むすべてのタスクは、一度に1つずつメッセージを送信します。 ブロックがビジーの場合、Postはfalseを返し、解放されるまで待機します。 この時点で、電流は引き続き機能します。 これは私が心配なく並行性と呼んでいるものです。



メッセージはコンベアに送られ、RAWアーカイブに安全に保存できるようになりました(はい、はい、読み取ったものはすべてファイルアーカイブに保存されます。サポートチームは後で感謝します)。



たとえば、アーカイブのローテーションを構成します。



Nlog.config
  <targets> <!-- add your targets here --> <target name="logfile" xsi:type="File" fileName="${basedir}\logs\${shortdate}-message.log" /> <target name="Archivefile" xsi:type="File" fileName="${basedir}\archive\${shortdate}-archive.dat" /> </targets>
      
      







次に、 logStashを設定できますが、それは別の話です...



  //Save every mail to archive Logger.Log.Debug("Archive message"); Logger.Archive.Info(Functions.MessageToString(mailInfo.Mail)); } catch (Exception ex) { Logger.Log.Error("Parse email error: {0}", ex.Message); Functions.ErrorsCounters[info.Type].Increment(); //Archive mail anyway Logger.Log.Debug("Archive message"); Logger.Archive.Info(Encoding.Default.GetString(client.GetMessageAsBytes(i + 1))); } if (_config.DeleteMail) client.DeleteMessage(i + 1); if (_stopPipeline) break; } Logger.Log.Debug("Done with '{0}'", info.User); } } catch (Exception ex) { Logger.Log.Error("General error - type: {0}, message: {1}", ex, ex.Message); Functions.ErrorsCounters[info.Type].Increment(); } }
      
      





ここでは、ErrorsCountersが次の静的エラーカウンターを使用しました(ボックスの種類別)。



 public static Dictionary<string, Counter> ErrorsCounters = new Dictionary<string, Counter>();
      
      





カウンター自体は次のように実行できます。



Counter.cs
 class Counter { private long _counter; public Counter() { _counter = 0; } public void Increment() { Interlocked.Increment(ref _counter); } public long Read() { return _counter; } public long Refresh() { return Interlocked.Exchange(ref _counter, 0); } public void Add(long value) { Interlocked.Add(ref _counter, value); } public void Set(long value) { Interlocked.Exchange(ref _counter, value); } }
      
      







パイプラインの作成に移りましょう。 留守番電話が注がれるボックスがあるとしましょう。 そのような文字は解析され(どのような種類の自動返信、誰から、どのメールで、など)、結果をリポジトリ(DB)に入れる必要があります。 FBLレポートが該当するボックスがあるとします。 そのような手紙をすぐにデータベースに追加します。 他のすべての文字は「有用」であると見なします。スパムをチェックし、CRMなどの外部システムに送信する必要があります。



すでに理解しているように、この例では主にマーケティングタスクでのコレクターの使用を考慮しています。メール配信に関する統計情報、スパムに関する情報を収集します。



そこで、ワークフローを決定しました。 Crawlerクラスで必要なブロックを宣言します。



 class MessageInfo { public bool IsSpam { get; set; } public Message Mail { get; set; } public string Subtype { get; set; } public string Recipient { get; set; } public MessageType Type { get; set; } public MailboxInfo Mailbox { get; set; } } class Crawler { //Pipeline private TransformBlock<MessageInfo, MessageInfo> _sortMailDataBlock; private TransformBlock<MessageInfo, MessageInfo> _spamFilterDataBlock; private TransformBlock<MessageInfo, MessageInfo> _checkBounceDataBlock; private TransformBlock<MessageInfo, MessageInfo> _identifyDataBlock; private ActionBlock<MessageInfo> _addToCrmDataBlock; private ActionBlock<MessageInfo> _addToFblDataBlock; private ActionBlock<MessageInfo> _addToBounceDataBlock; ...
      
      





初期化メソッドを作成し、パイプラインブロックを作成します(configsの素晴らしいセクションを使用してブロックを初期化します):



  public void Init() { //*** Create pipeline *** //Create TransformBlock to get message type var blockOptions = _config.GetDataBlockOptions("_sortMailDataBlock"); _sortMailDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => SortMail(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create TransformBlock to filter spam blockOptions = _config.GetDataBlockOptions("_spamFilterDataBlock"); _spamFilterDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => FilterSpam(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create TransformBlock to sort bounces blockOptions = _config.GetDataBlockOptions("_checkBounceDataBlock"); _checkBounceDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => BounceTypeCheck(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create TransformBlock to identify bounce owner blockOptions = _config.GetDataBlockOptions("_identifyDataBlock"); _identifyDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => GetRecipient(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create ActionBlock to send mail to CRM blockOptions = _config.GetDataBlockOptions("_addToCrmDataBlock"); _addToCrmDataBlock = new ActionBlock<MessageInfo>(mail => AddToCrm(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create ActionBlock to send FBL to MailWH blockOptions = _config.GetDataBlockOptions("_addToFblDataBlock"); _addToFblDataBlock = new ActionBlock<MessageInfo>(mail => AddToFbl(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity }); //Create ActionBlock to send Bounce to MailWH blockOptions = _config.GetDataBlockOptions("_addToBounceDataBlock"); _addToBounceDataBlock = new ActionBlock<MessageInfo>(mail => AddToBounce(mail), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = blockOptions.Maxdop, BoundedCapacity = blockOptions.BoundedCapacity });
      
      





スキームに従ってコンベアを組み立てます。



  //*** Build pipeline *** _sortMailDataBlock.LinkTo(_spamFilterDataBlock, info => info.Type == MessageType.GENERAL); _sortMailDataBlock.LinkTo(_addToFblDataBlock, info => info.Type == MessageType.FBL); _sortMailDataBlock.LinkTo(_checkBounceDataBlock, info => info.Type == MessageType.BOUNCE); _sortMailDataBlock.LinkTo(DataflowBlock.NullTarget<MessageInfo>(), info => info.Type == MessageType.UNKNOWN); /*STUB*/ _checkBounceDataBlock.LinkTo(_identifyDataBlock); _identifyDataBlock.LinkTo(_addToBounceDataBlock); _spamFilterDataBlock.LinkTo(_addToCrmDataBlock, info => !info.IsSpam); _spamFilterDataBlock.LinkTo(DataflowBlock.NullTarget<MessageInfo>(), info => info.IsSpam); /*STUB*/
      
      





ご覧のように、すべてが非常に単純です-ブロックを以下に関連付けます(通信条件を設定する可能性がある)。 すべてのブロックは並行して実行されます。 各ブロックにはある程度の並列性と容量があります(容量を使用すると、ブロックの前のキューを調整できます。つまり、ブロックはメッセージを受信しましたが、まだ処理していません)。 したがって、レターの内容を解析するなど、「複雑な」長時間の操作に対して高度な並列処理を設定できます。



DataFlowマテリアルについては説明しません。TPLDataFlowソースのすべてを読む方が良いでしょう



次に、ブロックを終了するためのルールを設定します。



  _sortMailDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_spamFilterDataBlock).Fault(t.Exception); else _spamFilterDataBlock.Complete(); }); _sortMailDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_addToFblDataBlock).Fault(t.Exception); else _addToFblDataBlock.Complete(); }); _sortMailDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_checkBounceDataBlock).Fault(t.Exception); else _checkBounceDataBlock.Complete(); }); _spamFilterDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_addToCrmDataBlock).Fault(t.Exception); else _addToCrmDataBlock.Complete(); }); _checkBounceDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_identifyDataBlock).Fault(t.Exception); else _identifyDataBlock.Complete(); }); _identifyDataBlock.Completion.ContinueWith(t => { if (t.IsFaulted) ((IDataflowBlock)_addToBounceDataBlock).Fault(t.Exception); else _addToBounceDataBlock.Complete(); }); }
      
      





実際、パイプラインはすべて機能しているので、メッセージを投稿できます。 Startメソッドを追加して停止するだけです。



開始する
  public void Start() { do { var getMailsTasks = _config.CredentialsList.Select(credentials => Task.Run(() => GetMails(credentials))).ToList(); foreach (var task in getMailsTasks) task.Wait(); Thread.Sleep(2000); } while (!_stopPipeline); //Stop pipeline - wait for completion of all endpoints _sortMailDataBlock.Complete(); _addToCrmDataBlock.Completion.Wait(); _addToFblDataBlock.Completion.Wait(); _addToBounceDataBlock.Completion.Wait(); if (_stopPipeline) Logger.Log.Warn("Pipeline has been stopped by user"); }
      
      







デリゲートに渡します。

並べ替え...まあ、すべてが私たちと簡単だとしましょう(私たちは常に複雑にする時間があります)



  private MessageInfo SortMail(MessageInfo mail) { switch (mail.Mailbox.Type) { case MailboxType.Crm: mail.Type = MessageType.GENERAL; break; case MailboxType.Bounce: mail.Type = MessageType.BOUNCE; break; case MailboxType.Fbl: mail.Type = MessageType.FBL; break; } return mail; }
      
      





スパムフィルター。 これは宿題用です-SpamAssassinを使用します。

代理人は次のとおりです。



  private MessageInfo FilterSpam(MessageInfo mail) { //TODO: Add SpamAssassin logic return mail; }
      
      





また、SpamAssassin APIを操作するためのクラス( プロジェクトへのリンク )。

そして、文字の解析に進みます。 Parsimから回答します。 これがMEFの出番です。

プラグインのインターフェイスを使用してプロジェクト(dll)を作成します(インターフェイスを呼び出しましょう)。

インターフェースを追加:



  public interface ICondition { string Check(Message mimeMessage); } public interface IConditionMetadata { Type Type { get; } }
      
      





そして...それだけです。 TopCrawlerはこのプロジェクトに依存しており、プラグインを含むプロジェクトでも使用されます。

新しいプロジェクト(dllも)を作成し、Conditionsを呼び出します。

自動応答タイプを追加します。



  #region --- Types --- static class BounceType { public const string Full = "BounceTypeFull"; public const string Timeout = "BounceTypeTimeout"; public const string Refused = "BounceTypeRefused"; public const string NotFound = "BounceTypeNotFound"; public const string Inactive = "BounceTypeInactive"; public const string OutOfOffice = "BounceTypeOutOfOffice"; public const string HostNotFound = "BounceTypeHostNotFound"; public const string NotAuthorized = "BounceTypeNotAuthorized"; public const string ManyConnections = "BounceTypeManyConnections"; } #endregion
      
      





そして、インターフェースを実装するクラス:



  [Export(typeof(ICondition))] [ExportMetadata("Type", typeof(ConditionNotFound1))] public class ConditionNotFound1 : ICondition { public string Check(Message mimeMessage) { if (!mimeMessage.MessagePart.IsMultiPart) return null; const string pattern = "Diagnostic-Code:.+smtp.+550"; var regexp = new Regex(pattern, RegexOptions.IgnoreCase); return mimeMessage.MessagePart.MessageParts.Any(part => part.ContentType.MediaType == "message/delivery-status" && regexp.IsMatch(part.GetBodyAsText())) ? BounceType.NotFound : null; } } ... [Export(typeof(ICondition))] [ExportMetadata("Type", typeof(ConditionTimeout2))] public class ConditionTimeout2 : ICondition { return BounceType.Timeout; } ...
      
      





お気づきのように、ポイントは属性にあります。 それらを使用して、プラグインがアップロードされます。

プロジェクトに戻り、プラグインをロードします。



  class Crawler { ... //Plugins [ImportMany] public IEnumerable<Lazy<ICondition, IConditionMetadata>> BounceTypeConditions { get; set; } private void LoadPlugins() { try { var container = new CompositionContainer(new DirectoryCatalog(_config.PluginDirectory), true); container.ComposeParts(this); } catch (Exception ex) { Logger.Log.Error("Unable to load plugins: {0}", ex.Message); } } ...
      
      





クラスのコンストラクターでLoadPluginsをプルします。ロードメカニズムについては詳しく説明しません。Googleの方がうまくいきます。



バウンスのようなチェックのデリゲートに渡します。最初の条件が機能するまで条件が順番に適用されます-排他的方法:



  private MessageInfo BounceTypeCheck(MessageInfo mailInfo) { try { foreach (var condition in BounceTypeConditions) { var res = condition.Value.Check(mailInfo.Mail); if (res == null) continue; mailInfo.Subtype = res; Logger.Log.Debug("Bounce type condition [{0}] triggered for message [{1}]", condition.Metadata.Type, mailInfo.Mail.Headers.MessageId); break; } } catch (Exception ex) { Logger.Log.Error("Failed to determine bounce type for message '{0}': {1}", mailInfo.Mail.Headers.MessageId, ex.Message); Logger.ErrorsCounters[MailboxType.Bounce].Increment(); } return mailInfo; }
      
      





したがって、新しいlogicabが表示された場合は、インターフェイスを実装するプラグインを含むプロジェクトに新しいクラスを追加するだけで十分です。レターの送信者の定義によって2番目のプラグインの例を適用しません。既に長い投稿です(サーバー自体が自動応答を生成したため、送信者もメッセージヘッダーから解決する必要があります)



データベースに結果を記録することも珍しいことではありません。たとえば、次のように:



  private void AddToBounce(MessageInfo mail) { try { MailWH.BounceAdd(mail); Functions.ProcessedCounters[MailboxType.Bounce].Increment(); Functions.Log.Debug("Send Bounce to MailWH"); } catch (Exception ex) { Functions.Log.Error("Error saving Bounce message '{0}' to MailWH: {1}", mail.Mail.Headers.MessageId, ex.Message); Functions.ErrorsCounters[MailboxType.Bounce].Increment(); } }
      
      





バウンスアド
  public static long BounceAdd(MessageInfo message) { using (var conn = new SqlConnection(ConnectionString)) using (var cmd = new SqlDataAdapter("BounceAdd", conn)) { var body = message.Mail.FindFirstPlainTextVersion() == null ? message.Mail.FindFirstHtmlVersion().GetBodyAsText() : message.Mail.FindFirstPlainTextVersion().GetBodyAsText(); var outId = new SqlParameter("@ID", SqlDbType.BigInt) { Direction = ParameterDirection.Output }; cmd.SelectCommand.CommandType = CommandType.StoredProcedure; cmd.SelectCommand.Parameters.Add(new SqlParameter("@RawMessage", message.Mail.RawMessage)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@Message", body)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@Subject", message.Mail.Headers.Subject ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@MessageID", message.Mail.Headers.MessageId ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@AddressTo", message.Mail.Headers.To[0].Address ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@AddressFrom", message.Mail.Headers.From.Address ?? "")); cmd.SelectCommand.Parameters.Add(new SqlParameter("@DateRecieved", DateTime.Now)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@BounceTypeSysName", (object)message.Subtype ?? DBNull.Value)); cmd.SelectCommand.Parameters.Add(new SqlParameter("@SourceFrom", (object)message.Recipient ?? DBNull.Value)); // TODO: Add ListId support cmd.SelectCommand.Parameters.Add(new SqlParameter("@ListId", DBNull.Value)); cmd.SelectCommand.Parameters.Add(outId); conn.Open(); cmd.SelectCommand.ExecuteNonQuery(); return outId.Value as long? ?? 0; } }
      
      







TopShelfを表示する時間がないためごめんなさい-投稿はすでに肥大化しすぎています。



結論



このレッスンでは、メールを収集するタスクはそれほど単純ではない可能性があることを学びました。開発されたカーネルにより、既存のロジックに影響を与えることなく、新しいプロセスステップであるDataFlowブロックをすばやく追加できます。プラグインサブシステムを使用すると、スクリプトのような解析ロジックをすばやく構築でき、DataFlow自体がすべての計算を並列化します(特定のマシンに対して柔軟にマルチスレッドを構成できます)。TopShelfは、デバッグを容易にするために、サービスモードとコンソールモードの両方でサービスを実行する機能を提供します。



Fuh ...興味深い場合は、Continious Integrationのレールに配置し、VSリリース管理を使用して自動ビルドとリリースを構成する方法をさらに説明します



All Articles