| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 | using System;using System.Data.Common;using System.Linq;using System.Net;using System.Net.Sockets;using System.Threading;using Newtonsoft.Json;using Npgsql;using StackExchange.Redis;namespace Worker{    public class Program    {        public static int Main(string[] args)        {            try            {                var pgsql = OpenDbConnection("Server=db;Username=postgres;Password=postgres;");                var redisConn = OpenRedisConnection("redis");                var redis = redisConn.GetDatabase();                // Keep alive is not implemented in Npgsql yet. This workaround was recommended:                // https://github.com/npgsql/npgsql/issues/1214#issuecomment-235828359                var keepAliveCommand = pgsql.CreateCommand();                keepAliveCommand.CommandText = "SELECT 1";                var definition = new { vote = "", voter_id = "" };                while (true)                {                    // Slow down to prevent CPU spike, only query each 100ms                    Thread.Sleep(100);                    // Reconnect redis if down                    if (redisConn == null || !redisConn.IsConnected) {                        Console.WriteLine("Reconnecting Redis");                        redisConn = OpenRedisConnection("redis");                        redis = redisConn.GetDatabase();                    }                    string json = redis.ListLeftPopAsync("votes").Result;                    if (json != null)                    {                        var vote = JsonConvert.DeserializeAnonymousType(json, definition);                        Console.WriteLine($"Processing vote for '{vote.vote}' by '{vote.voter_id}'");                        // Reconnect DB if down                        if (!pgsql.State.Equals(System.Data.ConnectionState.Open))                        {                            Console.WriteLine("Reconnecting DB");                            pgsql = OpenDbConnection("Server=db;Username=postgres;Password=postgres;");                        }                        else                        { // Normal +1 vote requested                            UpdateVote(pgsql, vote.voter_id, vote.vote);                        }                    }                    else                    {                        keepAliveCommand.ExecuteNonQuery();                    }                }            }            catch (Exception ex)            {                Console.Error.WriteLine(ex.ToString());                return 1;            }        }        private static NpgsqlConnection OpenDbConnection(string connectionString)        {            NpgsqlConnection connection;            while (true)            {                try                {                    connection = new NpgsqlConnection(connectionString);                    connection.Open();                    break;                }                catch (SocketException)                {                    Console.Error.WriteLine("Waiting for db");                    Thread.Sleep(1000);                }                catch (DbException)                {                    Console.Error.WriteLine("Waiting for db");                    Thread.Sleep(1000);                }            }            Console.Error.WriteLine("Connected to db");            var command = connection.CreateCommand();            command.CommandText = @"CREATE TABLE IF NOT EXISTS votes (                                        id VARCHAR(255) NOT NULL UNIQUE,                                        vote VARCHAR(255) NOT NULL                                    )";            command.ExecuteNonQuery();            return connection;        }        private static ConnectionMultiplexer OpenRedisConnection(string hostname)        {            // Use IP address to workaround https://github.com/StackExchange/StackExchange.Redis/issues/410            var ipAddress = GetIp(hostname);            Console.WriteLine($"Found redis at {ipAddress}");            while (true)            {                try                {                    Console.Error.WriteLine("Connecting to redis");                    return ConnectionMultiplexer.Connect(ipAddress);                }                catch (RedisConnectionException)                {                    Console.Error.WriteLine("Waiting for redis");                    Thread.Sleep(1000);                }            }        }        private static string GetIp(string hostname)            => Dns.GetHostEntryAsync(hostname)                .Result                .AddressList                .First(a => a.AddressFamily == AddressFamily.InterNetwork)                .ToString();        private static void UpdateVote(NpgsqlConnection connection, string voterId, string vote)        {            var command = connection.CreateCommand();            try            {                command.CommandText = "INSERT INTO votes (id, vote) VALUES (@id, @vote)";                command.Parameters.AddWithValue("@id", voterId);                command.Parameters.AddWithValue("@vote", vote);                command.ExecuteNonQuery();            }            catch (DbException)            {                command.CommandText = "UPDATE votes SET vote = @vote WHERE id = @id";                command.ExecuteNonQuery();            }            finally            {                command.Dispose();            }        }    }}
 |